[Building a Movie Recommendation Service with Apache Spark & Flask/Python](https://www.codementor.io/jadianes/building-a-recommender-with-apache-spark-python-example-app-part1-du1083qbw)

In [1]:
import os
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('20170214_Spark_Recommender_Python').setMaster("local[2]")
sc = SparkContext(conf=conf)
sc.version

'2.1.0'

In [2]:
# load the raw ratings data
datasets_path = '/home/fred/data'
small_ratings_file = os.path.join(datasets_path, 'amazon', 'ratings_Baby.csv')
small_ratings_raw_data = sc.textFile(small_ratings_file)
# small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0] # ain't no header in my data

In [3]:
# this method has collisions (e.g. using hash there are 64391 but just taking the last 8 digits there are 64368)
def str2int(s):
    return abs(hash(s)) % (10 ** 8)

# parse the raw data into a new RDD
small_ratings_data = (small_ratings_raw_data
    #.filter(lambda line: line != small_ratings_raw_data_header)
    .map(lambda line: line.split(","))
    .map(lambda tokens: (str2int(tokens[0]), str2int(tokens[1]), tokens[2])).cache())

In [4]:
# for illustrative purposes, take the first few lines of our RDD to see the result
small_ratings_data.take(3)

[(32140752, 92008734, '5.0'),
 (99567681, 82257884, '5.0'),
 (15071229, 80643384, '4.0')]

In [5]:
# split it into train, validation, and test datasets
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
training_for_predict_RDD = training_RDD.map(lambda x: (x[0], x[1]))
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [6]:
# proceed with the training phase
from pyspark.mllib.recommendation import ALS
import math
import itertools

seed = 5
iterations = 10
regularization_parameters = [0.3, 0.6]
ranks = [18, 22]
tolerance = 0.02

def rmse(data_RDD, data_for_predict_RDD):
    predictions = model.predictAll(data_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    #predictions.take(3) # [Managed Memory Leak Msg Should Only Be a Warning](https://issues.apache.org/jira/browse/SPARK-14168)
    rates_and_preds = data_RDD.map(lambda r: ((r[0], r[1]), float(r[2]))).join(predictions)
    #rates_and_preds.take(5)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    return (error, rates_and_preds)

min_error = float('inf')
best = -1
for rp, rank in itertools.product(regularization_parameters, ranks):
    model = ALS.train(training_RDD,
                      rank, # number of latent factors in the model
                      seed=seed,
                      iterations=iterations,
                      lambda_=rp) # lambda_ specifies the regularization parameter in ALS
    
    terror = rmse(training_RDD, training_for_predict_RDD)[0]
    verror = rmse(validation_RDD, validation_for_predict_RDD)[0]
    print('For rp/rank {} the training RMSE is {} and validation RMSE is {}'.format((rp, rank), terror, verror))
    if verror < min_error:
        min_error = verror
        best = (rp, rank)

print('The best model was trained with rp/rank {}'.format(best))

For rp/rank (0.3, 18) the training RMSE is 0.4071467269658386 and validation RMSE is 2.1518842574090336
For rp/rank (0.3, 22) the training RMSE is 0.40581488045020164 and validation RMSE is 2.1221218722563933
For rp/rank (0.6, 18) the training RMSE is 0.7295444804815372 and validation RMSE is 2.1107169701492996
For rp/rank (0.6, 22) the training RMSE is 0.7300551267537654 and validation RMSE is 2.075655216948862
The best model was trained with rp/rank (0.6, 22)


In [7]:
model = ALS.train(training_RDD, best[1], seed=seed, iterations=iterations, lambda_=best[0])
(error, rates_and_preds) = rmse(test_RDD, test_for_predict_RDD)
print('For testing data the RMSE is {}'.format(error)) # 2.0735

For testing data the RMSE is 2.0735024116449736


In [8]:
rates_and_preds.take(10)

[((11139009, 52261773), (5.0, 1.2892196214442693)),
 ((55849902, 14036664), (5.0, 1.226805107805887)),
 ((94035557, 49751529), (1.0, 2.0351829362849623)),
 ((83221295, 41831799), (5.0, 3.1790315566727143)),
 ((77215656, 1873142), (5.0, 3.6003108064750045)),
 ((86965400, 48183970), (5.0, 3.535596767059843)),
 ((50675945, 27475733), (5.0, -0.5908317753357553)),
 ((79618307, 40660303), (4.0, 3.478527167949731)),
 ((46292038, 25687912), (5.0, 3.0671195044435495)),
 ((16987148, 18698726), (4.0, 0.8906650441849951))]

In [9]:
(terror, rates_and_preds) = rmse(training_RDD, training_for_predict_RDD)
print('For training data the RMSE is {}'.format(terror))

For training data the RMSE is 0.7300551267537654


In [10]:
rates_and_preds.take(10)

[((56503762, 17061952), (5.0, 4.348035443906253)),
 ((58026780, 29894974), (5.0, 4.392538989692055)),
 ((94728161, 32828469), (5.0, 2.8982636681644807)),
 ((26470646, 49417676), (1.0, 0.8467392719615738)),
 ((144989, 7223697), (5.0, 4.398401924016896)),
 ((55306509, 72726565), (5.0, 4.402950752519727)),
 ((97616838, 96379596), (4.0, 3.5059729346672937)),
 ((79595097, 29276793), (5.0, 4.098479102578497)),
 ((9701402, 64975656), (2.0, 1.6877662435124414)),
 ((2103967, 27619211), (1.0, 0.8360267995153563))]

"That is, the recommender needs to be trained every time we have new user ratings (although a single model can be used by multiple users of course!)."

"Due to its very nature, collaborative filtering is a costly procedure since requires updating its model when new user preferences arrive. Therefore, having a distributed computation engine such as Spark to perform model computation is a must in any real-world recommendation engine like the one we have built here."