# Collaborative filtering recommender system

## Dataset description

In this assignment you will use Apache Spark to build simple movie recommendation system on big MovieLens 20M dataset (https://grouplens.org/datasets/movielens/). The dataset contains 20 million ratings for 27000 movies given by 138000 users. 

Dataset format: comma-separated values (https://en.wikipedia.org/wiki/CSV)

The first line is a header:
userId,movieId,rating,timestamp

_userId_, _movieId_ are integers representing user and movies identifiers correspondingly

_rating_ is a floating point number from 1.0 to 5.0

_timestamp_ is an integer but it won’t be used in the assignment :)

## Reading dataset

Init spark session

In [1]:
from __future__ import division, print_function, unicode_literals

In [2]:
from pyspark.sql import SparkSession

spark_session = SparkSession.builder \
    .enableHiveSupport() \
    .appName("recSys")\
    .master("local[4]")\
    .getOrCreate()
sc = spark_session.sparkContext

In [3]:
# You can now use spark session or spark context to read csv.
raw_data = sc.textFile('ratings.csv')

In [4]:
# remove header & timeID column
raw_data_header = raw_data.take(1)[0]
print("header", raw_data_header)
movies_data = raw_data.filter(lambda line: line!=raw_data_header)\
                      .map(lambda line: line.split(","))\
                      .map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2]))).cache()
print(movies_data.take(3))

header userId,movieId,rating,timestamp
[(1, 2, 3.5), (1, 29, 3.5), (1, 32, 3.5)]


In this assignment you will use collaborative filtering approach for making predictions. To find latent vector representation for users and items we suggest you to use explicit ALS method. Refer to the Spark MLLib documentation for Python API details: https://spark.apache.org/docs/2.1.0/api/python/pyspark.mllib.html

Split dataset into three folds: on each iteration one fold will be used for testing and the other two folds will be used for training. Wrap necessary values with _Rating_ class.

In [5]:
from pyspark.mllib.recommendation import Rating
import math
fold_count = 3

In [8]:
folds = movies_data.randomSplit([1.0/fold_count for i in range(fold_count)], seed=0)

Recommendations: to make your solution more efficient prepare RDDs for all the folds beforehand running the training process.

## Training ALS model

Now it is time to choose training parameters. It is recommended to set the rank equal to 20,
regularization term lambda equal to 0.01 and the number of iterations equal to 5.
Feel free to experiment and choose your own parameters to see how they influence the final score.

In [11]:
rank = 20
iterations = 10
lambda_ = 0.3

Train the explicit ALS model on two of three folds and evaluate its performance on remaining test fold. Performance evaluation should be done using the classic RMSE metric.

In [12]:
scores = []

In [9]:
from pyspark.mllib.recommendation import ALS

In [12]:
for rank in [5, 10, 15, 20, 25, 30]:
    for iterations in [5, 10, 15]:
        for lambda_ in [0.01, 0.03, 0.1, 0.3]:
            scores = []
            
            for test_fold_index in range(fold_count):

                testRDD = folds[test_fold_index]
                indices = list(range(fold_count))
                indices.remove(test_fold_index)

                # get userID + movieID
                testSet = testRDD.map(lambda r: (r[0], r[1]))
                trainRDD = sc.union([folds[i] for i in indices])

                model = ALS.train(ratings=trainRDD, rank=rank, iterations=iterations, lambda_=lambda_)

                # get prediction
                predictions = model.predictAll(testSet).map(lambda r: ((r[0], r[1]), r[2]))
                # join with testSet
                rates_and_preds = testRDD.map(lambda r: ((r[0], r[1]), r[2]))\
                                     .join(predictions)

                # calculate RMSE
                error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
                scores.append(error)
                #     pass # Your code goes here
                
            print ("rank: %d, iterations: %d, lambda_: %f, RMSE: %f" % (rank, iterations, lambda_, sum(scores) / len(scores)))

rank: 5, iterations: 5, lambda_: 0.010000, RMSE: 0.825655
rank: 5, iterations: 5, lambda_: 0.030000, RMSE: 0.824144
rank: 5, iterations: 5, lambda_: 0.100000, RMSE: 0.819327
rank: 5, iterations: 5, lambda_: 0.300000, RMSE: 0.887157
rank: 5, iterations: 10, lambda_: 0.010000, RMSE: 0.818208
rank: 5, iterations: 10, lambda_: 0.030000, RMSE: 0.816474
rank: 5, iterations: 10, lambda_: 0.100000, RMSE: 0.816076
rank: 5, iterations: 10, lambda_: 0.300000, RMSE: 0.906710
rank: 5, iterations: 15, lambda_: 0.010000, RMSE: 0.814075
rank: 5, iterations: 15, lambda_: 0.030000, RMSE: 0.811254
rank: 5, iterations: 15, lambda_: 0.100000, RMSE: 0.815826
rank: 5, iterations: 15, lambda_: 0.300000, RMSE: 0.907917
rank: 10, iterations: 5, lambda_: 0.010000, RMSE: 0.825239
rank: 10, iterations: 5, lambda_: 0.030000, RMSE: 0.817640
rank: 10, iterations: 5, lambda_: 0.100000, RMSE: 0.817744
rank: 10, iterations: 5, lambda_: 0.300000, RMSE: 0.884754
rank: 10, iterations: 10, lambda_: 0.010000, RMSE: 0.816148


Average RMSE scores on all the three folds and output the result to stdout.
You could refer to publicly available benchmarks (e.g. http://mymedialite.net/examples/datasets.html)
to find out what score to expect.

In [15]:
score = sum(scores) / len(scores)

In [16]:
score
# 0.9471727528537421
# 0.945164679413495
# 0.9435918668544166
# 0.9427975749365988

0.9409765230362396