## Implementing a Recommender System on Spark
### Harris Dupre
### Data 612, Summer 2020

#### Introduction

In this project we will implement a recommender system in the distributed computing system "Databricks."

We will convert the matrix factorialization recommender system written in Python here: https://raw.githubusercontent.com/hdupre/rec_sys/master/Project4/Project4.ipynb

The original project used the SVD algorithm, but because PySpark's readily available matrix factorialization algorithm is ALS, this project will use that instead.

#### Data

We are using the movies and ratings data from the MovieLens dataset, specifically found in the ml-latest-small.zip which contains 100,000 ratings and 3,600 tag applications applied to 9,000 movies by 600 users.

This small dataset was appropriate for use on a locally-run recommender system where prohibitively large sizes might make processing time excessive. In theory, a distributed computing cluster should be able to run these processes much more efficiently

#### Loading the Data

The data files were dropped into Databricks from the local environment, but can also be found here:

Ratings CSV: https://raw.githubusercontent.com/hdupre/rec_sys/master/Project3/ratings.csv

Movie Titles CSV: https://raw.githubusercontent.com/hdupre/rec_sys/master/Project3/movies.csv

In [2]:
# File location and type
file_location = "/FileStore/tables/ratings-1.csv"
file_type = "csv"

# CSV options
# we will allow Spark to infer the datatypes
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
movie_ratings = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(movie_ratings)

userId,movieId,rating,timestamp
1,1,4.0,964982703
1,3,4.0,964981247
1,6,4.0,964982224
1,47,5.0,964983815
1,50,5.0,964982931
1,70,3.0,964982400
1,101,5.0,964980868
1,110,4.0,964982176
1,151,5.0,964984041
1,157,5.0,964984100


#### PySpark ALS Algorithm to Create a Prediction Matrix

In [4]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml. recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
import timeit

# split the set into training and test sets 80/20
(training, test) = movie_ratings.randomSplit([0.8,0.2])

# call the ALS algorithm
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", nonnegative=True)

# build the parameter grid and evaluator functions
param_grid = ParamGridBuilder().addGrid(als.rank,[10]).addGrid(als.maxIter,[10]).addGrid(als.regParam,[0.1]).build()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
tvs = TrainValidationSplit(estimator=als, estimatorParamMaps=param_grid,evaluator=evaluator)

We can now use the training validation split to fit the training set to the model.

In [6]:
import mlflow

tic = timeit.default_timer()
model = tvs.fit(training)
toc = timeit.default_timer()
tic-toc

The time to fit the model was 106 seconds.

In [8]:
# select the best model for the predictions matrix
best_model = model.bestModel
# create the predictions matrix
predictions = best_model.transform(test)
# calculate RMSE
rmse = evaluator.evaluate(predictions)

print("RMSE = " + str(rmse))

RMSE was decent at 0.87

#### Original recommendation system using Python Surprise library

In [11]:
import pandas as pd
import numpy as np
import surprise
from surprise import Reader, Dataset, SVD, accuracy
from surprise.model_selection import train_test_split

Converting the Spark dataframe into a Pandas dataframe

In [13]:
pandas_ratings_df = movie_ratings.select("*").toPandas()

In [14]:
pandas_ratings_df.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224
3,1,47,5.0,964983815
4,1,50,5.0,964982931


In [15]:
# setup the Surprise SVD variables, load the Pandas df to the Surprise format
reader = Reader(rating_scale = (1,5))
data = Dataset.load_from_df(pandas_ratings_df[['userId', 'movieId', 'rating']], reader)

# split into train and test set, 80/20 ratio
trainset, testset = train_test_split(data, test_size=0.2)

svd = SVD()

# fit the training set to the SVD model
tic = timeit.default_timer()
svd.fit(trainset)
toc = timeit.default_timer()
tic-toc

Training the model took only 5 seconds.

In [17]:
# creating the predictions matrix
predictions_svd = svd.test(testset)
# calculating RMSE
accuracy.rmse(predictions_svd)

#### Conclusions

The PySpark and the original system had nearly identical RMSEs. This required some tuning for the PySpark model. Initially, I had higher ranks and higher max iterations -- but processing time was prohibitively long. In fact, I never finished training that model as I cancelled after over 40 minutes. I doubt that this extra processing time would have resulted in a significantly better RMSE.

At that point, I experimented with fitting the model with a rank and max iterations of 1. This allowed the process to complete in about 2 minutes, but the RMSE was approximately 3.4 -- not very usable.

By using the default param_grid values, save for a value of 0.1 for the regParam (default value is 1), I managed to achieve a sub-two minute runtime with a decent RMSE.

Prediction time was also faster on the original system.

A distributed platform would likely become necessary only with a much greater volume of data, more algorithms (as the system would be hybridized), and many concurrent prediction requests. In this scenario, the Spark system would load the bigger volume of data and multiple models into memory (likely with faster processing available in the cloud) and would be able to rapidly generate predictions as necessary.