In [1]:
import pyspark
from pyspark.sql import SparkSession 
from pyspark.sql.functions import * 
from pyspark.sql.types import *
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
import matplotlib.pyplot as plt
import seaborn as sns
from wordcloud import WordCloud, STOPWORDS
from pyspark.ml.fpm import FPGrowth
import time
import traceback
from functools import partial



In [2]:
class SparkConfig :
    def create_sparksession() :
        spark = SparkSession.builder.appName("IMDB movie") \
                                    .config("spark.driver.memory" , "8g") \
                                    .config("spark.executors.memory" , "4g") \
                                    .config('spark.streaming.stopGracefullyOnShutdown' , True) \
                                    .config("spark.sql.shuffle.partitions" , "20") \
                                    .config("spark.jars.packages" , "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.1," \
                                                                    "com.johnsnowlabs.nlp:spark-nlp_2.12:5.4.0") \
                                    .config("spark.jars" , "/home/enovo/prj/test/postgresql-42.7.3.jar") \
                                    .getOrCreate()
        return spark 

In [3]:
class IMDB_Schema :
    def movie_schema() :
        movie_schema = ArrayType(StructType([
            StructField('movie_id' , StringType() , True) , 
            StructField("title" , StringType() , True) ,
            StructField("rating" , FloatType() , True) , 
            StructField("year" , IntegerType() , True) ,
            StructField("vote_count" , StringType() , True) ,
            StructField("runtime" , StringType() , True) ,
            StructField("items" , StringType() , True ) ,
            StructField("country" , StringType() , True) ,
            StructField("language" , StringType() , True) ,
            StructField("company" , StringType() , True) ,
            StructField("budget" , StringType() , True) , 
            StructField("revenue" , StringType() , True) ,
            StructField("plot" , StringType() , True) ,
            StructField("poster" , StringType() , True) ,
            StructField("url" , StringType() , True)
        ]))
        return movie_schema
    
    def actor_schema()  :
        actor_schema = ArrayType(StructType([
            StructField("actor_id" , StringType() , True) ,
            StructField("director" , StringType() , True) ,
            StructField("writers" , StringType() , True) ,
            StructField("stars" , StringType() , True) ,
            StructField("movie_id" , StringType() , True)
        ]))
        return actor_schema 
    
    def review_schema() : 
        review_schema = ArrayType(StructType([
            StructField("review_id" , StringType() , True) , 
            StructField("title_review" , StringType() , True) ,
            StructField("comment" , StringType() , True) ,
            StructField("star" , StringType() , True) ,
            StructField("like" , StringType() , True) ,
            StructField("dislike" , StringType() , True) ,
            StructField("date" , StringType() , True) ,
            StructField("user_name" , StringType() , True) , 
            StructField("movie_id" , StringType() , True)
        ]))
        return review_schema

In [4]:
class DataLoader :
    def __init__(selt) :
        selt.spark = SparkConfig.create_sparksession()
        
    def movie_load(selt) :
        print("\n" + "="*60)
        print("=== Starting load movie ===")
        print("="*60)

        movie = selt.spark.readStream.format("kafka" ).option("kafka.bootstrap.servers" , "localhost:9092" ) \
                                                .option("subscribe" , "movie") \
                                                .option("startingOffsets" , "earliest") \
                                                .load()
        
        movie_generate = movie.withColumn("movie_ts" , col("timestamp")) \
                                        .withWatermark("movie_ts" , "10 minutes")
        
        movie_generate = movie_generate.withColumn("value" , explode(
                                                    from_json(col("value").cast("string") , IMDB_Schema.movie_schema()))) \
                                                    .select("value.*" , "movie_ts")
        
        
        print("=== Loaded movie successfully ===")
        return movie_generate
    
    def actor_load(selt) :
        print("\n" + "="*60)
        print("=== Starting load actor ===")
        print("="*60)
        actor = selt.spark.readStream.format("kafka" ).option("kafka.bootstrap.servers" , "localhost:9092" ) \
                                                .option("subscribe" , "actor") \
                                                .option("startingOffsets" , "earliest") \
                                                .load()
        
        actor_generate = actor.withColumn("actor_ts" , col("timestamp")) \
                                        .withWatermark("actor_ts" , "10 minutes")
        
        actor_generate = actor_generate.withColumn("value" , explode(
                                                    from_json(col("value").cast("string") , IMDB_Schema.actor_schema()))) \
                                                    .select("value.*" , "actor_ts")
        
        
        print("=== Loaded actor successfully ===")
        return actor_generate    
    
    def review_load(selt) :
        print("\n" + "="*60)
        print("=== Starting review actor ===")
        print("="*60)

        review = selt.spark.readStream.format("kafka" ).option("kafka.bootstrap.servers" , "localhost:9092" ) \
                                                .option("subscribe" , "review") \
                                                .option("startingOffsets" , "earliest") \
                                                .load()
        
        review_generate = review.withColumn("review_ts" , col("timestamp")) \
                                        .withWatermark("review_ts" , "10 minutes")
        
        review_generate = review_generate.withColumn("value" , explode(
                                                    from_json(col("value").cast("string") , IMDB_Schema.review_schema()))) \
                                                    .select("value.*" , "review_ts")
        
        
        print("=== Loaded review successfully ===")
        return review_generate

In [5]:
class DataClean :
    def movie_clean(movie) :
        # Lọc các record rác
        print("\n" + "="*60)
        print("=== Starting clean movie ===")
        print("="*60)  

        movie = movie.filter(~col("movie_id").startswith("{"))

        # Chuyển đổi kiểu dữ liệu , làm sạch cột revenue
        movie_revenue_clean = movie.withColumn("revenue" , regexp_replace(col("revenue") , "[\$,]" , "")) \
                                    .withColumn("revenue" , trim(col("revenue")) ) \
                                    .withColumn("revenue" , when(col("revenue") == "" , 0).otherwise(col("revenue"))) \
                                    .withColumn("revenue" , col("revenue").cast('int')) 
        
        # Chuyển đổi kiểu dữ liệu , làm sạch cột budget
        movie_budget_clean = movie_revenue_clean.withColumn("budget" , when(col("budget") == "" , "0").otherwise(col("budget"))) \
                                                    .withColumn("budget" , split(col("budget") , " ").getItem(0)) \
                                                    .withColumn("budget" , regexp_replace(col("budget") , "[\$,]" , "")) \
                                                    .withColumn("budget" , trim(col("budget")) ) \
                                                    .withColumn("budget" , col("budget").cast('int'))
        
        # Chuyển đổi kiểu dữ liệu , làm sạch cột vote_count
        movie_clean =  movie_budget_clean.withColumn("vote_count" ,
                            when(col("vote_count").endswith("M"),  regexp_replace(col("vote_count") , "M" , "").cast("double") * 1_000_000 ) \
                            .when(col("vote_count").endswith("K"), regexp_replace(col("vote_count") , "K" , "").cast("double") * 1_000  ) \
                            .otherwise(0) \
                            .cast("int"))
        print("=== Cleaned movie successfully ===")
        return movie_clean
    
    def actor_clean(actor) :
        # Lọc các record rác
        print("\n" + "="*60)
        print("=== Starting clean actor ===")
        print("="*60)  
        actor = actor.filter(~col("actor_id").startswith("{"))

        # Chuyển đổi kiểu dữ liệu , làm sạch cột writers và stars
        actor_clean = actor.withColumn("writers" , regexp_replace(col("writers") , '[\[\]"]' , "") ) \
                        .withColumn("stars" , regexp_replace(col("stars") , '[\[\]"]' , "") )
        print("=== Cleaned actor successfully ===")
        return actor_clean
    
    def review_clean(review) : 
        print("\n" + "="*60)
        print("=== Starting clean review ===")
        print("="*60)  

        # Chuyển đổi kiểu dữ liệu , làm sạch cột star
        review_star_clean = review.withColumn("star" , when(col("star") == "" , "0") \
                                                .otherwise(col("star")) \
                                                .cast("int") )
        
        # Chuyển đổi kiểu dữ liệu , làm sạch cột like
        review_like_clean = review_star_clean.withColumn("like" , 
                                            when(col("like").endswith("K") ,
                                                trim(regexp_replace(col("like") , "K" , "")).cast("double") * 1_000 ) \
                                            .when(col("like") == "" , "0") \
                                            .otherwise(trim(col("like"))) \
                                            .cast("int"))
        
        # Chuyển đổi kiểu dữ liệu , làm sạch cột dislike
        review_dislike_clean = review_like_clean.withColumn("dislike" , 
                                            when(col("dislike").endswith("K") , 
                                                trim(regexp_replace(col("dislike") , "K" , "")).cast("double") * 1_000 ) \
                                            .when(col("dislike") == "" , "0") \
                                            .otherwise(trim(col("dislike"))) \
                                            .cast("int"))
        
        # Xóa cột không cần thiết
        review_clean = review_dislike_clean.drop("date")
        print("=== Cleaned review successfully ===")

        return review_clean

    

  movie_revenue_clean = movie.withColumn("revenue" , regexp_replace(col("revenue") , "[\$,]" , "")) \
  .withColumn("budget" , regexp_replace(col("budget") , "[\$,]" , "")) \
  actor_clean = actor.withColumn("writers" , regexp_replace(col("writers") , '[\[\]"]' , "") ) \
  .withColumn("stars" , regexp_replace(col("stars") , '[\[\]"]' , "") )


In [17]:
class AnalysisSentiment :    
    # ===================================================================
    # 1 : Top đánh giá ,doanh thu ,lợi nhuận của các phim trong từng nước 
    # ===================================================================

    def top_country(movie) :
        # Thêm cột lợi nhuận
        print("Analysis 1 : Top ratings, revenue, and profits of films in each country")
        movie_profit = movie.withColumn("profit" , when(col("revenue") > 0 , col("revenue") - col("budget")) \
                                            .otherwise(0))
        # Lấy ra top theo country và language
        top_country = movie_profit.groupBy("country" , "language","movie_ts").agg(avg("rating").alias("avg_rating") ,
                                                                        sum("revenue").alias("total_revenue") ,
                                                                        count("movie_id").alias("total_movie") ,
                                                                        sum("budget").alias("total_budget") ,
                                                                        avg("profit").alias("avg_profit"))
        return top_country

    # ===================================================================
    # 2 : Phân tích cảm xúc người dùng
    # ===================================================================
    def top_user_sentiment(review , movie) :
        print("Analysis 2 : Analytics user Sentiment")

        user_sentiment = review.join(broadcast(movie) , "movie_id" , "inner") 

        top_user_sentiment = user_sentiment.groupBy("title" , "rating" , "review_ts").agg(count("review_id").alias("total_review") ,
                                                                        sum("like").alias("total_like") ,
                                                                        sum("dislike").alias("total_dislike") ,
                                                                    when(sum("like")+sum("dislike") > 0 ,
                                                                     (sum("like")/(sum("like")+sum("dislike")))).otherwise(0.0).alias("like_ratio")
                                                                        )

        return top_user_sentiment
    # ===================================================================
    # 3 : Top các đạo diễn có lượng rating , revenue cao nhất
    # ===================================================================

    def top_director_rate(actor , movie) :
        print("Analysis 3 : Top directors with the highest ratings and revenue.")

        director_rate = actor.join(broadcast(movie) , "movie_id" , "inner") 

        top_director_rate = director_rate.groupBy("director" , "actor_ts").agg(avg("rating").alias("avg_rating") ,
                                                        count("movie_id").alias("total_movies") ,
                                                        sum("revenue").alias("total_revenue"))
        
        return top_director_rate

    # ===================================================================
    # 4 : Xu hướng đánh giá theo năm 
    # ===================================================================
    def rating_per_year(review , movie) :
        print("Analysis 4 : Annual evaluation trends")

        review_join = review.join(broadcast(movie) , "movie_id" , "inner")

        review_dt = review_join.dropDuplicates(["review_id" , "movie_id"])

        rating_per_year = review_dt.groupBy("year" , "review_ts").agg(avg("rating").alias("avg_rating") ,
                                                        count("movie_id").alias("total_movie") ,
                                                        count("review_id").alias("total_review"))
        
        return rating_per_year

    # ===================================================================
    # 5 : Khai phá dữ liệu , dự đoán cảm xúc người dùng
    # ===================================================================
    def data_mining(review) :    
        
        review_label =  review.withColumn("sentiment" , when(col("star") >= 8 , "Positive") \
                                                            .when(col("star") >= 5 , "Neutral") \
                                                            .otherwise("Negative")) 
        pdf = review_label.toPandas()


        # ----------------------------
        # 1. Star distribution
        # ----------------------------
        plt.figure(figsize=(6,4))
        pdf["star"].hist(bins=12)
        plt.xlabel("Star")
        plt.ylabel("Count")
        plt.title("Star Distribution")
        plt.tight_layout()
        plt.savefig("../models/star_distribution.png")
        plt.close()

        # ----------------------------
        # 2. Sentiment distribution
        # ----------------------------
        plt.figure(figsize=(6,4))
        pdf["sentiment"].value_counts().plot(kind="bar")
        plt.xlabel("Sentiment")
        plt.ylabel("Count")
        plt.title("Sentiment Distribution")
        plt.tight_layout()
        plt.savefig("../models/sentiment_distribution.png")
        plt.close()

        # ----------------------------
        # 3. Scatter Like – Dislike
        # ----------------------------
        plt.figure(figsize=(6,4))
        plt.scatter(pdf["like"], pdf["dislike"])
        plt.xlabel("Like")
        plt.ylabel("Dislike")
        plt.title("Like vs Dislike")
        plt.tight_layout()
        plt.savefig("../models/like_dislike_scatter.png")
        plt.close()

        # ----------------------------
        # 4. Boxplot Likes theo sentiment
        # ----------------------------
        plt.figure(figsize=(6,4))
        sns.boxplot(x="sentiment", y="like", data=pdf)
        plt.title("Like Distribution by Sentiment")
        plt.tight_layout()
        plt.savefig("../models/like_by_sentiment.png")
        plt.close()

        print("✔ Ảnh đã lưu vào: models", )
        # 5. Lấy toàn bộ comment worldcount
        text_pdf = review_label.select("comment").toPandas()
        text = " ".join(text_pdf["comment"].fillna(""))

        wc = WordCloud(
            width=1200,
            height=800,
            background_color="white",
            stopwords=STOPWORDS,
            collocations=True
        ).generate(text)

        plt.figure(figsize=(12,8))
        plt.imshow(wc, interpolation="bilinear")
        plt.axis("off")
        plt.tight_layout()
        path_wc = f"../models/wordcloud.png"
        plt.savefig(path_wc)
        plt.close()

        print("✔ Saved WordCloud:", path_wc)


        # 6 .Heatmap
        num_pdf = review_label.select("star", "like", "dislike").toPandas()
        corr = num_pdf.corr()

        plt.figure(figsize=(6,5))
        sns.heatmap(corr, annot=True, fmt=".2f", cmap="Blues")
        plt.title("Correlation Heatmap")
        plt.tight_layout()
        path_heat = f"../models/correlation_heatmap.png"
        plt.savefig(path_heat)
        plt.close()

        print("✔ Saved Heatmap:", path_heat)

        # ============================
        #  Chuẩn bị dữ liệu cho FP-Growth
        # ============================
        fp_df = review_label.select(
            array(
                "sentiment",
                col("star").cast("string"),
                when(col("like") > 500, "like_high").otherwise("like_low"),
                when(col("dislike") > 100, "dislike_high").otherwise("dislike_low")
            ).alias("items")
        )

        # ============================
        #  Train FP-Growth (Apriori)
        # ============================
        fp = FPGrowth(itemsCol="items", minSupport=0.1, minConfidence=0.1)
        model = fp.fit(fp_df)

        rules = model.associationRules
        rules_pdf = rules.toPandas()

        # Chuyển luật thành dạng chuỗi dễ đọc
        rules_pdf["rule"] = rules_pdf["antecedent"].apply(lambda x: ",".join(x)) + " → " + \
                            rules_pdf["consequent"].apply(lambda x: ",".join(x))

        # ============================
        #  biểu đồ 7: Biểu đồ cột Confidence theo luật (đã chỉnh sửa)
        # ============================

        # Rút gọn chuỗi rule nếu quá dài (ví dụ > 40 ký tự)
        def shorten(text, max_len=40):
            return text if len(text) <= max_len else text[:max_len] + "..."

        rules_pdf["rule_short"] = rules_pdf["rule"].apply(shorten)

        # Lấy top 20 luật theo confidence (không bị rối chữ)
        rules_top = rules_pdf.sort_values("confidence", ascending=False).head(20)

        plt.figure(figsize=(14, 10))   # ảnh lớn hơn cho dễ nhìn

        sns.barplot(
            y="rule_short",
            x="confidence",
            data=rules_top,
            palette="Blues_r"
        )

        plt.title("Association Rules – Confidence", fontsize=16)
        plt.xlabel("Confidence", fontsize=13)
        plt.ylabel("Rule", fontsize=13)

        # Tăng lề trái để nhãn không bị dính
        plt.subplots_adjust(left=0.40)

        # Giảm kích thước font nhãn
        plt.yticks(fontsize=10)

        plt.tight_layout()

        save_path1 = f"../models/association_rules_confidence.png"
        plt.savefig(save_path1, dpi=200, bbox_inches="tight")
        plt.close()

        print("✔ Saved:", save_path1)

        # ============================
        #  Vẽ biểu đồ 8: Scatter Support – Confidence
        # ============================
        plt.figure(figsize=(7,6))
        plt.scatter(rules_pdf["support"], rules_pdf["confidence"])
        plt.xlabel("Support")
        plt.ylabel("Confidence")
        plt.title("Support vs Confidence")
        plt.tight_layout()

        save_path2 = "../models/support_vs_confidence.png"
        plt.savefig(save_path2)
        plt.close()

        print("✔ Saved:", save_path2)

        # ============================
        #  Vẽ biểu đồ 9: Scatter Lift – Confidence
        # ============================
        plt.figure(figsize=(7,6))
        plt.scatter(rules_pdf["lift"], rules_pdf["confidence"])
        plt.xlabel("Lift")
        plt.ylabel("Confidence")
        plt.title("Lift vs Confidence")
        plt.tight_layout()

        save_path3 = "../models/lift_vs_confidence.png"
        plt.savefig(save_path3)
        plt.close()

        print("✔ Saved:", save_path3)

    def predict_sentiment(review) :
        review_label =  review.withColumn("label" , when(col("star") >= 8 , 1) \
                                                        .when(col("star") >= 5 , 2) \
                                                        .otherwise(0)) \
                                                        .drop("sentiment")
        #  Tokenize comment
        tokenizer = Tokenizer(inputCol="comment", outputCol="words")
        remover = StopWordsRemover(inputCol="words", outputCol="filtered")

        # TF 
        vectorizer = CountVectorizer(inputCol="filtered", outputCol="raw_features")

        #  IDF
        idf = IDF(inputCol="raw_features", outputCol="features")

        # Classifier
        lr = LogisticRegression(featuresCol="features", labelCol="label")

        # Full pipeline
        pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, idf, lr])

        train, test = review_label.randomSplit([0.8, 0.2], seed=42)
        model = pipeline.fit(train)
        predictions = model.transform(test)

        predictions.select("comment", "label", "prediction", "probability").show(truncate=False)

    # ===================================================================
    # 6 : Tổng các đánh giá cảm xúc theo từng phim
    # ===================================================================

    def top_sentiment(review , movie) :
        print("Analysis 5 : Total ratings sentiment for each movie")

        review_sentiment =  review.withColumn("Positive" , when(col("star") >= 8 , 1).otherwise(0)) \
                                    .withColumn("Neutral" , when((col("star") >= 5) & (col("star") < 8) , 1).otherwise(0)) \
                                        .withColumn("Negative" , when(col("star") < 5 , 1).otherwise(0))

        review_join = review_sentiment.join(broadcast(movie) , "movie_id" , "inner")

        review_dt = review_join.dropDuplicates(["review_id"])

        top_sentiment = review_join.groupBy("title" , "year" , "rating" , "review_ts") \
                                    .agg(count("review_id").alias("total_review") ,
                                        sum("Positive").alias("Positive") ,
                                        sum("Negative").alias("Negative") ,
                                        sum("Neutral").alias("Neutral"))
        
        return top_sentiment


[Stage 334:>                                                      (0 + 12) / 20]

                                                                                

Count in batch = 0


                                                                                

+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+
|title|rating|review_ts|total_review|total_like|total_dislike|CASE WHEN ((sum(like) + sum(dislike)) > 0) THEN (sum(like) / (sum(like) + sum(dislike))) AS like_ratio ELSE 0 END|
+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+
+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+

Error to save  :  [COLUMN_NOT_DEFINED_IN_TABLE] "DOUBLE" column `CASE WHEN ((sum(like) + sum(dislike)) > 0) THEN (sum(like) / (sum(like) + sum(dislike))) AS like_ratio ELSE 0 END` is not defined in table `public`.`top_user_sentiment`, defined table columns are: `title`, `rating`, `review_ts

25/11/23 20:00:12 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000} milliseconds, but spent 13315 milliseconds


Batch id  18  :
=== Save into postgresql ===


                                                                                

Count in batch = 0


                                                                                

+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+
|title|rating|review_ts|total_review|total_like|total_dislike|CASE WHEN ((sum(like) + sum(dislike)) > 0) THEN (sum(like) / (sum(like) + sum(dislike))) AS like_ratio ELSE 0 END|
+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+
+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+

Error to save  :  [COLUMN_NOT_DEFINED_IN_TABLE] "DOUBLE" column `CASE WHEN ((sum(like) + sum(dislike)) > 0) THEN (sum(like) / (sum(like) + sum(dislike))) AS like_ratio ELSE 0 END` is not defined in table `public`.`top_user_sentiment`, defined table columns are: `title`, `rating`, `review_ts

25/11/23 20:00:19 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000} milliseconds, but spent 6156 milliseconds


Batch id  19  :
=== Save into postgresql ===


                                                                                

Count in batch = 0
+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+
|title|rating|review_ts|total_review|total_like|total_dislike|CASE WHEN ((sum(like) + sum(dislike)) > 0) THEN (sum(like) / (sum(like) + sum(dislike))) AS like_ratio ELSE 0 END|
+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+
+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+

Error to save  :  [COLUMN_NOT_DEFINED_IN_TABLE] "DOUBLE" column `CASE WHEN ((sum(like) + sum(dislike)) > 0) THEN (sum(like) / (sum(like) + sum(dislike))) AS like_ratio ELSE 0 END` is not defined in table `public`.`top_user_sentiment`, defined table columns are: `title`, `

25/11/23 20:00:25 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000} milliseconds, but spent 6039 milliseconds


Batch id  20  :
=== Save into postgresql ===


                                                                                

Count in batch = 0
+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+
|title|rating|review_ts|total_review|total_like|total_dislike|CASE WHEN ((sum(like) + sum(dislike)) > 0) THEN (sum(like) / (sum(like) + sum(dislike))) AS like_ratio ELSE 0 END|
+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+
+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+

Error to save  :  [COLUMN_NOT_DEFINED_IN_TABLE] "DOUBLE" column `CASE WHEN ((sum(like) + sum(dislike)) > 0) THEN (sum(like) / (sum(like) + sum(dislike))) AS like_ratio ELSE 0 END` is not defined in table `public`.`top_user_sentiment`, defined table columns are: `title`, `

25/11/23 20:00:31 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000} milliseconds, but spent 6071 milliseconds


Batch id  21  :
=== Save into postgresql ===


                                                                                

Count in batch = 0


In [7]:
class ImdbPipeline :
    def __init__(self):
        self.spark = SparkConfig.create_sparksession()
        self.load = DataLoader()
        self.clean = DataClean
        self.analytics = AnalysisSentiment
        self.result = {}

    def load_data(self) :
        print("\n" + "="*60)
        print("=== DATA LOADING PHASE ===")
        print("="*60)

        self.movie = self.clean.movie_clean(self.load.movie_load())
        self.actor = self.clean.actor_clean(self.load.actor_load())
        self.review = self.clean.review_clean(self.load.review_load())

        print("\n All data loaded successfully")

    def run_analysis_1(self) :
        try:
            result = self.analytics.top_country(self.movie)
            self.result["top_country"] = result
            return result
        except Exception as e :
            print("Error in Analysis 1 : " , str(e))
            raise

    def run_analysis_2(self) :
        try:
            result = self.analytics.rating_per_year(self.review , self.movie)
            self.result["rating_per_year"] = result
            return result
        except Exception as e :
            print("Error in Analysis 2 : " , str(e))
            raise

    def run_analysis_3(self) :
        try:
            result = self.analytics.top_user_sentiment(self.review , self.movie)
            self.result["top_user_sentiment"] = result
            return result
        except Exception as e :
            print("Error in Analysis 3 : " , str(e))
            raise

    def run_analysis_4(self) :
        try:
            result = self.analytics.top_director_rate(self.actor , self.movie)
            self.result["top_director_rate"] = result
            return result
        except Exception as e :
            print("Error in Analysis 4 : " , str(e))
            raise

    def run_analysis_5(self) :
        try:
            result = self.analytics.top_sentiment(self.review , self.movie)
            self.result["top_sentiment"] = result
            return result
        except Exception as e :
            print("Error in Analysis 5 : " , str(e))
            raise
        
    def run_all_analysis(self) :
        print("\n" + "="*60)
        print("=== ANALYSIS PHASE - RUNNING ALL ANALYSES ===")
        print("="*60)

        total_start = time.time()

        self.run_analysis_1()
        self.run_analysis_3()
        self.run_analysis_4()
        self.run_analysis_2()
        self.run_analysis_5()

        total_elapsed = time.time() - total_start

        print("\n" + "="*60)
        print(f"ALL ANALYSIS COMPLETED IN {total_elapsed:.2f}s")
        print("="*60)
    
    def save_sql(self) :
        print("\n" + "="*60)
        print("=== SAVING TO POSTGRESQL ===")
        print("="*60)

        def save_psql(df , batch_id , name):
            print("Batch id " , str(batch_id) , " :")
            print(f"=== Save {name} into postgresql ===")
            print("Count in batch =", df.count()) 
            df.show(5, truncate=False)
            
            try :
                df.write.format("jdbc").mode("append") \
                            .option("driver" ,"org.postgresql.Driver") \
                            .option("url" , "jdbc:postgresql://192.168.1.3:5432/imdb_sentiment") \
                            .option("user" , "postgres") \
                            .option("password" ,"minhhai123") \
                            .option("dbtable" , f"public.{name}") \
                            .save()
            except Exception as e :
                print(f"Error to save {name} : " , str(e))
            

        queries = []
        for name , df in self.result.items() :
            query =  df.writeStream.outputMode("append") \
                            .foreachBatch(partial(save_psql , name = name)) \
                            .option("checkpointLocation" , f"/home/enovo/prj/test/check_points/{name}/") \
                            .trigger(processingTime = "5 seconds") \
                            .start() 
            queries.append(query)
            print(f"Streming {name} started -> to postgreSQL")
        # self.spark.streams.awaitAnyTermination()
        for q in queries :
            q.awaitTermination()
    def spark_stop(self) :
        self.spark.stop()
        print("=== Spark session stop ===")

            


In [8]:
def main() :
    print("\n" + "="*90)
    print("=== IMDB ANALYSIS SYSTEM ===")
    print("="*90)
    try :
        Pipeline = ImdbPipeline()


        Pipeline.load_data()
        Pipeline.run_all_analysis()
        Pipeline.save_sql()

        print("\n" + "="*90)
        print("=== ALL PIPELINE COMPLETED ===")
        print("="*90)
    except Exception as e :
        print("Pipeline failed : " , str(e))
        traceback.print_exc()
# if __name__ == "__main__" :
#     main()

In [18]:
spark = SparkConfig.create_sparksession()
movie = spark.read.json("../data/movies.json")
actor = spark.read.json("../data/actors.json")
review = spark.read.json("../data/reviews.json")
movie = DataClean.movie_clean(DataLoader().movie_load())
actor = DataClean.actor_clean(DataLoader().actor_load())
review = DataClean.review_clean(DataLoader().review_load())

def save_psql(df , batch_id ):
            print("Batch id " , str(batch_id) , " :")
            print(f"=== Save into postgresql ===")
            print("Count in batch =", df.count())  
            df.show(5, truncate=False)   
            try :
                df.write.format("jdbc").mode("append") \
                            .option("driver" ,"org.postgresql.Driver") \
                            .option("url" , "jdbc:postgresql://192.168.1.3:5432/imdb_sentiment") \
                            .option("user" , "postgres") \
                            .option("password" ,"minhhai123") \
                            .option("dbtable" , f"public.top_user_sentiment") \
                            .save()
            except Exception as e :
                print(f"Error to save  : " , str(e))
df = AnalysisSentiment.top_user_sentiment(review , movie)


                                                                                

+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+
|title|rating|review_ts|total_review|total_like|total_dislike|CASE WHEN ((sum(like) + sum(dislike)) > 0) THEN (sum(like) / (sum(like) + sum(dislike))) AS like_ratio ELSE 0 END|
+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+
+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+

Error to save  :  [COLUMN_NOT_DEFINED_IN_TABLE] "DOUBLE" column `CASE WHEN ((sum(like) + sum(dislike)) > 0) THEN (sum(like) / (sum(like) + sum(dislike))) AS like_ratio ELSE 0 END` is not defined in table `public`.`top_user_sentiment`, defined table columns are: `title`, `rating`, `review_ts

25/11/23 20:00:37 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000} milliseconds, but spent 6124 milliseconds



=== Starting load movie ===
=== Loaded movie successfully ===

=== Starting clean movie ===
=== Cleaned movie successfully ===

=== Starting load actor ===
Batch id  22  :
=== Save into postgresql ===
=== Loaded actor successfully ===

=== Starting clean actor ===
=== Cleaned actor successfully ===

=== Starting review actor ===
=== Loaded review successfully ===

=== Starting clean review ===
=== Cleaned review successfully ===
Analysis 2 : Analytics user Sentiment


                                                                                

Count in batch = 0
+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+
|title|rating|review_ts|total_review|total_like|total_dislike|CASE WHEN ((sum(like) + sum(dislike)) > 0) THEN (sum(like) / (sum(like) + sum(dislike))) AS like_ratio ELSE 0 END|
+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+
+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+

Error to save  :  [COLUMN_NOT_DEFINED_IN_TABLE] "DOUBLE" column `CASE WHEN ((sum(like) + sum(dislike)) > 0) THEN (sum(like) / (sum(like) + sum(dislike))) AS like_ratio ELSE 0 END` is not defined in table `public`.`top_user_sentiment`, defined table columns are: `title`, `

25/11/23 20:00:43 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000} milliseconds, but spent 6057 milliseconds


Batch id  23  :
=== Save into postgresql ===




In [None]:
df.printSchema()

root
 |-- title: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- review_ts: timestamp (nullable = true)
 |-- total_review: long (nullable = false)
 |-- total_like: long (nullable = true)
 |-- total_dislike: long (nullable = true)
 |-- like_ratio: double (nullable = true)



                                                                                

Count in batch = 0
+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+
|title|rating|review_ts|total_review|total_like|total_dislike|CASE WHEN ((sum(like) + sum(dislike)) > 0) THEN (sum(like) / (sum(like) + sum(dislike))) AS like_ratio ELSE 0 END|
+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+
+-----+------+---------+------------+----------+-------------+-----------------------------------------------------------------------------------------------------------------+

Error to save  :  [COLUMN_NOT_DEFINED_IN_TABLE] "DOUBLE" column `CASE WHEN ((sum(like) + sum(dislike)) > 0) THEN (sum(like) / (sum(like) + sum(dislike))) AS like_ratio ELSE 0 END` is not defined in table `public`.`top_user_sentiment`, defined table columns are: `title`, `

25/11/23 20:00:59 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000} milliseconds, but spent 6203 milliseconds


Batch id  25  :
=== Save into postgresql ===


[Stage 472:>                                                      (0 + 12) / 20]

In [None]:
df.writeStream.outputMode("append") \
                            .foreachBatch(save_psql) \
                            .option("checkpointLocation" , "../check_points/") \
                            .trigger(processingTime = "5 seconds") \
                            .start() \
                            .awaitTermination()

25/11/23 20:01:24 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Batch id  0  :
=== Save into postgresql ===


[Stage 522:>                                                        (0 + 1) / 1]