## Collaborative Filtering Movie Recommendation System with Explicit Rating

In [1]:
## 1. Content-based (product-based/user-based) recommandation system and collaborative filtering recommendation are two major approaches for recommendation system. Collaborative filtering is more commom and widely used. 
## 2. Use Alternating-least-square (ALS) method to estimate the rating matrix.
## 3. Depends on the latent factor, number of free parameters is usually very large and likely lead to overfitting. Regularization can be added to penalize large parameters.
## 4. Common difficulties in rating estimation: a. sparsity, b. cold start, c. computational intensity
### a. Sparsity: chose smart  rating measures : explicit rating (review, rating, like/dislike) and implicit rating (# of views, length of time, etc.)
####    challenges of implicit feedback: no negative feedback, noisy, no preference or order, can't be evaluated by RMSE (fine for optimization)
### b. Cold Start: need to be handled differently in validation and production

### 1. Initiate App and Load Raw Data

In [2]:
spark=SparkSession\
    .builder\
    .appName('Collaborative Filtering Movie Recommendation System')\
    .getOrCreate()

In [3]:
ratingRawData=spark.read.format('csv').option('header','true').load('02/demos/datasets/movielens/ratings.csv')

In [4]:
ratingRawData.toPandas().head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,31,2.5,1260759144
1,1,1029,3.0,1260759179
2,1,1061,3.0,1260759182
3,1,1129,2.0,1260759185
4,1,1172,4.0,1260759205


In [5]:
## select all columns except timestamp
from pyspark.sql.functions import col

dataset=ratingRawData.select(col('userId').cast('int'),
                             col('movieId').cast('int'),
                             col('rating').cast('float'))

dataset.toPandas().head()

Unnamed: 0,userId,movieId,rating
0,1,31,2.5
1,1,1029,3.0
2,1,1061,3.0
3,1,1129,2.0
4,1,1172,4.0


In [6]:
## It's a pretty clean explicit rating dataset, no need for further feature engineering.
## Check the distribution of the dataset

dataset.select('rating').toPandas().describe()

Unnamed: 0,rating
count,100004.0
mean,3.543608
std,1.058064
min,0.5
25%,3.0
50%,4.0
75%,4.0
max,5.0


In [7]:
## Split into traning and test datasets

(trainingData, testData)=dataset.randomSplit([0.8,0.2])

### 2. Define CF model with ALS-WR method

In [8]:
##maxIter: The max # of iterations 
##regParam: regularization parameter in ALS (defaults to 1.0)
##coldStartStrategy: 'drop'/'nan'

In [9]:
from pyspark.ml.recommendation import ALS

als=ALS(maxIter=10,
        regParam=0.1,
        userCol='userId',
        itemCol='movieId',
        ratingCol='rating',
        coldStartStrategy='drop'
       )

In [10]:
## build the ALS Model with training dataset

model=als.fit(trainingData)

In [11]:
## transform test dataset with predictions

predictions=model.transform(testData)
predictions.toPandas().head(10)

Unnamed: 0,userId,movieId,rating,prediction
0,242,463,4.0,3.723727
1,452,471,3.0,3.710159
2,537,471,5.0,3.959364
3,241,471,4.0,3.653464
4,311,471,0.5,2.926611
5,521,471,3.5,4.110173
6,547,496,3.0,2.510504
7,580,1088,3.0,2.840365
8,133,1088,1.5,1.99029
9,52,1088,4.0,3.015393


### 3. Model Evaluation and Selectioin

#### 3.1 Model Evaluation

In [12]:
## Compare the distribution of values for true ratings and predicitons
### There is no constraint in predicted ratings, it can be negative or over 5.

predictions.select('rating','prediction').toPandas().describe()

Unnamed: 0,rating,prediction
count,19330.0,19330.0
mean,3.559571,3.387051
std,1.04937,0.749801
min,0.5,-0.100073
25%,3.0,2.942954
50%,4.0,3.470544
75%,4.0,3.909906
max,5.0,5.696434


In [13]:
## Get Root Mean Square Error RMSE on the test data 
## Explicit rating can use RMSE for evaluation, but implicit dataset can't

from pyspark.ml.evaluation import RegressionEvaluator

evaluator= RegressionEvaluator (metricName='rmse',
                                labelCol='rating',
                                predictionCol='prediction'
                                )

rmse=evaluator.evaluate(predictions)

In [14]:
rmse

0.9165988993924484

#### 3.2 Hyper parameter tuning with CV

In [15]:
## User CrossValidator from Spark ML to tune hyper parameter
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

cvals=ALS(maxIter=5,
        userCol='userId',
        itemCol='movieId',
        ratingCol='rating',
        coldStartStrategy='drop'
       )

paramGrid=ParamGridBuilder()\
          .addGrid(cvals.regParam,[0.01,0.1,0.2,0.5,1])\
          .build()

In [16]:
crossval=CrossValidator(estimator=cvals,
                        estimatorParamMaps=paramGrid,
                        evaluator=RegressionEvaluator(metricName='rmse',labelCol='rating'),
                        numFolds=3
                       )

In [17]:
cvModel = crossval.fit(trainingData)

In [18]:
cvPredictions=cvModel.transform(testData)
cvPredictions.toPandas().head(10)

Unnamed: 0,userId,movieId,rating,prediction
0,242,463,4.0,3.736043
1,452,471,3.0,3.458832
2,537,471,5.0,4.021946
3,241,471,4.0,3.3785
4,311,471,0.5,3.016191
5,521,471,3.5,4.041764
6,547,496,3.0,2.518697
7,580,1088,3.0,2.865585
8,133,1088,1.5,1.835583
9,52,1088,4.0,3.034947


In [19]:
cvPredictions.select('rating','prediction').toPandas().describe()

Unnamed: 0,rating,prediction
count,19330.0,19330.0
mean,3.559571,3.313087
std,1.04937,0.652553
min,0.5,0.147619
25%,3.0,2.923459
50%,4.0,3.369169
75%,4.0,3.76386
max,5.0,5.381654


In [20]:
cvEvaluator= RegressionEvaluator (metricName='rmse',
                                labelCol='rating',
                                predictionCol='prediction'
                                )

cvrmse=cvEvaluator.evaluate(cvPredictions)

In [21]:
cvrmse

0.9142911512097865

### 4. Movie Recommendation

#### 4.1 Recommendations for all users/items

In [22]:
## 3 recommendations for each user
recForUsers=model.recommendForAllUsers(3)
recForUsers.toPandas().head()

Unnamed: 0,userId,recommendations
0,471,"[(3414, 5.168346881866455), (93320, 4.84812355..."
1,463,"[(67504, 5.030383110046387), (83411, 5.0303831..."
2,496,"[(8530, 5.495335578918457), (9010, 5.317599296..."
3,148,"[(67504, 5.672274589538574), (83411, 5.6722745..."
4,540,"[(5765, 5.854146957397461), (3437, 5.854146957..."


In [23]:
## top 3 users for each movie
userForMovie=model.recommendForAllItems(3)
userForMovie.toPandas().head()

Unnamed: 0,movieId,recommendations
0,1580,"[(46, 5.092299938201904), (287, 4.841249942779..."
1,5300,"[(545, 3.9724838733673096), (4, 3.923589706420..."
2,6620,"[(545, 4.954958438873291), (123, 4.88732242584..."
3,7340,"[(46, 4.900078296661377), (568, 4.479588985443..."
4,54190,"[(156, 4.271191596984863), (301, 4.18521738052..."


#### 4.2 Recommendations for a specific user

In [24]:
userMovieList=recForUsers.filter(recForUsers.userId==148).select('recommendations')
recMovieList=userMovieList.collect()[0].recommendations

In [25]:
recMovieDF=spark.createDataFrame(recMovieList)
recMovieDF.toPandas()

Unnamed: 0,movieId,rating
0,67504,5.672275
1,83411,5.672275
2,83318,5.672275


In [26]:
## Load movie info

movieDF=spark.read.csv('02/demos/datasets/movielens/movies.csv',header=True,ignoreLeadingWhiteSpace=True)
movieDF.toPandas().head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [27]:
## join recMovieDF and movieDF 

recMovieFinalDF=movieDF.join(recMovieDF, on=['movieId']).orderBy('rating',ascending=False).select('title','genres','rating')
recMovieFinalDF.toPandas()

Unnamed: 0,title,genres,rating
0,Land of Silence and Darkness (Land des Schweig...,Documentary,5.672275
1,Cops (1922),Comedy,5.672275
2,"Goat, The (1921)",Comedy,5.672275


#### 4.3 Recommendation Engine  

In [28]:
## Combine 4.1 and 4.2 to a Recommendation Egnine for movie recommendation
## this project is runing on Spark 2.2, new Spark 2.3 has a new ALS attribute recommendForUserSubset 
## which is more flexible in this case

def getMovieRecommendationsForUser(userId,numRecs):
    allUserRecs=model.recommendForAllUsers(numRecs)
    
    userMovieList=allUserRecs.filter(allUserRecs.userId==userId).select('recommendations')
    recMovieList=userMovieList.collect()[0].recommendations
    recMovieDF=spark.createDataFrame(recMovieList)
    
    recMovieFinalDF=movieDF.join(recMovieDF, on=['movieId']).orderBy('rating',ascending=False).select('title','genres','rating')
    
    return recMovieFinalDF

In [29]:
getMovieRecommendationsForUser(219,5).toPandas()

Unnamed: 0,title,genres,rating
0,"7th Voyage of Sinbad, The (1958)",Action|Adventure|Fantasy,5.659566
1,Mortal Thoughts (1991),Mystery|Thriller,5.464806
2,Albino Alligator (1996),Crime|Thriller,5.417367
3,"New World, The (2005)",Adventure|Drama|Romance,5.320665
4,Lake of Fire (2006),Documentary,5.236992
