In [None]:
# Dataset: Movielens
# Environment: Apache Spark, Python, AWS EC2 m4.large 
# Movie recommendation engine built using Collaborative filtering with explicit feedback


# Collaborative filtering is commonly used for recommender systems. 
# These techniques aim to fill in the missing entries of a user-item association matrix. 
# spark.mllib currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. 
# spark.mllib uses the alternating least squares (ALS) algorithm to learn these latent factors. 
# The implementation in spark.mllib has the following parameters:

# numBlocks is the number of blocks used to parallelize computation.
# rank is the number of latent factors in the model.
# iterations is the number of iterations of ALS to run. 
# ALS typically converges to a reasonable solution in 20 iterations or less.
# lambda specifies the regularization parameter in ALS.


In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession,SQLContext
import os

In [2]:
sc = SparkContext()
sqlContext = SQLContext(sc)

In [9]:
ratings_raw_data = sc.textFile("ml-20m/ratings.csv")
movies_raw_data = sc.textFile("ml-20m/movies.csv")
ratings_data_header = ratings_data.take(1)[0]
movies_data_header = movies_data.take(1)[0]

#creating ratings and movies RDDs and storing the headers in another RDD 

In [20]:
ratings_raw_data.first()

#checking the first line which is the header

u'userId,movieId,rating,timestamp'

In [15]:
ratings_data = ratings_raw_data.filter(lambda line: line!=ratings_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()
    
# reading the raw ratings data without header, mapping and creating a cached RDD 
    
movies_data = movies_raw_data.filter(lambda line: line!=movies_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens:(tokens[0],tokens[1],tokens[2])).cache()
    
# reading the raw movies data without header, mapping and creating a cached RDD

In [16]:
training_RDD, validation_RDD, test_RDD = ratings_data.randomSplit([6, 2, 2], seed=0L)

#create a transformation to split the ratings RDD in to training, validation and test RDDs

validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))

#create a transformation to prepare validation set

test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

#create a transformation to prepare test set


In [17]:
ratings_data.first()

#action to make sure header is removed

(u'1', u'2', u'3.5')

In [19]:
from pyspark.mllib.recommendation import ALS
import math

#import Alternating least squares from pyspark mllib 

seed = 5L
iterations = 8
ranks = [3, 6, 9]
regularization_parameter = 0.1
errors = [0, 0, 0]
err = 0
tolerance = 0.02

#set hyperparameter values or ranges

min_error = float('inf')
best_rank = -1
best_iteration = -1

for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,lambda_=regularization_parameter)
#training the model
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda x: ((x[0], x[1]), x[2]))
#predicting the results using validation set
    rates_and_preds = validation_RDD.map(lambda x: ((int(x[0]), int(x[1])), float(x[2]))).join(predictions)
#joining actual ratings and predictions    
    error = math.sqrt(rates_and_preds.map(lambda x: (x[1][0] - x[1][1])**2).mean())
#calculate the RMSE 
    errors[err] = error
    err += 1

    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < min_error:
        min_error = error
        best_rank = rank

print 'model with the best rank is %s' % best_rank



For rank 3 the RMSE is 0.827672311062
For rank 6 the RMSE is 0.814785432209
For rank 9 the RMSE is 0.816497886269
model with the best rank is 6


In [8]:
predictions.take(5)

#check predictions

[((18624, 1233), 4.304325179879104),
 ((18624, 3617), 2.295641218397508),
 ((18624, 103249), 2.9205323565716235),
 ((18624, 4545), 2.469724193933608),
 ((18624, 91306), 3.8818960894869847)]

In [9]:
rates_and_preds.take(5)
#compare predictions and actual ratings 


[((13690, 1208), (3.0, 3.1904670577022287)),
 ((106110, 2420), (3.5, 3.727345381968274)),
 ((62497, 1653), (5.0, 4.092835245815293)),
 ((75178, 1296), (4.0, 4.105249382340389)),
 ((117371, 911), (5.0, 3.8284228092195365))]

In [10]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)

predictions = model.predictAll(test_for_predict_RDD).map(lambda x: ((x[0], x[1]), x[2]))

# testing the model using test set 

rates_and_preds = test_RDD.map(lambda x: ((int(x[0]), int(x[1])), float(x[2]))).join(predictions)

#joining actual test set ratings and predictions to compare the results

Error = math.sqrt(rates_and_preds.map(lambda x: (x[1][0] - x[1][1])**2).mean())

#calculate the RMSE error 

print 'testing data RMSE is %s' % (Error)



For testing data the RMSE is 0.815253154176


In [21]:

complete_movies_titles = movies_data.map(lambda x: ((x[0]),x[1]))
    
print "There are %s movies in the complete dataset" % (complete_movies_titles.count())

There are 27278 movies in the complete dataset


In [22]:
def counts_and_averages(ID_ratings_tuple):
    nratings = len(ID_ratings_tuple[1])
    return ID_ratings_tuple[0], (nratings, float(sum(float(x) for x in ID_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (ratings_data.map(lambda x: (float(x[1]), x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [23]:
movie_ID_with_avg_ratings_RDD.take(5)

[(122880.0, (1, 3.0)),
 (118784.0, (1, 3.5)),
 (27824.0, (6, 3.5833333333333335)),
 (16.0, (17394, 3.7874554444061173)),
 (122480.0, (2, 1.5))]

In [24]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,4), 
     (0,1,3), 
     (0,16,3), 
     (0,25,4), 
     (0,32,4), 
     (0,335,1), 
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print 'New user ratings: %s' % new_user_ratings_RDD.take(10)

New user ratings: [(0, 260, 4), (0, 1, 3), (0, 16, 3), (0, 25, 4), (0, 32, 4), (0, 335, 1), (0, 379, 1), (0, 296, 3), (0, 858, 5), (0, 50, 4)]


In [25]:
complete_data_with_new_ratings_RDD = ratings_data.union(new_user_ratings_RDD)

# union new users ratings with the original ratings data

In [27]:
from time import time

start_time = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
total_time = time() - start_time

print "New model trained in %s seconds" % round(total_time,3)

#train the model again with the new data and hyperparameters from the training model

New model trained in 291.234 seconds


In [29]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings)
# get new user movie IDs and keep just those not on the ID list 
new_user_unrated_movies_RDD = (complete_data_with_new_ratings_RDD.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [31]:
#Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)

new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)


new_user_recommendations_rating_title_and_count_RDD.take(5)


In [32]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda x: (x[1][0][1], x[1][0][0], x[1][1]))

In [34]:
top_movies = new_user_recommendations_rating_RDD.filter(lambda x: x[0]>=50 ).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 50 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))



In [24]:
my_movie = sc.parallelize([(8, 1208),(7,500)])
individual_movie_rating_RDD = final_model.predictAll(my_movie)
individual_movie_rating_RDD.collect()

[Rating(user=8, product=1208, rating=3.5836688392131775),
 Rating(user=7, product=500, rating=3.348774159532742)]

In [23]:
from pyspark.mllib.recommendation import MatrixFactorizationModel

# Save and load model
new_ratings_model.save(sc,'~/freecom')
final_model = MatrixFactorizationModel.load(sc, '~/freecom')