In [3]:
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id, when, rank, sum as spark_sum, count as spark_count, log2
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import Row

spark = SparkSession.builder \
    .appName("MovieLensALS") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")



In [None]:
ratings_path = "hdfs://namenode:9000/movielens/32M/ratings.csv"
movies_path = "hdfs://namenode:9000/movielens/32M/movies.csv"

ratings_df = spark.read.csv(
    ratings_path,
    header=True,
    inferSchema=True
)

movies_df = spark.read.csv(
    movies_path,
    header=True,
    inferSchema=True
)

als_data = ratings_df.select(
    F.col("userId").cast("integer").alias("userId"),
    F.col("movieId").cast("integer").alias("movieId"),
    F.col("rating").cast("float").alias("rating"),
    F.col("timestamp").alias("timestamp_orig")  # Giữ lại timestamp để chia train/test
).dropna()
als_data.show(5)

                                                                                

+------+-------+------+--------------+
|userId|movieId|rating|timestamp_orig|
+------+-------+------+--------------+
|     1|     17|   4.0|     944249077|
|     1|     25|   1.0|     944250228|
|     1|     29|   2.0|     943230976|
|     1|     30|   5.0|     944249077|
|     1|     32|   5.0|     943228858|
+------+-------+------+--------------+
only showing top 5 rows



In [5]:
user_counts = als_data.groupBy("userId").count()
item_counts = als_data.groupBy("movieId").count()

min_user_ratings = 5
min_item_ratings = 10

filtered_als_data = als_data \
    .join(user_counts.filter(F.col("count") >= min_user_ratings), "userId") \
    .join(item_counts.filter(F.col("count") >= min_item_ratings), "movieId") \
    .select("userId", "movieId", "rating", "timestamp_orig")

In [None]:
# Lưu lại mapping
user_mapping = filtered_als_data.select("userId").distinct().withColumn(
    "new_userId",
    F.row_number().over(Window.orderBy("userId")) - 1
)
user_mapping.write.mode("overwrite").parquet("/app/src/batch/mappings/users.parquet")


# Lưu lại mapping
movie_mapping = filtered_als_data.select("movieId").distinct().withColumn(
    "new_movieId",
    F.row_number().over(Window.orderBy("movieId")) - 1
)
movie_mapping.write.mode("overwrite").parquet("/app/src/batch/mappings/movies.parquet")

indexed_data = filtered_als_data \
    .join(user_mapping, "userId") \
    .join(movie_mapping, "movieId") \
    .select(
        F.col("new_userId").alias("userId"),
        F.col("new_movieId").alias("movieId"),
        "rating",
        "timestamp_orig"
    ).cache()

indexed_data.show(5)

25/11/23 03:18:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:18:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:18:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:18:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:18:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:19:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 0

+------+-------+------+--------------+
|userId|movieId|rating|timestamp_orig|
+------+-------+------+--------------+
|   147|   1060|   0.5|    1471747769|
|   147|   2265|   1.0|    1471747783|
|   147|   4375|   1.0|    1471747756|
|   495|   1519|   3.5|    1633649130|
|   832|   1519|   2.5|    1193952315|
+------+-------+------+--------------+
only showing top 5 rows



In [7]:
split_ratio = 0.8
total_count = indexed_data.count()
split_point = indexed_data.select(
    F.percentile_approx("timestamp_orig", F.lit(split_ratio), 1000)
).collect()[0][0]

train_set = indexed_data.filter(F.col("timestamp_orig") <= split_point)
test_set = indexed_data.filter(F.col("timestamp_orig") > split_point)

train_set = train_set.select("userId", "movieId", "rating")
test_set = test_set.select("userId", "movieId", "rating")

test_set_count = test_set.count()

print(f"Tổng số ratings: {total_count:,}")
print(f"Train Set (Rating cũ): {train_set.count():,}")
print(f"Test Set (Rating mới): {test_set_count:,}")

25/11/23 03:23:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:23:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:23:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:23:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:23:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:23:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 0

Tổng số ratings: 31,842,705




Train Set (Rating cũ): 25,461,934
Test Set (Rating mới): 6,380,771


                                                                                

In [8]:
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=False,
    rank=100,                
    maxIter=15,               
    regParam=0.05,
    numUserBlocks=100,        
    numItemBlocks=100,        
    intermediateStorageLevel="MEMORY_AND_DISK", 
    finalStorageLevel="MEMORY_AND_DISK_SER"     
)
num_partitions = 100
train_set_repartitioned = train_set.repartition(num_partitions).cache()
model_32m = als.fit(train_set_repartitioned)

25/11/23 03:23:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:23:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:23:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:23:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

In [None]:
model_path = "file:///app/src/batch/als_model_32m"
model_32m.write().overwrite().save(model_path)

                                                                                

In [10]:
# Đánh giá mô hình (RMSE)
predictions = model_32m.transform(test_set)

predictions = predictions.filter(F.col("prediction").isNotNull())

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)

print(f"RMSE trên tập kiểm tra: {rmse:.4f}")

[Stage 187:>                                                        (0 + 4) / 4]

RMSE trên tập kiểm tra: 3.5335


                                                                                

In [None]:
from math import log2 as math_log2
from pyspark.sql import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import col, when, rank, sum as spark_sum, count as spark_count, log2, lit, avg
from pyspark.sql.types import DoubleType

K = 10
GOOD_RATING_THRESHOLD = 3.5


test_with_binary_label = test_set.withColumn(
    "is_good_item",
    when(col("rating") >= GOOD_RATING_THRESHOLD, 1).otherwise(0)
)

# Dự đoán rating cho các item trong test
user_items_to_rank = test_with_binary_label.select("userId", "movieId", "is_good_item")
predictions_for_ranking = model_32m.transform(user_items_to_rank)
predictions_for_ranking = predictions_for_ranking.filter(col("prediction").isNotNull())


window_spec = Window.partitionBy("userId").orderBy(col("prediction").desc())

ranked_predictions = predictions_for_ranking.withColumn(
    "rank",
    rank().over(window_spec)
)


top_k_recommendations = ranked_predictions.filter(col("rank") <= K)


users_hit_status = top_k_recommendations.groupBy("userId").agg(
    F.max("is_good_item").alias("has_hit")
)

total_users = test_set.select("userId").distinct().count()
hit_rate = users_hit_status.select(avg("has_hit")).collect()[0][0]

print(f"Hit Rate @{K} (HR@{K}): {hit_rate:.4f}")



25/11/23 03:26:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:26:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:26:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:26:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:26:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:26:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 0

Hit Rate @10 (HR@10): 0.8203


                                                                                

In [None]:
ndcg_scores = ranked_predictions \
    .filter(col("rank") <= K) \
    .withColumn(
        "dcg_gain",
        col("is_good_item") / log2(col("rank") + 1)
    ) \
    .groupBy("userId") \
    .agg(spark_sum("dcg_gain").alias("DCG"))

def calculate_idcg(num_good_items, k):
    idcg = 0.0
    for i in range(1, min(num_good_items, k) + 1):
        idcg += 1.0 / math_log2(i + 1)
    return idcg

user_good_item_counts = test_with_binary_label \
    .filter(col("is_good_item") == 1) \
    .groupBy("userId").agg(spark_count("*").alias("num_good_items"))

idcg_udf = F.udf(lambda num_good: calculate_idcg(num_good, K), DoubleType())

ndcg_data = ndcg_scores.join(user_good_item_counts, "userId", "inner") \
    .withColumn("IDCG_score", idcg_udf(col("num_good_items"))) \
    .filter(col("IDCG_score") > 0)

ndcg_result = ndcg_data.withColumn(
    "NDCG",
    when(col("IDCG_score") > 0, col("DCG") / col("IDCG_score")).otherwise(0.0)
)

avg_ndcg = ndcg_result.select(avg("NDCG")).collect()[0][0]
print(f"NDCG @{K}: {avg_ndcg:.4f}")


25/11/23 03:26:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:26:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:26:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:26:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:26:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 03:26:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/23 0

NDCG @10: 0.7785


                                                                                