In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col
import pandas as pd
import os

In [17]:
spark = SparkSession.builder \
    .appName("RecommenderDemo") \
    .master(os.environ.get("SPARK_MASTER", "local[*]")) \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [18]:
RECOMMENDATION_PATH = "hdfs://namenode:9000/recommendations/batch_top10"
METADATA_PATH = "hdfs://namenode:9000/data/movies_metadata.csv"

In [21]:
def load_recommendations(recommendations_path):
    user_recs = spark.read.parquet(recommendations_path)
    user_recs.printSchema()
    
    recs_exp = (
        user_recs
        .withColumn("rec", explode(col("recommendations")))
        .select(
            col("user_id"),
            col("rec.movie_id").alias("movie_id"),
            col("rec.rating").alias("score")
        )
    )
    return recs_exp.toPandas()

def load_metadata(metadata_path):
    movies_metadata = spark.read.csv(metadata_path, header=True, inferSchema=True)
    movies_metadata.printSchema()
    
    selected_metadata = (
        movies_metadata
        .select(
            col("id").alias("movie_id"),
            col("name").alias("title"),
            col("genres")
        )
    )
    
    return selected_metadata.toPandas()

In [22]:
recommendations_pd = load_recommendations(RECOMMENDATION_PATH)
metadata_pd = load_metadata(METADATA_PATH)

root
 |-- user_id: integer (nullable = true)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- movie_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- country: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- IMDb_score: double (nullable = true)
 |-- MoMo_score: double (nullable = true)



In [23]:
recommendations_pd

Unnamed: 0,user_id,movie_id,score
0,2,1,9.895300
1,2,268,6.122416
2,2,220,6.048351
3,2,640,6.013707
4,2,649,5.904694
...,...,...,...
865415,101055,239,9.422239
865416,101055,464,9.413363
865417,101055,566,9.215281
865418,101055,282,9.205427


In [24]:
metadata_pd

Unnamed: 0,movie_id,title,genres
0,1,Marita: Vong Nữ Đoạt Hồn,Kinh dị
1,2,Giờ Thả Máu,"Gay cấn, Kinh dị"
2,3,Nắm Đấm Trời Ban,"Hài, Hành động"
3,4,Cô Gái Năm Ấy Chúng Ta Từng Theo Đuổi,"Chính kịch, Hài, Lãng mạn"
4,5,GIA ĐÌNH x ĐIỆP VIÊN MÃ: TRẮNG,"Phiêu lưu, Hài, Hành động, Hoạt hình"
...,...,...,...
849,850,Những Đứa Trẻ Trong Sương,Tài liệu
850,851,Thanh Xuân 18×2: Lữ Trình Hướng Về Em,Lãng mạn
851,852,Madame Web,"Khoa học - Viễn tưởng, Hành động"
852,853,Domino: Lối Thoát Cuối Cùng,"Tâm lý, Tội phạm, Hành động"


In [28]:
user_ids = recommendations_pd['user_id'].unique()
selected_user = 36

# MERGE 2 TABLES RECOMMENDATIONS AND METADATA
recommendations_pd["movie_id"] = recommendations_pd["movie_id"].astype(str)
metadata_pd["movie_id"] = metadata_pd["movie_id"].astype(str)

merged_pd = recommendations_pd.merge(
    metadata_pd[['movie_id', 'title', 'genres']],
    on="movie_id",
    how="inner"
)

user_recs = merged_pd[merged_pd['user_id'] == selected_user]
top_n = 10
top_recs = user_recs.sort_values("score", ascending=False).head(top_n)

top_recs

Unnamed: 0,user_id,movie_id,score,title,genres
86712,36,814,9.97937,Phim Shin Cậu Bé Bút Chì: Nóng Bỏng Tay! Những...,"Âm Nhạc, Hoạt hình"
100054,36,274,9.877183,Phim Shin Cậu Bé Bút Chì: Bí Ẩn! Học Viện Hoa ...,Hoạt hình
103411,36,754,9.7575,Trốn Chạy Tử Thần,"Gay cấn, Khoa học - Viễn tưởng, Hành động"
103741,36,776,9.707589,3DCG! SHIN - CẬU BÉ BÚT CHÌ: ĐẠI CHIẾN SIÊU NĂ...,Hoạt hình
2,36,1,9.619936,Marita: Vong Nữ Đoạt Hồn,Kinh dị
104004,36,556,9.062175,Bí Kíp Luyện Rồng,"Gia đình, Phiêu lưu, Giả tưởng, Hành động"
107794,36,259,9.000944,Xì Trum,"Gia đình, Giả tưởng, Hoạt hình"
2904,36,438,8.893117,Nhà Có Năm Nàng Dâu,"Hài, Lãng mạn, Hoạt hình"
108213,36,74,8.813324,Mộng Du,"Bí ẩn, Kinh dị, Tâm lý"
108265,36,711,8.579103,Lật Mặt 8: Vòng Tay Nắng,"Chính kịch, Gia đình, Âm Nhạc"


In [29]:
spark.stop()