# Lab Instruction #6: Building a Movie Recommendation System Using PySpark and Spark MLlib  
## Lab Assignment: Spark MLlib – Book Recommendation    

**Student Information**  
- Name: Thái Hồ Phú Gia  
- Class: 23MMT  
- Student ID: 11012302891  

**Objective**  
- Load and process the Book-Crossing dataset using PySpark.  
- Perform data cleaning and transformation to structure the dataset for recommendations.  
- Use Spark MLlib’s ALS (Alternating Least Squares) to build a book recommendation system.  
- Tune hyperparameters to optimize the recommendation model.  
- Evaluate model performance using Root Mean Squared Error (RMSE).  

**Instructions**  
Download this dataset: Book-Crossing Dataset.  
This dataset contains user ratings for books, which will be used to build a recommendation system using Spark MLlib. Your goal is to process the dataset using Spark and apply ALS (or similar) collaborative filtering to build a book recommendation system.  

- Load and preprocess the dataset, ensuring valid user ratings.  
- Filter out books with very few ratings to improve model performance.  
- Train an ALS model using PySpark MLlib to generate book recommendations.  
- Evaluate the model using Root Mean Squared Error (RMSE).  
- Tune hyperparameters (rank, lambda_, iterations) to optimize the recommendation model.  
- Generate and display the top 5 book recommendations for a given user.  

**Submission**  
- Submission deadline: 2 weeks from the assignment date.  
- Submission Format: Upload the Executed Notebook (or similar) to LMS (lms.siu.edu.vn).  

**Suggested Resources**  
- [Spark Collaborative Filtering Documentation](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html)  
- [Spark SQL Documentation](https://spark.apache.org/sql/)  

In [1]:
from pyspark.sql import SparkSession

# Khởi tạo SparkSession
spark = SparkSession.builder \
    .appName("BookRecommendation") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

# Để giảm log level
spark.sparkContext.setLogLevel("WARN")


25/05/14 00:41:21 WARN Utils: Your hostname, codespaces-26c537 resolves to a loopback address: 127.0.0.1; using 10.0.1.210 instead (on interface eth0)
25/05/14 00:41:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/14 00:41:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Đường dẫn đến thư mục chứa dataset
data_path = "./dataset"

# Đọc CSV với header, infer schema và delimiter là dấu chấm phẩy
books_df = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .option("sep", ";") \
    .csv(f"{data_path}/Books.csv")

ratings_df = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .option("sep", ";") \
    .csv(f"{data_path}/Ratings.csv")

users_df = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .option("sep", ";") \
    .csv(f"{data_path}/Users.csv")

# Kiểm tra lại tên các cột
print("Books columns:", books_df.columns)
print("Ratings columns:", ratings_df.columns)
print("Users columns:", users_df.columns)

# Xem vài dòng mẫu
books_df.show(3, truncate=50)
ratings_df.show(3)
users_df.show(3)


                                                                                

Books columns: ['ISBN', 'Title', 'Author', 'Year', 'Publisher']
Ratings columns: ['User-ID', 'ISBN', 'Rating']
Users columns: ['User-ID', 'Age']
+----------+--------------------+--------------------+----+-----------------------+
|      ISBN|               Title|              Author|Year|              Publisher|
+----------+--------------------+--------------------+----+-----------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|2002|Oxford University Press|
|0002005018|        Clara Callan|Richard Bruce Wright|2001|  HarperFlamingo Canada|
|0060973129|Decision in Normandy|        Carlo D'Este|1991|        HarperPerennial|
+----------+--------------------+--------------------+----+-----------------------+
only showing top 3 rows

+-------+----------+------+
|User-ID|      ISBN|Rating|
+-------+----------+------+
| 276725|034545104X|     0|
| 276726|0155061224|     5|
| 276727|0446520802|     0|
+-------+----------+------+
only showing top 3 rows

+-------+----+
|User-

In [3]:
from pyspark.sql.functions import col

# Giả sử bạn đã load ratings_df, books_df, users_df với sep=";"
# Bây giờ rename cho dễ dùng:

# Với ratings
ratings_df = ratings_df \
    .withColumnRenamed("User-ID", "UserID") \
    .withColumnRenamed("Rating", "BookRating")

# Với books
books_df = books_df \
    .withColumnRenamed("Title", "BookTitle") \
    .withColumnRenamed("Author", "BookAuthor") \
    .withColumnRenamed("Year", "PublishYear") \
    .withColumnRenamed("Publisher", "BookPublisher")

# Với users (nếu cần)
users_df = users_df \
    .withColumnRenamed("User-ID", "UserID")

# Kiểm tra lại
print("Ratings columns:", ratings_df.columns)
print("Books columns:",   books_df.columns)
print("Users columns:",   users_df.columns)


Ratings columns: ['UserID', 'ISBN', 'BookRating']
Books columns: ['ISBN', 'BookTitle', 'BookAuthor', 'PublishYear', 'BookPublisher']
Users columns: ['UserID', 'Age']


In [4]:
from pyspark.sql.functions import col, count

# 1. Loại bỏ null trong ratings
ratings_clean = ratings_df.dropna(subset=["UserID", "ISBN", "BookRating"])

# 2. Đếm số rating theo ISBN
book_rating_counts = ratings_clean.groupBy("ISBN").agg(count("*").alias("rating_count"))

# 3. Chỉ giữ sách có ≥ 50 ratings
popular_books = book_rating_counts.filter(col("rating_count") >= 50).select("ISBN")

# 4. Join lại để chỉ giữ ratings cho sách phổ biến
ratings_filtered = ratings_clean.join(popular_books, on="ISBN", how="inner")

print(f"Original ratings: {ratings_df.count()}, After filtering: {ratings_filtered.count()}")




Original ratings: 1149780, After filtering: 237293


                                                                                

In [5]:
from pyspark.sql.functions import col

# Bước ánh xạ sang tên user/item/rating
als_data = ratings_df.select(
    col("UserID").alias("user"),
    col("ISBN").alias("item"),
    col("BookRating").alias("rating")
)

# Split thành train/test
train_df, test_df = als_data.randomSplit([0.8, 0.2], seed=42)
print(f"Train count = {train_df.count()}, Test count = {test_df.count()}")




Train count = 919771, Test count = 230009


                                                                                

In [6]:
from pyspark.ml.feature import StringIndexer

# 1. Index ISBN → itemIndex
item_indexer = StringIndexer(inputCol="item", outputCol="itemIndex", handleInvalid="skip") \
    .fit(als_data)
als_idx = item_indexer.transform(als_data)

# 2. Nếu user vẫn là string (kiểm tra bằng als_idx.dtypes), index tương tự
if dict(als_idx.dtypes)["user"] == "string":
    user_indexer = StringIndexer(inputCol="user", outputCol="userIndex", handleInvalid="skip") \
        .fit(als_idx)
    als_idx = user_indexer.transform(als_idx)
    user_col = "userIndex"
else:
    user_col = "user"

# 3. Cast về int/float cho ALS
als_numeric = als_idx.select(
    col(user_col).cast("int").alias("user"),
    col("itemIndex").cast("int").alias("item"),
    col("rating").cast("float").alias("rating")
)

# 4. Split train/test
train_df, test_df = als_numeric.randomSplit([0.8, 0.2], seed=42)
print(f"Train: {train_df.count()}, Test: {test_df.count()}")


25/05/14 00:41:50 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
25/05/14 00:41:53 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB

Train: 919771, Test: 230009


                                                                                

In [7]:
from pyspark.ml.recommendation import ALS

# Khởi tạo ALS
als = ALS(
    userCol="user", 
    itemCol="item", 
    ratingCol="rating",
    coldStartStrategy="drop",   # drop missing during evaluation
    nonnegative=True
)

# Fit model trên train_df
als_model = als.fit(train_df)


25/05/14 00:41:57 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
25/05/14 00:41:59 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
25/05/14 00:42:02 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
25/05/14 00:42:04 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
25/05/14 00:42:06 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
25/05/14 00:42:07 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
25/05/14 00:42:11 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 00:42:11 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/05/14 00:42:12 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 00:42:15 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 00:42:17 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 00:42:19 WARN 

In [8]:
from pyspark.ml.evaluation import RegressionEvaluator

# Dự đoán
predictions = als_model.transform(test_df)

# Đánh giá
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"Test RMSE = {rmse:.4f}")


25/05/14 00:42:55 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
25/05/14 00:42:55 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 00:42:56 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 00:43:01 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 00:43:03 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB


Test RMSE = 4.0261


                                                                                

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Xây grid
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [5, 10, 20]) \
    .addGrid(als.regParam, [0.01, 0.1]) \
    .addGrid(als.maxIter, [5, 10]) \
    .build()

# CrossValidator
cv = CrossValidator(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=2
)

# Chạy CV (có thể mất thời gian)
# small_train = train_df.sample(fraction=0.2, seed=42)
cv_model = cv.fit(train_df)

# Lấy best model và tham số
best_model = cv_model.bestModel
print("Best params:", 
      f"rank={best_model._java_obj.parent().getRank()},",
      f"regParam={best_model._java_obj.parent().getRegParam()},",
      f"maxIter={best_model._java_obj.parent().getMaxIter()}"
)


25/05/14 00:43:04 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
25/05/14 00:43:04 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
25/05/14 00:43:07 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 00:43:07 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 00:43:08 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 00:43:08 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 00:43:09 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 00:43:09 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 00:43:10 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 00:43:11 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 00:43:12 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 00:43:13 WARN DAGScheduler: Broadc

Best params: rank=20, regParam=0.1, maxIter=5


                                                                                

In [10]:
best_predictions = best_model.transform(test_df)
best_rmse = evaluator.evaluate(best_predictions)
print(f"Best model Test RMSE = {best_rmse:.4f}")


25/05/14 01:00:51 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
25/05/14 01:00:51 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 01:00:52 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 01:00:55 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB

Best model Test RMSE = 4.1556


25/05/14 01:00:57 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
                                                                                

In [11]:
# Chọn một user bất kỳ từ tập test
sample_user = test_df.select("user").distinct().limit(1).collect()[0][0]
print(f"Sample user ID = {sample_user}")

# Lấy 5 sách đề xuất
user_df = spark.createDataFrame([(sample_user,)], ["user"])
recs = best_model.recommendForUserSubset(user_df, 5)

# Hiển thị kết quả: ISBN + score
recs.select("user", "recommendations").show(truncate=False)


25/05/14 01:00:57 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
                                                                                

Sample user ID = 8


25/05/14 01:00:59 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 01:01:01 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 01:01:02 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 01:01:03 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB


+----+---------------------------------------------------------------------------+
|user|recommendations                                                            |
+----+---------------------------------------------------------------------------+
|8   |[{228742, 0.0}, {228732, 0.0}, {228722, 0.0}, {228612, 0.0}, {228602, 0.0}]|
+----+---------------------------------------------------------------------------+



                                                                                

In [12]:
from pyspark.sql.functions import explode

# Nổ recommendations thành từng dòng
recs_exploded = recs \
    .withColumn("rec", explode("recommendations")) \
    .select("user", col("rec.item").alias("ISBN"), col("rec.rating").alias("score"))

# Join với books_df để lấy Title
final_recs = recs_exploded.join(
    books_df.select("ISBN", "BookTitle"), on="ISBN"
).select("user", "BookTitle", "score")

final_recs.show(truncate=False)


25/05/14 01:01:04 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 01:01:06 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/05/14 01:01:07 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB

+----+---------+-----+
|user|BookTitle|score|
+----+---------+-----+
+----+---------+-----+



25/05/14 01:01:08 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
                                                                                

In [13]:
spark.stop()
