# Collaborative filtering in Pyspark:

spark.ml currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. spark.ml uses the alternating least squares (ALS) algorithm to learn these latent factors. ALS is a Matrix factorization approach which implements a recommendation algorithm. 

Your data needs to be in a specific format to work with spark.ml ALS.



# Project:


The dataset we use is the [movielens dataset](https://grouplens.org/datasets/movielens/). 

More dataset examples can be found at https://gist.github.com/entaroadun/1653794

### Dataset detail:

62,000 movies ranked by 162,000 users 

**'ratings.csv': **

All ratings are contained in the file 'ratings.csv'. 

Each line of this file represents one rating of one movie by one user, and has the following format:

    userId,movieId,rating,timestamp



**'movies.csv': **

Movie information is contained in the file 'movies.csv'. Each line of this file represents one movie, and has the following format:

    movieId,title,genres
    
**'genome-scores.csv': **

Each movie has a value for *every* tag in the genome. The tag genome encodes how strongly movies exhibit particular properties represented by tags (atmospheric, thought-provoking, realistic, etc.). The file 'genome-scores.csv' contains movie-tag relevance data in the following format:

    movieId,tagId,relevance

### Objective: 

Split to train-test then apply a **collaborative filtering algorithm** to predict the rank of test data.

<!--(Content based algorithm can be applied using genome-scores.csv as features)-->

<!-- explicit vs implicit collaborative filtering. Example for implicit CF see https://towardsdatascience.com/large-scale-jobs-recommendation-engine-using-implicit-data-in-pyspark-ccf8df5d910e -->

### Note:

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('MLlibRecommenderSys').getOrCreate()

In [2]:
from pyspark.ml.recommendation import ALS

In [3]:
from pyspark.ml.evaluation import RegressionEvaluator

In [4]:
data = spark.read.csv('./ml-25m/ratings.csv', header=True, inferSchema=True)

In [5]:
data.head(2)

[Row(userId=1, movieId=296, rating=5.0, timestamp=1147880044),
 Row(userId=1, movieId=306, rating=3.5, timestamp=1147868817)]

In [6]:
data.describe().show()

+-------+-----------------+------------------+------------------+--------------------+
|summary|           userId|           movieId|            rating|           timestamp|
+-------+-----------------+------------------+------------------+--------------------+
|  count|         25000095|          25000095|          25000095|            25000095|
|   mean|81189.28115381162|21387.981943268616| 3.533854451353085|1.2156014431215513E9|
| stddev|46791.71589745776| 39198.86210105973|1.0607439611423535| 2.268758080595386E8|
|    min|                1|                 1|               0.5|           789652009|
|    max|           162541|            209171|               5.0|          1574327703|
+-------+-----------------+------------------+------------------+--------------------+



In [7]:
# train/test split
train, test = data.randomSplit([0.8, 0.2])

In [8]:
# define a recommendation model instance using ALS 
als = ALS(maxIter=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")

In [9]:
#fit the model on train data
als_model = als.fit(train)

In [10]:
test_result = als_model.transform(test)

In [11]:
test_result.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
| 32855|    148|   4.0|1029309135|  2.993903|
|151614|    148|   1.0| 878170956| 2.8919578|
|131173|    148|   2.0| 842275770| 2.7699504|
| 14831|    148|   3.0| 944148276| 3.0077996|
| 98520|    148|   4.0|1034547175|  2.691712|
|145182|    148|   3.0| 944952722| 2.8496423|
| 62058|    148|   3.0| 832006144| 3.1205623|
| 80974|    148|   3.5|1138041704| 2.7939594|
| 29095|    148|   3.0| 944947868| 2.1337345|
| 41703|    148|   2.0|1311022737|  3.111855|
| 64994|    148|   1.0|1055542421| 2.4774272|
| 84667|    148|   5.0| 832207176| 3.7432034|
|146419|    148|   2.0| 942665835| 3.1447525|
|132310|    148|   3.0| 836248537|  3.123418|
| 75209|    148|   2.0|1361853682|  2.142588|
|138552|    148|   4.0| 829756906|  3.260183|
| 70733|    148|   1.0| 837770520| 3.5192626|
| 88277|    148|   2.0| 839943770| 2.6598005|
| 74794|    148|   3.0| 989050056|

In [12]:
reg_eval = RegressionEvaluator(metricName='rmse', labelCol="rating",predictionCol="prediction")

In [13]:
rmse = reg_eval.evaluate(test_result)

In [14]:
print(f'Computed RMSE  is {rmse}')

Computed RMSE  is nan


## Dealing with nans:

Nans at ALS test time can occur for two reasons: 1- In production, for new users or items that have no rating history; 2- When using simple random splits as in Spark’s CrossValidator or trainValidationSplit, it is very common to encounter users and/or items in the evaluation set that are not in the training set.

By default, Spark assigns NaN predictions during ALSModel.transform when a user and/or item factor is not present in the model. This can be useful **in a production system**, since it indicates a new user or item, and so the system can make a decision on some fallback to use as the prediction.

However, this is undesirable during cross-validation. Spark allows users to set the coldStartStrategy parameter to “drop” in ALS instance definition in order to drop any rows in the DataFrame of predictions that contain NaN values. The evaluation metric will then be computed over the non-NaN data and will be valid. 

Or we can simply drop Nans then compute rmse.

In [15]:
rmse = reg_eval.evaluate(test_result.na.drop())

In [16]:
print(f'Computed RMSE after dropping nans  is {rmse}')

Computed RMSE after dropping nans  is 0.8016113051718181
