##BigData Project
#Building a Movie Recommendation System Using PySpark

**Authors:**

Amir Sadeghi,
Behnam Yaghoubi

**Supervisor:** Professor Marco Maggini


##Objective:

Develop a movie recommendation system.

Utilize collaborative filtering with Alternating Least Squares (ALS).

Implement hyperparameter tuning for model optimization.

#1. Installing PySpark library

Install PySpark to enable the development of the recommendation system.

PySpark is a Python API for Spark.

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=156dfb217c9cca58c43f4d629f596a949ca39d50db11acefc84349e5511c478d
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


#2. Importing the needed libraries



In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import explode

#3. Initializing Spark Session

The session is the entry point for DataFrame and SQL functionality.

In [None]:
spark = SparkSession.builder.appName("Movie Recommendation System").getOrCreate()

#4. Load ratings and movies datasets from CSV files.

header=True indicates that the first row is a header.

inferSchema=True automatically infers data types.

In [None]:
ratings = spark.read.csv("./ratings.csv", header=True, inferSchema=True)
movies = spark.read.csv("./movies.csv", header=True, inferSchema=True)

#5. Data Caching and Cleaning

Cache the data for better performance as they are accessed multiple times.

Drop rows with null values to ensure data quality.

In [None]:
ratings.cache()
movies.cache()

ratings = ratings.na.drop()
movies = movies.na.drop()

#6. Preparing Data for ALS
Convert data types to integers for userId and movieId, and float for rating.

ALS requires specific data types for processing.


In [None]:
ratings = ratings.selectExpr("cast(userId as int) userId",
                             "cast(movieId as int) movieId",
                             "cast(rating as float) rating")

#7. Splitting the Data
Split the ratings data into training (80%) and test (20%) sets.

A seed value ensures reproducibility.

In [None]:
(training, test) = ratings.randomSplit([0.8, 0.2], seed=1234)

#8. Building and Training the ALS Model

ALS (Alternating Least Squares) is a popular algorithm used in collaborative filtering for building recommendation systems. It is particularly well-suited for scenarios involving large datasets of user-item interactions, such as movie ratings, product purchases, or any situation where recommendations are made based on user behavior.

###Collaborative Filtering:

* ALS is a collaborative filtering technique, meaning it makes recommendations based on patterns of user-item interactions (e.g., which users rated which movies and how).
It focuses on finding latent factors that explain observed user-item interactions.

###Matrix Factorization:
* The core idea behind ALS is to factorize a large user-item interaction matrix $R$ into two lower-dimensional matrices: $U$ (user factors) and $M$ (item factors).

 $R≈U⋅M^T, where: $

 * $R$ is the original user-item interaction matrix.
 * $U$ is the user-factor matrix (users x latent factors).
 * $M$ is the item-factor matrix (items x latent factors).

###Alternating Optimization:

* ALS iteratively optimizes one matrix while keeping the other fixed, alternating between the two until convergence.
* First, it fixes the item matrix $M$ and solves for the user matrix $U$.
* Then, it fixes the user matrix $U$ and solves for the item matrix $M$.
* This process is repeated until the algorithm converges to a solution.

###Loss Function:
* ALS minimizes the regularized least squares error:

  $∑_{(u,i)∈R}(R_{ui}−U_u⋅M_i)^2+λ(∥U∥^2+∥M∥^2)$
 * $(u,i)∈R$ are the observed user-item interactions.
 * $R_{u,i}$ is the observed rating of user $u$ for item $i$.
 * $U_u$ is the latent factor vector for user $u$.
 * $M_i$​ is the latent factor vector for item $i$.
 * $λ$ is a regularization parameter to prevent overfitting.

Configure the ALS model with specified parameters.

Train the model using the training dataset.

In [None]:
als = ALS(maxIter=5, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop", nonnegative=True)
model = als.fit(training)

#9. Making Predictions

Generate predictions on the test dataset. Before refining tuning on the full dataset serves the purpose of quickly assessing the initial performance of the ALS model. It's crucial for understanding the starting point of model effectiveness and provides a baseline for comparison with the refined model after hyperparameter tuning.

Display the first five predictions.

In [None]:
predictions = model.transform(test)
predictions.show(5)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   148|    356|   4.0| 3.7171543|
|   148|   4896|   4.0| 3.5658805|
|   148|   4993|   3.0|  3.719834|
|   148|   7153|   3.0| 3.7453744|
|   148|   8368|   4.0| 3.8152897|
+------+-------+------+----------+
only showing top 5 rows



#10. Evaluating the Model

Evaluate the model using RMSE (Root Mean Square Error).

Lower RMSE indicates better model performance.

In [None]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")

Root-mean-square error = 0.8797059001059495


#11. Hyperparameter Tuning

Build a parameter grid to search for optimal ALS hyperparameters.

Use cross-validation for robust evaluation.

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 50, 100]) \
    .addGrid(als.regParam, [0.01, 0.05, 0.1]) \
    .build()

crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

#12. Initial Tuning on a Smaller Sample

Perform initial hyperparameter tuning on a smaller subset of the training data.

Extract and display the best parameters.

In [None]:
small_training, _ = training.randomSplit([0.1, 0.9], seed=1234)
initial_cvModel = crossval.fit(small_training)

#13. Refine Tuning on Full Dataset

Conduct full hyperparameter tuning using the entire training dataset.

Evaluate the refined model's performance on the test dataset.

In [None]:
cvModel = crossval.fit(training)
bestModel = cvModel.bestModel
predictions = bestModel.transform(test)
rmse = evaluator.evaluate(predictions)
print(f"Best model's root-mean-square error = {rmse}")

Best model's root-mean-square error = 0.8705656722581655


#14. Generating Recommendations

Generate top 10 movie recommendations for a specific user.

Example shown for user_id = 123.

In [None]:
user_id = 123
user_recs = bestModel.recommendForAllUsers(10)
user_recs.filter(user_recs.userId == user_id).show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   123|[{171495, 4.80851...|
+------+--------------------+



#15. Displaying Recommended Movie Titles

Explode the recommendations column to get individual movie recommendations.

Join with the movies dataset to display movie titles and ratings.

In [None]:
user_recs = user_recs.filter(user_recs.userId == user_id).select("recommendations")
user_recs = user_recs.withColumn("recommendation", explode("recommendations"))
user_recs = user_recs.select("recommendation.*")
user_recs = user_recs.join(movies, on="movieId")
user_recs.select("movieId", "title", "rating").show()
spark.stop()

+-------+--------------------+--------+
|movieId|               title|  rating|
+-------+--------------------+--------+
| 171495|              Cosmos|4.808518|
|  78836|Enter the Void (2...| 4.78131|
| 184245|De platte jungle ...|4.739649|
| 179135|Blue Planet II (2...|4.739649|
| 138966|Nasu: Summer in A...|4.739649|
| 117531|    Watermark (2014)|4.739649|
|  86237|  Connections (1978)|4.739649|
|  84273|Zeitgeist: Moving...|4.739649|
|  74226|Dream of Light (a...|4.739649|
|  26928|Summer's Tale, A ...|4.739649|
+-------+--------------------+--------+

