# A Movie Recommendation Service
### Source: https://www.codementor.io/spark/tutorial/building-a-recommender-with-apache-spark-python-example-app-part1

#### Create a SparkContext configured for local mode

In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

#### File download
Small: 100,000 ratings and 2,488 tag applications applied to 8,570 movies by 706 users. Last updated 4/2015.   
Full: 21,000,000 ratings and 470,000 tag applications applied to 27,000 movies by 230,000 users. Last updated 4/2015.

In [2]:
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'

#### Download location(s)


In [3]:
import os
datasets_path = os.path.join('/home/jovyan', 'work')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')
complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')

#### Getting file(s)


In [4]:
import urllib.request
small_f = urllib.request.urlretrieve (small_dataset_url, small_dataset_path)
complete_f = urllib.request.urlretrieve(complete_dataset_url, complete_dataset_path)

#### Extracting file(s)


In [5]:
import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)
    
with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)

## Loading and parsing datasets
Now we are ready to read in each of the files and create an RDD consisting of parsed lines. 

Each line in the ratings dataset (ratings.csv) is formatted as: 
+ userId,movieId,rating,timestamp 

Each line in the movies (movies.csv) dataset is formatted as:
+ movieId,title,genres 

The format of these files is uniform and simple, so we can use Python split() to parse their lines once they are loaded into RDDs. Parsing the movies and ratings files yields two RDDs: 
+ For each line in the ratings dataset, we create a tuple of (UserID, MovieID, Rating). We drop the timestamp because we do not need it for this recommender.
+ For each line in the movies dataset, we create a tuple of (MovieID, Title). We drop the genres because we do not use them for this recommender.

#### ratings.csv


In [6]:
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')
small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]
# Parse
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()

print ('There are {} recommendations in the small dataset'.format(small_ratings_data.count()))
small_ratings_data.take(3)

There are 100836 recommendations in the small dataset


[(1, 1, 4.0), (1, 3, 4.0), (1, 6, 4.0)]

#### movies.csv


In [7]:
# Load the small movie file
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')
small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]
# Parse
small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()

small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

## Collaborative Filtering
In Collaborative filtering we make predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption is that if a user A has the same opinion as a user B on an issue, A is more likely to have B's opinion on a different issue x than to have the opinion on x of a user chosen randomly. 

At first, people rate different items (like videos, images, games). Then, the system makes predictions about a user's rating for an item not rated yet. The new predictions are built upon the existing ratings of other users with similar ratings with the active user. In the image, the system predicts that the user will not like the video.

Spark MLlib library for Machine Learning provides a Collaborative Filtering implementation by using Alternating Least Squares. The implementation in MLlib has the following parameters:

+ numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).
+ rank is the number of latent factors in the model.
+ iterations is the number of iterations to run.
+ lambda specifies the regularization parameter in ALS.
+ implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.
+ alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

#### Selecting ALS parameters using the small dataset
In order to determine the best ALS parameters, we will use the small dataset. We need first to split it into train, validation, and test datasets.

In [8]:
# source uses see=0L, which is the previous version of python (2.x)
# 0L should be written as 0 from now on
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

#### Training phase


In [9]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1

for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print ('For rank {} the RMSE is {}'.format(rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank {}'.format(best_rank))

For rank 4 the RMSE is 0.908078105265682
For rank 8 the RMSE is 0.916462973348527
For rank 12 the RMSE is 0.917665030756129
The best model was trained with rank 4


In [10]:
predictions.take(3)

[((372, 1084), 3.42419871162954),
 ((4, 1084), 3.866749726695713),
 ((402, 1084), 3.4099577968422152)]

## Using the complete dataset to build the final model
Due to the limitations of virtual machine, we keep using the small dataset instead of complete dataset

We need first to split it into training and test datasets.

In [11]:
# Load the complete dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print("There are {} recommendations in the complete dataset".format(complete_ratings_data.count()))

There are 27753444 recommendations in the complete dataset


##### load In Complete movie file for later use

In [12]:
complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print("There are {} movies in the complete dataset".format(complete_movies_data.count()))

There are 58098 movies in the complete dataset


In [13]:
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, \
                           iterations=iterations, lambda_=regularization_parameter)

### Now we test on our testing set.


In [14]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print ('For testing data the RMSE is {}'.format(error))

For testing data the RMSE is 0.8318265262101795


## How to make recommendations
Although we aim at building an online movie recommender, now that we know how to have our recommender model ready, we can give it a try providing some movie recommendations. This will help us coding the recommending engine later on when building the web service, and will explain how to use the model in any other circumstances.

When using collaborative filtering, getting recommendations is not as simple as predicting for the new entries using a previously generated model. Instead, we need to train again the model but including the new user preferences in order to compare them with other users in the dataset. That is, the recommender needs to be trained every time we have new user ratings (although a single model can be used by multiple users of course!). This makes the process expensive, and it is one of the reasons why scalability is a problem (and Spark a solution!). Once we have our model trained, we can reuse it to obtain top recomendations for a given user or an individual rating for a particular movie. These are less costly operations than training the model itself.

Another thing we want to do, is give recommendations of movies with a certain minimum number of ratings. For that, we need to count the number of ratings per movie.

In [15]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

## Custom Ratings

### Scenario 1: Add 10 ratings and show movies with more than 25 ratings (User 1)

In [17]:
new_user_ID = 0
# The format of each line is (userID, movieID, rating)
# new_user_ratings = [
#     (0,260,4), # Star Wars (1977)
#     (0,1,3), # Toy Story (1995)
#     (0,16,3), # Casino (1995)
#     (0,25,4), # Leaving Las Vegas (1995)
#     (0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
#     (0,335,1), # Flintstones, The (1994)
#     (0,379,1), # Timecop (1994)
#     (0,296,3), # Pulp Fiction (1994)
#     (0,858,5), # Godfather, The (1972)
#     (0,50,4) # Usual Suspects, The (1995)
#    ]

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,1213,5), # Goodfellas
     (0,48516,5), # Departed
     (0,72641,4), # The Blindside 
     (0,99114,4), # Django 
     (0,33672,4), # Lords of Dogtown
     (0,45720,3), # devil wears prada
     (0,127198,3), # dope
     (0,5786,4), # paid in full
     (0,180045,5), # molly's game
     (0,589,3) # terminator 2 judgement day
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: {}'.format(new_user_ratings_RDD.take(10)))

New user ratings: [(0, 1213, 5), (0, 48516, 5), (0, 72641, 4), (0, 99114, 4), (0, 33672, 4), (0, 45720, 3), (0, 127198, 3), (0, 5786, 4), (0, 180045, 5), (0, 589, 3)]


#### Get top 10 Recommendations with more than 25 ratings in Scenario 1

In [18]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [19]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed,
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ('New model trained in {} seconds'.format(round(tt,3)))

New model trained in 147.645 seconds


In [22]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the moviebs
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [23]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(6216,
  ((3.9604661984522522, 'Nowhere in Africa (Nirgendwo in Afrika) (2001)'),
   717)),
 (124320, ((3.927817304466629, 'Once a Thief (1965)'), 1)),
 (83916, ((3.4360328306690775, 'Blues in the Night (1941)'), 9))]

In [24]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [25]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(15, key=lambda x: -x[1])

print ('TOP 15 recommended movies (with more than 25 reviews):\n{}'.format('\n'.join(map(str, top_movies))))

TOP 15 recommended movies (with more than 25 reviews):
('Planet Earth II (2016)', 4.766092073155562, 853)
('Band of Brothers (2001)', 4.717674059739451, 984)
('Planet Earth (2006)', 4.702251685027321, 1384)
('Cosmos', 4.677837032205204, 157)
('The Reichenbach Fall (2012)', 4.6245618382733085, 48)
('Black Mirror: White Christmas (2014)', 4.600097088824938, 1074)
('The Blue Planet (2001)', 4.596713716687987, 421)
('"Shawshank Redemption', 4.5922129274593, 97999)
('Blue Planet II (2017)', 4.579969345998066, 349)
('The Godfather Trilogy: 1972-1990 (1992)', 4.550977031924532, 421)
('Cosmos: A Spacetime Odissey', 4.540579321087449, 37)
('Life (2009)', 4.540095801945341, 166)
('Over the Garden Wall (2013)', 4.537629877726175, 377)
('Frozen Planet (2011)', 4.527251705193223, 402)
('Human (2015)', 4.521760013486599, 68)


### Scenario 2: Add 10 ratings and show movies with more than 100 ratings (User 1)

In [26]:
new_user_ID = 0
# The format of each line is (userID, movieID, rating)
# new_user_ratings = [
#     (0,260,4), # Star Wars (1977)
#     (0,1,3), # Toy Story (1995)
#     (0,16,3), # Casino (1995)
#     (0,25,4), # Leaving Las Vegas (1995)
#     (0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
#     (0,335,1), # Flintstones, The (1994)
#     (0,379,1), # Timecop (1994)
#     (0,296,3), # Pulp Fiction (1994)
#     (0,858,5), # Godfather, The (1972)
#     (0,50,4) # Usual Suspects, The (1995)
#    ]
new_user_ratings = [
     (0,1213,5), # Goodfellas
     (0,48516,5), # Departed
     (0,72641,4), # The Blindside 
     (0,99114,4), # Django 
     (0,33672,4), # Lords of Dogtown
     (0,45720,3), # devil wears prada
     (0,127198,3), # dope
     (0,5786,4), # paid in full
     (0,180045,5), # molly's game
     (0,589,3) # terminator 2 judgement day
    ]

new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: {}'.format(new_user_ratings_RDD.take(10)))

New user ratings: [(0, 1213, 5), (0, 48516, 5), (0, 72641, 4), (0, 99114, 4), (0, 33672, 4), (0, 45720, 3), (0, 127198, 3), (0, 5786, 4), (0, 180045, 5), (0, 589, 3)]


In [27]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [28]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed,
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ('New model trained in {} seconds'.format(round(tt,3)))

New model trained in 128.308 seconds


In [29]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the moviebs
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [30]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(6216,
  ((3.9604661984522522, 'Nowhere in Africa (Nirgendwo in Afrika) (2001)'),
   717)),
 (124320, ((3.927817304466629, 'Once a Thief (1965)'), 1)),
 (83916, ((3.4360328306690775, 'Blues in the Night (1941)'), 9))]

In [31]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [32]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=100).takeOrdered(15, key=lambda x: -x[1])

print ('TOP 10 recommended movies (with more than 25 reviews):\n{}'.format('\n'.join(map(str, top_movies))))

TOP 10 recommended movies (with more than 25 reviews):
('Planet Earth II (2016)', 4.766092073155562, 853)
('Band of Brothers (2001)', 4.717674059739451, 984)
('Planet Earth (2006)', 4.702251685027321, 1384)
('Cosmos', 4.677837032205204, 157)
('Black Mirror: White Christmas (2014)', 4.600097088824938, 1074)
('The Blue Planet (2001)', 4.596713716687987, 421)
('"Shawshank Redemption', 4.5922129274593, 97999)
('Blue Planet II (2017)', 4.579969345998066, 349)
('The Godfather Trilogy: 1972-1990 (1992)', 4.550977031924532, 421)
('Life (2009)', 4.540095801945341, 166)
('Over the Garden Wall (2013)', 4.537629877726175, 377)
('Frozen Planet (2011)', 4.527251705193223, 402)
('"Usual Suspects', 4.509894734615401, 62180)
('"Godfather', 4.508941513287041, 60904)
('Fight Club (1999)', 4.505961745022873, 65678)


### Scenario 3: Add 10 ratings and show movies with more than 25 ratings (User 2)

In [33]:
new_user_ratings = [
     (0,3275,3), # Boondock Saints
     (0,3246,3), # Malcolm X
     (0,78499,5), # Toy Story 3 
     (0,90620,5), # Mulan 
     (0,114240,4), # Aladdin
     (0,493,3), # Menace II Society
     (0,5989,4), # Catch Me If you Can
     (0,106696,4), # Frozen
     (0,4489,5), # Coming to America
     (0,3263,4) # White Men Can't Jump
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: {}'.format(new_user_ratings_RDD.take(10)))

New user ratings: [(0, 3275, 3), (0, 3246, 3), (0, 78499, 5), (0, 90620, 5), (0, 114240, 4), (0, 493, 3), (0, 5989, 4), (0, 106696, 4), (0, 4489, 5), (0, 3263, 4)]


In [34]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [35]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed,
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ('New model trained in {} seconds'.format(round(tt,3)))

New model trained in 202.805 seconds


In [36]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the moviebs
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [40]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(6216,
  ((4.024020082071413, 'Nowhere in Africa (Nirgendwo in Afrika) (2001)'),
   717)),
 (124320, ((4.272413557115556, 'Once a Thief (1965)'), 1)),
 (83916, ((1.2548010966669985, 'Blues in the Night (1941)'), 9))]

In [41]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [42]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(15, key=lambda x: -x[1])

print ('TOP 10 recommended movies (with more than 25 reviews):\n{}'.format('\n'.join(map(str, top_movies))))

TOP 10 recommended movies (with more than 25 reviews):
('Runaway Brain (1995) ', 5.2279616498404735, 30)
('"Scarlet Pimpernel', 5.021209610460179, 26)
('Vacations in Prostokvashino (1980)', 4.980444320828486, 96)
('Anne of Green Gables: The Sequel (a.k.a. Anne of Avonlea) (1987)', 4.974950925382629, 342)
('Winter in Prostokvashino (1984)', 4.94247508468436, 67)
('Drishyam (2013)', 4.917112026346761, 37)
('Connections (1978)', 4.912249277821132, 49)
('On the Trail of the Bremen Town Musicians (1973)', 4.886093283576544, 45)
('Anne of Green Gables (1985)', 4.865124225687161, 706)
('Junior and Karlson (1968)', 4.831446823860796, 64)
('Bobik Visiting Barbos (1977)', 4.831267136648936, 31)
('Jeff Dunham: Arguing with Myself (2006)', 4.827342993754868, 59)
('Cosmos: A Spacetime Odissey', 4.817053171244368, 37)
('Adventures of Mowgli (1973)', 4.781372506125603, 41)
('Between the Folds (2008)', 4.771722039146336, 61)


### Scenario 3: Add 10 ratings and show movies with more than 100 ratings (User 2)

In [43]:
new_user_ratings = [
     (0,3275,3), # Boondock Saints
     (0,3246,3), # Malcolm X
     (0,78499,5), # Toy Story 3 
     (0,90620,5), # Mulan 
     (0,114240,4), # Aladdin
     (0,493,3), # Menace II Society
     (0,5989,4), # Catch Me If you Can
     (0,106696,4), # Frozen
     (0,4489,5), # Coming to America
     (0,3263,4) # White Men Can't Jump
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: {}'.format(new_user_ratings_RDD.take(10)))

New user ratings: [(0, 3275, 3), (0, 3246, 3), (0, 78499, 5), (0, 90620, 5), (0, 114240, 4), (0, 493, 3), (0, 5989, 4), (0, 106696, 4), (0, 4489, 5), (0, 3263, 4)]


In [44]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [45]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed,
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ('New model trained in {} seconds'.format(round(tt,3)))

New model trained in 203.036 seconds


In [46]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the moviebs
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [47]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(6216,
  ((4.024020082071413, 'Nowhere in Africa (Nirgendwo in Afrika) (2001)'),
   717)),
 (124320, ((4.272413557115556, 'Once a Thief (1965)'), 1)),
 (83916, ((1.2548010966669985, 'Blues in the Night (1941)'), 9))]

In [48]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [49]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=100).takeOrdered(15, key=lambda x: -x[1])

print ('TOP 10 recommended movies (with more than 25 reviews):\n{}'.format('\n'.join(map(str, top_movies))))

TOP 10 recommended movies (with more than 25 reviews):
('Anne of Green Gables: The Sequel (a.k.a. Anne of Avonlea) (1987)', 4.974950925382629, 342)
('Anne of Green Gables (1985)', 4.865124225687161, 706)
('North & South (2004)', 4.677353878911141, 389)
('"Sound of Music', 4.6724442402768105, 17154)
('Winnie Pooh (1969)', 4.635798602051356, 130)
('Pride and Prejudice (1995)', 4.628448444432022, 2919)
("Can't Change the Meeting Place (1979)", 4.617300794574756, 119)
('Office Romance (1977)', 4.592592729832818, 209)
('Fireproof (2008)', 4.592533227584184, 202)
("Mr. Holland's Opus (1995)", 4.590043712266695, 23359)
('Facing the Giants (2006)', 4.587816338341661, 164)
('"Shawshank Redemption', 4.586534513982329, 97999)
('The Adventures of Sherlock Holmes and Dr. Watson: The Hound of the Baskervilles (1981)', 4.566738483064668, 193)
('Hidden Figures (2016)', 4.563692733674656, 2647)
('Doctor Who: A Christmas Carol (2010)', 4.554402097885682, 271)
