In [1]:
from pprint import pprint
from collections import defaultdict
import itertools
import sys
from math import sqrt
from operator import add
from os.path import join, isfile, dirname
from pyspark.mllib.recommendation import ALS,MatrixFactorizationModel

In [3]:
import findspark
findspark.init()

In [4]:
import pyspark

sc = pyspark.SparkContext(appName="recommender")

In [6]:
def to_string(line):
    line=line.split("\t")
    return (str(line[0]),str(line[1]),str(line[2]))

def float_tostring(line):
    return (str(line[0]),str(line[1]),str(line[2]))

def take5(tup_list):
    tup_top5=tup_list[:5]
    return tup_top5

def sort_tup_list(list_tup):
    list_tup.sort(key=lambda x: -x[1])
    return list_tup


music_data=sc.textFile("EvalData/year1_valid_triplets_hidden.txt").take(5000000)
test_sc=sc.textFile("EvalData/year1_valid_triplets_visible.txt").take(100000)

In [7]:
music_data=sc.parallelize(music_data)
test_sc=sc.parallelize(test_sc)

In [9]:
ratings=music_data.map(to_string)

userstoint = ratings.map(lambda (a,b,c): a).distinct().zipWithIndex()
reversemappingofusers = userstoint.map(lambda (a,b) : (b,a))

ratingswithuniqueuserid = ratings.map(lambda (a,b,c) : (a,(b,c))).join(userstoint)

newratings = ratingswithuniqueuserid.map(lambda  (userid,((songid, count), usertoint)) : (usertoint, songid, count))

songstoint = ratings.map(lambda (usertoint, songid, count): songid).distinct().zipWithIndex()

ratingswithuniquesongid = newratings.map(lambda (usertoint,songid,count) : (songid,(usertoint,count))).join(songstoint)

newratings = ratingswithuniquesongid.map(lambda  (songid,((usertoint, count), songtoint)) : (usertoint, songtoint, count))

reversemappingofsongs = songstoint.map(lambda (a,b) : (b,a))

newratings=newratings.map(float_tostring)
newratings=newratings.filter(lambda (a,b,c): c.isdigit())

In [15]:
training_RDD, validation_RDD, test_RDD = newratings.randomSplit([6, 2, 2], seed=0L)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [16]:
seed = 5L
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1

for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < min_error:
        min_error = error
        best_rank = rank

print 'The best model was trained with rank %s' % best_rank

For rank 4 the RMSE is 10.0300958753
For rank 8 the RMSE is 8.73563038913
For rank 12 the RMSE is 8.2602847062
The best model was trained with rank 12


In [17]:
rates_and_preds.take(2)

[((825, 961), (1.0, 1.2534019986313527)),
 ((3141, 38629), (1.0, 0.2514054467539409))]

In [18]:
test_RDD.take(1)

[('6313', '3029', '1')]

In [24]:
"""
test_ratings=test_RDD.map(to_string)
test_userstoint = test_ratings.map(lambda (a,b,c): a).distinct().zipWithIndex()
test_reversemappingofusers = test_userstoint.map(lambda (a,b) : (b,a))
test_ratingswithuniqueuserid = test_ratings.map(lambda (a,b,c) : (a,(b,c))).join(userstoint)
test_newratings = test_ratingswithuniqueuserid.map(lambda  (userid,((songid, count), usertoint)) : (usertoint, songid, count))
test_songstoint = test_ratings.map(lambda (usertoint, songid, count): songid).distinct().zipWithIndex()
test_ratingswithuniquesongid = test_newratings.map(lambda (usertoint,songid,count) : (songid,(usertoint,count))).join(songstoint)
test_newratings = test_ratingswithuniquesongid.map(lambda  (songid,((usertoint, count), songtoint)) : (usertoint, songtoint, count))
test_reversemappingofsongs = test_songstoint.map(lambda (a,b) : (b,a))
test_newratings=test_newratings.map(float_tostring)
test_newratings=test_newratings.filter(lambda (a,b,c): c.isdigit())
"""

'\ntest_ratings=test_RDD.map(to_string)\ntest_userstoint = test_ratings.map(lambda (a,b,c): a).distinct().zipWithIndex()\ntest_reversemappingofusers = test_userstoint.map(lambda (a,b) : (b,a))\ntest_ratingswithuniqueuserid = test_ratings.map(lambda (a,b,c) : (a,(b,c))).join(userstoint)\ntest_newratings = test_ratingswithuniqueuserid.map(lambda  (userid,((songid, count), usertoint)) : (usertoint, songid, count))\ntest_songstoint = test_ratings.map(lambda (usertoint, songid, count): songid).distinct().zipWithIndex()\ntest_ratingswithuniquesongid = test_newratings.map(lambda (usertoint,songid,count) : (songid,(usertoint,count))).join(songstoint)\ntest_newratings = test_ratingswithuniquesongid.map(lambda  (songid,((usertoint, count), songtoint)) : (usertoint, songtoint, count))\ntest_reversemappingofsongs = test_songstoint.map(lambda (a,b) : (b,a))\ntest_newratings=test_newratings.map(float_tostring)\ntest_newratings=test_newratings.filter(lambda (a,b,c): c.isdigit())\n'

In [23]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print 'For testing data the RMSE is %s' % (error)

For testing data the RMSE is 8.52239654355
