# Spark ALS Recommendations: MovieLens

We will use the  [MovieLens dataset](http://grouplens.org/datasets/movielens/) to build a movie recommender using [collaborative filtering](https://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering) with [Spark's Alternating Least Saqures](https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html) implementation. 

## Overview

We will be using Spark's MLLib API for this lab, as some of the features we are using is not available in the newer ML API

The following sections introduce *Collaborative Filtering* and explain how to use *Spark MLlib* to build a recommender model. We will close the tutorial by explaining how a model such this is used to make recommendations, and how to persist it for later use (e.g. in our Python/flask web-service).

## 1 - Data download

GroupLens Research has collected and made available rating data sets from the [MovieLens web site](http://movielens.org). The data sets were collected over various periods of time, depending on the size of the set. They can be found [here](http://grouplens.org/datasets/movielens/).   

In our case, we will use the latest datasets:  

- Small: 100,000 ratings and 2,488 tag applications applied to 8,570 movies by 706 users. Last updated 4/2015.  
- Full: 21,000,000 ratings and 470,000 tag applications applied to 27,000 movies by 230,000 users. Last updated 4/2015.  


Run the file `download.sh` in the directory /data/movielens.  This will download the files

In [1]:
print('Spark UI running on http://YOURIPADDRESS:' + sc.uiWebUrl.split(':')[2])

Spark UI running on http://YOURIPADDRESS:4040


## 2 - Understand Data

We have
- movies
- ratings
- links
- tags 

See below to understand the data format

### Movies

In [2]:
### Movie Data
movies = spark.read.csv("/data/movielens/ml-latest-small/movies.csv", 
                         header=True, inferSchema=True)
print ('movie count {:,}'.format(movies.count()))
movies.show(5, False)

movie count 9,125
+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows



### Ratings data

In [3]:
### ratings Data
ratings = spark.read.csv("/data/movielens/ml-latest-small/ratings.csv", 
                         header=True, inferSchema=True)
print ('rating count {:,}'.format(ratings.count()))
ratings.show(5, False)

rating count 100,004
+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|1     |31     |2.5   |1260759144|
|1     |1029   |3.0   |1260759179|
|1     |1061   |3.0   |1260759182|
|1     |1129   |2.0   |1260759185|
|1     |1172   |4.0   |1260759205|
+------+-------+------+----------+
only showing top 5 rows



### Links

In [4]:
### links Data
links = spark.read.csv("/data/movielens/ml-latest-small/links.csv", 
                         header=True, inferSchema=True)
print ('links count {:,}'.format(links.count()))
links.show(5, False)

links count 9,125
+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|1      |114709|862   |
|2      |113497|8844  |
|3      |113228|15602 |
|4      |114885|31357 |
|5      |113041|11862 |
+-------+------+------+
only showing top 5 rows



### Tags

In [5]:
### tags Data
tags = spark.read.csv("/data/movielens/ml-latest-small/tags.csv", 
                         header=True, inferSchema=True)
print ('tags count {:,}'.format(tags.count()))
tags.show(5, False)

tags count 1,296
+------+-------+-----------------------+----------+
|userId|movieId|tag                    |timestamp |
+------+-------+-----------------------+----------+
|15    |339    |sandra 'boring' bullock|1138537770|
|15    |1955   |dentist                |1193435061|
|15    |7478   |Cambodia               |1170560997|
|15    |32892  |Russian                |1170626366|
|15    |34162  |forgettable            |1141391765|
+------+-------+-----------------------+----------+
only showing top 5 rows



## 3 - Parsing Data

The format of these files is uniform and simple, so we can use Python [`split()`](https://docs.python.org/2/library/stdtypes.html#str.split) to parse their lines once they are loaded into RDDs. Parsing the movies and ratings files yields two RDDs:  

* For each line in the ratings dataset, we create a tuple of `(UserID, MovieID, Rating)`. We drop the *timestamp* because we do not need it for this recommender.  
* For each line in the movies dataset, we create a tuple of `(MovieID, Title)`. We drop the *genres* because we do not use them for this recommender.  

So let's load the raw ratings data. We need to filter out the header, included in each file.    

### 3.1 - Ratings

In [6]:
small_ratings_raw_data = sc.textFile('/data/movielens/ml-latest-small/ratings.csv')
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [7]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [8]:
small_ratings_data.take(3)

[('1', '31', '2.5'), ('1', '1029', '3.0'), ('1', '1061', '3.0')]

### 3.2 - We proceed in a similar way with the `movies.csv` file.

In [9]:
small_movies_raw_data = sc.textFile('/data/movielens/ml-latest-small/movies.csv')
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

## 4 - Collaborative Filtering

In Collaborative filtering we make predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption is that if a user A has the same opinion as a user B on an issue, A is more likely to have B's opinion on a different issue x than to have the opinion on x of a user chosen randomly.  

The image below (from [Wikipedia](https://en.wikipedia.org/?title=Collaborative_filtering)) shows an example of collaborative filtering. At first, people rate different items (like videos, images, games). Then, the system makes predictions about a user's rating for an item not rated yet. The new predictions are built upon the existing ratings of other users with similar ratings with the active user. In the image, the system predicts that the user will not like the video.  

![collaborative filtering](https://upload.wikimedia.org/wikipedia/commons/5/52/Collaborative_filtering.gif)

Spark MLlib library for Machine Learning provides a [Collaborative Filtering](https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html) implementation by using [Alternating Least Squares](http://dl.acm.org/citation.cfm?id=1608614). The implementation in MLlib has the following parameters:  

- numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).  
- rank is the number of latent factors in the model.  
- iterations is the number of iterations to run.  
- lambda specifies the regularization parameter in ALS.  
- implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.  
- alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.  


## 5 - Selecting ALS parameters using the small dataset

In order to determine the best ALS parameters, we will use the small dataset. We need first to split it into train, validation, and test datasets.

In [10]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

print("training data size {:,}".format(training_RDD.count()) )
print("validation data size {:,}".format(validation_RDD.count()) )
print("test data size {:,}".format(test_RDD.count()) )

training data size 60,220
validation data size 19,844
test data size 19,940


## 6 - Training

In [11]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print('The best model was trained with rank ', best_rank)

For rank 4 the RMSE is 0.9317078532174138
For rank 8 the RMSE is 0.9421828839911472
For rank 12 the RMSE is 0.9432158182181839
The best model was trained with rank  4


## 7 - Understanding Predictions
But let's explain this a little bit. First, let's have a look at how our predictions look.  

In [12]:
predictions.take(3)

[((468, 6400), 1.9129122963991014),
 ((452, 3272), 2.451792839331191),
 ((262, 3272), 3.239204886744039)]

Basically we have the UserID, the MovieID, and the Rating, as we have in our ratings dataset. In this case the predictions third element, the rating for that movie and user, is the predicted by our ALS model.

Then we join these with our validation data (the one that includes ratings) and the result looks as follows:  

In [13]:
rates_and_preds.take(3)

[((1, 1129), (2.0, 2.886518225200327)),
 ((3, 44191), (3.5, 3.768400838261381)),
 ((4, 616), (5.0, 4.227537834403311))]

To that, we apply a squared difference and the we use the `mean()` action to get the MSE and apply `sqrt`.

Finally we test the selected model.

In [14]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is ', (error))

For testing data the RMSE is  0.9450330628023882


## 8 - Using the complete dataset to build the final model

### 8.1 - Download the data

In [15]:
! (cd /data/movielens; ./download.sh)

ml-latest.zip exists
ml-latest-full directory exists


### 8.2 - Use full dataset
In order to build our recommender model, we will use the complete dataset. Therefore, we need to process it the same way we did with the small dataset.   

In [16]:
# Load the complete dataset file
complete_ratings_raw_data = sc.textFile('/data/movielens/ml-latest-full/ratings.csv')
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print("There are {:,} recommendations in the complete dataset".format((complete_ratings_data.count())))

There are 27,753,444 recommendations in the complete dataset


In [17]:
sample_fraction = 0.2
sample_data = complete_ratings_data.sample(False, sample_fraction)
print("sample data size : {:,}".format((sample_data.count())))

sample data size : 5,551,000


### 8.3 - Train on full data

In [18]:
training_RDD, test_RDD = sample_data.randomSplit([0.7, 0.3], seed=0)
print("training data size {:,}".format(training_RDD.count()) )
print("test data size {:,}".format(test_RDD.count()) )

training data size 3,884,787
test data size 1,666,213


In [19]:
%%time
print("trainnig starting...")
complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)
print("training done")

trainnig starting...
training done
CPU times: user 35.2 ms, sys: 12.5 ms, total: 47.7 ms
Wall time: 16.7 s


### 8.4 - Test

In [20]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is ', error)

For testing data the RMSE is  0.8921419608302642


We can see how we got a more accurate recommender when using a much larger dataset.  

## 9 - How to make recommendations

Although we aim at building an on-line movie recommender, now that we know how to have our recommender model ready, we can give it a try providing some movie recommendations. This will help us coiding the recommending engine later on when building the web service, and will explain how to use the model in any other circumstances.  

When using collaborative filtering, getting recommendations is not as simple as predicting for the new entries using a previously generated model. Instead, we need to train again the model but including the new user preferences in order to compare them with other users in the dataset. That is, the recommender needs to be trained every time we have new user ratings (although a single model can be used by multiple users of course!). This makes the process expensive, and it is one of the reasons why scalability is a problem (and Spark a solution!). Once we have our model trained, we can reuse it to obtain top recomendations for a given user or an individual rating for a particular movie. These are less costly operations than training the model itself.    

So let's first load the movies complete file for later use.

In [21]:
complete_movies_file = '/data/movielens/ml-latest-full/movies.csv'
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print("There are {:,} movies in the complete dataset".format(complete_movies_titles.count()))

There are 58,098 movies in the complete dataset


Another thing we want to do, is give recommendations of movies with a certain minimum number of ratings. For that, we need to count the number of ratings per movie.  

In [22]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (sample_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

print (movie_rating_counts_RDD.take(10))

[(828, 305), (23, 958), (736, 7572), (1127, 3935), (1196, 13219), (1242, 2723), (1909, 2915), (4025, 2709), (4232, 1153), (552, 3076)]


## 10 - Adding new user ratings

Now we need to rate some movies for the new user. We will put them in a new RDD and we will use the user ID 0, that is not assigned in the MovieLens dataset. Check the [dataset](http://grouplens.org/datasets/movielens/) movies file for ID to Tittle assignment (so you know what movies are you actually rating).   

In [23]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,9), # Star Wars (1977)
     (0,1,8), # Toy Story (1995)
     (0,16,7), # Casino (1995)
     (0,25,8), # Leaving Las Vegas (1995)
     (0,32,9), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,4), # Flintstones, The (1994)
     (0,379,3), # Timecop (1994)
     (0,296,7), # Pulp Fiction (1994)
     (0,858,10) , # Godfather, The (1972)
     (0,50,8) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print('New user ratings: ', new_user_ratings_RDD.take(10))

New user ratings:  [(0, 260, 9), (0, 1, 8), (0, 16, 7), (0, 25, 8), (0, 32, 9), (0, 335, 4), (0, 379, 3), (0, 296, 7), (0, 858, 10), (0, 50, 8)]


Now we add them to the data we will use to train our recommender model. We use Spark's `union()` transformation for this.  

In [24]:
complete_data_with_new_ratings_RDD = sample_data.union(new_user_ratings_RDD)

And finally we train the ALS model using all the parameters we selected before (when using the small dataset).

In [25]:
%%time 
print("training starting...")
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
print ("training done")

training starting...
training done
CPU times: user 48.3 ms, sys: 19.9 ms, total: 68.3 ms
Wall time: 24.4 s


It took some time. We will need to repeat that every time a user add new ratings. Ideally we will do this in batches, and not for every single rating that comes into the system for every user.

## 11 - Getting top recommendations

Let's now get some recommendations! For that we will get an RDD with all the movies the new user hasn't rated yet. We will them together with the model to predict ratings.  

In [26]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

We have our recommendations ready. Now we can print out the 25 movies with the highest predicted ratings. And join them with the movies RDD to get the titles, and ratings count in order to get movies with a minimum number of counts. First we will do the join and see what does the result looks like.

In [27]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(136680, ((3.147308239394963, 'Battle For SkyArk (2015)'), 5)),
 (55080, ((5.810976549948867, '"Brave One'), 125)),
 (26520,
  ((7.711563585045221,
    'Full Moon in Paris (Les nuits de la pleine lune) (1984)'),
   7))]

So we need to flat this down a bit in order to have `(Title, Rating, Ratings Count)`.

In [28]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

Finally, get the highest rated recommendations for the new user, filtering out movies with less than 25 ratings.

In [29]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n',
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
 ('Gangs of Wasseypur (2012)', 9.34319143829024, 32)
('56 Up (2012)', 9.301834930364095, 42)
('"Unvanquished', 9.212360108876751, 72)
('"Civil War', 9.102088893288368, 93)
('"Sorrow and the Pity', 9.086578086599339, 27)
('The Adventures of Sherlock Holmes and Dr. Watson: Bloody Signature (1979)', 9.038043704969603, 29)
("Empire of Dreams: The Story of the 'Star Wars' Trilogy (2004)", 8.938093515505592, 33)
('Shoah (1985)', 8.92026510034303, 68)
('Cosmos', 8.917868551669244, 36)
('Planet Earth (2006)', 8.901130125562844, 294)
('When the Levees Broke: A Requiem in Four Acts (2006)', 8.843454892554306, 28)
('Alone in the Wilderness (2004)', 8.837415975275022, 79)
('The Fool (2014)', 8.821698469253732, 25)
('"World of Apu', 8.809171957277087, 169)
('Seven Up! (1964)', 8.77984827995737, 93)
('Casablanca (1942)', 8.715574680102954, 6305)
('Close-Up (Nema-ye Nazdik) (1990)', 8.70562785789081, 30)
('Sherlock Jr. (1924)', 8.685331656439345, 12

## 12 - Getting individual ratings

Another useful usecase is getting the predicted rating for a particular movie for a given user. The process is similar to the previous retreival of top recommendations but, instead of using `predcitAll` with every single movie the user hasn't rated yet, we will just pass the method a single entry with the movie we want to predict the rating for.  

In [30]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=153030, rating=6.689039296689197)]

Not very likely that the new user will like that one... Obviously we can include as many movies as we need in that list!