# Ripe Pumpkins: Pumpkinmeter

### Source: [GroupLens Research](https://grouplens.org/datasets/movielens/latest/)


#### Create a SparkContext configured for local mode

In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

#### File download

 - Small: 100,000 ratings and 3,600 tag applications applied to 9,000 movies by 600 users. Last updated 9/2018.
 
 - Complete: 27,000,000 ratings and 1,100,000 tag applications applied to 58,000 movies by 280,000 users. Includes tag genome data with 14 million relevance scores across 1,100 tags. Last updated 9/2018.

In [2]:
small_dataset_url = 'https://files.grouplens.org/datasets/movielens/ml-latest-small.zip'
complete_dataset_url = 'https://files.grouplens.org/datasets/movielens/ml-latest.zip'

#### Download locations

In [3]:
import os
datasets_path = os.path.join('/home/jovyan', 'work')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')
complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')

#### Getting files

In [4]:
import urllib.request
small_f = urllib.request.urlretrieve (small_dataset_url, small_dataset_path)
complete_f = urllib.request.urlretrieve (complete_dataset_url, complete_dataset_path)

#### Extracting files

In [5]:
import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)

### Loading and parsing datasets

#### ratings.csv from small dataset

In [6]:
# Load the small dataset file
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')
small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]
print(small_ratings_raw_data.take(5))

# Parse
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()

print ('There are {} recommendations in the small dataset'.format(small_ratings_data.count()))
small_ratings_data.take(3)

['userId,movieId,rating,timestamp', '1,1,4.0,964982703', '1,3,4.0,964981247', '1,6,4.0,964982224', '1,47,5.0,964983815']
There are 100836 recommendations in the small dataset


[(1, 1, 4.0), (1, 3, 4.0), (1, 6, 4.0)]

#### movies.csv from small dataset

In [7]:
# Load the small dataset file
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')
small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

# Parse
small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

small_movies_titles = small_movies_data.map(lambda x: (int(x[0]),x[1]))
print ('There are {} movies in the small dataset'.format(small_movies_titles.count()))
small_movies_data.take(3)

There are 9742 movies in the small dataset


[(1, 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy'),
 (2, 'Jumanji (1995)', 'Adventure|Children|Fantasy'),
 (3, 'Grumpier Old Men (1995)', 'Comedy|Romance')]

### Collaborative Filtering

Spark MLlib library for Machine Learning provides a Collaborative Filtering implementation by using Alternating Least Square. The implementation in MLlib has the following parameters:

 - numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).
 - rank is the number of latent factors in the model.
 - iterations is the number of iterations to run.
 - lambda specifies the regularization parameter in ALS.
 - implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.
 - alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

#### Selecting ALS parameters using the small dataset

In order to determine the best ALS parameters, we will use the small dataset. We need first to split it into train, validation, and test datasets.

In [8]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [9]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1

for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print ('For rank {} the RMSE is {}'.format(rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank {}'.format(best_rank))

For rank 4 the RMSE is 0.908078105265682
For rank 8 the RMSE is 0.916462973348527
For rank 12 the RMSE is 0.917665030756129
The best model was trained with rank 4


But let's explain this a little bit. First, let's have a look at how our predictions look.

In [10]:
predictions.take(3)

[((372, 1084), 3.42419871162954),
 ((4, 1084), 3.866749726695713),
 ((402, 1084), 3.4099577968422152)]

Basically we have the UserID, the MovieID, and the Rating, as we have in our ratings dataset. In this case the predictions third element, the rating for that movie and user, is the predicted by our ALS model.

Then we join these with our validation data (the one that includes ratings) and the result looks as follows:



In [11]:
rates_and_preds.take(3)

[((1, 457), (5.0, 4.381060760461434)),
 ((1, 1025), (5.0, 4.705295366590298)),
 ((1, 1089), (5.0, 4.979982471805129))]

To that, we apply a squared difference and the we use the mean() action to get the MSE and apply sqrt.

Finally we test the selected model.

In [12]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is {}'.format(error))

For testing data the RMSE is 0.9113780946334407


#### Using the complete dataset to build the final model

In order to build our recommender model, we will use the complete dataset. Therefore, we need to process it the same way we did with the small dataset.

In [13]:
# Load the complete dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()

print ('There are {} recommendations in the complete dataset'.format(complete_ratings_data.count()))
complete_ratings_data.take(3)

There are 27753444 recommendations in the complete dataset


[(1, 307, 3.5), (1, 481, 3.5), (1, 1091, 1.5)]

Now we are ready to train the recommender model.

In [14]:
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

Now we test on our testing set.

In [15]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print ('For testing data the RMSE is {}'.format(error))

For testing data the RMSE is 0.8318265262101795


We can see how we got a more accurate recommender when using a much larger dataset.

### How to make recommendations

Although we aim at building an online movie recommender, now that we know how to have our recommender model ready, we can give it a try providing some movie recommendations. This will help us coding the recommending engine later on when building the web service, and will explain how to use the model in any other circumstances.

When using collaborative filtering, getting recommendations is not as simple as predicting for the new entries using a previously generated model. Instead, we need to train again the model but including the new user preferences in order to compare them with other users in the dataset. That is, the recommender needs to be trained every time we have new user ratings (although a single model can be used by multiple users of course!). This makes the process expensive, and it is one of the reasons why scalability is a problem (and Spark a solution!). Once we have our model trained, we can reuse it to obtain top recomendations for a given user or an individual rating for a particular movie. These are less costly operations than training the model itself.

Another thing we want to do, is give recommendations of movies with a certain minimum number of ratings. For that, we need to count the number of ratings per movie.

In [16]:
complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print ("There are {} movies in the complete dataset".format(complete_movies_titles.count()))

There are 58098 movies in the complete dataset


In [17]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
print(movie_ID_with_ratings_RDD.take(5))
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

[(1449, <pyspark.resultiterable.ResultIterable object at 0x7f12a417d278>), (828, <pyspark.resultiterable.ResultIterable object at 0x7f12a4172128>), (2024, <pyspark.resultiterable.ResultIterable object at 0x7f12a4172940>), (23, <pyspark.resultiterable.ResultIterable object at 0x7f12a41724e0>), (161, <pyspark.resultiterable.ResultIterable object at 0x7f12a4172be0>)]


### Adding new user ratings

Now we need to rate some movies for the new users. We will put them in a new RDD and we will use the user ID 0, that is not assigned in the MovieLens dataset. Check the dataset movies file for ID to Tittle assignment (so you know what movies are you actually rating).

In [18]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)

# ###################################################
# Keep the userID, but Replace movieID, rating, title
# ###################################################

# Find 10 movies you have watched in the past
# Put your OWN ratings

#Andrew_User1
# new_user_ratings = [
#       (0,65216,4), # Defiance (2008)
#       (0,65261,3), # Ponyo (Gake no ue no Ponyo) (2008)
#       (0,67087,5), # I Love You, Man (2009)
#       (0,112552,3), # Whiplash (2014)
#       (0,112852,4), # Guardians of the Galaxy (2014)
#       (0,114670,2), # Tusk (2014)
#       (0,114713,1), # Annabelle (2014)
#       (0,151455,5), # Eddie the Eagle (2016)
#       (0,177615,1), # Lady Bird (2017)
#       (0,185029,5) # A Quiet Place (2018)
#     ]

#Christine_User2
new_user_ratings = [
    (0,231,1), # Dumb & Dumber (Dumb and Dumber) (1994)
    (0,208,3), # Waterworld (1995)
    (0,216,2), # Billy Madison (1995)
    (0,2,3.5), # Jumanji (1995)
    (0,239,2.5), # Goofy Movie, A (1995)
    (0,34,4), # Babe (1995)
    (0,1247,2.5), # Graduate, The (1967)
    (0,141,4), # Birdcage, The (1996)
    (0,1259,3), # Stand by Me (1986)
    (0,47,4.5), # Seven (a.k.a. Se7en) (1995)
    ]

new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: {}'.format(new_user_ratings_RDD.take(10)))

New user ratings: [(0, 231, 1), (0, 208, 3), (0, 216, 2), (0, 2, 3.5), (0, 239, 2.5), (0, 34, 4), (0, 1247, 2.5), (0, 141, 4), (0, 1259, 3), (0, 47, 4.5)]


Now we add them to the data we will use to train our recommender model. We use Spark's union() transformation for this.

In [19]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

# sample_70 = complete_ratings_data.sample(False, 0.7, 42)
# complete_data_with_new_ratings_RDD = sample_70.union(new_user_ratings_RDD)


And finally we train the ALS model using all the parameters we selected before (when using the small dataset).



In [20]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed,
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ('New model trained in {} seconds'.format(round(tt,3)))

New model trained in 127.887 seconds


### Getting top recommendations

Let's now get some recommendations! For that we will get an RDD with all the movies the new user hasn't rated yet. We will them together with the model to predict ratings.

In [21]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

We have our recommendations ready. Now we can print out the 15 movies with the highest predicted ratings. And join them with the movies RDD to get the titles, and ratings count in order to get movies with a minimum number of counts. First we will do the join and see what does the result looks like.

In [22]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles)
new_user_recommendations_rating_title_and_count_RDD_final = \
    new_user_recommendations_rating_title_and_count_RDD.join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD_final.take(3)

[(6216,
  ((3.5917633183742277, 'Nowhere in Africa (Nirgendwo in Afrika) (2001)'),
   717)),
 (124320, ((3.7323287718854887, 'Once a Thief (1965)'), 1)),
 (83916, ((1.511409816873222, 'Blues in the Night (1941)'), 9))]

So we need to flat this down a bit in order to have (Title, Rating, Ratings Count).

In [23]:
new_user_recommendations_rating_title_and_count_RDD_final = \
new_user_recommendations_rating_title_and_count_RDD_final.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

Finally, get the highest rated recommendations for the new user, filtering out movies with less than 25 ratings.

In [24]:
print(new_user_recommendations_rating_title_and_count_RDD_final.take(3))
top_movies = new_user_recommendations_rating_title_and_count_RDD_final.filter(lambda r: r[2]>=25).takeOrdered(15, key=lambda x: -x[1])

print ('TOP 15 recommended movies (with more than 25 reviews):\n{}'.format('\n'.join(map(str, top_movies))))

[('Nowhere in Africa (Nirgendwo in Afrika) (2001)', 3.5917633183742277, 717), ('Once a Thief (1965)', 3.7323287718854887, 1), ('Blues in the Night (1941)', 1.511409816873222, 9)]
TOP 15 recommended movies (with more than 25 reviews):
('"Very Potter Sequel', 4.203361392733084, 35)
('Anne of Green Gables: The Sequel (a.k.a. Anne of Avonlea) (1987)', 4.194682324929555, 342)
('Drishyam (2013)', 4.194313087843041, 37)
('Sense & Sensibility (2008)', 4.173311644082534, 69)
('North & South (2004)', 4.172073490468209, 389)
('Anne of Green Gables (1985)', 4.1311354217429335, 706)
('Cranford (2007)', 4.101020635084971, 35)
('Pride and Prejudice (1995)', 4.092407129306942, 2919)
('Murder on the Orient Express (2010)', 4.084905289576348, 29)
('I Can Only Imagine (2018)', 4.084778533479282, 30)
('Winter in Prostokvashino (1984)', 4.059710268014413, 67)
('Boys (2014)', 4.042659247790695, 96)
('Little Dorrit (2008)', 4.031781124575435, 55)
('Vacations in Prostokvashino (1980)', 4.025106026739121, 96)


Get the highest rated recommendations for the new user, filtering out movies with less than 100 ratings.

In [25]:
top_movies = new_user_recommendations_rating_title_and_count_RDD_final.filter(lambda r: r[2]>=100).takeOrdered(15, key=lambda x: -x[1])

print ('TOP 15 recommended movies (with more than 100 reviews):\n{}'.format('\n'.join(map(str, top_movies))))

TOP 15 recommended movies (with more than 100 reviews):
('Anne of Green Gables: The Sequel (a.k.a. Anne of Avonlea) (1987)', 4.194682324929555, 342)
('North & South (2004)', 4.172073490468209, 389)
('Anne of Green Gables (1985)', 4.1311354217429335, 706)
('Pride and Prejudice (1995)', 4.092407129306942, 2919)
('"Sound of Music', 3.9555028013733855, 17154)
("Schindler's List (1993)", 3.9424153655281424, 71516)
('"Shawshank Redemption', 3.927038664912665, 97999)
('Wild China (2008)', 3.9266979007310328, 105)
('Hidden Figures (2016)', 3.918849839976069, 2647)
('Emma (2009)', 3.918751948474767, 385)
('"Civil War', 3.913985064896245, 431)
("It's a Wonderful Life (1946)", 3.893595085997471, 17770)
('Piper (2016)', 3.8884303809434613, 1253)
('Jane Eyre (2006)', 3.8878025961775076, 327)
('Persuasion (2007)', 3.8786315236139512, 349)
