In [1]:
import os 
path = "/home/mr/Grive/01.ALDA/Project-Alda/data/"
from pyspark.mllib.recommendation import ALS
import math

In [2]:
ratings = sc.textFile(os.path.join(path, 'ratings.csv')) \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda a: (a[0], a[1], a[2]))
header = ratings.first()
ratings = ratings.filter(lambda line: line != header)
ratings = ratings.map(lambda x:(int(x[0]), x[1], x[2]))
print ("Total number of ratings : %d"%ratings.count())

Total number of ratings : 100004


In [3]:
movies = sc.textFile(os.path.join(path, 'movies.csv'))\
        .map(lambda line:line.split(","))\
        .filter(lambda line:len(line)>1)\
        .map(lambda a:(a[0],a[1]))
header2 = movies.first()
movies = movies.filter(lambda line: line != header2)
print ("Total number of Movies : %d"%movies.count())

Total number of Movies : 9125


Divide the data into Training data, Validation Data and Test data
Training data is to form the model with different rank everytime
For each time, sum of squared errors is found out using Validation data
Then the rank that gives least SSE is selected

In [4]:
seed = 133
trainData, validationData, testData = ratings.randomSplit([0.6, 0.2, 0.2], seed)
trainData.cache()
validationData.cache()
testData.cache()

PythonRDD[10] at RDD at PythonRDD.scala:43

Ranks chosen = [2,4,8,10]
ALS -> Alternating least squares that finds out latent factors 

In [6]:
rank_sse = {}
for rank in range(2,31):
    model = ALS.train(trainData, rank, seed=seed)
    predictions = model.predictAll(validationData.map(lambda x: (int(x[0]), int(x[1]))))\
                    .map(lambda r: ((r[0], r[1]), r[2]))
    actual_pred = validationData.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    rank_sse[rank] = actual_pred.map(lambda r: (r[1][0] - r[1][1])**2).sum()
print (rank_sse)

{2: 24326.618122816402, 3: 21777.453591149446, 4: 23567.3672340687, 5: 23020.866170513644, 6: 24893.35198115826, 7: 25921.861369149112, 8: 25635.63896171894, 9: 27363.741942835684, 10: 27560.814525813857, 11: 28785.363630246844, 12: 29060.02313750468, 13: 28030.165107063436, 14: 31196.12073286578, 15: 31047.152085549882, 16: 31642.323415871222, 17: 33910.78369945775, 18: 33015.62092946656, 19: 34003.96752356912, 20: 34404.08830447215, 21: 35351.32718963707, 22: 34736.52358675417, 23: 36733.60015864587, 24: 37947.36267098985, 25: 37413.59873166142, 26: 38526.46609801681, 27: 37853.36337572466, 28: 39858.51754487103, 29: 39610.81102243094, 30: 40219.98404160369}


lowest SSE for rank = 3
Construct model with rank = 3
Now SSE is calculated for the test data

In [7]:
model = ALS.train(trainData, rank=3, seed=seed)
predictions = model.predictAll(testData.map(lambda x: (int(x[0]), int(x[1])))).map(lambda r: ((r[0], r[1]), r[2]))
actual_pred = testData.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
sse = actual_pred.map(lambda r: (r[1][0] - r[1][1])**2).sum()  
print 'For testing data the SSE is %s' % (sse)

For testing data the SSE is 21057.0116283


Now a new user with some ratings is added to the ratings data
We predict his ratings to unwatched movies and sort based on predicted rating
and take the top 20 movies 

In [8]:
add_user = sc.parallelize([(0,260,4), (0,1,3), (0,16,3), (0,25,4), (0,32,4), (0,335,1), (0,379,1), (0,296,3),
     (0,858,5), (0,50,4)])
new_data = ratings.union(add_user)
model = ALS.train(new_data, rank=3, seed=133)
ids = add_user.map(lambda x:x[1]).collect()
to_predict = movies.filter(lambda x:int(x[0]) not in ids).map(lambda x:(0, int(x[0])))
predictions = model.predictAll(to_predict)
temp = movies.map(lambda x:(int(x[0]), x[1]))
predictions = predictions.map(lambda x: (x.rating, x.product)).sortByKey(ascending = False).map(lambda x:(x[1], x[0]))
recommend = predictions.take(5)
print (recommend)

[(4630, 19.82755780913979), (6127, 18.234697739663915), (31694, 16.46869434991649), (279, 16.255157252569376), (6797, 16.132769072038457)]


In [9]:
output = sc.parallelize(recommend)
output = output.join(temp).map(lambda x:x[1][1])
output.collect()

[u'Shoot the Moon (1982)',
 u'Bride & Prejudice (2004)',
 u'My Family (1995)',
 u'No Holds Barred (1989)',
 u'Bugsy (1991)']