# Movie Recommendation Engine Tutorial

This is a Notebook adaptation of a Movielens-based example on [Codementor](https://www.codementor.io/@jadianes/building-a-recommender-with-apache-spark-python-example-app-part1-du1083qbw) and the [Spark Documentation](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html) on collaborative filtering.

Before running this notebook, make sure that you have run `make download` in the repo root to download movie recommendation data files.

## Setup

Configure Pandas (used for display/debug purposes)

In [1]:
# define display options
import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 200)

Work out the paths to the files and directories that are relevant to the model build

In [2]:
import os
small_ratings_file = os.path.join('/data', 'ml-latest-small', 'ratings.csv')
complete_ratings_file = os.path.join('/data', 'ml-latest', 'ratings.csv')
complete_movies_file = os.path.join('/data', 'ml-latest', 'movies.csv')

model_path = os.path.join('/model', 'movie_lens_als')

Set up the Spark session

In [3]:
# instantiate the spark instance
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.driver.memory", "16G") \
    .config("spark.driver.maxResultSize", "10G") \
    .getOrCreate()    

## Build & Train on Small Dataset

This section loads a small dataset in order to train the model, i.e. tweak parameters to yield the best ALS result. The result of this step will be used in the next stage, using a larger dataset, to retrain the model for release.

In [4]:
raw_data = spark.read.format("csv") \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .load(small_ratings_file)
raw_data.limit(5).toPandas()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224
3,1,47,5.0,964983815
4,1,50,5.0,964982931


Split the dataset into two subsets: training, test. These will be used in training the model below then testing against the test set to verify that no 'overfitting' has happened.

In [5]:
# random split [75%, 25%] the enriched dataset into two datasets [train_df, test_df]
# it is important that the test_df is reserved to test the model against unseen data to test if the model is generalised
splits = raw_data.randomSplit([3.0, 1.0], 42)
train_df = splits[0]
test_df = splits[1]
train_df.cache()
test_df.cache()

print(f'train_df records: {train_df.count()}\ntest_df records: {test_df.count()}')

train_df records: 75681
test_df records: 25155


Train the ALS recommendation model by testing out three different ranks.

For each rank train an ALS model to determine which is most accurate. The best rank will be carried into building the model to use in production.

In [6]:
from pyspark.ml import *
from pyspark.ml.recommendation import *
from pyspark.ml.tuning import *
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
    
# define the model type to train - in this case an Alternating Least Squares (ALS) mode to predict a continuous variable 'prediction'
# https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.recommendation
# see https://spark.apache.org/docs/latest/ml-collaborative-filtering.html for implementation details
# pay attention to cold-start-strategy which details strategy for how to deal with customers which do not have enough responses to determine their similarity to other customers
als = ALS(
  rank=10, 
  maxIter=10, 
  regParam=0.1, 
  numUserBlocks=10, 
  numItemBlocks=10, 
  implicitPrefs=False, 
  alpha=1.0, 
  userCol='userId', 
  itemCol='movieId', 
  seed=42, 
  ratingCol='rating', 
  nonnegative=False, 
  checkpointInterval=10, 
  intermediateStorageLevel='MEMORY_AND_DISK', 
  finalStorageLevel='MEMORY_AND_DISK', 
  coldStartStrategy='drop')    

# define a sequence of stages
# in this case we have only one stage but normally some feature engineering would happen before the model (https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.feature)
pipeline = Pipeline(stages=[
    als \
    ])

# create a matrix of parameters to try whilst training
paramGrid = ParamGridBuilder() \
    .addGrid(param=als.rank, values=[4, 8, 12]) \
    .build()

# define the evaluation
# this is testing prediction vs total_amount difference using the root mean square error metric
regressionEvaluator = RegressionEvaluator(
    predictionCol='prediction', 
    labelCol='rating', 
    metricName='rmse')

# set up the model for training
crossValidator = CrossValidator(
    estimator = pipeline,
    estimatorParamMaps = paramGrid,
    evaluator = regressionEvaluator,
    numFolds = 3,
    collectSubModels=True)

In [7]:
# run the training on the train_df
crossValidatorModel = crossValidator.fit(train_df)

In [8]:
# to demonstrate how the training works loop through all the sub models (these are available due to collectSubModels=True) and print parameters and error rate (rmse) against the full training set. 
# running against the full training set is not entirely accurate as the CrossValidator.numFolds will seek for a more generalised model than may be exposed when evaluating against the full training set.
# you can see that this is a brute force parameter search. This means the more .addGrid() parameters and CrossValidator.numFolds tested will result in longer training time but potentially a better model
for fold, foldModel in enumerate(crossValidatorModel.subModels, start=1):
    for grid, gridModel in enumerate(foldModel, start=1):
        prediction = gridModel.transform(train_df)
        rmse = regressionEvaluator.evaluate(prediction)
        rank = gridModel.stages[-1].rank
        print(f'{{"fold": {fold}, "grid": {grid}, "rank": {rank}, "rmse": {rmse}}}')

{"fold": 1, "grid": 1, "rank": 4, "rmse": 0.7262748350505489}
{"fold": 1, "grid": 2, "rank": 8, "rmse": 0.6774952928110416}
{"fold": 1, "grid": 3, "rank": 12, "rmse": 0.6548876426568273}
{"fold": 2, "grid": 1, "rank": 4, "rmse": 0.7269762973982319}
{"fold": 2, "grid": 2, "rank": 8, "rmse": 0.6821879249409497}
{"fold": 2, "grid": 3, "rank": 12, "rmse": 0.6596626134414807}
{"fold": 3, "grid": 1, "rank": 4, "rmse": 0.7296546594659135}
{"fold": 3, "grid": 2, "rank": 8, "rmse": 0.6859068076829552}
{"fold": 3, "grid": 3, "rank": 12, "rmse": 0.6585555992240535}


In [9]:
# test the best trained model against the reserved test set to verify no overfitting
bestModel = crossValidatorModel.bestModel
prediction = bestModel.transform(test_df)
rmse = regressionEvaluator.evaluate(prediction)
rank = bestModel.stages[-1].rank
print(f'{{"rank": {rank}, "rmse": {rmse}}}')

{"rank": 4, "rmse": 0.8862707366654035}


## Build & Train on Large Dataset

If the model works with the small data change the data load above and rerun from there.

`
raw_data = spark.read.format("csv") \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .load(small_ratings_file)
raw_data.limit(5).toPandas()
`

to:

`
raw_data = spark.read.format("csv") \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .load(complete_ratings_file)
raw_data.limit(5).toPandas()
`

In [10]:
## Export the model for production
If you are happy with the rmse listed above export the model ready for use

SyntaxError: invalid syntax (<ipython-input-10-5d0b29b72933>, line 2)

In [None]:
bestModel.write() \
  .overwrite() \
  .save(model_path)