# DATA 612 - Project 5
**Derek G. Nokes**

## Introduction

In this project, we create a simple but scalable recommender system using PySpark and compare it to a similar recommender system created for Project 3. 

In previous projects for this course, we created our own functions to download the movielens dataset [@MovieLens]. We re-use these functions to acquire the required data.

In the first section, we provide instructions for replicating our computing environment using a custom docker image. We review the data acquisition in the second section. In the third section, we demonstrate how PySpark can be used to explore data, then in the fourth section we provide an overview of the theory underlying the two models we implement and compare in the fifth and sixth sections respectively. In the final section we provide our concluding remarks.  


## Computing Environment

To ensure that our work can easily be replicated, we have created a docker container with all of the required tools installed. In this section, we provide instructions for acquiring and running this project in a docker container.

To create our environment we simply pulled jupyter/all-spark-notebook:latest image and added the recommender systems package, 'surprise'.

### Creating the Custom Docker Image

One a system with docker installed, at the command line run:

docker pull jupyter/all-spark-notebook:latest

Start the contain once it has downloaded:

docker run -it -p 8888:8888 -v /home/dnokes/projects/ms/github/DATA_612/Project_5:/home/dnokes/projects/ms/github/DATA_612/Project_5 jupyter/all-spark-notebook:latest

The first path is the local path. The second path is the remote path inside the container

Open another terminal window and at the command line type

docker ps 

Locate the container ID (e.g. , 0d78bfb39601). 

Open another terminal window and run the command

docker exec -it 0d78bfb39601 /bin/bash

Add the 'surprise' package. Type:

conda install -c conda-forge scikit-surprise

Click 'Y' when prompted.

Go back to the terminal window where we ran docker ps.

Now we create the custom docker image. Type:

docker commit 0d78bfb39601 dgn2/data_612_project_5:version1

Finally, we push the image to dockerhub:

### Running the Custom Docker Image




In [1]:
# import necessary libraries
import pandas as pd 
import numpy
import matplotlib.pyplot as plt 
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
# import MF library
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
#from pyspark.ml.recommendation import ALS

# create sparksession
spark = SparkSession \
    .builder \
    .appName("Pysparkexample") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [2]:
sqlContext = SQLContext(spark)
# check if spark context is defined
print(spark.version)

2.4.3


## Data Acquisition

In this section we download the movielens data and read the data into a Spark dataframe.


In [3]:
import requests
import zipfile
from collections import defaultdict

We define a function to download the movielens data:


In [4]:
# function to download movielens data
def download(download_url, download_path):

    req = requests.get(download_url, stream=True)

    with open(download_path, 'wb') as fd:
        for chunk in req.iter_content(chunk_size=2**20):
            fd.write(chunk)

We download the data:

In [5]:
# define download path
#download_path='/home/dnokes/projects/ms/github/DATA_612/Project_5/ml-100k.zip'
download_path='ml-100k.zip'
# define download URL
download_url='http://files.grouplens.org/datasets/movielens/ml-100k.zip'
# dowload movielens data
download(download_url, download_path)


We extract the data from the zip file and read it into memory:


In [6]:
# define unzip path
unzip_path=''
# unzip movielens data
zf = zipfile.ZipFile(download_path)
user_by_item_file=zf.extract('ml-100k/u.data', unzip_path)
title_by_item_file=zf.extract('ml-100k/u.item', unzip_path)

# read movielens data into dataframe
df_user_by_item = pd.read_csv(user_by_item_file, sep='\t',header=None,
    names=['user_id','item_id','rating','titmestamp'])
df_title_by_item = pd.read_csv(title_by_item_file, sep='|',encoding='latin-1',
    header=None,usecols=[0,1],names=['item_id', 'title'])
# join the ratings by user and item to the title
df = pd.merge(df_title_by_item,df_user_by_item, on='item_id')


We check the dimensions of the component and merged datasets.

In [73]:
df_title_by_item.shape

(1682, 2)

In [74]:
df_user_by_item.shape

(100000, 4)

In [75]:
df.shape

(100000, 5)

Finally, we convert our Pandas dataframe to a Spark dataframe for use in the next sections of the project. While this approach works for our small dataset, obviously we would read the data directly into Spark if we were using a much larger dataset.

In [7]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
sql_sc = SQLContext(spark)
# convert to spark
s_df = sql_sc.createDataFrame(df)

## Data Exploration

In this section, we quickly explore the Spark functionality for manipulating and exploring data.


First, we show the top 5 lines of the pandas dataframe:

In [8]:
# display top 5 lines
df.head()

Unnamed: 0,item_id,title,user_id,rating,titmestamp
0,1,Toy Story (1995),308,4,887736532
1,1,Toy Story (1995),287,5,875334088
2,1,Toy Story (1995),148,4,877019411
3,1,Toy Story (1995),280,4,891700426
4,1,Toy Story (1995),66,3,883601324


Next, we show the top 5 lines of the Spark dataframe:

In [9]:
s_df.head(5)

[Row(item_id=1, title='Toy Story (1995)', user_id=308, rating=4, titmestamp=887736532),
 Row(item_id=1, title='Toy Story (1995)', user_id=287, rating=5, titmestamp=875334088),
 Row(item_id=1, title='Toy Story (1995)', user_id=148, rating=4, titmestamp=877019411),
 Row(item_id=1, title='Toy Story (1995)', user_id=280, rating=4, titmestamp=891700426),
 Row(item_id=1, title='Toy Story (1995)', user_id=66, rating=3, titmestamp=883601324)]

We can see that the data is the same.

We can view the schema as follows:

In [10]:
s_df.printSchema()

root
 |-- item_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- rating: long (nullable = true)
 |-- titmestamp: long (nullable = true)



Using Spark we can summarize the data quickly:

In [11]:
s_df.describe().show()

+-------+------------------+--------------------+-----------------+------------------+-----------------+
|summary|           item_id|               title|          user_id|            rating|       titmestamp|
+-------+------------------+--------------------+-----------------+------------------+-----------------+
|  count|            100000|              100000|           100000|            100000|           100000|
|   mean|         425.53013|                null|        462.48475|           3.52986|8.8352885148862E8|
| stddev|330.79835632558525|                null|266.6144201275087|1.1256735991443156|5343856.189502792|
|    min|                 1|'Til There Was Yo...|                1|                 1|        874724710|
|    max|              1682|Á köldum klaka (C...|              943|                 5|        893286638|
+-------+------------------+--------------------+-----------------+------------------+-----------------+



We can find the mean rating by user:

In [12]:
s_df.groupby('user_id').agg({'rating': 'mean'}).show()

+-------+------------------+
|user_id|       avg(rating)|
+-------+------------------+
|     26|  2.94392523364486|
|    474|  4.08256880733945|
|     29|3.6470588235294117|
|    541|3.6240601503759398|
|     65|            3.9375|
|    558|               4.2|
|    191|3.6296296296296298|
|    418|               2.9|
|    222| 3.049095607235142|
|    293|3.0309278350515463|
|    730| 3.236842105263158|
|    938|3.2685185185185186|
|    270| 4.333333333333333|
|    243|3.6419753086419755|
|    705| 3.710526315789474|
|    442| 3.125874125874126|
|    367| 4.155172413793103|
|    278| 4.260869565217392|
|    720| 3.966666666666667|
|     54|3.6923076923076925|
+-------+------------------+
only showing top 20 rows



We can find the mean rating by item:

In [13]:
s_df.groupby('item_id').agg({'rating': 'mean'}).show()

+-------+------------------+
|item_id|       avg(rating)|
+-------+------------------+
|     26| 3.452054794520548|
|     29|2.6666666666666665|
|    474| 4.252577319587629|
|    964|3.3333333333333335|
|   1677|               3.0|
|     65|3.5391304347826087|
|    191| 4.163043478260869|
|    418|3.5813953488372094|
|    541| 2.877551020408163|
|    558|3.6714285714285713|
|   1010|              3.25|
|   1224|2.6666666666666665|
|   1258|2.5217391304347827|
|   1277|3.4210526315789473|
|   1360|               1.5|
|    222|  3.66027397260274|
|    270|3.5955882352941178|
|    293| 3.802721088435374|
|    730|               3.5|
|    938|              2.88|
+-------+------------------+
only showing top 20 rows



We can count the number of ratings by user id:

In [14]:
s_df.crosstab('user_id', 'rating').show()

+--------------+---+---+---+---+---+
|user_id_rating|  1|  2|  3|  4|  5|
+--------------+---+---+---+---+---+
|           645|  2|  2| 29| 55| 34|
|           892|  2| 13| 40| 99| 72|
|            69|  2|  3| 21| 16| 23|
|           809|  2|  2|  6|  5|  5|
|           629|  1|  8| 24| 35| 53|
|           365|  5|  9| 12| 23|  9|
|           138|  0|  1|  3| 28| 19|
|           760|  4|  7| 11| 13|  6|
|           101|  3| 19| 28| 16|  1|
|           479| 24| 14| 48| 85| 31|
|           347| 20| 25| 37| 55| 62|
|           846| 10| 46| 89|154|106|
|           909|  0|  0|  5|  7| 14|
|           333|  1|  1|  5| 13|  6|
|           628|  0|  1|  1|  3| 22|
|           249|  1|  5| 31| 63| 61|
|           893|  1|  5| 28| 18|  7|
|           518|  4|  3| 28| 18| 20|
|           468|  0|  9| 31| 55| 48|
|           234| 14|103|205|126| 32|
+--------------+---+---+---+---+---+
only showing top 20 rows



We can count the number of ratings by item id:

In [15]:
s_df.crosstab('item_id', 'rating').show()

+--------------+---+---+---+---+---+
|item_id_rating|  1|  2|  3|  4|  5|
+--------------+---+---+---+---+---+
|           645|  1|  0|  9| 10|  7|
|           892|  7| 13| 22|  8|  3|
|            69| 14| 23| 59|125|100|
|          1322|  1|  2|  3|  0|  0|
|          1665|  0|  1|  0|  0|  0|
|          1036|  2| 12|  2|  6|  2|
|          1586|  1|  0|  0|  0|  0|
|          1501|  1|  0|  2|  2|  0|
|           809|  4| 10| 16| 12|  1|
|          1337|  3|  0|  6|  0|  0|
|          1411|  6|  7|  8|  7|  0|
|           629|  3|  8| 33| 26|  7|
|          1024|  1|  1|  6|  2|  5|
|          1469|  2|  0|  5|  4|  1|
|           365|  3| 10| 16| 12|  7|
|          1369|  1|  0|  2|  1|  0|
|           138|  6|  3|  5|  3|  2|
|          1190|  0|  2|  5|  2|  2|
|          1168|  1|  4|  6|  9|  2|
|           760| 11|  8| 18|  4|  5|
+--------------+---+---+---+---+---+
only showing top 20 rows



We can use SQL to run queries against our dataset as well.

In [16]:
# reguster dataset as table
s_df.createOrReplaceTempView('movielens')


We count the number of ratings for each distinct rating level:

In [17]:
sqlContext.sql('select rating,count(rating) as nRatings from movielens group by rating order by rating').show(5)

+------+--------+
|rating|nRatings|
+------+--------+
|     1|    6110|
|     2|   11370|
|     3|   27145|
|     4|   34174|
|     5|   21201|
+------+--------+



Now that we have familarized ourselves with some of the functionality of Spark for exploring data, we very breifly review the theory underlying the recommender model we will implement in later sections of the project.

## Theory

In Project 3, we used Probabilistic Matrix Factorization to create a recommender system. In the first sub-section of the 'Theory' section we review the underlying theory for Probabilistic Matrix Factorization, then we in the second sub-section we provide an overview of the Spark implementation.

### Probabilistic Matrix Factorization Using Stochastic Gradient Descent (SGD)

Probabilistic matrix factorization [@Salakhutdinov-2008a] is very similar to singular value decomposition. We define the probabilistic matrix factorization as follows: 

$$R = Q^{T}P$$

where predictions $\hat{r}_{ui}$ for user $u$ for item $i$,  are:

$$\hat{r}_{ui} = q_i^{T}p_u$$

where $q_i$ and $p_u$ are the latent factors for items and users respectively.

Unlike the classical SVD, this matrix factorization can be applied to sparse matrices. We minimize the regularized squared error using stochastic gradient descent to estimate all of the unknowns as follows:

$$\sum_{r_{ui} \in R_{train}} \left(r_{ui} - \hat{r}_{ui} \right)^2 + \lambda\left(\big\|q_i\big\|^{2} + \big\|p_u\big\|^{2}\right)$$

where $r_{ui} - \hat{r}_{ui}$ is the error and $\lambda$ is the regularization parameter.

### Probabilistic Matrix Factorization Using Alternating Least Squares (ALS) With Weighted-$\lambda$-Regularization

As the rating matrix contains both signals and noise, it is important to remove noise and use the recovered signal to predict missing ratings. Singular Value Decomposition (SVD) is a natural approach that approximates the original user-movie rating matrix $R$ by the product of two rank-$k$ matrices of $\hat{R} = U^{T} \times M$. The solution given by the SVD minimizes the Frobenious norm of $R - \hat{R}$, which is equivalent to minimizing the RMSE over all elements of $R$, stand SVD algorithms cannot find $U$ and $M$.

In this project, we use the *alternating least squares* (ALS) algorithm to solve the low rank matrix factorization problem using the following steps:

**Step 1**: Initialize matrix $M$ by assigning the average rating for that movie as the first row, and small random numbers for the remaining entries.

**Step 2**: Fix $M$, Solve $U$ by minimizing the objective function (the sum of squared errors);

**Step 3**: Fix $U$, solve $M$ by minimizing the objective function similarly;

**Step 4**: Repeat Steps 2 and 3 until a stopping criterion is satisfied.

The stopping criterion used is based on the observed root mean squared error (RMSE) on the training dataset. After one round of updating both $U$ and $M$, if the difference between the observed RMSEs on the training dataset is less than 0.0001, the iteration stops and we use the obtained $U,M$ to make final predictions on the testing dataset. The same objective function was used previously by Salakhutdinov et al. and solved using gradient descent. [In fact, in Project 3, we used this approach? We will compare the results of ALS and SGD]] 

Without regularization, ALS might lead to overfitting as there are many free parameters. A common fix is to use Tikhonov regularization, which penalizes large parameters. 

The following weighted-$\lambda$-regularization has been found to prevent overfitting as the number of features or number of iterations increase:

$$f(U,M)=\sum_{(i,j) \in I}\big(r_{i,j}-u_{i}^{T}m_{j}\big)^{2}+\lambda\bigg(\sum_{i}n_{u_{i}}\big\|u_{i}\big\|^{2}+\sum_{j}n_{m_{j}}\big\|m_{j}\big\|^{2}\bigg)$$

where 

$n_{u_{i}}$ is the number of ratings of user $i$ 

$n_{m_{j}}$ denotes the number of ratings of movie $j$ 

$I_{j}$ denotes the set of users who rated movie $j$

$I_{i}$ is the set of movies $j$ that user $i$ rated

$n_{u_{i}}$ is the cardinality of $I_{i}$

$n_{m_{j}}$ is the cardinality of $I_{j}$ 

This corresponds to Tikhonov regularization where $\Gamma_{U}=\text{diag}(n_{u_{i}})$ and $\Gamma_{M}=\text{diag}(n_{m_{j}})$


## Implementation

In this section, we use PySpark to create a distributed version of our SVD-based recommender system from Project 3. We do not create a full cluster, but instead run a local single node version, focusing on the implementation in PySpark rather than on cluster setup.

Although the objective function of the model is the same as that used in Project 3 for probablistic matrix factorization, the Spark version of the model uses alternating least squares (ALS) to do the matrix factorization (i.e., solve for the user and item latent factors) rather than gradient descent. This method allows relatively easy scaling in a distributed environment.


### Probabilistic Matrix Factorization Using Stochastic Gradient Descent (SGD)

Frist we review the results of the probabilistic matrix factorization using stochastic gradient descent from project 3. 

In [127]:
# import from Surprise package
from surprise import Reader, Dataset, SVD, accuracy
from surprise.model_selection import cross_validate
from surprise.model_selection import GridSearchCV
from surprise.model_selection import train_test_split
# import python packages
import numpy as np
from numpy.linalg import norm

import random
from math import sqrt

# set random seed
randomSeed = 12345678

We create the data object required for the modeling below as follows:

In [128]:
# load reader library
reader = Reader()
# load ratings dataset with Dataset library
data = Dataset.load_from_df(df[['user_id', 'item_id',
  'rating']], reader)

We set the random seed to ensure that our results are comparable across models and parameters and perform 5-fold cross validation using default parameters.

In [129]:
random.seed(randomSeed)
np.random.seed(randomSeed)

# select SVD algorithm
algo = SVD()

# compute RMSE and MAE of SVD algorithm using 5-fold cross 
# validation
result=cross_validate(algo, data, measures=['RMSE', 'MAE'], 
    cv=5, verbose=True,return_train_measures=True)
# extract mean test RMSE
meanTestRMSE=result[ 'test_rmse'].mean()

Evaluating RMSE, MAE of algorithm SVD on 5 split(s).

                  Fold 1  Fold 2  Fold 3  Fold 4  Fold 5  Mean    Std     
RMSE (testset)    0.9372  0.9367  0.9382  0.9338  0.9358  0.9363  0.0015  
MAE (testset)     0.7393  0.7408  0.7374  0.7367  0.7383  0.7385  0.0015  
RMSE (trainset)   0.6857  0.6826  0.6863  0.6841  0.6846  0.6847  0.0013  
MAE (trainset)    0.5435  0.5407  0.5447  0.5425  0.5424  0.5428  0.0013  
Fit time          5.57    5.58    5.53    5.52    5.51    5.54    0.03    
Test time         0.23    0.17    0.23    0.18    0.17    0.19    0.03    


Now we define a function to return the top-N recommendations for each user from a set of predictions:

In [130]:
def get_top_n(predictions, n=10):
    # return top-N recommendations for each user from 
    # a set of predictions.
    #
    # predictions(list of Prediction objects): list of 
    #   predictions, as returned by test method of an 
    #   algorithm.
    # n(int): number of recommendation to output for each 
    #   user

    # First map predictions to each user
    top_n = defaultdict(list)
    for uid, iid, true_r, est, _ in predictions:
        top_n[uid].append((iid, est))

    # sort predictions for each user and retrieve 
    # k highest ones
    for uid, user_ratings in top_n.items():
        user_ratings.sort(key=lambda x: x[1], 
          reverse=True)
        top_n[uid] = user_ratings[:n]

    # return dict where keys are user (raw) ids and 
    # values are lists of tuples:
    # [(raw item id, rating estimation), ...] of size n.
    return top_n


In [131]:
random.seed(randomSeed)
np.random.seed(randomSeed)
# select best algo with grid search
print('Grid Search...')
param_grid = {'n_epochs': [10, 20, 30], 
    'lr_all': [0.002, 0.005, 0.01,0.015,0.02,0.05],
    'biased' : [True,False]}
grid_search = GridSearchCV(SVD, param_grid, measures=['rmse'], 
  cv=5)

random.seed(randomSeed)
np.random.seed(randomSeed)
# fit using best model
grid_search.fit(data)
# use best from grid search
algo = grid_search.best_estimator['rmse']
bestRMSE = grid_search.best_score['rmse']


Grid Search...


In [141]:
bestRMSE

0.9363329339678333

In [142]:
meanTestRMSE

0.9363396424407643

Based on the grid search performance, our best mean RMSE is 0.9363 across all 5-folds matching the performance of the default settings. 

In [143]:
# create dataframe
results_df = pd.DataFrame.from_dict(grid_search.cv_results)
# extract required columns
results_table=results_df[['param_biased','param_lr_all',
    'param_n_epochs','mean_test_rmse','split0_test_rmse',
    'split1_test_rmse','split2_test_rmse','split3_test_rmse',
    'split4_test_rmse']].sort_values(by=['mean_test_rmse'])
columnMap={'param_biased' : 'biased',
    'param_lr_all' : 'lr_all',
    'param_n_epochs' : 'n_epochs',
    'mean_test_rmse' : 'Mean',
    'split0_test_rmse' : 'Fold 1', 
    'split1_test_rmse' : 'Fold 2',
    'split2_test_rmse' : 'Fold 3', 
    'split3_test_rmse' : 'Fold 4',
    'split4_test_rmse' : 'Fold 5'}
results_table.rename(columns=columnMap,inplace=True)


In [151]:
results_table

Unnamed: 0,biased,lr_all,n_epochs,Mean,Fold 1,Fold 2,Fold 3,Fold 4,Fold 5
4,True,0.01,10,0.936333,0.93661,0.93812,0.939056,0.934613,0.933266
14,True,0.005,20,0.936745,0.935265,0.936795,0.942951,0.936236,0.932477
6,True,0.015,10,0.94342,0.945438,0.939999,0.948176,0.941976,0.941512
26,True,0.005,30,0.94358,0.947548,0.940927,0.945648,0.946101,0.937675
24,True,0.002,30,0.944294,0.944029,0.944479,0.949286,0.94229,0.941385
2,True,0.005,10,0.947223,0.947633,0.946636,0.951514,0.945079,0.945252
15,False,0.005,20,0.948613,0.951853,0.945324,0.954849,0.943987,0.947055
12,True,0.002,20,0.949987,0.950142,0.948434,0.956086,0.948348,0.946926
5,False,0.01,10,0.953025,0.953494,0.948194,0.96316,0.948255,0.952023
27,False,0.005,30,0.95329,0.955076,0.947399,0.962642,0.950337,0.950995


- The 'biased' column indicates whether we include biases $b_i$ an and $b_u$ in our model (i.e., whether or not we use probabilistic matrix factorization or the extended probabilistic matrix factorization).

- 'lr_all' is parameter is the learning rate

If we exclude the extended probabilistic matrix factorization model results, the performance worsens to 0.9486:

In [152]:
nonBiasedIndex=results_table['biased']==False
results_table.loc[nonBiasedIndex]

Unnamed: 0,biased,lr_all,n_epochs,Mean,Fold 1,Fold 2,Fold 3,Fold 4,Fold 5
15,False,0.005,20,0.948613,0.951853,0.945324,0.954849,0.943987,0.947055
5,False,0.01,10,0.953025,0.953494,0.948194,0.96316,0.948255,0.952023
27,False,0.005,30,0.95329,0.955076,0.947399,0.962642,0.950337,0.950995
7,False,0.015,10,0.959367,0.96072,0.954012,0.969482,0.954134,0.958486
25,False,0.002,30,0.967097,0.970785,0.960351,0.975844,0.9625,0.966006
17,False,0.01,20,0.969302,0.976076,0.961909,0.975329,0.967677,0.965518
9,False,0.02,10,0.975298,0.977871,0.970301,0.976564,0.975495,0.97626
3,False,0.005,10,0.979137,0.982012,0.970966,0.988101,0.97607,0.978536
29,False,0.01,30,0.983669,0.988109,0.977487,0.991606,0.978516,0.982626
19,False,0.015,20,0.986083,0.989636,0.978324,0.998357,0.982093,0.982003


Now, that we have looked at the 5-fold cross validated performance for a range of parameters, we demonstrate how the model can be used to produce recommendations.

In [133]:
# define training / testing split
testTrainSplit=0.75


We train an extended probabilistic matrix factorization model in-sample using 75% of the data, then we generate recommendations for all of the out-of-sample users.


In [134]:
random.seed(randomSeed)
np.random.seed(randomSeed)

# sample random trainset and testset
# test set is made of 75% of the ratings.
trainset, testset = train_test_split(data, test_size=testTrainSplit)
# create predictions from
algo.fit(trainset)
predictions = algo.test(testset)
accuracy.rmse(predictions)

RMSE: 0.9759


0.9758846020775652

In [135]:
# define top N 
topN=10


We now find the top 10 recommendations for the test set:

In [136]:
iid='item_id'
# map item id to title 
item2Title=df_title_by_item.set_index(iid)['title'].to_dict()
# get top 10 recommendation for each user
top_n = get_top_n(predictions, n=topN)
# extract top N recommendations for each user
listTopN=list()
for uid, user_ratings in top_n.items():
    listTopN.append([uid,[item2Title[int(iid)] for (iid, 
    _) in user_ratings]])
# convert to dataframe
dfTopN=pd.DataFrame(listTopN)
# rename columns
dfTopN.rename(columns={0 : 'user_id',1 : 'recommendations'},
    inplace=True)
# set index to user ID
dfTopN=dfTopN.set_index('user_id')


We can test the recommendations out-of-sample. We can predict the rating for an item by a user that was not part of the training set and compare that prediction with their actual rating.

In [137]:
# define raw user id (string)
user_id = str(531)
# define raw item id (string)
item_id = str(421)  
trueRating=4.0
# get a prediction for specific users and items.
pred = algo.predict(user_id, item_id, r_ui=trueRating, 
  verbose=True)
print(pred)

user: 531        item: 421        r_ui = 4.00   est = 3.53   {'was_impossible': False}
user: 531        item: 421        r_ui = 4.00   est = 3.53   {'was_impossible': False}


In [138]:
print(dfTopN.head())

                                           recommendations
user_id                                                   
521      [Usual Suspects, The (1995), Star Wars (1977),...
405      [Shawshank Redemption, The (1994), Rear Window...
216      [Star Wars (1977), Usual Suspects, The (1995),...
478      [Star Wars (1977), Shawshank Redemption, The (...
454      [Rear Window (1954), One Flew Over the Cuckoo'...


We extract the top 10 recommendations for an example user as follows:

In [147]:
# define example user
user_id=5
# extract top N recommendations for example user
recommendationList=dfTopN['recommendations'].loc[user_id]

Finally, here are the recommendations:

In [148]:
sampleRecommendations=pd.DataFrame(recommendationList,
  columns=['recommendations'])
print(sampleRecommendations)


                             recommendations
0                      Close Shave, A (1995)
1                           Star Wars (1977)
2             Raiders of the Lost Ark (1981)
3                 Princess Bride, The (1987)
4           Silence of the Lambs, The (1991)
5               To Kill a Mockingbird (1962)
6                                Jaws (1975)
7     Monty Python and the Holy Grail (1974)
8                  Young Frankenstein (1974)
9  Indiana Jones and the Last Crusade (1989)


Although we have not delved into measures beyond the root mean squared error to understand the utility of these recommendations to our users, there are many more techniques that can be used to help us understand our data once we have the latent factor loadings.

### Probabilistic Matrix Factorization Using Alternating Least Squares (ALS) With Weighted-$\lambda$-Regularization

Next, we review the results of the probabilistic matrix factorization using alternating least squares (ALS) with weighted-$\lambda$-regularization. We use the PySpark implementation which can be scaled out using a cluster.

In [77]:
# create shape method for spark dataframe
def spark_shape(self):
    return (self.count(), len(self.columns))
pyspark.sql.dataframe.DataFrame.shape = spark_shape


We set the random seed and apply a random split, using 75% of the data for training and 25% of the data for testing:

In [79]:
# train/test split (75%/25%)
X_train, X_test = s_df.randomSplit([0.75, 0.25],seed=randomSeed)


We double-check the dimension of the training and testing data:

In [89]:
X_train.shape()

(79972, 5)

In [90]:
X_test.shape()

(20028, 5)

We check the data structure:

In [19]:
X_train.head(5)

[Row(item_id=1, title='Toy Story (1995)', user_id=1, rating=5, titmestamp=874965758),
 Row(item_id=1, title='Toy Story (1995)', user_id=5, rating=4, titmestamp=875635748),
 Row(item_id=1, title='Toy Story (1995)', user_id=6, rating=4, titmestamp=883599478),
 Row(item_id=1, title='Toy Story (1995)', user_id=13, rating=3, titmestamp=882140487),
 Row(item_id=1, title='Toy Story (1995)', user_id=15, rating=1, titmestamp=879455635)]

In [20]:
X_test.head(5)

[Row(item_id=1, title='Toy Story (1995)', user_id=2, rating=4, titmestamp=888550871),
 Row(item_id=1, title='Toy Story (1995)', user_id=10, rating=4, titmestamp=877888877),
 Row(item_id=1, title='Toy Story (1995)', user_id=16, rating=5, titmestamp=877717833),
 Row(item_id=1, title='Toy Story (1995)', user_id=18, rating=5, titmestamp=880130802),
 Row(item_id=1, title='Toy Story (1995)', user_id=20, rating=3, titmestamp=879667963)]

We test the select method:

In [21]:
X_train.select(["user_id", "item_id", "rating"]).head(5)

[Row(user_id=1, item_id=1, rating=5),
 Row(user_id=5, item_id=1, rating=4),
 Row(user_id=6, item_id=1, rating=4),
 Row(user_id=13, item_id=1, rating=3),
 Row(user_id=15, item_id=1, rating=1)]

We test simple model fitting on the training data using sample hyperparameters:

In [154]:
from pyspark.ml.recommendation import ALS
# define ALS inital parameters
maxIterations=5
# define number of latent features
k=10
regParameter=0.01
# define model
als = ALS(maxIter=maxIterations, rank=k, regParam=regParameter, 
    userCol="user_id", itemCol="item_id", ratingCol="rating",
    coldStartStrategy="drop",nonnegative=True,implicitPrefs=False,seed=randomSeed)
# fit model
model = als.fit(X_train)

We evaluation our model out-of-sample using the testing data:

In [155]:
from pyspark.ml.evaluation import RegressionEvaluator
# evaluate model (compute RMSE on test data)
predictions = model.transform(X_test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
    predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error (RMSE) = " + str(rmse))

# generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# generate top 10 user recommendations for each item
itemRecs = model.recommendForAllItems(10)

Root-mean-square error (RMSE) = 0.9995291737343103


Without any parameter tuning, we achieve a RMSE of roughly 1.

We generate the top 10 movie recommendations for a specified set of users and for a specified set of movies as follows:

In [160]:
# generate top 10 movie recommendations for a specified set of users
users = s_df.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# generate top 10 user recommendations for a specified set of movies
items =s_df.select(als.getItemCol()).distinct().limit(3)
itemSubsetRecs = model.recommendForItemSubset(items, 10)

In [161]:
userSubsetRecs.head(10)

[Row(user_id=26, recommendations=[Row(item_id=1368, rating=5.025524616241455), Row(item_id=1463, rating=4.919097900390625), Row(item_id=1449, rating=4.811394214630127), Row(item_id=1397, rating=4.704085350036621), Row(item_id=1427, rating=4.645888328552246), Row(item_id=1452, rating=4.438278675079346), Row(item_id=1458, rating=4.438278675079346), Row(item_id=1155, rating=4.377913951873779), Row(item_id=534, rating=4.334097862243652), Row(item_id=113, rating=4.311575889587402)]),
 Row(user_id=474, recommendations=[Row(item_id=1463, rating=8.219408988952637), Row(item_id=1394, rating=5.69248104095459), Row(item_id=1449, rating=5.663028240203857), Row(item_id=626, rating=5.428760051727295), Row(item_id=1558, rating=5.241504669189453), Row(item_id=1639, rating=5.227160930633545), Row(item_id=1645, rating=5.179046154022217), Row(item_id=1650, rating=5.179046154022217), Row(item_id=1585, rating=5.179046154022217), Row(item_id=1405, rating=5.120999813079834)]),
 Row(user_id=29, recommendation

In [162]:
itemSubsetRecs.head(10)

[Row(item_id=26, recommendations=[Row(user_id=863, rating=6.21252965927124), Row(user_id=609, rating=6.171199798583984), Row(user_id=260, rating=6.1130757331848145), Row(user_id=68, rating=6.040411472320557), Row(user_id=697, rating=5.93201208114624), Row(user_id=88, rating=5.791384696960449), Row(user_id=507, rating=5.740054607391357), Row(user_id=355, rating=5.581533432006836), Row(user_id=153, rating=5.5377960205078125), Row(user_id=775, rating=5.489548683166504)]),
 Row(item_id=474, recommendations=[Row(user_id=863, rating=7.1820502281188965), Row(user_id=310, rating=6.90852689743042), Row(user_id=803, rating=6.594876766204834), Row(user_id=273, rating=6.282232761383057), Row(user_id=353, rating=6.219252109527588), Row(user_id=68, rating=6.145155906677246), Row(user_id=720, rating=6.073712348937988), Row(user_id=895, rating=6.005275249481201), Row(user_id=697, rating=5.972476482391357), Row(user_id=818, rating=5.950571537017822)]),
 Row(item_id=29, recommendations=[Row(user_id=153,

Now that we have learned the basic methods for fitting and applying our model to generate recommendationss, we move to hyperparameter tuning.

## Cross-Validation

In this section, we use the training data to evaluate a range of model hyperparameters. We iterate over a range of parameters and apply 5-fold cross-validation to select a model based on the lowest root mean squared error (RMSE).


In [165]:
# Build generic ALS model without hyperparameters
als = ALS(userCol="user_id", itemCol="item_id", ratingCol="rating",coldStartStrategy="drop", 
    nonnegative = True,implicitPrefs = False,seed=randomSeed)
# Tell Spark what values to try for each hyperparameter
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().addGrid(als.rank, 
    [5, 40, 80, 120]).addGrid(als.maxIter, 
    [5, 10]).addGrid(als.regParam, 
    [.01,.05, .1, 1.5]).build()

# Tell Spark how to evaluate model performance
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Build cross validation step using CrossValidator 
from pyspark.ml.tuning import CrossValidator
nFolds=5
cv = CrossValidator(estimator = als, estimatorParamMaps = param_grid,evaluator = evaluator,
    numFolds = nFolds,seed=randomSeed)

# Run cross validation on training data
model = cv.fit(X_train)
# Extract best combination of values from cross validation
best_model = model.bestModel


Running on our local workstation we were not able to run a higher number of iterations without our training process crashing despite having 18 cores and 128 gigs of RAM. All of the cores are used during the parameter search, but RAM utilization remained low throughout.

We generate the predicted ratings using the test set and compute the root mean squared error (RMSE) as follows:

In [166]:
# generate test set predictions
predictions = best_model.transform(X_test)
# evaluate using RMSE
rmseBest = evaluator.evaluate(predictions)
kBest=best_model.rank
maxNIterationsBest=best_model._java_obj.parent().getMaxIter()
regParameterBest=best_model._java_obj.parent().getRegParam()

In [167]:
print("Best RMSE: "+str(rmseBest))
print("Best # of Latent Factors: "+str(kBest))
print("Best Max # of Iterations: "+str(maxNIterationsBest))
print("Best Reg Parameter: "+str(regParameterBest))

Best RMSE: 0.9097099852148802
Best # of Latent Factors: 120
Best Max # of Iterations: 10
Best Reg Parameter: 0.1


Our best model - which uses 120 factors, a maximum of 10 iterations, and a regularization parameter of 0.1 - has an root mean squared error of 0.9097 on the test set.

We extract the latent factors - both user and item - as for our best model as follows:

In [169]:
# extract best model latent user factors
userFactors=best_model.userFactors.orderBy('id')
# extract best model latent item factors
itemFactors=best_model.itemFactors.orderBy('id')


In [123]:
userFactors.show()

+---+--------------------+
| id|            features|
+---+--------------------+
|  1|[0.009767955, 0.0...|
|  2|[0.06330102, 0.0,...|
|  3|[0.00480999, 0.0,...|
|  4|[0.037397504, 0.1...|
|  5|[0.0, 0.10382693,...|
|  6|[0.009057933, 4.9...|
|  7|[0.02530831, 0.0,...|
|  8|[0.028864225, 0.0...|
|  9|[0.0013958388, 0....|
| 10|[0.02542325, 0.04...|
| 11|[0.044405952, 0.1...|
| 12|[0.030377503, 0.0...|
| 13|[0.0, 0.02294527,...|
| 14|[0.0, 0.0, 0.0942...|
| 15|[0.0, 0.15974322,...|
| 16|[0.035361394, 0.0...|
| 17|[0.0, 0.027111717...|
| 18|[0.030113203, 0.0...|
| 19|[0.03618488, 0.08...|
| 20|[0.008329038, 0.0...|
+---+--------------------+
only showing top 20 rows



In [170]:
itemFactors.show()

+---+--------------------+
| id|            features|
+---+--------------------+
|  1|[0.0, 0.051413972...|
|  2|[0.05016716, 0.05...|
|  3|[0.04314227, 0.04...|
|  4|[0.0, 0.0, 0.4614...|
|  5|[0.020771416, 0.0...|
|  6|[0.11821658, 0.27...|
|  7|[0.055226717, 0.0...|
|  8|[0.059798315, 0.0...|
|  9|[0.032114543, 0.0...|
| 10|[0.019956274, 0.0...|
| 11|[0.040037796, 0.0...|
| 12|[0.030063575, 0.0...|
| 13|[0.08434692, 0.13...|
| 14|[0.08940532, 0.05...|
| 15|[0.020502938, 0.1...|
| 16|[0.020317668, 0.0...|
| 17|[0.04921029, 0.10...|
| 18|[0.13009888, 0.19...|
| 19|[0.0, 0.0, 0.0624...|
| 20|[0.0, 0.004746076...|
+---+--------------------+
only showing top 20 rows



These latent factors allow us to express the sparse high-dimensional user by item by ratings space as a dense lower dimensional space (i.e., by a user factors by item factors by ratings space). Using these factors we can complete the user by item by rating matrix. Ranking previously unrated items by rating for each user then allows us to provide recommendations.

In [171]:
# create user factor table
userFactors.createOrReplaceTempView("ALS_user_factors")
# create item factor table
itemFactors.createOrReplaceTempView("ALS_item_factors")


Immediately above, we create a temporary view of the data that can be used to facilitate data manipulation.

We find the top 10 movies for each user using our best model:

In [172]:
nTop=10
ALS_recommendations=best_model.recommendForAllUsers(nTop)

In [173]:
ALS_recommendations.show()

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|    471|[[102, 4.604454],...|
|    463|[[20, 4.2555323],...|
|    833|[[1187, 4.4711742...|
|    496|[[56, 4.1801705],...|
|    148|[[169, 4.9311213]...|
|    540|[[1449, 4.774058]...|
|    392|[[1449, 4.985334]...|
|    243|[[1449, 4.673945]...|
|    623|[[496, 4.440423],...|
|    737|[[179, 4.81333], ...|
|    897|[[313, 4.679883],...|
|    858|[[127, 4.279135],...|
|     31|[[1449, 4.8956256...|
|    516|[[408, 4.73162], ...|
|    580|[[50, 4.654353], ...|
|    251|[[50, 4.745293], ...|
|    451|[[313, 4.1940765]...|
|     85|[[1463, 4.38562],...|
|    137|[[96, 5.17765], [...|
|    808|[[316, 5.304805],...|
+-------+--------------------+
only showing top 20 rows



We create view of our recommendations:

In [174]:
ALS_recommendations.createOrReplaceTempView("ALS_recommendations")

In the next several cells, we rearrange the data into a 'long' format (i.e., user ID, item ID, rating prediction):

In [175]:
exploded_recommendations = spark.sql("SELECT user_id,explode(recommendations) AS recommendation FROM ALS_recommendations")

In [176]:
exploded_recommendations.show()

+-------+-----------------+
|user_id|   recommendation|
+-------+-----------------+
|    471|  [102, 4.604454]|
|    471|  [342, 4.549936]|
|    471| [140, 4.4436836]|
|    471| [477, 4.3312244]|
|    471|   [82, 4.323793]|
|    471|  [378, 4.317134]|
|    471| [422, 4.3069406]|
|    471| [465, 4.2862034]|
|    471|    [8, 4.259362]|
|    471| [1169, 4.253462]|
|    463|  [20, 4.2555323]|
|    463| [1449, 4.213945]|
|    463|[1167, 4.0801134]|
|    463| [302, 4.0746717]|
|    463|  [408, 4.050413]|
|    463|   [116, 4.01069]|
|    463|  [887, 3.991781]|
|    463| [253, 3.9703588]|
|    463|[1137, 3.9649327]|
|    463| [361, 3.9490182]|
+-------+-----------------+
only showing top 20 rows



In [177]:
topN_clean_recommendations = spark.sql("SELECT user_id,item_ids_and_ratings.item_id AS item_id,item_ids_and_ratings.rating AS prediction FROM ALS_recommendations LATERAL VIEW explode(recommendations) exploded_table AS item_ids_and_ratings")

In [178]:
topN_clean_recommendations.show()

+-------+-------+----------+
|user_id|item_id|prediction|
+-------+-------+----------+
|    471|    102|  4.604454|
|    471|    342|  4.549936|
|    471|    140| 4.4436836|
|    471|    477| 4.3312244|
|    471|     82|  4.323793|
|    471|    378|  4.317134|
|    471|    422| 4.3069406|
|    471|    465| 4.2862034|
|    471|      8|  4.259362|
|    471|   1169|  4.253462|
|    463|     20| 4.2555323|
|    463|   1449|  4.213945|
|    463|   1167| 4.0801134|
|    463|    302| 4.0746717|
|    463|    408|  4.050413|
|    463|    116|   4.01069|
|    463|    887|  3.991781|
|    463|    253| 3.9703588|
|    463|   1137| 3.9649327|
|    463|    361| 3.9490182|
+-------+-------+----------+
only showing top 20 rows



In [179]:
topN_clean_recommendations.shape()

(9430, 3)

Now we add the title to the recommendations:

In [180]:
#
item_info=sqlContext.sql('select distinct item_id,title from movielens order by item_id,title')
#
ratings=sqlContext.sql('select * from movielens order by item_id,title,user_id')
#
topN_recommendations=topN_clean_recommendations.join(item_info, ["item_id"], "left")
#
topN_recommendations.show(100)


+-------+-------+----------+--------------------+
|item_id|user_id|prediction|               title|
+-------+-------+----------+--------------------+
|    474|    737| 4.5123734|Dr. Strangelove o...|
|    474|     31|  4.637323|Dr. Strangelove o...|
|    474|     85| 4.1466703|Dr. Strangelove o...|
|    474|    321| 4.0486994|Dr. Strangelove o...|
|    474|     76| 4.5824924|Dr. Strangelove o...|
|    474|    916| 4.2088294|Dr. Strangelove o...|
|    474|    409| 4.3826313|Dr. Strangelove o...|
|    474|    822| 3.8614087|Dr. Strangelove o...|
|    474|    601| 4.0764046|Dr. Strangelove o...|
|    474|    875| 4.7661343|Dr. Strangelove o...|
|    474|    686| 5.0842505|Dr. Strangelove o...|
|    474|    232|  4.526551|Dr. Strangelove o...|
|    474|    360|  4.588858|Dr. Strangelove o...|
|    474|    855| 3.7666855|Dr. Strangelove o...|
|    474|    305|  4.122358|Dr. Strangelove o...|
|    474|    747| 4.8431907|Dr. Strangelove o...|
|    474|    325| 4.6233015|Dr. Strangelove o...|


We add the actual rating:

In [181]:
topN_clean_recommendations.join(ratings, ["user_id", "item_id"], "left").show()


+-------+-------+----------+--------------------+------+----------+
|user_id|item_id|prediction|               title|rating|titmestamp|
+-------+-------+----------+--------------------+------+----------+
|      9|    172|   4.76347|                null|  null|      null|
|     15|    754| 3.8676977|   Red Corner (1997)|     5| 879455080|
|     25|     12|  4.458394|                null|  null|      null|
|     51|    172| 4.0368223|Empire Strikes Ba...|     5| 883498936|
|     56|     96| 4.5499263|Terminator 2: Jud...|     5| 892676429|
|     61|    169|  3.659671|                null|  null|      null|
|     79|   1159| 4.8410177|                null|  null|      null|
|     82|   1463| 3.9947956|                null|  null|      null|
|     96|    169| 4.7422895|                null|  null|      null|
|     99|    318| 4.3608117|                null|  null|      null|
|    114|    100| 3.9224412|        Fargo (1996)|     5| 881259927|
|    116|    127| 3.8134534|Godfather, The (1...

Finally, we filter all of the originally unrated items for which we have generated predicted ratings using our best model:

In [182]:
#
unrated_recommendations=clean_recommendations.join(ratings, ["user_id", "item_id"], 
    "left").filter(ratings.rating.isNull())
#
unrated_recommendations.show()


+-------+-------+----------+-----+------+----------+
|user_id|item_id|prediction|title|rating|titmestamp|
+-------+-------+----------+-----+------+----------+
|      7|    963| 4.7649426| null|  null|      null|
|     23|    302| 4.3101945| null|  null|      null|
|     25|     12|  4.448552| null|  null|      null|
|     79|   1159|  4.856906| null|  null|      null|
|     81|     56| 4.1670837| null|  null|      null|
|     82|   1463| 3.9235337| null|  null|      null|
|     96|    169| 4.7215858| null|  null|      null|
|    123|   1449| 4.8516417| null|  null|      null|
|    170|   1192| 4.4448237| null|  null|      null|
|    199|    137| 4.4266148| null|  null|      null|
|    207|    496|  3.945725| null|  null|      null|
|    216|    113|  4.562116| null|  null|      null|
|    238|   1169|   4.04074| null|  null|      null|
|    259|    169| 4.6149435| null|  null|      null|
|    310|    408|   4.97844| null|  null|      null|
|    341|    603| 4.3532605| null|  null|     

In [101]:
test_recommendations.shape()

(5440, 6)

We retreive sample recommendations for user 5

In [191]:
topN_clean_recommendations_user_5 = spark.sql("SELECT user_id,item_ids_and_ratings.item_id AS item_id,item_ids_and_ratings.rating AS prediction FROM ALS_recommendations LATERAL VIEW explode(recommendations) exploded_table AS item_ids_and_ratings WHERE user_id=5")

In [197]:
topN_clean_recommendations_user_5.join(item_info, ["item_id"], "left").show()

+-------+-------+----------+--------------------+
|item_id|user_id|prediction|               title|
+-------+-------+----------+--------------------+
|     50|      5| 4.2129393|    Star Wars (1977)|
|    114|      5| 4.3055744|Wallace & Gromit:...|
|     89|      5| 4.1861587| Blade Runner (1982)|
|   1449|      5|  4.152075|Pather Panchali (...|
|    390|      5| 4.3298526|Fear of a Black H...|
|    175|      5| 4.1830025|       Brazil (1985)|
|    408|      5|  4.525527|Close Shave, A (1...|
|   1500|      5| 4.1414766|Santa with Muscle...|
|    189|      5|  4.179242|Grand Day Out, A ...|
|    169|      5| 4.5346627|Wrong Trousers, T...|
+-------+-------+----------+--------------------+



Notice that a couple of the recommendations are the same for both the SGD and ALS models, namely 'A Close Shave' and 'Star Wars'.


## Model Comparison

In this section, we provide a quick summary comparison of the performance of our original SVD-like recommender system (created for Project 3), with our more scalable SVD-like matrix factorization implemented using PySpark. Although the RMSE for the ALS model was 0.9097 versus 0.9486 for the SGD model. Although the random seed was the same for the cross-validation, the parameter grid used to search for 'best' parameters was necessarily different. We were not able to run as many iterations for the PySpark implementation.

The PySpark implementation was able to utilize all 18 CPU cores on our workstation and was much faster than the 'surprise' implementation.


Although we were able to reduce the scope of the PySpark implementation by using a docker container where both the Python and Spark setup were addressed almost entirely for us, setting up a real cluster in order to scale out our recommender system is complex. This complexity is only warranted once the size of the dataset moves beyond the amount of memory that can cost-effectively be put in a single server.

## Conclusion

Although the predictive performance of both recommender systems was reasonably good, as our dataset size expands beyond what we can fit into memory on a single server, it becomes necessary to modify our approach to use a distributed computing environment. Our PySpark implementation allows us to scale beyond the memory size of a single server.


## References

Golub, G. H. and Van Loan, C. F. Matrix Computations, 3rd ed. Baltimore, MD: Johns Hopkins, 1996. 