<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Movie-recommendation" data-toc-modified-id="Movie-recommendation-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Movie recommendation</a></span><ul class="toc-item"><li><span><a href="#Dataset" data-toc-modified-id="Dataset-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Dataset</a></span></li><li><span><a href="#Evaluation-Protocol" data-toc-modified-id="Evaluation-Protocol-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Evaluation Protocol</a></span></li><li><span><a href="#Models" data-toc-modified-id="Models-1.3"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>Models</a></span><ul class="toc-item"><li><span><a href="#ALS" data-toc-modified-id="ALS-1.3.1"><span class="toc-item-num">1.3.1&nbsp;&nbsp;</span><a href="https://spark.apache.org/docs/latest/ml-collaborative-filtering.html#explicit-vs-implicit-feedback" target="_blank">ALS</a></a></span></li><li><span><a href="#Ваша-формулировка" data-toc-modified-id="Ваша-формулировка-1.3.2"><span class="toc-item-num">1.3.2&nbsp;&nbsp;</span>Ваша формулировка</a></span></li></ul></li><li><span><a href="#Evaluation-Results" data-toc-modified-id="Evaluation-Results-1.4"><span class="toc-item-num">1.4&nbsp;&nbsp;</span>Evaluation Results</a></span></li></ul></li></ul></div>

# Movie recommendation

Ваша задача - рекомендация фильмов для пользователей


In [1]:
%matplotlib inline
%config InlineBackend.figure_format ='retina'

import os
import sys
import glob
import pickle
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession


spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName("spark_sql_examples") \
    .config("spark.executor.memory", "12g") \
    .config("spark.driver.memory", "25g") \
    .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [2]:
!pip3.5 install hyperopt

Collecting hyperopt
  Downloading hyperopt-0.2.4-py2.py3-none-any.whl (964 kB)
[K     |████████████████████████████████| 964 kB 677 kB/s eta 0:00:01
Collecting future
  Downloading future-0.18.2.tar.gz (829 kB)
[K     |████████████████████████████████| 829 kB 3.3 MB/s eta 0:00:01
[?25hCollecting cloudpickle
  Downloading cloudpickle-1.4.1-py3-none-any.whl (26 kB)
Collecting tqdm
  Downloading tqdm-4.46.1-py2.py3-none-any.whl (63 kB)
[K     |████████████████████████████████| 63 kB 2.0 MB/s eta 0:00:011
Collecting networkx>=2.2
  Downloading networkx-2.4-py3-none-any.whl (1.6 MB)
[K     |████████████████████████████████| 1.6 MB 4.6 MB/s eta 0:00:01
Installing collected packages: future, cloudpickle, tqdm, networkx, hyperopt
    Running setup.py install for future ... [?25ldone
[?25hSuccessfully installed cloudpickle-1.4.1 future-0.18.2 hyperopt-0.2.4 networkx-2.4 tqdm-4.46.1
You should consider upgrading via the '/usr/bin/python3.5 -m pip install --upgrade pip' command.[0m


In [29]:
from pyspark.sql.window import Window
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.ml.recommendation import ALS
import pandas as pd
from hyperopt import fmin, STATUS_OK, Trials, tpe, hp
from functools import reduce
from scipy import spatial

## Dataset 

`MovieLens-25M`

In [4]:
DATA_PATH = '/workspace/MLBD/data/ml-25m'

RATINGS_PATH = os.path.join(DATA_PATH, 'ratings.csv')
MOVIES_PATH = os.path.join(DATA_PATH, 'movies.csv')
TAGS_PATH = os.path.join(DATA_PATH, 'tags.csv')

In [5]:
!pwd

/workspace/MLBD/recsys/notebooks


In [6]:
!ls /workspace/MLBD/data/ml-25m

genome-scores.csv  links.csv	      movies.csv   README.txt  tmdb.json
genome-tags.csv    ml-25m-README.htm  ratings.csv  tags.csv


In [7]:
import pyspark.sql.functions as F
from pyspark.sql.types import *


ratings_df = sqlContext.read.format("com.databricks.spark.csv") \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('file:///' + RATINGS_PATH)

movies_df = sqlContext.read.format("com.databricks.spark.csv") \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('file:///' + MOVIES_PATH)

In [8]:
ratings_df.printSchema()
print(ratings_df.count())

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)

25000095


In [9]:
movies_df.printSchema()
print(movies_df.count())

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

62423


## Evaluation Protocol

Так как мы хотим оценивать качество разных алгоритмов рекомендаций, то в первую очередь нам нужно определить
* Как разбить данные на `Train`/`Validation`/`Test`;
* Какие метрики использовать для оценки качества.

In [10]:
def split_df(df, train_part, validate_part, test_part):
    n_parts = train_part + validate_part + test_part
    tmp = df.withColumn('tile', F.ntile(n_parts).over(
            Window.orderBy('timestamp') \
                .partitionBy('userId')))

    train = tmp \
        .filter(F.col('tile') <= train_part) \
        .drop('tile') \
        .cache()
    
    validate = tmp \
        .filter(F.col('tile') > train_part) \
        .filter(F.col('tile') <= train_part + validate_part) \
        .drop('tile') \
        .cache()
    
    test = tmp \
        .filter(F.col('tile') > train_part + validate_part) \
        .drop('tile') \
        .cache()
    
    return train, validate, test

train_ratings, validate_ratings, test_ratings = split_df(ratings_df, 8, 1, 1)

In [11]:
train_ratings.count(), validate_ratings.count(), test_ratings.count()

(20123411, 2445623, 2431061)

In [12]:
def calculate_metrics(model, map_fn, df, k_options=[1, 5, 10, 20]):
    users = df \
        .select('userId') \
        .distinct()
    
    targets = df \
        .groupby('userId') \
        .agg(F.collect_set('movieId').alias('labels'))
    
    predictions = model.recommendForUserSubset(users, max(k_options))

    results = targets \
        .join(predictions, 'userId') \
        .select('recommendations', 'labels') \
        .rdd \
        .map(lambda row: (list(map(map_fn, row[0])), row[1])) \
        .cache()
    
    ranking_metrics = RankingMetrics(results)
    
    metrics_values = {"MAP": ranking_metrics.meanAveragePrecision}
    
    for k in k_options:
        metrics_values["P@" + str(k)] = ranking_metrics.precisionAt(k)
        metrics_values["NDCG@" + str(k)] = ranking_metrics.ndcgAt(k)
    return metrics_values

In [13]:
def get_ate(groups, control_name) -> pd.DataFrame:
    """Get Average Treatment Effect
    groups - dictionary where keys - names of models, values - dicts of pairs <metric_name>, <metric_value>
    control_name - name of baseline model
    
    return pd.DataFrame (rows corresponds to metrics, cols corresponds to models and ATE with respect to control)
    """
    columns = ['metric']
    control_metrics = groups[control_name]
    metrics = list(sorted(control_metrics.keys()))
    
    df = {'metric': metrics}
    
    for name, treatment in groups.items():
        if name == control_name:
            continue
        ate = []
        for metric_name in metrics:
            ate.append((treatment[metric_name] / control_metrics[metric_name] - 1.0) * 100)

        columns.append('{} ate %'.format(name))
        df['{} ate %'.format(name)] = ate
            
    return pd.DataFrame(df, columns=columns)

In [14]:
def show_metrics(metrics) -> pd.DataFrame:
    columns = ['metric']
    metrics_names = list(reduce(set.union, map(dict.keys, metrics.values()), set()))
    metrics_names.sort()
    
    df = {'metric': metrics_names}
    
    for name, values in metrics.items():
        columns.append(name)
        buffer = []
        for metric_name in metrics_names:
            buffer.append(values[metric_name])
        df[name] = buffer
            
    return pd.DataFrame(df, columns=columns)

In [15]:
all_metrics = {}

## Models

Теперь мы можем перейти к формулировке задачи в терминах машинного обучения.

Одна из формулировок, к которой мы сведем нашу задачу - **Matrix Completetion**. Данную задачу будем решать с помощью `ALS`

### [ALS](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html#explicit-vs-implicit-feedback)

In [16]:
%%time

baseline_parameters = {
        'rank': 10,
        'maxIter': 10,
        'regParam': 0.1,
        'implicitPrefs': False,
        'alpha': 1.0,
        'nonnegative': False,

        'numUserBlocks': 10,
        'numItemBlocks': 10,
        'userCol': 'userId',
        'itemCol': 'movieId',
        'ratingCol': 'rating',
        'seed': 239566,
        'coldStartStrategy': 'nan',
    }

als = ALS(**baseline_parameters)
baseline_als_model = als.fit(train_ratings)

CPU times: user 18.5 ms, sys: 9.22 ms, total: 27.7 ms
Wall time: 24 s


In [17]:
%%time
all_metrics['baseline'] = calculate_metrics(
        baseline_als_model,
        lambda rec: rec.movieId,
        test_ratings)

CPU times: user 264 ms, sys: 69.5 ms, total: 334 ms
Wall time: 1min 36s


### Подбираем параметры

In [125]:
subsample_users = train_ratings \
        .select('userId') \
        .distinct() \
        .sample(False, 0.2) \
        .cache()

train_sampled = train_ratings.join(subsample_users, 'userId', how='leftsemi')
validate_sampled = validate_ratings.join(subsample_users, 'userId', how='leftsemi')

In [126]:
train_sampled.count(), validate_sampled.count()

(4117254, 500625)

In [127]:
def objective(space):
    estimator = ALS(**space)
    print('SPACE:', space)
    success = False
    attempts = 0
    model = None
    while not success and attempts < 2:
        try:
            model = estimator.fit(train_sampled)
            success = True
        except Exception as e:
            attempts += 1
            print(e)
            print('Try again')
    metrics = calculate_metrics(model, lambda rec: rec.movieId, validate_sampled)
    return {"loss": -metrics['MAP'], 'status': STATUS_OK}

In [128]:
best_parameters = {**baseline_parameters}

In [129]:
%%time

alpha_choice = [0.5, 1.0, 1.5]
implicit_prefs_choice = [True, False]

space = {**best_parameters}
space['alpha'] = hp.choice('alpha', alpha_choice)
space['implicitPrefs'] = hp.choice('implicitPrefs', implicit_prefs_choice)

trials = Trials()
best = fmin(fn=objective,
            space=space,
            algo=tpe.suggest,
            max_evals=20,
            trials=trials)
best_parameters['alpha'] = alpha_choice[best['alpha']]
best_parameters['implicitPrefs'] = implicit_prefs_choice[best['implicitPrefs']]

SPACE:                                                
{'itemCol': 'movieId', 'numItemBlocks': 10, 'alpha': 1.5, 'ratingCol': 'rating', 'numUserBlocks': 10, 'implicitPrefs': True, 'seed': 239566, 'coldStartStrategy': 'nan', 'userCol': 'userId', 'regParam': 0.1, 'nonnegative': False, 'rank': 10, 'maxIter': 10}
SPACE:                                                                            
{'itemCol': 'movieId', 'numItemBlocks': 10, 'alpha': 1.0, 'ratingCol': 'rating', 'numUserBlocks': 10, 'implicitPrefs': True, 'seed': 239566, 'coldStartStrategy': 'nan', 'userCol': 'userId', 'regParam': 0.1, 'nonnegative': False, 'rank': 10, 'maxIter': 10}
SPACE:                                                                            
{'itemCol': 'movieId', 'numItemBlocks': 10, 'alpha': 1.0, 'ratingCol': 'rating', 'numUserBlocks': 10, 'implicitPrefs': True, 'seed': 239566, 'coldStartStrategy': 'nan', 'userCol': 'userId', 'regParam': 0.1, 'nonnegative': False, 'rank': 10, 'maxIter': 10}
SPACE:     

In [130]:
%%time

rank_choice = [8, 10, 12, 15]
nonnegative_choice = [True, False]

space = {**best_parameters}
space['rank'] = hp.choice('rank', rank_choice)
space['nonnegative'] = hp.choice('nonnegative', nonnegative_choice)

trials = Trials()
best = fmin(fn=objective,
            space=space,
            algo=tpe.suggest,
            max_evals=20,
            trials=trials)
best_parameters['rank'] = rank_choice[best['rank']]
best_parameters['nonnegative'] = nonnegative_choice[best['nonnegative']]

SPACE:                                                
{'itemCol': 'movieId', 'numItemBlocks': 10, 'alpha': 1.5, 'ratingCol': 'rating', 'numUserBlocks': 10, 'implicitPrefs': True, 'seed': 239566, 'coldStartStrategy': 'nan', 'userCol': 'userId', 'regParam': 0.1, 'nonnegative': True, 'rank': 12, 'maxIter': 10}
SPACE:                                                                             
{'itemCol': 'movieId', 'numItemBlocks': 10, 'alpha': 1.5, 'ratingCol': 'rating', 'numUserBlocks': 10, 'implicitPrefs': True, 'seed': 239566, 'coldStartStrategy': 'nan', 'userCol': 'userId', 'regParam': 0.1, 'nonnegative': True, 'rank': 12, 'maxIter': 10}
SPACE:                                                                             
{'itemCol': 'movieId', 'numItemBlocks': 10, 'alpha': 1.5, 'ratingCol': 'rating', 'numUserBlocks': 10, 'implicitPrefs': True, 'seed': 239566, 'coldStartStrategy': 'nan', 'userCol': 'userId', 'regParam': 0.1, 'nonnegative': False, 'rank': 15, 'maxIter': 10}
SPACE:     

In [131]:
%%time

max_iter_choice = [5, 10, 15]
reg_param_choice = [0.01, 0.1, 0.5, 1.0]

space = {**best_parameters}
space['maxIter'] = hp.choice('maxIter', max_iter_choice)
space['regParam'] = hp.choice('regParam', reg_param_choice)

trials = Trials()
best = fmin(fn=objective,
            space=space,
            algo=tpe.suggest,
            max_evals=20,
            trials=trials)
best_parameters['maxIter'] = max_iter_choice[best['maxIter']]
best_parameters['regParam'] = reg_param_choice[best['regParam']]

SPACE:                                                
{'itemCol': 'movieId', 'numItemBlocks': 10, 'alpha': 1.5, 'ratingCol': 'rating', 'numUserBlocks': 10, 'implicitPrefs': True, 'seed': 239566, 'coldStartStrategy': 'nan', 'userCol': 'userId', 'regParam': 0.01, 'nonnegative': False, 'rank': 12, 'maxIter': 10}
SPACE:                                                                             
{'itemCol': 'movieId', 'numItemBlocks': 10, 'alpha': 1.5, 'ratingCol': 'rating', 'numUserBlocks': 10, 'implicitPrefs': True, 'seed': 239566, 'coldStartStrategy': 'nan', 'userCol': 'userId', 'regParam': 0.5, 'nonnegative': False, 'rank': 12, 'maxIter': 10}
SPACE:                                                                             
{'itemCol': 'movieId', 'numItemBlocks': 10, 'alpha': 1.5, 'ratingCol': 'rating', 'numUserBlocks': 10, 'implicitPrefs': True, 'seed': 239566, 'coldStartStrategy': 'nan', 'userCol': 'userId', 'regParam': 0.1, 'nonnegative': False, 'rank': 12, 'maxIter': 10}
SPACE:  

In [132]:
best_parameters

{'alpha': 1.5,
 'coldStartStrategy': 'nan',
 'implicitPrefs': True,
 'itemCol': 'movieId',
 'maxIter': 15,
 'nonnegative': False,
 'numItemBlocks': 10,
 'numUserBlocks': 10,
 'rank': 12,
 'ratingCol': 'rating',
 'regParam': 0.01,
 'seed': 239566,
 'userCol': 'userId'}

In [133]:
%%time
tuned_als = ALS(**best_parameters)
tuned_als_model = tuned_als.fit(train_ratings)
all_metrics['als-tuned'] = calculate_metrics(
        tuned_als_model,
        lambda rec: rec.movieId,
        test_ratings)

CPU times: user 208 ms, sys: 114 ms, total: 321 ms
Wall time: 2min 2s


### Примеры рекоммендаций

In [134]:
def nearest_movies(movie_id, model, limit=20):
    movie_factors = model.itemFactors

    selected_movie_factors = movie_factors \
        .filter(F.col('id') == movie_id) \
        .select(F.col('id').alias('requestId'), F.col('features').alias('movieFeatures'))
    
    cosine_dist = F.udf(lambda x, y: float(spatial.distance.cosine(x, y)), FloatType())
    
    out = selected_movie_factors \
        .crossJoin(movie_factors) \
        .withColumn('dist', cosine_dist('movieFeatures', 'features')) \
        .select('requestId', F.col('id').alias('recommendationId'), 'dist') \
        .join(movies_df, F.col('recommendationId') == movies_df['movieId']) \
        .filter(F.col('recommendationId') != movie_id) \
        .sort(F.col('dist')) \
        .limit(limit) \
        .select('movieId', 'dist', 'title', 'genres')
    return out

#### Фильмы, похожие на Железного Человека (2008)

In [135]:
res = nearest_movies(59315, baseline_als_model)
pd.DataFrame(res.select('title', 'genres', 'dist').collect(), 
             columns=['title', 'genres', 'dist'])
   

Unnamed: 0,title,genres,dist
0,X-Men: First Class (2011),Action|Adventure|Sci-Fi|Thriller|War,0.006534
1,X-Men: Days of Future Past (2014),Action|Adventure|Sci-Fi,0.008488
2,Game Changers (2017),Documentary,0.008959
3,Guardians of the Galaxy 2 (2017),Action|Adventure|Sci-Fi,0.009209
4,Star Trek (2009),Action|Adventure|Sci-Fi|IMAX,0.010212
5,Guardians of the Galaxy (2014),Action|Adventure|Sci-Fi,0.011228
6,"Avengers, The (2012)",Action|Adventure|Sci-Fi|IMAX,0.011908
7,Doctor Strange (2016),Action|Adventure|Sci-Fi,0.012976
8,Ant-Man (2015),Action|Adventure|Sci-Fi,0.013271
9,Marvel One-Shot: All Hail the King (2014),Action|Comedy|Drama|Fantasy|Thriller,0.013656


#### Фильмы, похожие на 'Брюс Всемогущий' (2003)

In [136]:
res = nearest_movies(6373, baseline_als_model)
pd.DataFrame(res.select('title', 'genres', 'dist').collect(), 
             columns=['title', 'genres', 'dist'])

Unnamed: 0,title,genres,dist
0,Yes Man (2008),Comedy,0.007936
1,"Whole Nine Yards, The (2000)",Comedy|Crime,0.008836
2,Having You (2013),Comedy|Drama,0.009344
3,Nothing to Lose (1997),Action|Adventure|Comedy|Crime,0.01004
4,Solstice (2008),Drama|Horror|Thriller,0.010344
5,Fun with Dick and Jane (2005),Comedy|Crime,0.010578
6,Ursul (2011),Comedy|Drama,0.010825
7,Necessary Roughness (1991),Comedy,0.01194
8,Il cosmo sul comò (2008),Comedy,0.011948
9,Domani mi sposo (1984),(no genres listed),0.011948


### Ваша формулировка

На лекции было еще несколько ML формулировок задачи рекомендаций. Выберете одну из них и реализуйте метод.

In [141]:
!pip3.5 install torch

You should consider upgrading via the '/usr/bin/python3.5 -m pip install --upgrade pip' command.[0m


## Evaluation Results

Сравните реализованные методы с помощью выбранных метрик. Не забывайте про оптимизацию гиперпараметров.

In [138]:
show_metrics(all_metrics)

Unnamed: 0,metric,baseline,als-tuned
0,MAP,1e-06,0.010767
1,NDCG@1,3.1e-05,0.013394
2,NDCG@10,1.8e-05,0.022062
3,NDCG@20,1.6e-05,0.032147
4,NDCG@5,2e-05,0.016712
5,P@1,3.1e-05,0.013394
6,P@10,1.6e-05,0.015339
7,P@20,1.3e-05,0.015999
8,P@5,1.8e-05,0.014745


In [139]:
get_ate(all_metrics, "baseline")

Unnamed: 0,metric,als-tuned ate %
0,MAP,1003302.0
1,NDCG@1,43440.0
2,NDCG@10,124653.4
3,NDCG@20,205015.5
4,NDCG@5,83469.76
5,P@1,43440.0
6,P@10,95792.31
7,P@20,126751.2
8,P@5,79786.67
