In [1]:
import os
import findspark
findspark.init()
import pyspark
from pyspark.mllib.recommendation import ALS
import math

In [2]:
sc = pyspark.SparkContext()

In [3]:
s_file = os.path.join('ml-latest-small', 'ratings.csv')
s_ratings_raw_data = sc.textFile(s_file)
s_ratings_raw_data_header = s_ratings_raw_data.take(1)[0]

In [4]:
s_ratings_RDD = s_ratings_raw_data.filter(lambda line: line!=s_ratings_raw_data_header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0], tokens[1],tokens[2])).cache()

In [5]:
s_ratings_RDD.take(2)

[('1', '31', '2.5'), ('1', '1029', '3.0')]

In [6]:
# now for the movie csv

In [7]:
s_movies_file = os.path.join('ml-latest-small', 'movies.csv')
s_movies_raw_data = sc.textFile(s_movies_file)
s_movies_raw_data_header = s_movies_raw_data.take(1)[0]

In [8]:
s_movies_RDD = s_movies_raw_data.filter(lambda line: line!=s_movies_raw_data_header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0], tokens[1])).cache()

In [9]:
s_movies_RDD.take(2)

[('1', 'Toy Story (1995)'), ('2', 'Jumanji (1995)')]

In [10]:
# we have the data in the RDDs and now can apply collaborative filtering (using Alternating least squares)

In [11]:
training_RDD, validation_RDD, testing_RDD = s_ratings_RDD.randomSplit([6, 2, 2])
val_predict_RDD = validation_RDD.map(lambda l: (l[0],l[1]))
test_predict_RDD = testing_RDD.map(lambda l: (l[0],l[1]))

In [12]:
# seed = 5L
iterations = 10
regularization_parameter = 0.1

model = ALS.train(training_RDD, rank=8, iterations=iterations, lambda_=regularization_parameter)

In [13]:
predictions = model.predictAll(val_predict_RDD).map(lambda x: ((x[0], x[1]), x[2]))
score = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(score.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print('RMSE is %s',error)

RMSE is %s 0.9466826998590147


In [14]:
predictions.take(2)

[((547, 45208), 1.6924119862277949), ((311, 45208), 1.5228682494760033)]

In [16]:
from pyspark.mllib.recommendation import MatrixFactorizationModel
model_path = os.path.join('models', 'movie_lens_als')
model.save(sc, model_path)
same_model = MatrixFactorizationModel.load(sc, model_path)