In a previous post, I explored the [MovieLens dataset](https://grouplens.org/datasets/movielens/), a repository of over 26,000,000 movie ratings given to 45,000 movies by 270,000 users, and built a Collaborative-Filtering recommendation system that was able to predict ratings with an average absolute error of 0.66. 

I recommend that you read [the initial post](https://hexhamallstar.github.io/movie-rec.html) for a detailed exploration of the dataset and the details of implementing a Collaborative-Filtering recommendation system with NumPy.

In this notebook, I will use PySpark's Alternating Least Squares (ALS) algorithm for model-based recommendations. 

## This notebook contains the following content:
* [1. Model-based recommendations](#first-bullet)
* [2. Working with PySpark](#second-bullet)
* [3. Parameter search](#third-bullet)
* [4. Conclusions](#fourth-bullet)

## 1. Model-based recommendations <a class="anchor" id="first-bullet"></a>
When building the Collaborative-Filtering model in my previous post, I ran into a number of limitations/issues:

**Dataset size**
* With 45,000 movies and 270,000 users, the non-sparse user-user similarity matrix would have required over 290GB of RAM to hold in memory.
* Because of this, I had to use a much smaller sample of the data to build and test my model.

**Similarity metric**
* Cosine similarity was used, which calculates the angle between vectors. Missing ratings were treated as zero's in order to compute the similarity but this implies that the user did not like that movie (on our scale).
* If 2 users have rated the exact same subset of the movies, and their ratings are multiples of one another (i.e. user1 = (1, 1, 0, 0, 0, 0, 0), user2 = (5, 5, 0, 0, 0, 0, 0) then the angle between the vectors is zero. These users will be deemed to be exactly the same despite them clearly not having the same preferences.

For these reasons, I decided to try a model-based system. In this type of system, the user-item rating matrix is decomposed into a user-feature matrix and an item-feature matrix:

<img src="https://4.bp.blogspot.com/-95QD5t9Lha4/Wd7uWnBZBeI/AAAAAAAADg4/xB4VnnxM0UgUp15lNmB3aHCXYGejpm4OACLcBGAs/s1600/matrix_factorization.png">

In this case, d would be the number of features that we 'learn' for each user and movie. We can use an optimization algorithm to learn the features that minimize the error in the matrix calculation shown above. The estimated user-item rating matrix is:

\begin{equation*}
\hat{R} = U \times V
\end{equation*}

And the residual sum of squared (RSS) errors is:
\begin{equation*}
RSS = \sum (R - U \times V)^{2}
\end{equation*}

And we therefore want to find the values of U and V that minimize the RSS. This looks very similar to the optimization of RSS for linear regression, however neither U or V are fixed making this a more complex problem (in linear regression we use the known input features and 'learn' the weights required for each one). 

The way this is implemented in Spark is to:
* Fix the values of U, perform one step of gradient descent updating the values in V.
* Fix the values of V, perform one step of gradient descent updating the values in U.
* Repeat for a set number of iterations or until a convergence criteria is met.

By alternately fixing one of the feature matrices this becomes a convex problem that is easily solved (and also easy to parallelize). This method is called Alternating Least Squares (ALS).

## 2. Working with PySpark <a class="anchor" id="second-bullet"></a>
PySpark is a convenient Python library that interfaces with Spark. For large datasets, a Spark-based system has advantages because:
* Data imported into Spark RDD's/Dataframes is partitioned and can be easily worked upon in parallel.
* Spark operations are lazily evaluated - defining operations creates a set of instructions that are only executed when some result is requested. Spark figures out how to distribute the computations to different cores and (often) does not need to materialize intermediate results which saves on time and memory.

In [1]:
# import the SparkConfiguration and SparkContext
from pyspark import SparkConf, SparkContext
# if we wanted to change any configuration settings for this session only we would define them here
conf = SparkConf() 
# create a SparkContext using the above configuration
sc = SparkContext(conf=conf)

In [2]:
# this command shows the current configuration settings
sc._conf.getAll()

[('spark.driver.port', '51292'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.app.id', 'local-1525363027861'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '192.168.1.7'),
 ('spark.driver.memory', '14g'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.ui.showConsoleProgress', 'true')]

The important settings here are:
* spark.rdd.compress - By default, compresses serialized RDD data. Takes some computation time but can give massive savings in memory used.
* spark.master - Defines how many worker nodes we have. In this case 'local\[*\]' means we are running on a local machine and it will use the maximum number of threads available. On my CPU this is 8.
* spark.driver.memory - Because we are performing matrix operations repeatedly with the rdd, we need a large amount of memory for the driver. I have assigned a maximum of 14GB out of my laptops 16GB total.

In [3]:
# import an SQL spark-session so that we can use dataframes
from pyspark.sql import SparkSession
# import the ALS algorithm we will be using
from pyspark.ml.recommendation import ALS
# instantiate the SQL spark session
spark = SparkSession.builder.getOrCreate()

In [4]:
# read in the data, specifying that there is a header in the csv file 
# and that spark should auto-detect the variable types for each column
data = spark.read.option("header","true").option("inferSchema","true").format("csv")\
       .load("Movie Recommender/ratings.csv")

In [5]:
# print the schema of the dataframe
data.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



As in the previous post, the column names are userId, movieId, rating and timestamp. We will not be needing the timestamp column so we can safely drop it:

In [6]:
data = data.drop('timestamp')
data.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



We can get spark to show us how many partitions it has split the dataframe up into. We can see below that there are 8 partitions (because my laptop's processor has 8 threads).

In [7]:
data.rdd.getNumPartitions()

8

We can also show the top N rows by using the take method:

In [8]:
data.take(1)

[Row(userId=1, movieId=110, rating=1.0)]

Now we can fit an ALS model. At first we will fit a model using the default parameters, specifying the required column names and the strategy for cold starts. Cold starts occur when we attempt to predict a rating for a user-movie pair but there were no ratings for this user/movie in the training set. There are 2 options for dealing with this:
* "NaN" - return an empty variable. In production systems this may be useful because it would identify new users/items to the system and allow it to fall back onto another recommendation system such as a content-based system. In development however, this result prevents us from calculating a performance metric to evaluate the system.
* "drop" - this option simply removes the row/column from the predictions and from the test set. Our result will therefore only contain valid numbers that can be used for evaluation.

In [9]:
#instantiate the model, with the "drop" cold start strategy
model = ALS(coldStartStrategy="drop")

In [10]:
# set the column names for the required data
model.setItemCol("movieId")\
    .setUserCol("userId")\
    .setRatingCol("rating")

ALS_4c41b64f738125c14df6

Now that we have a model, we need to split our data such that we can evaluate the system on a set of data points that were not used to train the model. As in the previous post, I will perform a random split with 90% training data and 10% test data held out for evaluation. Dataframes in spark have a handy randomSplit method for doing just that:

In [11]:
# split data into train and test sets with 90:10 proportions
(train, test) = data.randomSplit([0.9, 0.1], seed=10)
# since the train dataframe will be used many times, forcing spark to cache it could 
# reduce time taken, as we don't have to read from disk as much
train.cache()

DataFrame[userId: int, movieId: int, rating: double]

In [12]:
# fit the model to the training set
model = model.fit(train)

In [13]:
# calculate predictions by using the model to transform the test set
predictions = model.transform(test)

In [14]:
predictions.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- prediction: float (nullable = false)



We now have a dataframe that contains all of the data points in the test set (that weren't dropped due to cold start) with an extra column that contains the prediction. To evaluate the predictions, we compare them to the rating column.

As in the previous post, we will use Mean Absolute Error as our measure of performance:

In [15]:
# import the regression evaluator
from pyspark.ml.evaluation import RegressionEvaluator

In [16]:
# instantiate evaluator, specifying the desired metric "mae" and the columns
# that contain the predictions and the actual values
evaluator = RegressionEvaluator(metricName="mae", predictionCol="prediction", labelCol="rating")

In [17]:
# evaluate the output of our model
mae = evaluator.evaluate(predictions)

In [18]:
print('The Mean Absolute Error is %.3f' % (mae))

The Mean Absolute Error is 0.635


We can see that we have achieved an MAE of 0.635, which is lower than the MAE achieved by the collaborative-filtering method which achieved an MAE of 0.66. This was just using the default parameters, now we can try to improve the model by searching over a grid.

## 3. Parameter Optimization <a class="anchor" id="third-bullet"></a>

The parameters we will search over are:
* Rank - The number of hidden features that we will use to describe the users/movies. This is equal to the value d in the diagram in section 1.
* RegParam - The regularization parameter applied to the cost function. In actual fact the cost function is not the simple RSS function shown in section 1, but includes a regularizing term:
\begin{equation*}
Cost(U) = \sum (R - U \times V)^{2} + \lambda \Vert U \Vert ^{2}_{2}
\end{equation*}

\begin{equation*}
Cost(V) = \sum (R - U \times V)^{2} + \lambda \Vert V \Vert ^{2}_{2}
\end{equation*}

If we have no regularization i.e. lambda is 0, we get the RSS result. If lambda is large, the values of the matrices U and V are forced to be small. This has the effect of preventing overfitting to the training set, which can improve performance if we were previously overfitting.

To decide which combination of the above parameters we should use, we will section off a validation set from the training set. We will fit the model with each combination of parameters to the training set and choose the combination that has the lowest error on the validation set. This will then be used to transform the test set for final evaluation.

If we were to use the test set instead of the validation set in this situation, information from the test set would be 'leaking' into our model and the model could potentially overfit to it and not generalize well to future data.

In [19]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

In [20]:
#create a new ALS estimator
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
#define a grid for both parameters
#this will test 9 different combinations of the 2 parameters
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [5, 10, 15]) \
    .addGrid(als.regParam, [1, 0.1, 0.01]) \
    .build()

In [21]:
# split the data with a ratio of 90% training, 10% validation
# define the estimator and evaluator to use to determine the best model
# also pass in the parameter grid to search over
trainValSplit = TrainValidationSplit(estimator = als, estimatorParamMaps=paramGrid, 
                                    evaluator = RegressionEvaluator(metricName="mae", predictionCol="prediction", labelCol="rating"), 
                                     trainRatio = 0.9, parallelism = 4)

In [22]:
# fit the model to the training data
model = trainValSplit.fit(train)

In [23]:
# retrieve the best model
bestModel = model.bestModel

Unfortunately there is currently no way in spark to see which combination of hyperparameters were used in the best model. We now use the best model to transform the test data and compute predictions & evaluate.

In [24]:
# transform test data using bestModel
predictions = bestModel.transform(test)
# evaluate the predictions
mae = evaluator.evaluate(predictions)
print('The Mean Absolute Error is %.3f' % (mae))

The Mean Absolute Error is 0.628


This is a slight improvement over our previous MAE of 0.635! Further tuning could be carried out, as currently only a small range of values has been tested.

## 4. Conclusions <a class="anchor" id="fourth-bullet"></a>
To summarize:
* We have built a recommender system that uses over 26 million data points to predict movie ratings for users, achieving an MAE of 0.628.
* The approach used is highly scalable, and can be used with computational clusters using HDFS for much larger data files.
* By using a parallel architecture we can make better use of hardware instead of using a pythonic serial calculation approach. This reduces runtimes for larger calculations.
* We have improved upon our previous recommender system that used collaborative-filtering (the previous model achieved an MAE of 0.66). This could be a combination of having more training data available and the Matrix Factorization approach used.

Thanks for reading!