In [1]:
import math
import pyspark as ps
from pyspark.sql import SQLContext
from pyspark.mllib.recommendation import ALS


# create spark session
# use local[7] on ec2 instance
spark = ps.sql.SparkSession.builder \
          .master('local[4]')  \
          .appName('rpg_rec') \
          .getOrCreate() \

In [2]:
# load in the ratings data
df = spark.read.csv('../data/ratings.csv',
                       header=False,
                       sep='|',
                       inferSchema=True)

In [3]:
# convert to rdd...
data_rdd = df.rdd

# tarin/test sets...
training_RDD, validation_RDD, test_RDD = data_rdd.randomSplit([6, 2, 2])
# training_RDD, test_RDD = data_rdd.randomSplit([7, 3])
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [4]:
# run a recommender...
# Parameters
seed = 5L
iterations = 10
regularization_parameter = 0.1
ranks = [a for a in xrange(4,30,2)]
errors = [0 for a in ranks]
err = 0
tolerance = 0.02

In [11]:
min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    last = ''
    try:
        last = 'model'
        model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)
        last = 'predicions'
        predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
        last = 'rates & predicions'
        rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
        last = 'errors'
        error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
        errors[err] = error
        err += 1
        print 'For rank %s the RMSE is %s' % (rank, error)
        if error < min_error:
            min_error = error
            best_rank = rank
    except:
        print 'error at {} with rank {}'.format(last, rank)

print 'The best model was trained with rank %s' % best_rank

For rank 4 the RMSE is 0.901505880985
For rank 6 the RMSE is 0.84211614967
error at errors with rank 8
error at errors with rank 10
error at errors with rank 12
error at errors with rank 14
error at predicions with rank 16
error at errors with rank 18
error at errors with rank 20
error at errors with rank 22
error at errors with rank 24
error at errors with rank 26
error at errors with rank 28
The best model was trained with rank 6


In [6]:
rates_and_preds.take(3)

[((125213, 31), (1.0, 0.18715964942219884)),
 ((327227, 105), (1.0, 0.9991674961583612)),
 ((158755, 0), (2.0, 0.9009769803647689))]

In [7]:
training_RDD, test_RDD = data_rdd.randomSplit([7, 3], seed=0L)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

In [8]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print 'For testing data the RMSE is %s' % (error)

For testing data the RMSE is 2.23686618525


In [10]:
rates_and_preds.take(12)

[((340380, 113), (1.0, -0.14438680609851456)),
 ((1811, 113), (1.0, 1.6106886265138605)),
 ((428735, 72), (1.0, 0.3867859856321298)),
 ((70083, 141), (4.0, 0.5893066692050223)),
 ((180289, 24), (1.0, 0.9525949638509973)),
 ((25064, 105), (1.0, 0.37817178898650666)),
 ((294251, 18), (1.0, 1.4180347217366158)),
 ((110899, 19), (1.0, -0.10075682933664337)),
 ((375340, 105), (1.0, 0.4547428648449171)),
 ((592892, 141), (1.0, 0.23472907686081446)),
 ((76393, 113), (2.0, 0.5539527336643408)),
 ((236911, 105), (1.0, 0.28169569828579627))]