In [1]:
from pyspark.sql import SparkSession

# Định nghĩa constants
HDFS_HOST = "hdfs://localhost:9000"  
HDFS_PATH = f"{HDFS_HOST}/hadoop/data/parquet/"

def init_spark(app_name="IMDb Analytics"):
    """
    Tạo và cấu hình SparkSession với các thiết lập phù hợp.
    
    Parameters:
        app_name (str): Tên của ứng dụng Spark
        
    Returns:
        SparkSession: SparkSession đã được cấu hình
    """
    return SparkSession.builder \
        .appName(app_name) \
        .master("spark://localhost:7077") \
        .config("spark.executor.memory", "2g") \
        .config("spark.driver.memory", "2g") \
        .config("spark.hadoop.fs.defaultFS", HDFS_HOST) \
        .config("spark.sql.warehouse.dir", f"{HDFS_HOST}/user/hive/warehouse") \
        .config("spark.executor.cores", "2") \
        .config("spark.driver.cores", "2") \
        .config("spark.sql.files.maxPartitionBytes", "64MB") \
        .config("spark.driver.maxResultSize", "2g") \
        .config("spark.sql.shuffle.partitions", "4") \
        .config("spark.executor.instances", "2")   \
        .getOrCreate()
        # .config("spark.dynamicAllocation.minExecutors", "2") \
        # .config("spark.dynamicAllocation.maxExecutors", "2") \
        #  .config("spark.dynamicAllocation.enabled", "true") 
       

# Hàm tiện ích để kiểm tra kết nối HDFS
def test_hdfs_connection(spark):
    """
    Kiểm tra kết nối tới HDFS bằng cách đọc thử một file parquet
    
    Parameters:
        spark (SparkSession): SparkSession đã được khởi tạo
        
    Returns:
        bool: True nếu kết nối thành công, False nếu thất bại
    """
    try:
        # Thử đọc một file parquet bất kỳ
        test_df = spark.read.parquet(f"{HDFS_PATH}/title_basics_parquet")
        test_df.printSchema()
        return True
    except Exception as e:
        print(f"Lỗi kết nối HDFS: {str(e)}")
        return False

In [6]:
class IMDbDataLoader:
    def __init__(self, spark, base_path):
        self.spark = spark
        self.base_path = base_path
    
    def load_titles(self):
        return self.spark.read.parquet(f"{self.base_path}/title_basics_parquet") # basic in4 about titles
    
    def load_ratings(self):
        return self.spark.read.parquet(f"{self.base_path}/title_ratings_parquet") # in4 about ratings and vote counts for titles
    
    def load_names(self):
        return self.spark.read.parquet(f"{self.base_path}/name_basics_parquet") # Basic in4 about individuals

    def load_akas(self):
        return self.spark.read.parquet(f"{self.base_path}/title_akas_parquet") # In4 about alternative titles of movies or shows
        
    def load_episodes(self):
        return self.spark.read.parquet(f"{self.base_path}/title_episode_parquet") # About episodoes in a series

    def load_principals(self):
        return self.spark.read.parquet(f"{self.base_path}/title_principals_parquet") # In4 about key indivisuals related to a title
    
    def load_crews(self):
        return self.spark.read.parquet(f"{self.base_path}/title_crew_parquet") # In4 about the creative team behind the film



In [7]:
# When have 2 workers
spark = init_spark()

loader = IMDbDataLoader(spark, "hdfs:///hadoop/data/parquet/")

# Sử dụng SparkContext để lấy thông tin về các executors
workers_info = spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()

# Đếm số lượng workers (executors)
num_workers = len(workers_info) - 1

# In ra số lượng workers và thông tin chi tiết
print(f"Số lượng workers đang hoạt động: {num_workers}")
for worker in workers_info:
    print(f"Executor Info: {worker}")


Số lượng workers đang hoạt động: 4
Executor Info: org.apache.spark.SparkExecutorInfoImpl@1c4bf79
Executor Info: org.apache.spark.SparkExecutorInfoImpl@6b719de5
Executor Info: org.apache.spark.SparkExecutorInfoImpl@d247f24
Executor Info: org.apache.spark.SparkExecutorInfoImpl@332e001e
Executor Info: org.apache.spark.SparkExecutorInfoImpl@17354e75


In [8]:
titles_df = loader.load_titles()
rating_df = loader.load_ratings()
names_df = loader.load_names()
akas_df = loader.load_akas()
episode_df = loader.load_episodes()
principals_df = loader.load_principals()
crew_df = loader.load_crews()
principal_df = loader.load_principals()




                                                                                



In [5]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml.feature import Word2Vec, VectorAssembler, Normalizer, StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import split, col, udf, lit
from pyspark.sql.types import ArrayType, StringType, FloatType
from scipy.spatial.distance import cosine



def load_and_preprocess_data(titles_df,rating_df):
    
    # Split genres into array
    titles_df = titles_df.fillna({'genres': ''})
    
    titles_df = titles_df.withColumn(
        "genres_array",
        split(col("genres"), ",")
    )
    titles_df = titles_df.filter(
    (col("genres_array").isNotNull()) & 
    (col("genres_array").getItem(0) != "")
    )   
    # Join with ratings
    movie_data = titles_df.join(rating_df, "tconst", "inner")
    
    return movie_data

def create_genre_embeddings(movie_data):
    # Configure Word2Vec model
    word2vec = Word2Vec(
        vectorSize=100,
        minCount=1,
        inputCol="genres_array",
        outputCol="genre_embedding"
    )
    
    # Fit and transform
    model = word2vec.fit(movie_data)
    movie_data_with_embeddings = model.transform(movie_data)
    
    return movie_data_with_embeddings

def train_als_model(movie_data):
    # Convert ratings to float
    movie_data = movie_data.withColumn(
        "averageRating", 
        col("averageRating").cast("float")
    )
    # Thêm cột userId với giá trị cố định (1) cho tất cả các phim
    movie_data = movie_data.withColumn("userId", lit(1))
    
    # Chuyển đổi 'tconst' từ string sang numeric
    indexer = StringIndexer(inputCol="tconst", outputCol="tconst_index")
    movie_data = indexer.fit(movie_data).transform(movie_data)
    # Initialize ALS model
    als = ALS(
        userCol="userId",
        itemCol="tconst_index",
        ratingCol="averageRating",
        coldStartStrategy="drop",
        nonnegative=True,
        rank=10,
        maxIter=10
    )
    
    

    
    # Train model
    model = als.fit(movie_data)
    
    return model , indexer

def compute_similarity_matrix(movie_data_with_embeddings):

    limited_data = movie_data_with_embeddings.limit(5000)

    # Convert to pandas for easier similarity computation
    embeddings_pd = limited_data.select(
        "tconst", 
        "primaryTitle", 
        "genre_embedding"
    ).toPandas()
    
    embeddings_pd['genre_embedding'] = embeddings_pd['genre_embedding'].apply(lambda x: x.toArray())
    
    n_movies = len(embeddings_pd)
    similarity_matrix = np.zeros((n_movies, n_movies))
    
    # Compute cosine similarity between all pairs
    for i in range(n_movies):
        for j in range(i+1, n_movies):
            sim = 1 - cosine(
                embeddings_pd.iloc[i]["genre_embedding"],
                embeddings_pd.iloc[j]["genre_embedding"]
            )
            similarity_matrix[i,j] = sim
            similarity_matrix[j,i] = sim
    
    return similarity_matrix, embeddings_pd

def get_recommendations(movie_id, similarity_matrix, embeddings_pd, als_model, indexer, spark, alpha=0.5):
    """
    Lấy đề xuất phim dựa trên nội dung và collaborative filtering.
    
    Parameters:
        movie_id (str): ID của phim muốn lấy đề xuất
        similarity_matrix (ndarray): Ma trận tương đồng giữa các phim
        embeddings_pd (DataFrame): DataFrame pandas chứa thông tin phim và embeddings
        als_model (ALSModel): Mô hình ALS đã được huấn luyện
        indexer (StringIndexerModel): Mô hình StringIndexer đã được huấn luyện
        spark (SparkSession): SparkSession hiện tại
        alpha (float): Hệ số cân bằng giữa content-based và collaborative filtering
    
    Returns:
        DataFrame pandas: Các phim được đề xuất
    """
    # Lấy index của phim dựa trên 'tconst'
    tconst_index_row = indexer.transform(spark.createDataFrame([(movie_id,)], ["tconst"]))
    tconst_index = tconst_index_row.select("tconst_index").first()["tconst_index"]
    
    # Get content-based recommendations
    movie_idx = embeddings_pd[embeddings_pd["tconst"] == movie_id].index[0]
    content_scores = similarity_matrix[movie_idx]
    
    # Get collaborative filtering recommendations
    cf_predictions = als_model.recommendForItemSubset(
        spark.createDataFrame([(tconst_index,)], ["tconst_index"]), 
        100
    )
    cf_scores = cf_predictions.toPandas()
    
    # Chuyển đổi cột 'recommendations' từ Struct thành list
    if 'recommendations' in cf_scores.columns:
        cf_scores = cf_scores.explode('recommendations')
        cf_scores['tconst_index_cf'] = cf_scores['recommendations'].apply(lambda x: x['tconst_index'])
        cf_scores['rating_cf'] = cf_scores['recommendations'].apply(lambda x: x['rating'])
    else:
        cf_scores['tconst_index_cf'] = cf_scores['tconst_index']
        cf_scores['rating_cf'] = 0
    
    # Kết hợp điểm số
    final_scores = {}
    for idx, row in embeddings_pd.iterrows():
        movie = row["tconst"]
        content_score = content_scores[idx]
        
        # Lấy điểm từ collaborative filtering
        # Chuyển 'tconst' sang 'tconst_index'
        movie_index_row = indexer.transform(spark.createDataFrame([(movie,)], ["tconst"]))
        movie_index = movie_index_row.select("tconst_index").first()["tconst_index"]
        
        cf_row = cf_scores[cf_scores["tconst_index_cf"] == movie_index]
        cf_score = cf_row["rating_cf"].values[0] if not cf_row.empty else 0
        
        final_scores[movie] = alpha * content_score + (1 - alpha) * cf_score
    
    # Sắp xếp và lấy top 10 đề xuất
    recommendations = sorted(
        final_scores.items(), 
        key=lambda x: x[1], 
        reverse=True
    )[:10]
    
    # Lấy thông tin phim chi tiết
    recommended_movies = embeddings_pd[
        embeddings_pd["tconst"].isin([r[0] for r in recommendations])
    ][["tconst", "primaryTitle"]]
    
    return recommended_movies

class MovieRecommender:
    def __init__(self):
        self.spark = init_spark()
        self.movie_data = None
        self.als_model = None
        self.similarity_matrix = None
        self.embeddings_pd = None
        self.indexer = None
    
    def fit(self):
        # Load and preprocess data
        titles_df = loader.load_titles()
        rating_df = loader.load_ratings()
        self.movie_data = load_and_preprocess_data(titles_df,rating_df)
        
        # Create genre embeddings
        movie_data_with_embeddings = create_genre_embeddings(self.movie_data)
        
        # Train ALS model
        self.als_model,self.indexer = train_als_model(self.movie_data)
        
        self.similarity_matrix, self.embeddings_pd = compute_similarity_matrix(movie_data_with_embeddings)
    
    def recommend(self, movie_id, alpha=0.5):
        return get_recommendations(
            movie_id,
            self.similarity_matrix,
            self.embeddings_pd,
            self.als_model,
            self.indexer,
            self.spark,
            alpha
        )
recommender = MovieRecommender()
recommender.fit()

# Lấy đề xuất cho một bộ phim
recommendations = recommender.recommend("tt0111161")  # Ví dụ với ID của phim The Shawshank Redemption
print(recommendations)

25/01/15 23:57:42 WARN DAGScheduler: Broadcasting large task binary with size 59.5 MiB
25/01/15 23:58:25 WARN DAGScheduler: Broadcasting large task binary with size 59.5 MiB
25/01/15 23:59:08 WARN DAGScheduler: Broadcasting large task binary with size 59.5 MiB
25/01/15 23:59:18 WARN DAGScheduler: Broadcasting large task binary with size 59.5 MiB
25/01/15 23:59:38 WARN DAGScheduler: Broadcasting large task binary with size 59.5 MiB
25/01/15 23:59:46 WARN DAGScheduler: Broadcasting large task binary with size 59.5 MiB
25/01/16 00:00:03 WARN DAGScheduler: Broadcasting large task binary with size 59.5 MiB
25/01/16 00:00:16 WARN DAGScheduler: Broadcasting large task binary with size 59.5 MiB
25/01/16 00:00:40 WARN DAGScheduler: Broadcasting large task binary with size 59.5 MiB
25/01/16 00:00:59 WARN DAGScheduler: Broadcasting large task binary with size 59.5 MiB
25/01/16 00:01:33 WARN DAGScheduler: Broadcasting large task binary with size 59.5 MiB
25/01/16 00:01:56 WARN DAGScheduler: Broadc

Py4JJavaError: An error occurred while calling o304.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 6 tasks (1139.8 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1046)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1045)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:374)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:402)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:374)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4160)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4157)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


