# Recommendation System with Spark
By Gabriel Garcez Barros Sousa

Dataset acquired from [GroupLens](http://grouplens.org/datasets/movielens/)

<hl>

## Introduction

Recommendation systems are a hot topic nowadays and are pretty much ubiquitous, being seen in online stores, movie databases and even job finders. In this Notebook, we will study the use of Spark's Machine Learning Library mllib to build a Collaborative Filtering based recommendation system using the [Alternating Least Squares](http://cs229.stanford.edu/proj2014/Christopher%20Aberger,%20Recommender.pdf) model.

## 0. Acquiring the Data

To acquire and extract the data, simply run the following Bash scripts:

In [None]:
!wget -O moviedataset.zip http://files.grouplens.org/datasets/movielens/ml-1m.zip

In [None]:
!unzip -o moviedataset.zip -d /resources/data

## 1. Loading in the data

First and foremost, let's get all of the imports out of the way.

In [1]:
#Dataframe manipulation library and machine learning model
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
#Math functions. We'll only need the sqrt function so let's import only that
from math import sqrt

Now let's read the file and extract the information that we need.

For the movies file, let's only extract the movieId and its title since we won't be using genres for this recommendation system.

In [2]:
#Load movies.dat in with Spark
movies_raw = sc.textFile('/resources/data/ml-1m/movies.dat')
#Take the first line from the raw to get the header names
movies_header = movies_raw.take(1)[0]
#Now clean up the data by first filtering out the header.
#Then split every line by '::'.
#And finally, extract only the data we need, which are the movieId and title, 
#so only get the first two elements of what we get after splitting
#And also convert the movieId to an int
movies_data = movies_raw.filter(lambda line: line!=movies_header)\
    .map(lambda line: line.split("::")).map(lambda tokens: (int(tokens[0]), tokens[1])).cache()

This is how the data structure looks like!

In [3]:
movies_data.take(5)

[(2, u'Jumanji (1995)'),
 (3, u'Grumpier Old Men (1995)'),
 (4, u'Waiting to Exhale (1995)'),
 (5, u'Father of the Bride Part II (1995)'),
 (6, u'Heat (1995)')]

Next let's load in the ratings files and extract the userId, movieId and the rating the user gave to that movie.

In [4]:
#Load in the ratings file
ratings_raw = sc.textFile('/resources/data/ml-1m/ratings.dat')
#Take the first line from the raw to get the header names
ratings_header = ratings_raw.take(1)[0]
#Now clean up the data by first filtering out the header.
#Then split every line by '::'.
#And finally, extract only the data we need, which are the userId, movieId and rating 
#so only get the first three elements of what we get after splitting
ratings_data = ratings_raw.filter(lambda line: line!=ratings_header)\
    .map(lambda line: line.split("::")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()

And this is the final ratings data structure:

In [5]:
ratings_data.take(5)

[(1, 661, 3.0), (1, 914, 3.0), (1, 3408, 4.0), (1, 2355, 5.0), (1, 1197, 3.0)]

## 2. Tuning the Recommendation System

Now, let's start training our recommendation model. But before we actually train the final model, we have to find out what parameters are optimal for it by training the model with varying parameters and selecting the best configuration.

Let's do this by first getting a random sample with a tenth of the whole dataset and then splitting that sample into training and validation data.

The training data will train the model and the validation data will be used to test the model and measure the error. The configuration that presents the smallest error is the one that possesses the best configuration. The function we will be using to measure the error known as the Root Mean Square Error (RMSE) function which is seen below:

RMSE = $\sqrt{\frac{1}{n}\sum_{t=1}^{n}e_t^2}$
Where e is the error for that case.

In [6]:
#Now let's split our data into training and validation data for machine learning
randomSample = sc.parallelize(ratings_data.takeSample(False,ratings_data.count()/10))
training, validation = randomSample.randomSplit([0.8, 0.2])
#We only need the first two elements for the validation prediction, so let's extract it
validationPredict = validation.map(lambda x: (x[0], x[1]))

Let's now set the hyper parameters for training the model.

Spark's Alternating Least Squares function takes in these parameters:

* Iterations: Number of iterations to run the model.
* Rank: Number of latent factors in the model.
* Lambda: The regularization parameter used in ALS.

In [7]:
#Parameter definition for Alternating Least Squares
#You can play around with these values to find the best configuration
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]

#Variables for error tracking
min_error = float('inf')
best_rank = -1
best_iteration = -1

Now let's train the model using different parameters and find out which configuration is best one based on error.

* model stores the trained model.
* predictions stores the predictions given by the model for a given userID and movieID.
* ratings_and_preds stores the the predictions and the actual rating given by the user.
* error stores the error value given after calculating the RMSE.

In [8]:
errorList = []
#Now let's figure out how many latent variables we should use by training it several times and changing the rank value each time
for rank in ranks:
    #Train the machine and get the model
    model = ALS.train(training, rank, iterations=iterations, lambda_=regularization_parameter)
    #Now let's use the validation data to have the machine predict ratings
    #r[0] is the userId, r[1] is the movieId and r[2] is the predicted value
    predictions = model.predictAll(validationPredict).map(lambda r: ((r[0], r[1]), r[2]))
    #Now let's get the actual results of the validation data so we can compute the error
    ratings_and_preds = validation.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    #We're computing the error with the Root Mean Squares Error fnction. (RMSE)
    error = sqrt(ratings_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    #Now, we check to see if the current error was the lowest one
    #if so, store it and the current configuration that we used
    errorList.append(error)
    if error < min_error:
        min_error = error
        best_rank = rank

print 'The best model was trained with rank ' + str(best_rank) + ' and possessed error: ' + str(min_error)
print 'Every error value produced: ' + str(errorList)

The best model was trained with rank 4 and possessed error: 1.07857459098
Every error value produced: [1.0785745909816142, 1.0814910411108178, 1.0812052119806697]


With that, let's use our best configuration to train the ALS model on our full dataset while using part of it for testing.

So, let's repeat the process used for creating the training and validation data above:

In [9]:
training, test = ratings_data.randomSplit([0.8, 0.2])
#We only need the first two elements for testing so let's extract it
testPredict = test.map(lambda x: (x[0], x[1]))

With the training and test data set, let's train the model and see its error value:

In [10]:
#Retrain it using the best found configuration
full_model = ALS.train(training, best_rank, iterations=iterations, lambda_=regularization_parameter)
#Predict the ratings for the test data
predictions = full_model.predictAll(testPredict).map(lambda r: ((r[0], r[1]), r[2]))
#Get the actual ratings from the test data
ratings_and_preds = test.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
#Now calculate the RMSE
error = sqrt(ratings_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print 'For testing data the RMSE is ' + str(error) + '.'

For testing data the RMSE is 0.877947641725.


## 3. Recommendation

With all the parameters set, let's start recommending movies to an input user.

Let's start by defining an input user in the same format as everything in the ratings data structure. But with an ID that isn't in the data structure.

In [11]:
new_user_ID = 0

#Insert a new user to recommend movies to here
#The format of each line is (userID, movieID, rating)
new_user = [
     (0, 1, 3.5),  #Toy Story
     (0, 2, 2), # Jumanji
     (0, 296, 5), # Pulp Fiction
     (0, 1274, 4.5), # Akira
     (0, 1968, 5) # The Breakfast Club
    ]
new_user_ratings = sc.parallelize(new_user)

Let's also create a way of counting the amount of ratings per movie so we can recommend movies with at least a certain amount of ratings.

In [12]:
#Form groups by movies
movie_ID_with_ratings = (ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
#Find the average score and amount of ratings for every movie then get the amount of ratings per movie
movie_rating_counts = movie_ID_with_ratings.map(lambda x : (x[0], len(x[1])))

Now, we're going to add the input user to the full dataset.

In [13]:
#Create a new dataframe holding every rating including the new user
data_with_new_user = ratings_data.union(new_user_ratings)

And train the model with the new dataset:

In [14]:
#And train it
new_ratings_model = ALS.train(data_with_new_user, best_rank, iterations=iterations, lambda_=regularization_parameter)

Now, before we recommend something to our input user, let's get the movies our user hasn't watched and run it through the prediction!

In [24]:
#Get the input user's movie id
new_user_ratings_ids = map(lambda x: x[1], new_user)
#Now filter out the movies that the user has watched
new_user_unrated_movies = (movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

#Now predict input user's movie ratings
new_user_recommendations = new_ratings_model.predictAll(new_user_unrated_movies)

Here's what our recommendations look like:

In [25]:
new_user_recommendations.take(10)

[Rating(user=0, product=1084, rating=3.874892121279405),
 Rating(user=0, product=3456, rating=3.4479192521546027),
 Rating(user=0, product=3764, rating=2.232790850369536),
 Rating(user=0, product=3272, rating=3.002177863628324),
 Rating(user=0, product=1724, rating=3.1338470117069774),
 Rating(user=0, product=428, rating=4.123986330616036),
 Rating(user=0, product=1900, rating=3.5348965411706272),
 Rating(user=0, product=1328, rating=1.3924031856122951),
 Rating(user=0, product=464, rating=2.5028695946299937),
 Rating(user=0, product=1040, rating=2.5440409840778058)]

Let's reformat the data structure, include the amount of ratings and clean it up a bit:

In [26]:
#Now reformat the new_user_recommendations dataframe to the form of (movieID, Predicted Rating)
new_user_recommendations = new_user_recommendations.map(lambda x: (x.product, x.rating))
new_user_recommendations = new_user_recommendations.join(movies_data).join(movie_rating_counts)
#Clean it up a bit
new_user_recommendations = new_user_recommendations.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

Now, all that's left is to recommend something, so let's filter out movies with less than 20 reviews and order them from highest recommended to lowest:

In [27]:
#Now get the top 25 movies with over 20 ratings and display them in order from highest recommended to lowest
top_movies = new_user_recommendations.filter(lambda r: r[2]>=20).takeOrdered(25, key=lambda x: -x[1])

print ('Top 20 recommended movies with over 20 reviews:\n%s' % '\n'.join(map(str, top_movies)))

Top 20 recommended movies with over 20 reviews:
(u'For All Mankind (1989)', 4.878450594112355, 27)
(u'Dear Diary (Caro Diario) (1994)', 4.862527061004506, 28)
(u'Sanjuro (1962)', 4.860741545687377, 69)
(u'American Beauty (1999)', 4.8193084503413655, 3428)
(u'Usual Suspects, The (1995)', 4.792900578839774, 1783)
(u'GoodFellas (1990)', 4.739372913523984, 1657)
(u'Godfather, The (1972)', 4.735397920456402, 2223)
(u'Last Days, The (1998)', 4.698700108200825, 27)
(u'Reservoir Dogs (1992)', 4.686128645014773, 1259)
(u'Monty Python and the Holy Grail (1974)', 4.654967948786981, 1599)
(u'Idiots, The (Idioterne) (1998)', 4.648629307901874, 37)
(u'Shawshank Redemption, The (1994)', 4.639626104173599, 2227)
(u'Apocalypse Now (1979)', 4.634400637504494, 1176)
(u'Fight Club (1999)', 4.619109706608868, 1451)
(u'Matrix, The (1999)', 4.591628605400125, 2590)
(u'Godfather: Part II, The (1974)', 4.590559252383079, 1692)
(u'Star Wars: Episode IV - A New Hope (1977)', 4.559225143047281, 2991)
(u'Clockwork

Created by Gabriel Garcez Barros Sousa

## References

* [Recommender: An Analysis of Collaborative Filtering Techniques](http://cs229.stanford.edu/proj2014/Christopher%20Aberger,%20Recommender.pdf)
* [Root Mean Square Error](https://www.kaggle.com/wiki/RootMeanSquaredError)
* [Alternating Least Squares for Collaborative Filtering](http://bugra.github.io/work/notes/2014-04-19/alternating-least-squares-method-for-collaborative-filtering/)
* [ALS function](https://spark.apache.org/docs/0.8.1/api/mllib/org/apache/spark/mllib/recommendation/ALS)
* [MLib](http://spark.apache.org/docs/latest/mllib-guide.html)