### Movie Recommendation Model 
The objective of this project is to predict users' ratings on movies and recommend each user their top 10 movies with highest rating. We will use dataset from [MovieLens](https://grouplens.org/datasets/movielens/latest/) to train the model.

Collaborative filtering(CF) is commonly used for recommender systems. In this notebook, Alternating Least Squares (ALS) algorithm in spark.ml packages was applied to develope a recommendation model based on Spark APIs. ALS allows us to describe users and products by a small set of latent factors, which can be used to predict ratings. Compared to item based or user based CF recommender, instead of recommendering similar product, ALS can recommenders product personally fitted for a given user.

## Data ETL and Data Exploration Analysis

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [4]:
movies = spark.read.load("/FileStore/tables/movies.csv", format='csv', header = True)
ratings = spark.read.load("/FileStore/tables/ratings.csv", format='csv', header = True)
movies.createOrReplaceTempView("sqlmovies")
ratings.createOrReplaceTempView("sqlratings")

In [5]:
display(movies)

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller


In [6]:
display(ratings)

userId,movieId,rating,timestamp
1,307,3.5,1256677221
1,481,3.5,1256677456
1,1091,1.5,1256677471
1,1257,4.5,1256677460
1,1449,4.5,1256677264
1,1590,2.5,1256677236
1,1591,1.5,1256677475
1,2134,4.5,1256677464
1,2478,4.0,1256677239
1,2840,3.0,1256677500


In [7]:
ratings.select('rating').summary().show()

In [8]:
display(ratings.select('rating').groupBy('rating').count().orderBy('rating'))

rating,count
0.5,442388
1.0,886233
1.5,441354
2.0,1850627
2.5,1373419
3.0,5515668
3.5,3404360
4.0,7394710
4.5,2373550
5.0,4071135


In [9]:
print('Total number of users rated on movies:', ratings.select('userId').distinct().count())
print('Total number of movies rated by users:', ratings.select('movieId').distinct().count())
print('Total number of movies: ', movies.select('movieId').count())
print('Number of movies not rated by user: ', spark.sql("select * from sqlmovies where movieId not in (select distinct(movieId) from sqlratings)").count())

In [10]:
tmp1 = ratings.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

In [11]:
tmp1 = sum(ratings.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

List of Movie Genres:

In [13]:
%sql select genres, count(genres) from (select title, explode(split(genres, '[|]')) as genres from sqlmovies) group by genres order by 2 DESC

genres,count(genres)
Drama,24144
Comedy,15956
Thriller,8216
Romance,7412
Action,7130
Horror,5555
Documentary,5118
Crime,5105
(no genres listed),4266
Adventure,4067


###Build Model

In [15]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
from pyspark.sql import functions

In [16]:
# Data type convert for model training
from pyspark.sql.types import IntegerType, FloatType
ratings = ratings.withColumn("userId", ratings["userId"].cast(IntegerType()))
ratings = ratings.withColumn("movieId", ratings["movieId"].cast(IntegerType()))
ratings = ratings.withColumn("rating", ratings["rating"].cast(FloatType()))

In [17]:
ratings = ratings.drop('timestamp')

Split 20% of data for testing

In [19]:
(training,test) = ratings.randomSplit([0.8,0.2])

In [20]:
print('training set size: ', training.count())
print('testing set size: ', test.count())

In [21]:
#cache dataset to memory for faster model training
training.cache()
test.cache()

Since the model is trained based on a users-movies matrix and aims to fill in missing value in the matrix, it is very important for us to know the sparsity of the martix. Theoretically, denser matrix will give us better result because we feed more information to the model.

In [23]:
numOfRatings = training.select('rating').count()
numOfMovies = training.select('movieId').distinct().count()
numOfUsers = training.select('userId').distinct().count()
print('The sparsity of user-movie matrix: {:.2f} %'.format(numOfRatings / (numOfMovies * numOfUsers) * 100))

Here we can tune model using pyspark.ml.tuning packages, however, due to limitted computional power, we will define our own hyperparameters.

In [25]:
#Tune model using ParamGridBuilder, didnot use
param_grid = ParamGridBuilder()\
    .addGrid(als.rank, [6, 8, 10, 12]) \
    .addGrid(als.maxIter, [10,15,20])\
    .addGrid(als.regParam, [0.05, 0.1, 0.2])\
    .build()
#Define evaluator as RMSE
evaluator=RegressionEvaluator(metricName='rmse',labelCol='rating',predictionCol='prediction')

# Build Cross validation 
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3) 

#Fit ALS model to training data
model=cv.fit(training)

#Extract best model from the tuning exercise using ParamGridBuilder
best_model=model.bestModel 

###Train Model

When making predictions using an ALSModel, it is common to encounter users and/or items in the test dataset that were not present during training the model. During cross-validation, the data is split between training and evaluation sets so the model will predict the ratings of unseen users or items as N/A, which will affect the model scores. Here we set coldStartStrategy="drop".

In [27]:
als = ALS(maxIter=20, regParam=0.2, rank=6, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
import time
start_time = time.time()
model = als.fit(training)
print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

In [28]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error: {:.2f}".format(rmse))
print('Normalized error:  {:.2f} %'.format(rmse/5*100))

In [29]:
display(predictions)

userId,movieId,rating,prediction
107339,148,4.0,3.1317124
253535,148,4.0,2.991921
52620,148,1.0,2.7181563
60382,148,4.0,3.3250985
275860,148,3.0,2.615859
8350,148,4.0,2.8657644
51571,148,3.0,2.966929
52772,148,3.0,3.4331298
73492,148,0.5,2.5985732
60950,148,2.0,2.4365332


### Apply model to recommend movies

In [31]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
display(userSubsetRecs)

userId,recommendations
471,"List(List(177209, 5.2675586), List(107434, 4.963147), List(189473, 4.7630672), List(192261, 4.722142), List(157789, 4.7133274), List(190707, 4.6376963), List(146724, 4.602815), List(144202, 4.563069), List(166812, 4.542925), List(143422, 4.4984303))"
463,"List(List(185659, 3.5780106), List(177209, 3.4935086), List(107434, 3.450104), List(155923, 3.4241645), List(187873, 3.4241645), List(173871, 3.3588603), List(177325, 3.3480835), List(155713, 3.3258576), List(135057, 3.3188667), List(171777, 3.3074894))"
148,"List(List(177209, 5.6212225), List(107434, 5.4577627), List(157789, 5.1778603), List(155923, 5.1658907), List(187873, 5.1658907), List(185659, 5.1535153), List(177325, 5.116991), List(173871, 5.053779), List(144202, 5.0441837), List(192261, 5.0390935))"


In [32]:
# Generate top 10 user recommendations for a specified set of movies
movieRecs = model.recommendForAllItems(10)
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)
display(movieSubSetRecs)

movieId,recommendations
1580,"List(List(53, 4.703871), List(543, 4.5154057), List(12, 4.34483), List(276, 4.302899), List(452, 4.268827), List(43, 4.2588677), List(93, 4.225232), List(413, 4.171688), List(523, 4.1124067), List(475, 4.1022787))"
3175,"List(List(53, 4.6932282), List(43, 4.4675694), List(12, 4.3700953), List(276, 4.3195143), List(452, 4.248333), List(93, 4.1994324), List(1, 4.162988), List(99, 4.1585407), List(371, 4.1501093), List(171, 4.127697))"
2366,"List(List(53, 4.3477855), List(236, 4.312768), List(122, 4.1948223), List(251, 4.1464543), List(371, 4.1429095), List(375, 4.1326103), List(171, 4.1321754), List(515, 4.121808), List(595, 4.1216), List(1, 4.1112175))"


###Summary

Recommendation model was trained based on ALS aglorithm using 80% of the data, 20% of data was used to evaluate the model. 

The RMSE is 0.87, 17.4% out of highest rating of 5. To improve model performance, hyperparameters including # of ranks (latent factors), # of interations and regularization parameters can be tuned. Tuning these parameters can balance the bias error and variance error, also overcome the overfitting of the model. To do this, we can apply grid search and k-fold cross validation to select the best model. 

#####Explicit Data vs Implicit Data

In this project, explicit data is provided for analyzed. Explicit data is data where we have some sort of rating. Like the 1 to 5 ratings from the MovieLens or Netflix dataset. Here we know how much a user likes or dislikes an item, but this data is hard to come by. Your users might not spend the time to rate items or your app might not work well with a rating approach in the first place. It is common in many real-world use cases to only have access to implicit feedback (e.g. views, clicks, purchases, likes, shares etc.). These implicit feedbacks can be related to the preference of users on item. The model then tries to find latent factors that can be used to predict the expected preference of a user for an item.