# **Big Data Assignment Week 9**
## Collaborative Filtering
---

Name: Farros Hilmi Syafei 
<br>
Student ID: 5025201012
<br>
Class: Big Data A
<br>
Lecturer: Abdul Munif, S.Kom., M.Sc.


Using: JupyterLab </br>
Reference: https://spark.apache.org/docs/latest/ml-collaborative-filtering.html

## Initialization

In [40]:
# SparkSession Initialization
# Set the Spark driver and executor memory configuration properties

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.driver.extraJavaOptions", "-Xss8g") \
    .config("spark.executor.extraJavaOptions", "-Xss8g") \
    .getOrCreate()

In [41]:
# Import necessary modules
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import Row

# Read data from a text file and separate elements of each line
lines = spark.read.text("miniconda3/lib/python3.9/site-packages/pyspark/data/mllib/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))

# Convert data into a DataFrame with userId, movieId, rating, and timestamp columns
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))

# Split the data into training (80%) and testing (20%) sets
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

## Build Recomendation model using ALS

In [42]:
# Initialize the parameters in to an array
max_iters = [5, 10, 15, 20]
reg_params = [0.1, 0.5, 1.0]

# Create empty dictionary to store RMSE results for each combination of hyperparameters
results = {}

In [43]:
# Loop for every combination of maxIter and regParam Array
for max_iter in max_iters:
    for reg_param in reg_params:
        # Build the recommendation model using ALS on the training data
        # Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
        als = ALS(maxIter=max_iter, regParam=reg_param, userCol="userId", itemCol="movieId", ratingCol="rating",
                  coldStartStrategy="drop")
        model = als.fit(training)

        # Evaluate the model by computing the RMSE on the test data
        predictions = model.transform(test)
        # Generates the predictions using the model generated above and applies a RegressionEvaluator on them to calculate the RMSE (root-mean-square error).
        evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                        predictionCol="prediction")
        rmse = evaluator.evaluate(predictions)

        # Save the RMSE result in the dictionary where key is a tuple of max_iter and reg_param values and the value is the RMSE score.
        results[(max_iter, reg_param)] = rmse
        # prints the RMSE value of the model with current hyperparameters.
        print(f"Root-mean-square error for maxIter={max_iter}, regParam={reg_param} = {rmse}")

Root-mean-square error for maxIter=5, regParam=0.1 = 0.921784586565725
Root-mean-square error for maxIter=5, regParam=0.5 = 1.119076748103717
Root-mean-square error for maxIter=5, regParam=1.0 = 1.4053489513207658
Root-mean-square error for maxIter=10, regParam=0.1 = 0.8625981642804895
Root-mean-square error for maxIter=10, regParam=0.5 = 1.116410578960977
Root-mean-square error for maxIter=10, regParam=1.0 = 1.405356756253438
Root-mean-square error for maxIter=15, regParam=0.1 = 0.8521670970628911
Root-mean-square error for maxIter=15, regParam=0.5 = 1.11668869893221
Root-mean-square error for maxIter=15, regParam=1.0 = 1.405356758474302
Root-mean-square error for maxIter=20, regParam=0.1 = 0.849335755687161
Root-mean-square error for maxIter=20, regParam=0.5 = 1.1168895104815277
Root-mean-square error for maxIter=20, regParam=1.0 = 1.405356762997726


In [44]:
# Find the minimum RMSE value in the results dictionary and return the corresponding hyperparameter combination with the lowest RMSE value.
best_params = min(results, key=results.get)
best_rmse = results[best_params]
print(f"\nBest hyperparameters: maxIter={best_params[0]}, regParam={best_params[1]} with RMSE={best_rmse}")


Best hyperparameters: maxIter=20, regParam=0.1 with RMSE=0.849335755687161


## Generate Movie Recomendation

In [46]:
# Build the final recommendation model using the best hyperparameters obtained
best_als = ALS(maxIter=best_params[0], regParam=best_params[1], userCol="userId", itemCol="movieId", ratingCol="rating",
               coldStartStrategy="drop")
# Set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
best_model = best_als.fit(training)

## Print Result and Show Ouput

In [47]:
# Generate top 10 movie recommendations for each user
userRecs = best_model.recommendForAllUsers(10)
userRecs.show()



+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    20|[{22, 3.9041224},...|
|    10|[{2, 3.3204358}, ...|
|     0|[{9, 2.7876582}, ...|
|     1|[{68, 3.253139}, ...|
|    21|[{29, 4.6004267},...|
|    11|[{18, 4.7046185},...|
|    12|[{46, 4.4934826},...|
|    22|[{75, 4.6521616},...|
|     2|[{8, 4.5137844}, ...|
|    13|[{93, 2.8385494},...|
|     3|[{51, 4.0174823},...|
|    23|[{55, 4.711238}, ...|
|     4|[{2, 3.4940147}, ...|
|    24|[{52, 4.4041543},...|
|    14|[{52, 4.6952863},...|
|     5|[{55, 4.0255384},...|
|    15|[{46, 3.9096065},...|
|    25|[{47, 3.1707156},...|
|    26|[{88, 4.5196896},...|
|     6|[{25, 3.6736743},...|
+------+--------------------+
only showing top 20 rows



                                                                                

In [48]:
# Generate top 10 user recommendations for each movie
movieRecs = best_model.recommendForAllItems(10)
movieRecs.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     20|[{17, 4.118328}, ...|
|     40|[{28, 3.6088238},...|
|     10|[{17, 3.4879637},...|
|     50|[{23, 3.8541417},...|
|     80|[{3, 3.323558}, {...|
|     70|[{4, 3.0526662}, ...|
|     60|[{21, 2.6623228},...|
|     90|[{17, 4.5872664},...|
|     30|[{11, 4.622165}, ...|
|      0|[{28, 2.3792202},...|
|     31|[{12, 3.1533713},...|
|     81|[{28, 4.175014}, ...|
|     91|[{12, 2.8104184},...|
|      1|[{12, 3.1678045},...|
|     41|[{4, 3.239939}, {...|
|     61|[{6, 2.1180947}, ...|
|     51|[{22, 4.2877326},...|
|     21|[{26, 2.7936552},...|
|     11|[{18, 3.4344501},...|
|     71|[{25, 2.885117}, ...|
+-------+--------------------+
only showing top 20 rows



In [49]:
# Generate top 10 movie recommendations for a specific set of users
users = ratings.select(best_als.getUserCol()).distinct().limit(3)
userSubsetRecs = best_model.recommendForUserSubset(users, 10)
userSubsetRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    26|[{88, 4.5196896},...|
|    19|[{22, 3.402011}, ...|
|    29|[{90, 3.8608525},...|
+------+--------------------+



In [50]:
# Generate top 10 user recommendations for a specific set of movies
movies = ratings.select(best_als.getItemCol()).distinct().limit(3)
movieSubSetRecs = best_model.recommendForItemSubset(movies, 10)
movieSubSetRecs.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     65|[{23, 4.0409894},...|
|     26|[{15, 2.2858176},...|
|     29|[{8, 4.632507}, {...|
+-------+--------------------+



## Summary

* Collaborative Filtering: </br>
Collaborative filtering is a technique used for recommendation systems that involves making predictions about user preferences based on past behavior of similar users.

* User-based vs Item-based Collaborative Filtering: </br>
Collaborative filtering can be done using either user-based or item-based methods, with the former being more popular in practice.

* Implementation of Collaborative Filtering using ALS Algorithm: </br>
Apache Spark's MLlib library provides an implementation of collaborative filtering using the Alternating Least Squares (ALS) algorithm.

To use ALS in Spark, we need to initialize a SparkSession and import necessary modules such as RegressionEvaluator, ALS, and ParamGridBuilder.

* Loading Data for Collaborative Filtering: </br>
The data for collaborative filtering can be loaded into a DataFrame using Spark's read.text method, and then transformed into an RDD to extract individual elements of each line.

The RDD can then be converted into a DataFrame with columns for userId, movieId, rating, and timestamp.

* Splitting Data and Model Evaluation: </br>
The data can be split into training and testing sets, with the former being used to train the ALS model and the latter being used to evaluate its performance.

* Finding Optimal Hyperparameters: </br>
We can then loop over various hyperparameters such as maxIter and regParam to find the combination that results in the lowest RMSE value using a dictionary to store the results.

* Training the Final Model and Generating Recommendations </br>
The best hyperparameters can be used to train the final ALS model using the fit method, and top movie recommendations for each user can be generated using the recommendForAllUsers method.
