#### Creating a distributed recommender system with Apache Spark using a collaborative filtering approach.

In [0]:
# Importing the CSV file

# File location and type
file_location = "/FileStore/tables/movies.csv"
file_type = "csv"

# CSV options
infer_schema = "true"          # schema refered here are the column types. inferSchema=false (default option) will give a dataframe where all columns are strings. By setting inferSchema=true, Spark will automatically go through the csv file and infer the schema of each column. This requires an extra pass over the file which will result in reading a file 
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_movies = spark.read.format(file_type) \
                 .option("inferSchema", infer_schema) \
                 .option("header", first_row_is_header) \
                 .option("sep", delimiter) \
                 .load(file_location)

display(df_movies)

movieId,rating,userId
2,3,0
3,1,0
5,2,0
9,4,0
11,1,0
12,2,0
15,1,0
17,1,0
19,1,0
21,1,0


In [0]:
# Describing the data 
df_movies.summary().display()

summary,movieId,rating,userId
count,1501.0,1501.0,1501.0
mean,49.40572951365756,1.7741505662891406,14.383744170552964
stddev,28.937034065088994,1.187276166124803,8.591040424293272
min,0.0,1.0,0.0
25%,24.0,1.0,7.0
50%,50.0,1.0,14.0
75%,74.0,2.0,22.0
max,99.0,5.0,29.0


In [0]:
# checking the counts of the null values for each column
df_movies.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_movies.columns]).show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      0|     0|     0|
+-------+------+------+



In [0]:
# Showing the top 15 movies with highest ratings
df_movies.groupBy("movieId").agg(F.count("rating").alias("Movies With Highest Ratings")).orderBy(F.col("Movies With Highest Ratings").desc()).show(15)

+-------+---------------------------+
|movieId|Movies With Highest Ratings|
+-------+---------------------------+
|      6|                         20|
|     29|                         20|
|     51|                         20|
|     22|                         20|
|     50|                         20|
|     94|                         19|
|     55|                         19|
|     68|                         19|
|      2|                         19|
|     15|                         19|
|     85|                         18|
|     36|                         18|
|     86|                         18|
|     88|                         18|
|     45|                         18|
+-------+---------------------------+
only showing top 15 rows



In [0]:
# Showing the top 10 users who provided the highest ratings
df_movies.groupBy("userId").agg(F.count("rating").alias("Users that Provided the highest ratings")).orderBy(F.col("Users that Provided the highest ratings").desc()).display(10)

userId,Users that Provided the highest ratings
14,57
6,57
22,56
11,56
12,55
4,55
7,54
9,53
24,52
18,52


#### Splitting the dataset into train and test. Implementing 2 different combinations for the splitting of the dataset. The first split will follow a  70/30 ratio, while the second split will follow a 80/20 ratio. 

###### Model 1: Splitting the data into 70% Training and 30% Testing

In [0]:
# Spliting the Annual dataframe to test and train set
TrainDF_Movie, TestDF_Movie = df_movies.randomSplit([0.70, 0.3], seed=1628) 
print(f"There are {TrainDF_Movie.cache().count()} rows in the training set, and {TestDF_Movie.cache().count()} in the test set")

There are 1054 rows in the training set, and 447 in the test set


In [0]:
# Self-check: displaying the TrainDF_Movie DataFrame to see the content 
TrainDF_Movie.display()

movieId,rating,userId
0,1,3
0,1,5
0,1,6
0,1,8
0,1,11
0,1,15
0,1,19
0,1,20
0,1,21
0,1,22


In [0]:
# Finding the datatypes of the Dataframe
TrainDF_Movie.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- userId: integer (nullable = true)



In [0]:
# Showing the summary statistics for the ratings column 
display(TrainDF_Movie.select("rating").summary())

summary,rating
count,1054.0
mean,1.793168880455408
stddev,1.1818952442637929
min,1.0
25%,1.0
50%,1.0
75%,2.0
max,5.0


In [0]:
# For this recommeder system I am using PySpark's ALS

# import relevant library 
from pyspark.ml.recommendation import ALS

# Please Note: The "cold start strategy" is set to 'drop' so we can ensure we don't get NaN evaluation metrics
# Initiating the ALS model
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", maxIter=5, regParam=0.01, coldStartStrategy="drop")

# Fitting the Training Data 
als_Model = als.fit(TrainDF_Movie)

In [0]:
# Transforming the test data
als_Model_pred = als_Model.transform(TestDF_Movie)
als_Model_pred.display()

movieId,rating,userId,prediction
27,1,0,0.70845914
30,1,0,-0.9860109
34,1,0,-0.15463859
41,2,0,-1.1811534
48,1,0,2.207834
51,1,0,-0.7866589
55,1,0,-0.5651864
67,1,0,0.466084
68,1,0,-1.8226037
72,1,0,1.9398003


###### Model 2: Splitting the data into 80% Training and 20% Testing

In [0]:
# Spliting the Annual dataframe to test and train set
TrainDF_Movie_2, TestDF_Movie_2 = df_movies.randomSplit([0.80, 0.20], seed=1628) 
print(f"There are {TrainDF_Movie_2.cache().count()} rows in the training set, and {TestDF_Movie_2.cache().count()} in the test set")

There are 1220 rows in the training set, and 281 in the test set


In [0]:
# Fitting the Training Data 
als_Model_2 = als.fit(TrainDF_Movie_2)

In [0]:
# Transforming the test data
als_Model_2_pred = als_Model_2.transform(TestDF_Movie_2)
als_Model_2_pred.display()

movieId,rating,userId,prediction
27,1,0,1.3609326
34,1,0,1.495415
41,2,0,0.18261139
48,1,0,-0.10979253
51,1,0,0.3461388
55,1,0,-0.55955553
67,1,0,0.7637528
72,1,0,3.559665
92,4,0,3.865212
9,3,1,-0.37046823


####  Comparing and evaluating both of my models

In [0]:
# Evaluating the model using rmse, mse, and mae
from pyspark.ml.evaluation import RegressionEvaluator
RMSE_Eval = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
MSE_Eval = RegressionEvaluator(metricName="mse", labelCol="rating", predictionCol="prediction")
MAE_Eval = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")

##--------------------Evaluating Model 1: 70% Training Data 30% Test Data Using RMSE Evalation Metric--------------------------##

RMSE_Score_1 = RMSE_Eval.evaluate(als_Model_pred)
MSE_Score_1 = MSE_Eval.evaluate(als_Model_pred)
MAE_Score_1 = MAE_Eval.evaluate(als_Model_pred)

print("Summary of the Evaluation Metrics for the First Model:")
print("Root-Mean-Square-Error (RMSE) for the 1st model (70% Train, 30% Test) is: ", RMSE_Score_1)
print("Mean-Square-Error (MSE) for the 1st model (70% Train, 30% Test) is: ", MSE_Score_1)
print("Mean-Absolute-Error (MAE) for the 1st model (70% Train, 30% Test) is: ", MAE_Score_1)
print("")

##--------------------Evaluating Model 2: 80% Training Data 20% Test Data Using RMSE Evalation Metric--------------------------##

RMSE_Score_2 = RMSE_Eval.evaluate(als_Model_2_pred)
MSE_Score_2 = MSE_Eval.evaluate(als_Model_2_pred)
MAE_Score_2 = MAE_Eval.evaluate(als_Model_2_pred)
print("Summary of the Evaluation Metrics for the Second Model:")
print("Root-Mean-Square-Error (RMSE) for the 2nd model (80% Train, 20% Test) is: " + str(RMSE_Score_2))
print("Mean-Square-Error (MSE) for the 2nd model (80% Train, 20% Test) is: " + str(MSE_Score_2))
print("Mean-Absolute-Error (MAE) for the 2nd model (80% Train, 20% Test) is: " + str(MAE_Score_2))

Summary of the Evaluation Metrics for the First Model:
Root-Mean-Square-Error (RMSE) for the 1st model (70% Train, 30% Test) is:  2.1029129329660563
Mean-Square-Error (MSE) for the 1st model (70% Train, 30% Test) is:  4.422242803635902
Mean-Absolute-Error (MAE) for the 1st model (70% Train, 30% Test) is:  1.5942018524820707

Summary of the Evaluation Metrics for the Second Model:
Root-Mean-Square-Error (RMSE) for the 2nd model (80% Train, 20% Test) is: 1.9172701337510203
Mean-Square-Error (MSE) for the 2nd model (80% Train, 20% Test) is: 3.675924765773655
Mean-Absolute-Error (MAE) for the 2nd model (80% Train, 20% Test) is: 1.4385967661504007


As seen above from the summary of the evaluation metrics, the second model (80% Train, 20% Test), consistenly outperforms the first model based on all 3 evaluation metrics (RMSE, MSE, and MAE). This is because the second model has more training data (80% of the entire dataset) when compared to the first model's training data (70% of the entire dataset) and therefore the model can better learn the underlying patters and have more accurate predictions. RMSE can be used as intuitive metric of comparison for these types of models as it has the same units as the initial dataset. The RSME for model 1 is 2.1 where as the RSME for model 2 is 1.9, and therefore model 2 (80% Train, 20% Test) would be our optimal model.

#### Hyperparameter tuning my best model (second model)

In [0]:
# Importing relevant package 
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", maxIter=5, regParam=0.01, coldStartStrategy="drop")

# Defining ParamGridBuilder to find the optimai hyperparameters.
param_grid = ParamGridBuilder() \
           .addGrid(als.maxIter, [5, 10, 20]) \
           .addGrid(als.regParam, [.01, .05, .1]) \
           .addGrid(als.rank, [10, 50, 75]) \
           .build()

tvs = TrainValidationSplit(estimator=als, estimatorParamMaps=param_grid, evaluator=RMSE_Eval, parallelism=1, seed= 1628)

# Running TrainValidationSplit, Using the second model (80% Train, 20% Test)
tvs_als = tvs.fit(TrainDF_Movie_2)

# Ref: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.TrainValidationSplit.html

In [0]:
# Displaying the optimal hyperparameters founded from TrainValidationSplit
print("The optimal hyperparameter for 'maxIter' is:", tvs_als.bestModel._java_obj.parent().getMaxIter())
print("The optimal hyperparameter for 'regParam' is:", tvs_als.bestModel._java_obj.parent().getRegParam())
print("The optimal hyperparameter for 'rank' is:", tvs_als.bestModel.rank)

The optimal hyperparameter for 'maxIter' is: 20
The optimal hyperparameter for 'regParam' is: 0.1
The optimal hyperparameter for 'rank' is: 10


In [0]:
# Transforming the test data
prediction = tvs_als.bestModel.transform(TestDF_Movie)
# Evaluating Model on the test data
RMSE_Score = RMSE_Eval.evaluate(prediction)
print("The Root-mean-square error (RMSE) is: ", (RMSE_Score))
print("")

MSE_Score = MSE_Eval.evaluate(prediction)
print("The Mean Square Error (MSE) is: ", (MSE_Score))
print("")

MAE_Score = MAE_Eval.evaluate(prediction)
print("The Mean Absolute Error (MAE) is: ", (MAE_Score))

The Root-mean-square error (RMSE) is:  0.8520076213246212

The Mean Square Error (MSE) is:  0.725916986795239

The Mean Absolute Error (MAE) is:  0.5553408196108454


In [0]:
# Displaying the predictions 
prediction.display()

movieId,rating,userId,prediction
31,1,5,1.342256
31,3,8,2.6736968
31,3,14,1.8006648
31,1,19,1.0121659
31,1,26,0.8298416
85,1,4,1.6550977
85,1,12,1.1602631
85,1,15,0.6912421
85,5,16,1.4710151
85,1,23,0.39597294


I have choose maxIter, regParam, and rank as the hyperparameters that I will tune using "TrainValidationSplit" for the second model (80% Trian, 20% Test). 

The definition of each parameter can be seen below:

1. maxIter: Max number of iterations used to run.
2. regParam: Specifies the regularization parameter
3. rank: Determines the latent factors that will be used in the model.

MaxIter is important to tune, since it specifies the amount of iterations for our model. The higher the iteration count, gives our model greater ability to learn from the training data. As we can see, maxIter of 20 was choosen in the hyper parameter tuning, which makes intuitive sense considering 20 is the largest value the model can select from. 

regParam is important to tune because it helps to prevent the model from overfitting. Using a L regularizer, it controls the model coeffieceint to prevent possible overfittings

rank is important to tune becuase because it determines the latent factors that will be used in the model.

#### Finding the top 10 movies recommendations for user id 9 and user id 13.

In [0]:
# Self-Check: Seeing how many total distinguish movies there are in the data set
df_movies.select('movieId').distinct().display()

movieId
31
85
65
53
78
34
81
28
76
26


Movie Recommendations for User ID 9:

In [0]:
# Finding the movies that user ID 9 has watched and rated 
Movies_ID_9 = df_movies.filter(df_movies.userId.isin([9]))
Movies_ID_9.display()

movieId,rating,userId
2,3,9
3,1,9
4,1,9
5,1,9
6,1,9
7,5,9
9,1,9
12,1,9
14,3,9
15,1,9


In [0]:
# Self-Check: Seeing the number of the distinguish movies user Id 9 has watched 
Movies_ID_9 = df_movies.filter(df_movies.userId.isin([9])).select('movieId').distinct()
Movies_ID_9.display()

movieId
53
81
26
12
22
6
86
3
94
54


In [0]:
# Converting the dataframe of movies that user ID 9 has watched to a list
Movies_ID_9_list = Movies_ID_9.select('movieId').rdd.flatMap(lambda x: x).collect()

In [0]:
# Filtering the original DataFrame for moives that User ID 9 has not watched 
Movies_NotWatched_ID9 = df_movies.filter(~df_movies.movieId.isin(Movies_ID_9_list))
# Self_Check: There should only be 47 unique movies displayed as there is only 47 movies that user ID 9 has not watched 
Movies_NotWatched_ID9.select('movieId').distinct().display() 

movieId
31
85
65
78
34
28
76
27
44
91


In [0]:
# Displaying the New DataFrame that contains movies that user ID 9 has not watched 
Movies_NotWatched_ID9.display()

movieId,rating,userId
11,1,0
17,1,0
23,1,0
27,1,0
28,1,0
29,1,0
31,1,0
34,1,0
41,2,0
44,1,0


In [0]:
# Droping columns userId and ratings 
Movies_NotWatched_ID9 = Movies_NotWatched_ID9.drop("userId","rating")
# Adding a column of 9 to the DataFrame 
Movies_NotWatched_ID9 = Movies_NotWatched_ID9.withColumn("userId", F.lit(9))
# Removing all the duplicate movie ID's 
Movies_NotWatched_ID9 = Movies_NotWatched_ID9.distinct()
Movies_NotWatched_ID9.display()

movieId,userId
27,9
65,9
34,9
93,9
8,9
44,9
92,9
56,9
85,9
55,9


In [0]:
# Finding the top 10 movies recommendations for user id 9
New_recommendation = tvs_als.bestModel.transform(Movies_NotWatched_ID9).orderBy(F.col("prediction").desc())
New_recommendation.display()

movieId,userId,prediction
62,9,3.1563108
46,9,3.1197858
29,9,2.8996568
55,9,2.8918211
48,9,2.822268
52,9,2.7866275
72,9,2.4929464
18,9,2.4867187
65,9,2.4864635
23,9,2.4200373


Movie Recommendations for User ID 13:

In [0]:
# Finding the movies that user ID 13 has watched and rated 
Movies_ID_13 = df_movies.filter(df_movies.userId.isin([13])).select('movieId').distinct()
# Converting the dataframe of movies that user ID 13 has watched to a list
Movies_ID_13_list = Movies_ID_13.select('movieId').rdd.flatMap(lambda x: x).collect()
# Filtering the original DataFrame for moives that User ID 13 has not watched 
Movies_NotWatched_ID13 = df_movies.filter(~df_movies.movieId.isin(Movies_ID_13_list))
# Droping columns userId and ratings 
Movies_NotWatched_ID13 = Movies_NotWatched_ID13.drop("userId","rating")
# Adding a column of 9 to the DataFrame 
Movies_NotWatched_ID13 = Movies_NotWatched_ID13.withColumn("userId", F.lit(13))
# Removing all the duplicate movie ID's 
Movies_NotWatched_ID13 = Movies_NotWatched_ID13.distinct()


# Finding the top 10 movies recommendations for user id 13
New_recommendation = tvs_als.bestModel.transform(Movies_NotWatched_ID13).orderBy(F.col("prediction").desc())
New_recommendation.display()

movieId,userId,prediction
30,13,2.3710365
39,13,2.3147454
2,13,2.2062283
76,13,2.0045745
37,13,1.9945801
92,13,1.9047711
38,13,1.8848507
75,13,1.8592632
32,13,1.8548797
89,13,1.8395025
