## File download

In [1]:
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

In [2]:
#download locations
import os
datasets_path = 'datasets'
#datasets_path='..'
complete_dataset_path = os.path.join(datasets_path,'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path,'ml-latest-small.zip')

In [6]:
#procces to download
import urllib
small_f=urllib.urlretrieve(small_dataset_url,small_dataset_path)
complete_f=urllib.urlretrieve(complete_dataset_url,complete_dataset_path)

In [7]:
#extract them into individual folders
import zipfile
with zipfile.ZipFile(small_dataset_path,'r') as z:
    z.extractall(datasets_path)
with zipfile.ZipFile(complete_dataset_path,'r') as z:
    z.extractall(datasets_path)

## Loading and parsing data

In [3]:
#loading and parsin datasets
#each line in the ratings dataset(ratings.csv) is formatted as:userId,movieId,rating,timestamp
#ecah line in the movies (movies.csv) dataset is formatted as:movieId,title,genres, where genres has format: Genre1|Genre2..
#tags.csv has the format: userId,movieId,tag,timestamp
#links.csv has the format:movieId,imdbId,tmdbId
#Parsing the movies and ratings files yields two RDDs:
###1. For each line in the ratings dataset, we create a tuple of (UserID,MovieID,Rating). We do not need the timestamp
###2. For each line in the movies dataset, we create a tuple of (MovieID,Title)
#Llet's load the raw ratings data. We need to filter out the header
#small_ratings_file = '/home/cloudera/Documents/ML_SPARK/ml-latest-small/ratings.csv'
#small_ratings_raw_data = sc.textFile('file:///home/cloudera/Documents/ML_SPARK/ml-latest-small/ratings.csv')
small_ratings_file = os.path.join(datasets_path,'ml-latest-small','ratings.csv')
small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [4]:
#parse the raw data into a new RDD
small_ratings_data = small_ratings_raw_data.filter(lambda line:line!=small_ratings_raw_data_header)\
    .map(lambda line:line.split(",")).map(lambda tokens:(tokens[0],tokens[1],tokens[2])).cache()

In [5]:
#checking userId,movieId,rating tuples
small_ratings_data.take(3)

[(u'1', u'31', u'2.5'), (u'1', u'1029', u'3.0'), (u'1', u'1061', u'3.0')]

In [6]:
#the same with movies.csv file (MovieId,Title)
small_movies_file = os.path.join(datasets_path,'ml-latest-small','movies.csv')
small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]
small_movies_data = small_movies_raw_data.filter(lambda line:line!=small_movies_raw_data_header)\
    .map(lambda line:line.split(",")).map(lambda tokens:(tokens[0],tokens[1])).cache()
small_movies_data.take(3)

[(u'1', u'Toy Story (1995)'),
 (u'2', u'Jumanji (1995)'),
 (u'3', u'Grumpier Old Men (1995)')]

## Collaborative filtering

In [7]:
#Collaborative filtering - Selecting Alternating Least Squares (ALS) parameters using the small dataset
#In order to determine the best ALS parameters, we will use the small dataset
training_RDD,validation_RDD,test_RDD = small_ratings_data.randomSplit([6,2,2], seed=0L)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0],x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0],x[1]))

In [8]:
#training
from pyspark.mllib.recommendation import ALS
import math

seed = 5L
iterations = 10
regularization_parameter = 0.1
ranks = [4,8,12]
errors = [0,0,0]
err = 0
tolerance = 0.02
min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD,rank,seed=seed,iterations=iterations,lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r:((r[0],r[1]),r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]),int(r[1])),float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank,error)
    if error < min_error:
        min_error = error
        best_rank = rank
print 'The best model was trained with the rank %s' % best_rank

For rank 4 the RMSE is 0.955539621074
For rank 8 the RMSE is 0.961337186578
For rank 12 the RMSE is 0.955331367921
The best model was trained with the rank 12


In [9]:
#compare real values with predictions
rates_and_preds.take(3)

[((63, 2987), (4.5, 3.230462074482902)),
 ((353, 4387), (2.5, 2.5613337280152444)),
 ((472, 2014), (3.0, 2.819901745338644))]

In [10]:
#let's test the selected model
model = ALS.train(training_RDD,best_rank,seed=seed,iterations=iterations,lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0],r[1]),r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]),int(r[1])),float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print 'For testing data the RMSE is %s' % (error)

For testing data the RMSE is 0.952936492902


## Using the complete dataset to build the final model

In [11]:
complete_ratings_file = os.path.join(datasets_path,'ml-latest','ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
sc.setCheckpointDir('/home/vcp/Documents/ML_SPARK/Recomm_system/checkpoint/')
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!= complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens:(int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
print 'There are %s recommendations in the complete dataset' % (complete_ratings_data.count())

There are 24404096 recommendations in the complete dataset


In [None]:
#train the complete dataset
training_RDD,test_RDD = complete_ratings_data.randomSplit([7,3],seed=0L)
complete_model = ALS.train(training_RDD,best_rank,seed=seed,iterations=iterations,lambda_=regularization_parameter)

In [33]:
#train the complete dataset
training_RDD,test_RDD = complete_ratings_data.randomSplit([7,3],seed=0L)
complete_model = ALS.train(training_RDD,best_rank,seed=seed,iterations=iterations,lambda_=regularization_parameter)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 883, in send_command
    response = connection.send_command(command)
  File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1040, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
Py4JNetworkError: Error while receiving


Py4JError: An error occurred while calling o629.partitions

In [12]:
#test on the testing set
test_for_predict_RDD= test_RDD.map(lambda x: (x[0],x[1]))
predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0],r[1]),r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]),int(r[1])),float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print 'For testing data the RMSE is %s' % (error)

For testing data the RMSE is 0.817353031097


In [13]:
#Big impreovement by only using the dataset with more data

In [16]:
#making recommendations
#Let's first load the movies complete file for later use
complete_movies_raw_data = sc.textFile('file:///home/cloudera/Documents/ML_SPARK/ml-latest/movies.csv')
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()
complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
print 'There are %s movies in the complete dataset' % (complete_movies_titles.count())

There are 40110 movies in the complete dataset


In [82]:
#as we want to give recommendations of movies with a certain minimum number of ratings, we need to count the number of
#ratings per movie
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings,float(sum(float(x) for x in ID_and_ratings_tuple[1]))/nratings)
movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1],x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD= movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD= movie_ID_with_avg_ratings_RDD.map(lambda x: (int(x[0]),x[1][0]))

In [18]:
#Adding new user ratings
#Now we need to rate some movies for the new user. We will put them in a new RDD and we will use the user ID 0,
#that is not assigned in the MovieLens dataset.
new_user_ID = 0
#the format of each line is (userID,movieID,rating)
new_user_ratings = [
    (0,260,4), #Star Wars
    (0,1,3),   #Toy Story
    (0,16,3),  #Casino
    (0,25,4),  #Leaving Las Vegas
    (0,32,4),  #Twelve Monkeys
    (0,335,1), #Flinstones, the
    (0,379,1), #Timecop
    (0,858,5), #Godfather, The
    (0,50,4)   #Usual Suspects, The
]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print 'New user ratings: %s' % new_user_ratings_RDD.take(10)

New user ratings: [(0, 260, 4), (0, 1, 3), (0, 16, 3), (0, 25, 4), (0, 32, 4), (0, 335, 1), (0, 379, 1), (0, 858, 5), (0, 50, 4)]


In [20]:
#Now we add them to the data we will use to train our recommender model. We use Spark's union() for this
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [23]:
#And finally we train the ALS model using all the parameters we selected before (when using the small dataset)
from time import time
t0=time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD,best_rank,seed=seed,iterations=iterations,
                             lambda_=regularization_parameter)
tt= time() - t0
print 'New model train in %s seconds' % round(tt,3)

New model train in 465.71 seconds


In [24]:
#It took a while. We would need to repeat that every time a user adds new ratings. Ideally we would do this in batches,
#and not every single rating that ocmes into the system for every user

In [25]:
#Getting top recommendations
#Let's now get some recommendations! For that we will get an RDD with all the movies the new user hasn't rated yet. We
#will pu them together with the model to predict ratings

In [26]:
new_user_ratings_ids = map(lambda x: x[1],new_user_ratings)#get just movie IDs. Keep just those not on the ID list
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids)\
                              .map(lambda x: (new_user_ID,x[0])))
#use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [84]:
#we have our recommendations ready. Now ew can print out the 25 movies with the highest predicted ratings. And join
#them with the movies RDD to get the titles, and ratings count in order to get movies with a minimum of counts.
#transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD= new_user_recommendations_RDD.map(lambda x: (x.product,x.rating))
new_user_recommendations_rating_title_and_count_RDD=new_user_recommendations_rating_RDD\
    .join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(67704, ((1.5516317518367981, u'Too Many Girls (1940)'), 2)),
 (30721, ((2.54679722756578, u'Hell Is for Heroes (1962)'), 54)),
 (92163, ((2.911432418448211, u'Fados (2007)'), 5))]

In [85]:
#we need to flat this down a bit in order to have (Title,Rating,Ratings Count)
new_user_recommendations_rating_title_and_count_RDD=new_user_recommendations_rating_title_and_count_RDD\
    .map(lambda r: (r[1][0][1],r[1][0][0],r[1][1]))

In [87]:
#finally, get the highest rated recommendations for the new user, filtering out movies with less than 25 ratings
top_movies=new_user_recommendations_rating_title_and_count_RDD.filter(lambda r:r[2]>=25)\
    .takeOrdered(25,key=lambda x:-x[1])
print ('TOP recommended movies (with more than 25 reviews):\n%s') % '\n'.join(map(str,top_movies))

TOP recommended movies (with more than 25 reviews):
(u'Beastie Boys: Sabotage (1994)', 4.230875480408205, 28)
(u"Long Night's Journey Into Day (2000)", 4.180721740547556, 35)
(u'Pulp Fiction (1994)', 4.16112621133422, 83523)
(u'Heimat - A Chronicle of Germany (Heimat - Eine deutsche Chronik) (1984)', 4.091140734620533, 31)
(u'"Godfather: Part II', 4.05359763667845, 34508)
(u'Frozen Planet (2011)', 4.011833420838773, 239)
(u'The War (2007)', 3.979016636774192, 34)
(u'O.J.: Made in America (2016)', 3.930796489509352, 41)
(u'Long Way Round (2004)', 3.928982437959138, 29)
(u'Wanderers (2014)', 3.8950620320889637, 41)
(u'Apocalypse Now (1979)', 3.8912439666328598, 26465)
(u'Milius (2013)', 3.8507362893279593, 29)
(u'Band of Brothers (2001)', 3.8487608892220133, 7833)
(u'Goodfellas (1990)', 3.8418881491963535, 32025)
(u'Pinchcliffe Grand Prix (Fl\xe5klypa Grand Prix) (1975)', 3.831509606489127, 28)
(u'"Classe am\xe9ricaine', 3.8211027927815797, 32)
(u'Bill Hicks: Sane Man (1989)', 3.82039377

In [88]:
#Getting individual ratings
#Another useful usecase is getting the predicted rating for a particular movie for a given user
my_movie = sc.parallelize([(0,500)]) #Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(my_movie)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=500, rating=1.9548165387003076)]

In [90]:
#persisting the model
from pyspark.mllib.recommendation import MatrixFactorizationModel
model_path = "file:///home/cloudera/Documents/ML_SPARK/Recomm_system/movie_lens_als"
model.save(sc,model_path)
#same_model=MatrixFactorizationModel.load(sc,model_path) --> to load the model