# Collaborative filtering recommender system
Dataset description
In this assignment you will use Apache Spark to build simple movie recommendation system on big MovieLens 20M dataset (https://grouplens.org/datasets/movielens/). The dataset contains 20 million ratings for 27000 movies given by 138000 users.

Dataset format: comma-separated values (https://en.wikipedia.org/wiki/CSV)

The first line is a header: userId,movieId,rating,timestamp

* userId, movieId are integers representing user and movies identifiers correspondingly
* rating is a floating point number from 1.0 to 5.0
* timestamp is an integer but it won’t be used in the assignment

The dataset is located at <b>/data/movielens</b>.

The metric for this assignment is RMSE. You have to achieve score lower than 0.9 on the test dataset in order to get the full score for the assignment.

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().master('local').getOrCreate()

In [2]:
df = spark.read \
    .option('header', True) \
    .option('inferSchema', True) \
    .format('csv') \
    .load('/data/movielens/ratings_100k.csv')

#df.printSchema()

Split dataset into three folds: on each iteration one fold will be used for testing and the other two folds will be used for training. Wrap necessary values with Rating class.

Now it is time to choose training parameters. It is recommended to set the rank equal to 20, regularization term lambda equal to 0.01 and the number of iterations equal to 5. Feel free to experiment and choose your own parameters to see how they influence the final score.

Train the explicit ALS model on two of three folds and evaluate its performance on remaining test fold. Performance evaluation should be done using the classic RMSE metric.

Average RMSE scores on all the three folds and output the result to stdout. You could refer to publicly available benchmarks (e.g. http://mymedialite.net/examples/datasets.html) to find out what score to expect.

## pyspark.mllib RDD API
https://spark.apache.org/docs/2.2.0/mllib-collaborative-filtering.html

## pyspark.ml DataFrame API
https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html

In [5]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
class DropNaRegressionEvaluator(RegressionEvaluator):
    def evaluate(self, df):
        return RegressionEvaluator.evaluate(self, df.dropna())

In [None]:
als = ALS(userCol='userId', itemCol='movieId', ratingCol='rating', seed=13)

grid = ParamGridBuilder() \
    .addGrid(als.rank, [20]) \
    .addGrid(als.maxIter, [5]) \
    .addGrid(als.regParam, [0.01]) \
    .build()

evaluator = DropNaRegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

cv = CrossValidator(estimator=als, estimatorParamMaps=grid, evaluator=evaluator, numFolds=3, seed=13) \
    .fit(df)

print(cv.avgMetrics[0])

1.28693504266
