# Recommendation system based on PySpark

In this tutorial I am going to demonstrate how Spark MLlib can be used for movie recommendation task.

First, let me start from problem statement: goal of any recommendation system is to analyze how users interact with existing system and make some recommendation of what they can buy, listen or watch in the future so that we can make our users happier, and hopefully, increase profitability of system we develop.

There are several approaches to this task due to [wiki](https://en.wikipedia.org/wiki/Recommender_system): 
* Collaborative filtering 
methods are based on collecting and analyzing a large amount of information on users’ behaviors, activities or preferences and predicting what users will like based on their similarity to other users. A key advantage of the collaborative filtering approach is that it does not rely on machine analyzable content and therefore it is capable of accurately recommending complex items such as movies without requiring an "understanding" of the item itself. Collaborative filtering is based on the assumption that people who agreed in the past will agree in the future, and that they will like similar kinds of items as they liked in the past.
* Content-based filtering
methods are based on a description of the item and a profile of the user’s preferences.
In a content-based recommender system, keywords are used to describe the items and a user profile is built to indicate the type of item this user likes. In other words, these algorithms try to recommend items that are similar to those that a user liked in the past (or is examining in the present). In particular, various candidate items are compared with items previously rated by the user and the best-matching items are recommended.
* Hybrid recommender systems
which is combining collaborative filtering and content-based filtering could be more effective in some cases. Hybrid approaches can be implemented in several ways: by making content-based and collaborative-based predictions separately and then combining them; by adding content-based capabilities to a collaborative-based approach (and vice versa); or by unifying the approaches into one mode. 

In this work I will demonstrate how to use Collaborative Filtering approach using Spark MLlib. 

# Dataset

In my case I would like to recommend movies to users, based on dataset provided by [Megogo](https://megogo.net/ru) and competition they hosted at [Kaggle](https://www.kaggle.com/c/megogochallenge). 

I've been participated in this competiton during mlcourse and took 10th place out of 42 teams there. 

Displaying data so that you could undesrtand how it looks like: 

In [3]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
import gc

%env JOBLIB_TEMP_FOLDER=/tmp 
#https://www.kaggle.com/getting-started/45288 - this helps some with 'no space left on device'

print(os.listdir("../input"))

env: JOBLIB_TEMP_FOLDER=/tmp
[]


In [11]:
!cd ../input && ls -la

total 24
drwxr-xr-x 6 root root 4096 Nov 25 05:15 .
drwxr-xr-x 1 root root 4096 Nov 25 05:15 ..
drwxr-xr-x 2 root root 4096 Nov 25 05:15 config
drwxr-xr-x 2 root root 4096 Nov  7 15:34 input
drwxr-xr-x 3 root root 4096 Oct 16 01:33 lib
drwxr-xr-x 3 root root 4096 Nov 25 05:15 working


In [7]:
full_df = pd.read_csv('../input/megogochallenge/train_data_full.csv')
full_df.head()

FileNotFoundError: File b'../input/megogochallenge/train_data_full.csv' does not exist

It's very important not only to recommend movies that user would like to watch. But to recommend movies which user will watch more than on half. It could be observed in `watching_percentage` column. 

Actually it was a goal of competition (to predict movies that will be watched more than on half). 

In [None]:
full_df['watching_percentage'].hist()

We see that most movies are watched till the end.

## Metric

As evaluation metric MAP@10 is used for competiton - it's variations is very common metric for any recommendation tasks. More about it you could read [here](http://sdsawtelle.github.io/blog/output/mean-average-precision-MAP-for-recommender-systems.html).

But in this tutorial I'll use RMSE, which is easier to understand for everyone.

# PySpark implementation

Finally, let's move to PySpark. 
There is 2 ways to install it in Kaggle Kernels: 
* Run magic command, as in cell below
* Or go to Packages and enter pyspark (it will take some time, but this will form your own version of docker and you'll no need to waste time while running next versions of kernel)

In [None]:
!pip install pyspark

To install it locally or at jupyter server follow [this](https://medium.freecodecamp.org/how-to-set-up-pyspark-for-your-jupyter-notebook-7399dd3cb389) instruction. 

First, you have to install Java and Scala. Then set enviroment variables to launch Spark with Python3 and finally install pyspark.

Now importing all needed spark modules. Pay attention how sc and spark variables were initialized. 

In [None]:
import pyspark.sql.functions as sql_func
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.ml.evaluation import RegressionEvaluator

sc = SparkContext('local') #https://stackoverflow.com/questions/30763951/spark-context-sc-not-defined
spark = SparkSession(sc)

Let's read data in Spark format: 

In [None]:
data_schema = StructType([
    StructField('session_start_datetime',TimestampType(), False),
    StructField('user_id',IntegerType(), False),
    StructField('user_ip',IntegerType(), False),
    StructField('primary_video_id',IntegerType(), False),
    StructField('video_id',IntegerType(), False),
    StructField('vod_type',StringType(), False),
    StructField('session_duration',IntegerType(), False),
    StructField('device_type',StringType(), False),
    StructField('device_os',StringType(), False),
    StructField('player_position_min',LongType(), False),
    StructField('player_position_max',LongType(), False),
    StructField('time_cumsum_max',LongType(), False),
    StructField('video_duration',IntegerType(), False),
    StructField('watching_percentage',FloatType(), False)
])
final_stat = spark.read.csv(
    '../input/megogochallenge/train_data_full.csv', header=True, schema=data_schema
).cache()

Let's prepare data for model: 

In [None]:
ratings = (final_stat
    .select(
        'user_id',
        'primary_video_id',
        'watching_percentage',
    )
).cache()

Making a `train_test_split`: 

In [None]:
(training, test) = ratings.randomSplit([0.8, 0.2])

[](http://)Finally build an [ALS](https://dl.acm.org/citation.cfm?id=1608614) model:

In [None]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=2, regParam=0.01, 
          userCol="user_id", itemCol="primary_video_id", ratingCol="watching_percentage",
          coldStartStrategy="drop",
          implicitPrefs=True)
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="watching_percentage",
                                predictionCol="prediction")

rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Let's take a look on parameters, and try to find any improvements. 

Parameters of ALS Model in PySpark realization are following: 
* **NumBlocks** is the number of blocks the users and items will be partitioned into in order to parallelize computation.
* **rank** is the number of latent factors in the model.
* **maxIter** is the maximum number of iterations to run.
* **regParam** specifies the regularization parameter in ALS.
* **implicitPrefs** specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback).
* **alpha** is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0).

## Explicit or implicit? 

The standard approach to matrix factorization based collaborative filtering treats the entries in the user-item matrix as explicit preferences given by the user to the item, for example, users giving ratings to movies.

As we see in our dataset we have a bunch of implicit information like `device_type`, `video_duration`, `device_os`. 
But let's try to use only explicit information and look on RMSE.

In [None]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=2, regParam=0.01, 
          userCol="user_id", itemCol="primary_video_id", ratingCol="watching_percentage",
          coldStartStrategy="drop",
          implicitPrefs=False) #changed param!
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="watching_percentage",
                                predictionCol="prediction")

rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Wow! Our RMSE improved really well!
But remember, if the rating matrix is derived from another source of information (i.e. it is inferred from other signals), you can set `implicitPrefs` to `True` to get better results. 

Could we do more? 
Let me increase `rank`, which is number of latent factor a bit. 

In [None]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(rank=20, #10 was by default
          maxIter=2, regParam=0.01,
          userCol="user_id", itemCol="primary_video_id", ratingCol="watching_percentage",
          coldStartStrategy="drop",
          implicitPrefs=False)
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="watching_percentage",
                                predictionCol="prediction")

rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))


Great! Our score is improved a bit, but not dramatically. 

Here I should say that I am a bit limited by Kaggle Kernels resources (Spark use a lot of RAM!) and of course it's better to experiment with parameters locally.

General recommendations: 
* increase `maxIter` and `rank` checking them on CV of course (may be time and RAM consuming). 
* don't forget about regularization parametr

Okay, now I want to output the recommendations itself.

In [None]:
%%time
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
userRecs.count()
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
movieRecs.count()

Converting Spark data to well-known Pandas could be done easily with `toPandas()` method: 

In [None]:
userRecs_df = userRecs.toPandas()
print(userRecs_df.shape)

movieRecs_df = movieRecs.toPandas()
print(movieRecs_df.shape)

In [None]:
userRecs_df.head()

Here we see movie recommendation and it's score by each user.

In [None]:
movieRecs_df.head()

Here we see recommendation and it's score by each movie.

# Summary

In summary, collaborative filtering is one of the most popular approach to build recommendation system. 
Movie recommendation task become extremely popular after [Netflix competition](https://en.wikipedia.org/wiki/Netflix_Prize) with one million dollar prize, which push Machine Learning a lot in recommender systems field. 

With PySpark, we could get great results in this task just in a few lines of code. 
In production, new problems appeared, on of them is **cold start** problem when we have no any historical information about user, but still have to recommend something.
But that's more than this tutorial would like to cover.

Hope this small intro was useful, feel free to criticize, discuss, and maybe upvote:) 

## References

* [Spark docs](https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html)
* [Megogo challenge baselines](https://github.com/SantyagoSeaman/megogo_challenge_solutions)
* [More on ranking metrics](https://spark.apache.org/docs/2.2.0/api/python/pyspark.mllib.html#pyspark.mllib.evaluation.RankingMetrics)
* [How does netflix recommeder system works](https://help.netflix.com/en/node/100639)



