In [23]:
import os
import urllib
import zipfile

# URLs of the dataset provided by Movielens 
full_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

datasets_path = os.path.join('../', 'datasets')

full_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')

# download zip files
urllib.request.urlretrieve(small_dataset_url, small_dataset_path)
urllib.request.urlretrieve(full_dataset_url, full_dataset_path)

# unzip files once downloaded
with zipfile.ZipFile(small_dataset_path, "r") as zip:
    zip.extractall(datasets_path)
    
with zipfile.ZipFile(full_dataset_path, "r") as zip:
    zip.extractall(datasets_path)

In [10]:
# ratings.csv : userId,movieId,rating,timestamp
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')
small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

# remove the header and the timestamp
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

# example of 3 first elements and RDD object small_ratings_data
small_ratings_data.take(3)

[('1', '1', '5.0'), ('1', '2', '3.0'), ('1', '10', '3.0')]

In [11]:
# movies.csv : movieId,title,genres
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

# remove the header and the genres
small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()

# example of 3 first elements and RDD object small_movies_data
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

In [14]:
# split small_ratings_data into 60% training set, 20% validation set and 20% test set
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [30]:
from pyspark.mllib.recommendation import ALS
import math

'''
  Here we will try to find the best latent factor over cross validation
  called k in the paper by computing the mean square (the Root Mean Square Error AKA RMSE)
  error for k in 4, 8, 12
'''

seed = 5
iterations = 10
# specifies the regularization parameter in ALS.
regularization_parameter = 0.1
# is the number of latent factors in the model.
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0

min_error = float('inf')
best_rank = -1
best_iteration = -1

for rank in ranks:
    
    # ALS.train will train and generate our model
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)
    
    # predict ratings for the validation model using the validation dataset
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    
    # compute the error (validation_dataset - predicted_validation_dataset)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
    errors[err] = error
    err += 1
    
    print('For rank {} the RMSE is {}'.format(rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

# print the best latent factor (rank) which generate a minimum RMSE
print('The best model was trained with rank equals to {}'.format(best_rank))

For rank 4 the RMSE is 0.9290115466719586
For rank 8 the RMSE is 0.9411651453706049
For rank 12 the RMSE is 0.9428971812389256
The best model was trained with rank 4


In [19]:
predictions.take(3)

[((640, 667), -1.1788836125393145),
 ((186, 667), 1.4015626015793647),
 ((143, 44828), 2.6238408968758153)]

In [20]:
rates_and_preds.take(3)

[((244, 35836), (3.5, 3.7881972952871883)),
 ((616, 2318), (5.0, 3.7703969474117467)),
 ((99, 41569), (3.5, 2.6824938834166203))]

In [31]:
# now we will train our model using the best_rank (best latent factor)
# and then test using the test dataset
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)

# now we will test our model using test dataset
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))

# compute the error test_dataset_rating - test_predicted_rating
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

# print the RMSE
print('For testing data the RMSE is {}'.format(error))

For testing data the RMSE is 0.9316026750566204


In [40]:
# Now we are going to use the best_rank for the full dataset
# Load the full dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parsing the file
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()

# .. number of ratings .. too big 21.622.187 
print("There are {} ratings in the complete dataset".format(complete_ratings_data.count()))

There are 21622187 ratings in the complete dataset


In [44]:
# split the dataset into training set and test set
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

# ALS.train and generate a model
complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

# test the model by computing the RMSE
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))
predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

# RMSE
print('For testing data the RMSE is {}'.format(error))

For testing data the RMSE is 0.8335530555506335


In [45]:
complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))

print("There are {} movies in the complete dataset".format(complete_movies_titles.count()))

There are 30106 movies in the complete dataset


In [46]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [47]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,4), # Star Wars (1977)
     (0,1,3), # Toy Story (1995)
     (0,16,3), # Casino (1995)
     (0,25,4), # Leaving Las Vegas (1995)
     (0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,1), # Flintstones, The (1994)
     (0,379,1), # Timecop (1994)
     (0,296,3), # Pulp Fiction (1994)
     (0,858,5) , # Godfather, The (1972)
     (0,50,4) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print('New user ratings: {}'.format(new_user_ratings_RDD.take(10)))

New user ratings: [(0, 260, 4), (0, 1, 3), (0, 16, 3), (0, 25, 4), (0, 32, 4), (0, 335, 1), (0, 379, 1), (0, 296, 3), (0, 858, 5), (0, 50, 4)]


In [48]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [49]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print("New model trained in {} min".format(round(tt,3) / 60))

New model trained in 465.774 seconds


In [50]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] 
                                            not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() 
# to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(3696, ((2.3061985693484486, 'Night of the Creeps (1986)'), 423)),
 (107408,
  ((3.8216587094229553,
    'Only Old Men Are Going to Battle (V boy idut odni stariki) (1973)'),
   5)),
 (83972, ((2.09797982652341, '"Europeans'), 1))]

In [51]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [52]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: 
                                                                r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
('Death on the Staircase (SoupÃ§ons) (2004)', 4.080518301552445, 34)
('"Century of the Self', 4.076642179063931, 66)
('"O Auto da Compadecida (Dog\'s Will', 3.9863361764320615, 36)
('Crooks in Clover (a.k.a. Monsieur Gangster) (Les tontons flingueurs) (1963)', 3.9834876900605636, 28)
('"Decalogue', 3.952630869008665, 426)
('"Human Condition III', 3.929565089862571, 52)
('Harakiri (Seppuku) (1962)', 3.9120941929989073, 406)
('Ikiru (1952)', 3.8996826496944266, 1155)
('Connections (1978)', 3.8854512164609787, 37)
('Shadows of Our Forgotten Ancestors (Tini zabutykh predkiv) (1964)', 3.881926185370361, 44)
('The Jinx: The Life and Deaths of Robert Durst (2015)', 3.864469324189585, 83)
('Come and See (Idi i smotri) (1985)', 3.864152197027231, 442)
('All Watched Over by Machines of Loving Grace (2011)', 3.862488475325229, 79)
('"Man Escaped', 3.857478037727134, 307)
('Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (196