# Improving Movie Recommendations with Spark and the Surprise Library

Company X has used production recommenders for many years now. The current recommender, called Mean of Means, relies on per-user and per-item rating means and incorporates the global mean for smoothing. Mean of Means provides a significant revenue stream so product managers are hesitant to touch it. The issue is that these systems have been around a long time. The task to explore new solutions. The main goals are to improve the Root Mean Square Error (RMSE) on the predictions, create a working recommender written in Spark, and use the surprise library to explore other models that could be prototyped for production.

## 1. Current Production Recommender: the Baseline

First, I ran the current recommender used in production. It was provided in the Baselines.py file. I added slight updates to the code in order to cross-validate the results on five folds and print the standard deviation of the RMSE over the folds.

In [17]:
import Baselines
%run Baselines


Global Mean...
Evaluating RMSE, MAE of algorithm GlobalMean on 5 split(s).

                  Fold 1  Fold 2  Fold 3  Fold 4  Fold 5  Mean    Std     
RMSE (testset)    1.1279  1.1183  1.1316  1.1217  1.1289  1.1257  0.0049  
MAE (testset)     0.9453  0.9399  0.9521  0.9416  0.9448  0.9447  0.0042  
Fit time          0.02    0.04    0.04    0.04    0.04    0.04    0.01    
Test time         0.06    0.06    0.05    0.05    0.12    0.07    0.02    

MeanOfMeans...
Evaluating RMSE, MAE of algorithm MeanofMeans on 5 split(s).

                  Fold 1  Fold 2  Fold 3  Fold 4  Fold 5  Mean    Std     
RMSE (testset)    1.0144  1.0228  1.0150  1.0156  1.0195  1.0175  0.0032  
MAE (testset)     0.8353  0.8415  0.8336  0.8348  0.8423  0.8375  0.0036  
Fit time          0.18    0.21    0.21    0.21    0.21    0.20    0.01    
Test time         0.30    0.30    0.30    0.30    0.24    0.29    0.02    


The baseline recommender has a Root Mean Square Error of 1.0175, with a standard deviation of 0.0032. Just to make sure all changes in the RMSE are accounted for over all recommender models, I will use the coefficient of variation from now on. So our baseline is an RMSE of 1.0175 with a CV of 31.44%.

## 2. Surprise Library

Surprise’s recommenders fall into 4 basic types: 
* baseline, 
* k-Nearest Neighbors (with pearson and cosine similarity distance metric options), 
* matrix factorization-based recommendation engines (SVD and NMF, plus an extension of SVD for implicit ratings), and
* slope one.

See this guide for more detail on each: http://surprise.readthedocs.io/en/stable/basic_algorithms.html

In [2]:
# imports
from surprise import Dataset
from surprise import accuracy
from surprise.model_selection import KFold

In [1]:
# load smaller built in Movielens dataset for evaluating
from surprise import Dataset

data = Dataset.load_builtin('ml-100k')

## 2.1. Baseline

Although I have a baseline already (the "Mean of Means" algorithm currently in use and the RMSE level it produces), I started by running the baseline algorithm from surprise library, trying both Alternating Least Squares and Stochastic Gradient Descent.

In [22]:
from surprise import BaselineOnly
from surprise.model_selection import cross_validate

# try baseline with ALS
# tuning bsl_options
bsl_options = {'method': 'als',
               'n_epochs': 5,
               'reg_u': 12,
               'reg_i': 5
               }
base_als = BaselineOnly(bsl_options=bsl_options)

cross_validate(base_als, data, cv=5, verbose=True)

# try baseline with SGD
bsl_options = {'method': 'sgd',
               'learning_rate': .00005,
               }
base_sgd = BaselineOnly(bsl_options=bsl_options)

cross_validate(base_sgd, data, cv=5, verbose=True)

Estimating biases using als...
Estimating biases using als...
Estimating biases using als...
Estimating biases using als...
Estimating biases using als...
Evaluating RMSE, MAE of algorithm BaselineOnly on 5 split(s).

                  Fold 1  Fold 2  Fold 3  Fold 4  Fold 5  Mean    Std     
RMSE (testset)    0.9379  0.9471  0.9406  0.9379  0.9431  0.9413  0.0035  
MAE (testset)     0.7433  0.7499  0.7436  0.7439  0.7469  0.7455  0.0026  
Fit time          0.13    0.15    0.16    0.12    0.12    0.13    0.02    
Test time         0.14    0.14    0.14    0.08    0.13    0.12    0.02    
Estimating biases using sgd...
Estimating biases using sgd...
Estimating biases using sgd...
Estimating biases using sgd...
Estimating biases using sgd...
Evaluating RMSE, MAE of algorithm BaselineOnly on 5 split(s).

                  Fold 1  Fold 2  Fold 3  Fold 4  Fold 5  Mean    Std     
RMSE (testset)    1.0795  1.0738  1.0876  1.0804  1.0703  1.0783  0.0059  
MAE (testset)     0.9026  0.8994  0.909

{'fit_time': (0.31844019889831543,
  0.34000301361083984,
  0.3462808132171631,
  0.4146289825439453,
  0.412550687789917),
 'test_mae': array([0.90257548, 0.89936624, 0.90915863, 0.90410012, 0.89269288]),
 'test_rmse': array([1.07946294, 1.07382762, 1.08760853, 1.08040708, 1.07028132]),
 'test_time': (0.12273168563842773,
  0.1241147518157959,
  0.07518982887268066,
  0.14624381065368652,
  0.12252092361450195)}

ALS did far better, with an RMSE of 94.06 (higher CV of 37.18%), achieving nearly a 7.5% improvement over the current recommender. That bodes well for building a production-ready recommender in Spark, since the Spark MLIb machine learning library only has one recommender option, which is ALS based. Next, I will take a look at Slope One, another easy baseline algorithm from the library (this one has no hyperparameters whatsoever). For more on this algorithm, see this Wikipedia article: https://en.wikipedia.org/wiki/Slope_One

## 2.2. Slope One

In [5]:
# No params!!!
from surprise import SlopeOne

cross_validate(SlopeOne(), data, cv=5, verbose=True)

Evaluating RMSE, MAE of algorithm SlopeOne on 5 split(s).

                  Fold 1  Fold 2  Fold 3  Fold 4  Fold 5  Mean    Std     
RMSE (testset)    0.9455  0.9416  0.9419  0.9519  0.9437  0.9449  0.0038  
MAE (testset)     0.7438  0.7386  0.7394  0.7486  0.7445  0.7430  0.0037  
Fit time          0.68    0.67    0.70    0.67    0.63    0.67    0.02    
Test time         2.20    2.03    1.97    1.85    1.90    1.99    0.12    


{'fit_time': (0.6811401844024658,
  0.6733529567718506,
  0.7013638019561768,
  0.6749172210693359,
  0.6329629421234131),
 'test_mae': array([0.74382186, 0.73862904, 0.73935682, 0.7486076 , 0.74451141]),
 'test_rmse': array([0.94547797, 0.94163869, 0.94186666, 0.95191744, 0.94370444]),
 'test_time': (2.198395013809204,
  2.029664993286133,
  1.9730260372161865,
  1.8469529151916504,
  1.8984408378601074)}

Even Slope One does better than the existing algorithm used in production. The average RMSE is 0.9449 with a CV of 40.22%. That means both baseline algorithms do better predictions than the existing recommender on average but the quality of their predictions have a bit wider range than the existing ones. They still both do much better than the existing system. Next, I will try co-clustering.

## 2.3. Co-Clustering

Co-clustering is a dynamic real-time collaborative filtering engine that simultaneously clusters users and items. Here, I used the default settings of 3 user clusters, 3 item clusters, and 20 iterations of the optimization loop. 

In [39]:
from surprise import CoClustering

cross_validate(CoClustering(), data, cv=5, verbose=True)

Evaluating RMSE, MAE of algorithm CoClustering on 5 split(s).

                  Fold 1  Fold 2  Fold 3  Fold 4  Fold 5  Mean    Std     
RMSE (testset)    0.9611  0.9700  0.9650  0.9622  0.9644  0.9645  0.0031  
MAE (testset)     0.7524  0.7575  0.7543  0.7561  0.7549  0.7550  0.0017  
Fit time          1.48    1.47    1.47    1.48    1.49    1.48    0.01    
Test time         0.08    0.19    0.08    0.19    0.08    0.12    0.05    


{'fit_time': (1.481929063796997,
  1.4680471420288086,
  1.4674088954925537,
  1.4812119007110596,
  1.4904828071594238),
 'test_mae': array([0.75236666, 0.75746018, 0.75427586, 0.7560816 , 0.7548954 ]),
 'test_rmse': array([0.96107374, 0.97000246, 0.96495001, 0.96216753, 0.96437651]),
 'test_time': (0.08111095428466797,
  0.19227886199951172,
  0.08017110824584961,
  0.1897718906402588,
  0.08066105842590332)}

The RMSE was lower than the baseline, but not particularly impressive.

## 2.4. KNN

kNN is a machine learning algorithm that finds clusters of similar users based on common item ratings, and makes predictions using the average rating of top k number nearest neighbors. In surprise, there are a number of hyperparameters that can be tuned besides k (the number of nearest neighbors): whether I want to use average rating, z-score rating, baseline rating to group the clusters and whether I want to use cosine similarity or pearson distance metrics for measuring "nearness." For a step-by-step tutorial on kNN not using surprise library, see: https://towardsdatascience.com/how-did-we-build-book-recommender-systems-in-an-hour-part-2-k-nearest-neighbors-and-matrix-c04b3c2ef55c 

In [38]:
# KNN has many versions, they take different similarity measures under sim_options param
# levers to tune similarity measures: cosine, Pearson
# other params: k, min_k
# knns.KNNWithMeans (mean rating of users)
from surprise import KNNWithMeans

sim_options = {
    'name': 'cosine',
    'user_based': False
}

knn_m = KNNWithMeans(sim_options=sim_options)

cross_validate(knn_m, data, cv=5, verbose=True)

Computing the cosine similarity matrix...
Done computing similarity matrix.
Computing the cosine similarity matrix...
Done computing similarity matrix.
Computing the cosine similarity matrix...
Done computing similarity matrix.
Computing the cosine similarity matrix...
Done computing similarity matrix.
Computing the cosine similarity matrix...
Done computing similarity matrix.
Evaluating RMSE, MAE of algorithm KNNWithMeans on 5 split(s).

                  Fold 1  Fold 2  Fold 3  Fold 4  Fold 5  Mean    Std     
RMSE (testset)    0.9361  0.9417  0.9395  0.9506  0.9409  0.9418  0.0048  
MAE (testset)     0.7374  0.7413  0.7364  0.7439  0.7430  0.7404  0.0030  
Fit time          1.39    1.42    1.51    1.29    1.39    1.40    0.07    
Test time         3.22    3.18    3.21    3.20    3.15    3.19    0.02    


{'fit_time': (1.3855597972869873,
  1.4217870235443115,
  1.5068080425262451,
  1.2944920063018799,
  1.3911669254302979),
 'test_mae': array([0.73741888, 0.74128558, 0.73639748, 0.74385914, 0.74295172]),
 'test_rmse': array([0.9360551 , 0.94170234, 0.93951627, 0.950621  , 0.94087471]),
 'test_time': (3.2197039127349854,
  3.1824898719787598,
  3.2069740295410156,
  3.1991658210754395,
  3.1527531147003174)}

In [40]:
# knns.KNNBaseline (baseline rating)
# has additional bsl_options
from surprise import KNNBaseline

sim_options = {
    'name': 'pearson',
    'user_based': False
}

knn_b = KNNBaseline(sim_options=sim_options)

cross_validate(knn_b, data, cv=5, verbose=True)

Estimating biases using als...
Computing the cosine similarity matrix...
Done computing similarity matrix.
Estimating biases using als...
Computing the cosine similarity matrix...
Done computing similarity matrix.
Estimating biases using als...
Computing the cosine similarity matrix...
Done computing similarity matrix.
Estimating biases using als...
Computing the cosine similarity matrix...
Done computing similarity matrix.
Estimating biases using als...
Computing the cosine similarity matrix...
Done computing similarity matrix.
Evaluating RMSE, MAE of algorithm KNNBaseline on 5 split(s).

                  Fold 1  Fold 2  Fold 3  Fold 4  Fold 5  Mean    Std     
RMSE (testset)    0.9458  0.9383  0.9460  0.9410  0.9369  0.9416  0.0038  
MAE (testset)     0.7424  0.7413  0.7461  0.7409  0.7392  0.7420  0.0023  
Fit time          1.63    1.62    1.45    1.49    1.59    1.55    0.07    
Test time         3.56    3.34    3.30    3.44    3.55    3.44    0.11    


{'fit_time': (1.6262061595916748,
  1.6176371574401855,
  1.45363187789917,
  1.4893670082092285,
  1.5877439975738525),
 'test_mae': array([0.74241672, 0.7412705 , 0.74611266, 0.74090039, 0.73923614]),
 'test_rmse': array([0.94578761, 0.9382898 , 0.94599386, 0.94095897, 0.93687012]),
 'test_time': (3.5579068660736084,
  3.3396248817443848,
  3.299196720123291,
  3.4366979598999023,
  3.554590940475464)}

I ran a variety of versions of kNN (only kept the code for two of the highest performing ones): RMSE of 0.9418 and CV of 50.97% for using average ratings vs. RMSE of 0.9416 and a CV of 40.36% for using baseline ratings.

## 2.5. Matrix Factorization

The last type of recommender systems in surprise relies on matrix factorization. Here's a breakdown of the two words in the name of this approach:
* Factor: The matrix factorization approach makes recommendations by finding latent factors that are not necessarily apparent in the data. (For example, when I log into Netflix, one of the latent factors in my recommendation feed is "international criminal investigation TV dramas." A label like that wouldn't necessarily be found on imdb.com, but it is a strongh enough feature to describe a number of items offered by Netflix.) 
* Matrix: Using linear algrebra, the user-item matrix can be decomposed into several parts: a user-feature matrix and a movie-feature matrix. Those two parts then can be recombined, giving us predicted “ratings” for movies not yet seen.

For good basic introductions on matrix factorization, see:
* https://blog.insightdatascience.com/explicit-matrix-factorization-als-sgd-and-all-that-jazz-b00e4d9b21ea
* https://www.datacamp.com/community/tutorials/matrix-factorization-names

In [31]:
# SVD
from surprise import SVD
from surprise.model_selection import cross_validate

svd = SVD()

# Run 5-fold cross-validation and print results.
cross_validate(svd, data, cv=5, verbose=True)

Evaluating RMSE, MAE of algorithm SVD on 5 split(s).

                  Fold 1  Fold 2  Fold 3  Fold 4  Fold 5  Mean    Std     
RMSE (testset)    0.9344  0.9287  0.9357  0.9397  0.9337  0.9344  0.0036  
MAE (testset)     0.7385  0.7303  0.7363  0.7409  0.7352  0.7363  0.0036  
Fit time          4.15    4.18    4.19    4.23    4.31    4.21    0.06    
Test time         0.12    0.18    0.18    0.12    0.19    0.16    0.03    


{'fit_time': (4.148401975631714,
  4.179248094558716,
  4.18828010559082,
  4.23301100730896,
  4.311578750610352),
 'test_mae': array([0.73852984, 0.73028014, 0.73633033, 0.74090837, 0.73520329]),
 'test_rmse': array([0.93442533, 0.92868259, 0.93570543, 0.93972205, 0.9336977 ]),
 'test_time': (0.11529302597045898,
  0.18133306503295898,
  0.18240094184875488,
  0.11872696876525879,
  0.19011807441711426)}

In [40]:
# NMF
from surprise import NMF
nmf = NMF()

# Run 5-fold cross-validation and print results.
cross_validate(nmf, data, cv=5, verbose=True)

Evaluating RMSE, MAE of algorithm NMF on 5 split(s).

                  Fold 1  Fold 2  Fold 3  Fold 4  Fold 5  Mean    Std     
RMSE (testset)    0.9620  0.9591  0.9664  0.9608  0.9696  0.9636  0.0039  
MAE (testset)     0.7564  0.7519  0.7605  0.7561  0.7641  0.7578  0.0042  
Fit time          4.41    4.53    4.48    4.51    4.64    4.52    0.07    
Test time         0.20    0.21    0.09    0.24    0.10    0.17    0.06    


{'fit_time': (4.412101984024048,
  4.534706115722656,
  4.484484910964966,
  4.512546062469482,
  4.6404478549957275),
 'test_mae': array([0.75643213, 0.75191407, 0.76046604, 0.7560633 , 0.76410932]),
 'test_rmse': array([0.96203534, 0.95912811, 0.96638438, 0.96077173, 0.96964493]),
 'test_time': (0.20378589630126953,
  0.21161890029907227,
  0.09108877182006836,
  0.24230217933654785,
  0.10178089141845703)}

Singular Vector Decomposition (SVD) did much better than Negative Matrix Factorization (NMF): RMSE of 0.9344 with a CV of 38.53% for SVD vs. RMSE of 0.9636 and CV of 40.47% for NMF.

# New Recommender in Spark

In [1]:
# initiate a spark session
import pyspark as ps

spark = ps.sql.SparkSession.builder \
            .master("local[4]") \
            .appName("df lecture") \
            .getOrCreate()

## Load Data: Ratings DataFrame

ALS only requires an input dataset of existing ratings between user-item pairs, with three columns: a user ID column, an item ID column (e.g., a movie), and a rating column. So I am only reading in the ratings part of the MovieLens dataset.

In [2]:
# import the many data types
from pyspark.sql.types import *

# create a schema of your own
customSchema = StructType([
   StructField('userId',IntegerType(),True),
   StructField('movieId',IntegerType(),True),
   StructField('rating',FloatType(),True),
   StructField('timestamp',StringType(),True)
])

# use that schema in reading in df
ratings_df = spark\
                .read\
                .schema(customSchema)\
                .option("header", "true")\
                .csv('data/movies/ratings.csv')


# show the result
ratings_df.show()

# print the schema
ratings_df.printSchema()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
|     1|   1263|   2.0|1260759151|
|     1|   1287|   2.0|1260759187|
|     1|   1293|   2.0|1260759148|
|     1|   1339|   3.5|1260759125|
|     1|   1343|   2.0|1260759131|
|     1|   1371|   2.5|1260759135|
|     1|   1405|   1.0|1260759203|
|     1|   1953|   4.0|1260759191|
|     1|   2105|   4.0|1260759139|
|     1|   2150|   3.0|1260759194|
|     1|   2193|   2.0|1260759198|
|     1|   2294|   2.0|1260759108|
|     1|   2455|   2.5|1260759113|
|     1|   2968|   1.0|1260759200|
|     1|   3671|   3.0|1260759117|
+------+-------+------+----------+
only showing top 20 rows

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestam

## Predict Ratings using Spark ML

For a quick overview of collaborative filtering in Spark ML, visit: https://spark.apache.org/docs/preview/ml-collaborative-filtering.html#examples (Also, the newly published Spark: The Definitive Guide is extremely helpful: http://shop.oreilly.com/product/0636920034957.do)

In [12]:
# ALS with GridSearch
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RegressionMetrics

# we already have a df called ratings_df from above
(training, test) = ratings_df.randomSplit([0.8, 0.2])

# always create a new instance of a model before creating a pipeline
# remember to set the coldStartStrategy back to 'NaN' for production
#rForm = RFormula()
als2 = ALS()\
            .setColdStartStrategy('drop')\
            .setUserCol("userId")\
            .setItemCol("movieId")\
            .setRatingCol("rating")

# add stages to pipeline
stages = [als2]
pipeline = Pipeline().setStages(stages)

# parameter tuning
params = ParamGridBuilder()\
        .addGrid(als2.rank, [10, 20, 40, 80])\
        .addGrid(als2.nonnegative, [True, False])\
        .addGrid(als2.regParam, [0.01, 0.02, 0.05, 0.08, 0.1, 0.15, 0.2])\
        .build()

# specify evaluation process   
evaluator = RegressionEvaluator()\
    .setMetricName("rmse")\
    .setPredictionCol("prediction")\
    .setLabelCol("rating")

# using crossvalidator for parameter search
cv = CrossValidator()\
        .setEstimator(pipeline)\
        .setEvaluator(evaluator)\
        .setEstimatorParamMaps(params)\
        .setNumFolds(5)
        
ALSmodelGS = cv.fit(training)

out = ALSmodelGS.transform(training)\
                .select("prediction", "rating").rdd\
                .map(lambda x: (float(x[0]), float(x[1])))
metrics = RegressionMetrics(out)

print("Root-mean-square error = " + str(metrics.rootMeanSquaredError))

# once trained, we can persist it to desk and use it on other data
# remember to set back coldStartStrategy to NaN
# ALSmodelGS.write.overwrite().save("prediction_model")

Root-mean-square error = 0.6254246953148832


Running GridSearch, Spark's recommender reduced the RMSE to 0.6254! This solution clearly outperforms the existing Mean of Means and should be considered as a replacememnt engine for production.