# Big Data - Movie Recommender System

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import  Imputer, StringIndexer, VectorAssembler
from pyspark.ml.linalg import SparseVector, DenseVector
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS # Alternating least square  
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator


In [0]:
#instantiate the spark session
spark = SparkSession.builder.appName("A2-PartB").getOrCreate()
 
#set the shuffle partition same as number of cpu cores to improve performance 
# spark.conf.set("spark.sql.shuffle.partitions")

In [0]:
# File location and type
file_location = "/FileStore/tables/movies.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)


movieId,rating,userId
2,3,0
3,1,0
5,2,0
9,4,0
11,1,0
12,2,0
15,1,0
17,1,0
19,1,0
21,1,0


In [0]:
df.rdd.getNumPartitions()


In [0]:
df.printSchema()


In [0]:
# check the count of null values for each column
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [0]:
rows = df.count()
columns = len(df.columns)
shape = (rows,columns)
print(shape)

In [0]:
# Looking at top 10 Movies which recieved the most ratings
q1a = df.select('movieId','rating').groupby('movieId').agg(count('rating').alias('count')).orderBy(desc('count')).limit(10)
q1a.show()


In [0]:
# Looking at top 10 Movies based on highest count of highest rating they have recieved
Extra1  = df.select('movieId','rating').groupby('movieId','rating').agg(count('*').alias('count')).orderBy(desc('rating'),desc('count')).limit(10)
Extra1.show()


In [0]:
# fltr = (df['movieID'] == 2)

# df.where(fltr).show()

In [0]:
# Looking at top 10 Users who have provided the most ratings
q1b = df.select('userId','rating').groupby('userId').agg(count('rating').alias('count')).orderBy(desc('count')).limit(10)
q1b.show()


In [0]:
# Looking at Users who have provided the highest rating max amount of time

Extra2 = df.select('userId','rating').groupby('userId','rating').agg(count('*').alias('count')).orderBy(desc('rating'),desc('count')).limit(10)
Extra2.show()


In [0]:
(training1, test1) = df.randomSplit([0.7, 0.3])
(training2, test2) = df.randomSplit([0.8, 0.2])

In [0]:
MOVIEID = 'movieId'
RATING = 'rating'
USERID = 'userId'

als = ALS(maxIter=15 , regParam=0.01, userCol=USERID, itemCol=MOVIEID, ratingCol=RATING)


In [0]:
model1 = als.fit(training1)

predictions1 = model1.transform(test1)
display(predictions1)

movieId,rating,userId,prediction
31,1,4,5.1812396
31,3,14,3.7451963
31,1,29,0.3347944
85,3,1,0.06412296
85,1,4,1.9018631
85,1,5,1.3831215
85,3,6,3.5036194
85,5,8,3.0272744
85,3,21,1.1056397
85,1,26,2.3982472


In [0]:
model2 = als.fit(training2)

predictions2 = model2.transform(test2)
display(predictions2)

movieId,rating,userId,prediction
31,1,0,3.2766697
31,1,13,1.0848343
31,1,19,2.4677005
31,1,27,0.6288843
31,1,29,1.5578259
85,1,2,-1.682748
85,1,15,0.21277595
85,2,20,-0.12727948
85,3,21,1.8258545
65,1,2,1.866737


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator_rmse = RegressionEvaluator(
           metricName="rmse", 
           labelCol=RATING, 
           predictionCol="prediction") 
evaluator_mse = RegressionEvaluator(
           metricName="mse", 
           labelCol=RATING, 
           predictionCol="prediction") 
evaluator_mae = RegressionEvaluator(
           metricName="mae", 
           labelCol=RATING, 
           predictionCol="prediction") 


In [0]:
RMSE1 = evaluator_rmse.evaluate(predictions1)
print("RMSE for Model 1: ", RMSE1)
MSE1 = evaluator_mse.evaluate(predictions1)
print("MSE for Model 1: ", MSE1)
MAE1 = evaluator_mae.evaluate(predictions1)
print("MAE for Model 1: ", MAE1)

In [0]:
RMSE2 = evaluator_rmse.evaluate(predictions2)
print("RMSE for Model 2: ", RMSE2)
MSE2 = evaluator_mse.evaluate(predictions2)
print("MSE for Model 2: ", MSE2)
MAE2 = evaluator_mae.evaluate(predictions2)
print("MAE for Model 2: ", MAE2)

Looking at the results above we can see that model 2 performed better with lower RMSE and MSE which could be as due to the higer training set data. in model 1 (70-30) split of the dataset was not enough and the model could be slightly underfitting causing higher bias which results in higher error. 

However, mean absolute  error (MAE) for both model is similar.

We will be using cross validation and along with gridsearch for finding the best parameters

In [0]:
als = ALS(userCol=USERID, itemCol=MOVIEID, ratingCol=RATING)

In [0]:
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [50, 100]) \
            .addGrid(als.maxIter, [15, 25])\
            .addGrid(als.regParam, [.01, .1]) \
            .build()

#### Using Cross Validation

In [0]:
# Using Corss Validation
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator_rmse, numFolds=2)

In [0]:
# (training2, test2) = df.randomSplit([0.8, 0.2])
cv_model = cv.fit(training2)

In [0]:
#Extract best model from the cv model above
best_model = cv_model.bestModel


In [0]:
# Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())
# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())
# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

In [0]:
# View the predictions
test2_predictions_tv = best_model.transform(test2)
RMSE_cv = evaluator_rmse.evaluate(test2_predictions_tv)
print("RMSE Value is: ", RMSE_cv)

In [0]:
display(test2_predictions_tv)

movieId,rating,userId,prediction
31,1,0,1.1813673
31,1,13,1.4173347
31,1,19,1.2622163
31,1,27,1.0882876
31,1,29,1.68816
85,1,2,1.1283349
85,1,15,0.98939586
85,2,20,2.3875895
85,3,21,2.6229606
65,1,2,1.0328238


In [0]:
trainvs = TrainValidationSplit(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator_rmse)

In [0]:
tv_model = trainvs.fit(training2)

In [0]:
#Extract best model from the cv model above
best_model_tV = tv_model.bestModel


In [0]:
# Print "Rank"
print("  Rank:", best_model_tV._java_obj.parent().getRank())
# Print "MaxIter"
print("  MaxIter:", best_model_tV._java_obj.parent().getMaxIter())
# Print "RegParam"
print("  RegParam:", best_model_tV._java_obj.parent().getRegParam())

In [0]:
# View the predictions
test2_predictions_tv = best_model_tV.transform(test2)
RMSE_tv = evaluator_rmse.evaluate(test2_predictions_tv)
print("RMSE Value is: ", RMSE_tv)

In [0]:
display(test2_predictions_tv)

movieId,rating,userId,prediction
31,1,0,1.1813673
31,1,13,1.4173347
31,1,19,1.2622163
31,1,27,1.0882876
31,1,29,1.68816
85,1,2,1.1283349
85,1,15,0.98939586
85,2,20,2.3875895
85,3,21,2.6229606
65,1,2,1.0328238


# QUESTION 5: Recommendation Using Cross Validation

In [0]:

movie_recommendations = best_model.recommendForAllUsers(60)
movie_recommendations.show()

In [0]:
# display(movie_recommendations)

In [0]:
cv_recommendations = movie_recommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))

In [0]:
cv_recommendations.show()

In [0]:
cv_filter = cv_recommendations['userId'] == 11

cv_recommendations_11 = cv_recommendations.where(cv_filter)
cv_recommendations_11.show()

In [0]:
# Movies already watched by User 11
watched_user11 = df.select(['userId','movieId',  'rating']).filter(df.userId == 11).orderBy('rating', ascending = False)
watched_user11.show()

In [0]:
# Extracting movie id for watched movies
movie_list = []
for movie in watched_user11.collect():
  movie_list.append(movie.movieId)

In [0]:
print(len(movie_list))

In [0]:
Unwatched_recommended_user11 = (cv_recommendations_11.filter(cv_recommendations_11.movieId.isin(movie_list) == False))
Unwatched_recommended_user11.show()

In [0]:
print(Unwatched_recommended_user11.count())

In [0]:
Unwatched_recommended_user11.orderBy(desc('rating')).limit(15).show()

In [0]:
cv_filter2 = cv_recommendations['userId'] == 23

cv_recommendations_23 = cv_recommendations.where(cv_filter2)
cv_recommendations_23.show()

In [0]:
# Movies already watched by User 23
watched_user23 = df.select(['userId','movieId',  'rating']).filter(df.userId == 23).orderBy('rating', ascending = False)
# Extracting movie id for watched movies
movie_list23 = []
for movie in watched_user23.collect():
  movie_list23.append(movie.movieId)
print(len(movie_list23))

In [0]:
Unwatched_recommended_user23 = (cv_recommendations_23.filter(cv_recommendations_23.movieId.isin(movie_list23) == False))
Unwatched_recommended_user23.orderBy(desc('rating')).limit(15).show()

In [0]:
watched_user11 = df.select(['userId','movieId',  'rating']).filter(df.userId == 11).orderBy('rating', ascending = False)

In [0]:
# MOVIES ALREADY WATCHED BY THE USER
watched_user11.show()

In [0]:
# Extracting movie id for watched movies
movie_list = []
for movie in watched_user11.collect():
  movie_list.append(movie.movieId)

In [0]:
Unwatched_user11 = (df.filter(df.movieId.isin(movie_list) == False))

In [0]:
Unwatched_user11.select('movieId','rating').show()

In [0]:
# Prediction based on Best ALS model from Cross Validation
movies_15_recommendation_user_11 = best_model.transform(Unwatched_user11)
display(movies_15_recommendation_user_11.select('movieId','prediction'))

movieId,prediction
31,1.1811305
31,1.2988654
31,1.2270397
31,2.6040952
31,2.7394843
31,3.3450334
31,1.417555
31,2.5627148
31,1.0549653
31,1.2622926


In [0]:
top15Movies = movies_15_recommendation_user_11.select('movieId','prediction').orderBy('prediction', ascending = False).withColumn('userId', lit(11)).show(15)


In [0]:
watched_user23 = df.select(['userId','movieId',  'rating']).filter(df.userId == 23).orderBy('rating', ascending = False)
watched_user23.select('movieId','rating').show()

In [0]:
# Extracting movie id for watched movies
movie_list_23 = []
for movie in watched_user23.collect():
  movie_list_23.append(movie.movieId)

In [0]:
# Getting Unwatched movies
Unwatched_user23 = (df.filter(df.movieId.isin(movie_list_23) == False))

movies_15_recommendation_user_23 = best_model.transform(Unwatched_user23)
display(movies_15_recommendation_user_23.select('movieId','prediction'))

movieId,prediction
31,1.1811305
31,1.2988654
31,1.2270397
31,2.6040952
31,2.7394843
31,3.3450334
31,1.417555
31,2.5627148
31,1.0549653
31,1.2622926


In [0]:
top15Movies_23 = movies_15_recommendation_user_23.select('movieId','prediction').orderBy('prediction', ascending = False).withColumn('userId', lit(23)).show(15)
