# An on-line movie recommending service using Spark & Flask - Building the recommender

## Getting and processing the data

The list of task we can pre-compute includes:  

- Loading and parsing the dataset. Persisting the resulting RDD for later use.  
- Building the recommender model using the complete dataset. Persist the dataset for later use.  

This notebook explains the first of these tasks.  

### File download

In [4]:
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

Define download locations.

In [5]:
import os

datasets_path = os.path.join('..', 'datasets')

complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')

Now we can proceed with both downloads.

In [6]:
import urllib.request

small_f = urllib.request.urlretrieve (small_dataset_url, small_dataset_path)
complete_f = urllib.request.urlretrieve (complete_dataset_url, complete_dataset_path)

Both of them are zip files containing a folder with ratings, movies, etc. We need to extract them into its individual folders so we can use each file later on.  

In [7]:
import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)

### Loading and parsing datasets

Load the raw ratings data. I need to filter out the header, included in each file.    

In [13]:
from pyspark import SparkConf, SparkContext

import os

conf = SparkConf().setAppName("RecommendationApp").setMaster("local[*]")

sc = SparkContext(conf=conf)

small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')

small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

Now we can parse the raw data into a new RDD.  

In [14]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

For illustrative purposes, we can take the first few lines of our RDD to see the result. In the final script we don't call any Spark action (e.g. `take`) until needed, since they trigger actual computations in the cluster.  

In [15]:
small_ratings_data.take(3)

[('1', '1', '4.0'), ('1', '3', '4.0'), ('1', '6', '4.0')]

I proceed in a similar way with the `movies.csv` file.

In [16]:
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

## Selecting ALS parameters using the small dataset

In order to determine the best ALS parameters, I will use the small dataset. We need first to split it into train, validation, and test datasets.

In [18]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

Now we can proceed with the training phase. 

In [23]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print ('For rank %s the RMSE is %s', rank, error)
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank %s', best_rank)

                                                                                

For rank %s the RMSE is %s 4 0.9136647249990393


                                                                                

For rank %s the RMSE is %s 8 0.9226236040056827




For rank %s the RMSE is %s 12 0.9188432735769673
The best model was trained with rank %s 4


                                                                                

But let's explain this a little bit. First, let's have a look at how our predictions look.  

In [24]:
predictions.take(3)

[((599, 1840), 2.81069881559431),
 ((11, 1840), 3.200947771281338),
 ((541, 464), 1.98838227782107)]

Basically we have the UserID, the MovieID, and the Rating, as we have in our ratings dataset. In this case the predictions third element, the rating for that movie and user, is the predicted by our ALS model.

Then we join these with our validation data (the one that includes ratings) and the result looks as follows:  

In [25]:
rates_and_preds.take(3)

[((1, 3441), (5.0, 3.1102836692112596)),
 ((2, 48516), (4.0, 4.031505125791396)),
 ((2, 80906), (5.0, 3.4547736654339793))]

To that, we apply a squared difference and the we use the `mean()` action to get the MSE and apply `sqrt`.

Finally we test the selected model.

In [27]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s', error)



For testing data the RMSE is %s 0.908279077289237


                                                                                

## Using the complete dataset to build the final model

In order to build our recommender model, we will use the complete dataset. Therefore, we need to process it the same way we did with the small dataset.   

In [34]:
# Load the complete dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print ("There are %s recommendations in the complete dataset", format(complete_ratings_data.count()))



There are {} recommendations in the complete dataset 33832162


                                                                                

Now we are ready to train the recommender model.

In [None]:
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0L)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

Now we test on our testing set.  

In [None]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print 'For testing data the RMSE is %s' % (error)

For testing data the RMSE is 0.82183583368


I got a more accurate recommender when using a much larger dataset.  

## Make recommendations

Load the movies complete file for later use.

In [29]:
complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print ("There are %s movies in the complete dataset", complete_movies_titles.count())

There are %s movies in the complete dataset 86537


Count the number of ratings per movie.  

In [30]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

NameError: name 'complete_ratings_data' is not defined

### Adding new user ratings

Now we need to rate some movies for the new user. We will put them in a new RDD and we will use the user ID 0, that is not assigned in the MovieLens dataset. Check the [dataset](http://grouplens.org/datasets/movielens/) movies file for ID to Tittle assignment (so you know what movies are you actually rating).   

In [None]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,9), # Star Wars (1977)
     (0,1,8), # Toy Story (1995)
     (0,16,7), # Casino (1995)
     (0,25,8), # Leaving Las Vegas (1995)
     (0,32,9), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,4), # Flintstones, The (1994)
     (0,379,3), # Timecop (1994)
     (0,296,7), # Pulp Fiction (1994)
     (0,858,10) , # Godfather, The (1972)
     (0,50,8) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print 'New user ratings: %s' % new_user_ratings_RDD.take(10)

New user ratings: [(0, 260, 9), (0, 1, 8), (0, 16, 7), (0, 25, 8), (0, 32, 9), (0, 335, 4), (0, 379, 3), (0, 296, 7), (0, 858, 10), (0, 50, 8)]


Now we add them to the data we will use to train our recommender model. We use Spark's `union()` transformation for this.  

In [31]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

NameError: name 'complete_ratings_data' is not defined

And finally we train the ALS model using all the parameters we selected before (when using the small dataset).

In [None]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print "New model trained in %s seconds" % round(tt,3)

New model trained in 56.61 seconds


It took some time. We will need to repeat that every time a user add new ratings. Ideally we will do this in batches, and not for every single rating that comes into the system for every user.

### Getting top recommendations

Let's now get some recommendations! For that we will get an RDD with all the movies the new user hasn't rated yet. We will them together with the model to predict ratings.  

In [None]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

We have our recommendations ready. Now we can print out the 25 movies with the highest predicted ratings. And join them with the movies RDD to get the titles, and ratings count in order to get movies with a minimum number of counts. First we will do the join and see what does the result looks like.

In [None]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(87040, ((6.834512984654888, u'"Housemaid'), 14)),
 (8194, ((5.966704041954459, u'Baby Doll (1956)'), 79)),
 (130390, ((0.6922328127396398, u'Contract Killers (2009)'), 1))]

So we need to flat this down a bit in order to have `(Title, Rating, Ratings Count)`.

In [None]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

Finally, get the highest rated recommendations for the new user, filtering out movies with less than 25 ratings.

In [None]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
(u'"Godfather: Part II', 8.503749129186701, 29198)
(u'"Civil War', 8.386497469089297, 257)
(u'Frozen Planet (2011)', 8.372705479107108, 31)
(u'"Shawshank Redemption', 8.258510064442426, 67741)
(u'Cosmos (1980)', 8.252254825768972, 948)
(u'Band of Brothers (2001)', 8.225114960311624, 4450)
(u'Generation Kill (2008)', 8.206487040524653, 52)
(u"Schindler's List (1993)", 8.172761674773625, 53609)
(u'Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1964)', 8.166229786764168, 23915)
(u"One Flew Over the Cuckoo's Nest (1975)", 8.15617022970577, 32948)
(u'Casablanca (1942)', 8.141303207981174, 26114)
(u'Seven Samurai (Shichinin no samurai) (1954)', 8.139633165142612, 11796)
(u'Goodfellas (1990)', 8.12931139039048, 27123)
(u'Star Wars: Episode V - The Empire Strikes Back (1980)', 8.124225700242096, 47710)
(u'Jazz (2001)', 8.078538221315313, 25)
(u"Long Night's Journey Into Day (2000)", 8.050176820606127, 34)
(u'Lawrence of

### Getting individual ratings

Another useful usecase is getting the predicted rating for a particular movie for a given user. The process is similar to the previous retreival of top recommendations but, instead of using `predcitAll` with every single movie the user hasn't rated yet, we will just pass the method a single entry with the movie we want to predict the rating for.  

In [None]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=122880, rating=4.955831875971526)]

Not very likely that the new user will like that one... Obviously we can include as many movies as we need in that list!

## Persisting the model

Optionally, we might want to persist the base model for later use in our on-line recommendations. Although a new model is generated everytime we have new user ratings, it might be worth it to store the current one, in order to save time when starting up the server, etc. We might also save time if we persist some of the RDDs we have generated, specially those that took longer to process. For example, the following lines save and load a ALS model.      

In [None]:
from pyspark.mllib.recommendation import MatrixFactorizationModel

model_path = os.path.join('..', 'models', 'movie_lens_als')

# Save and load model
model.save(sc, model_path)
same_model = MatrixFactorizationModel.load(sc, model_path)

Among other things, you will see in your filesystem that there are folder with product and user data into [Parquet](https://parquet.apache.org/) format files.  

## Genre and other fields

We havent used the `genre` and `timestamp` fields in order to simplify the transformations and the whole tutorial. Incorporating them doesn't reprensent any problem. A good use could be filtering recommendations by any of them (e.g. recommendations by genre, or recent recommendations) like we have done with the minimum number of ratings.  