<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Movie-recommendation" data-toc-modified-id="Movie-recommendation-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Movie recommendation</a></span><ul class="toc-item"><li><span><a href="#Dataset" data-toc-modified-id="Dataset-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Dataset</a></span></li><li><span><a href="#Evaluation-Protocol" data-toc-modified-id="Evaluation-Protocol-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Evaluation Protocol</a></span></li><li><span><a href="#Models" data-toc-modified-id="Models-1.3"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>Models</a></span><ul class="toc-item"><li><span><a href="#ALS" data-toc-modified-id="ALS-1.3.1"><span class="toc-item-num">1.3.1&nbsp;&nbsp;</span><a href="https://spark.apache.org/docs/latest/ml-collaborative-filtering.html#explicit-vs-implicit-feedback" target="_blank">ALS</a></a></span></li><li><span><a href="#Ваша-формулировка" data-toc-modified-id="Ваша-формулировка-1.3.2"><span class="toc-item-num">1.3.2&nbsp;&nbsp;</span>Ваша формулировка</a></span></li></ul></li><li><span><a href="#Evaluation-Results" data-toc-modified-id="Evaluation-Results-1.4"><span class="toc-item-num">1.4&nbsp;&nbsp;</span>Evaluation Results</a></span></li></ul></li></ul></div>

# Movie recommendation

Ваша задача - рекомендация фильмов для пользователей


In [1]:
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

import os
import sys
import glob
import pickle
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession


spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName("spark_sql_examples") \
    .config("spark.executor.memory", "6g") \
    .config('spark.driver.memory', '6g') \
    .getOrCreate()

sc = spark.sparkContext
sc.setCheckpointDir('chpd')
sqlContext = SQLContext(sc)

## Dataset 

`MovieLens-25M`

In [2]:
DATA_PATH = '/workspace/data/ml-10M100K'

RATINGS_PATH = os.path.join(DATA_PATH, 'ratings.csv')
MOVIES_PATH = os.path.join(DATA_PATH, 'movies.csv')
TAGS_PATH = os.path.join(DATA_PATH, 'tags.csv')

In [3]:
import pyspark.sql.functions as F
from pyspark.sql.types import *


ratings_df = sqlContext.read.format("com.databricks.spark.csv") \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('file:///' + RATINGS_PATH)

## Evaluation Protocol

Так как мы хотим оценивать качество разных алгоритмов рекомендаций, то в первую очередь нам нужно определить
* Как разбить данные на `Train`/`Validation`/`Test`;
* Какие метрики использовать для оценки качества.

In [4]:
from pyspark.sql.window import Window


timestamp_rank_window = Window().orderBy('timestamp')


TRAIN_SHARE = .6
VAL_SHARE = .2
TEST_SHARE = .2


ranked_ratings_df = ratings_df \
    .withColumn('timestamp_rank', F.percent_rank().over(timestamp_rank_window))
train_df = ranked_ratings_df \
    .filter(F.col('timestamp_rank') <= TRAIN_SHARE) \
    .drop('timestamp_rank')
val_df = ranked_ratings_df \
    .filter((TRAIN_SHARE < F.col('timestamp_rank')) & (F.col('timestamp_rank') <= TRAIN_SHARE + VAL_SHARE)) \
    .drop('timestamp_rank')
test_df = ranked_ratings_df \
    .filter(F.col('timestamp_rank') > TRAIN_SHARE + VAL_SHARE) \
    .drop('timestamp_rank')

# Данные разбивались общей границей по времени, чтобы не заглядывать вперёд. Недостатком 
# такого разбиения может быть систематическое различие между валидационной и тестовой 
# выборками: просмотры, попадающие в валидационную выборку, ближе к тренировочной.

In [5]:
def get_users(df):
    return df \
        .select('userId') \
        .distinct()


train_users = get_users(train_df)
val_users = get_users(val_df)
test_users = get_users(test_df)

common_users = train_users \
    .join(val_users, on='userId') \
    .join(test_users, on='userId')

train_df = train_df.join(common_users, on='userId').cache()
val_df = val_df.join(common_users, on='userId').cache()
test_df = test_df.join(common_users, on='userId').cache()

# Пользователи, представленные не во всех данных отбрасывались, чтобы убрать вычисления,
# не влияющие на метрики.

## Models

Теперь мы можем перейти к формулировке задачи в терминах машинного обучения.

Одна из формулировок, к которой мы сведем нашу задачу - **Matrix Completetion**. Данную задачу будем решать с помощью `ALS`

### [ALS](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html#explicit-vs-implicit-feedback)

In [6]:
from pyspark.ml.recommendation import ALS

als = ALS(maxIter=10, regParam=.1, userCol='userId', itemCol='movieId', ratingCol='rating', 
          coldStartStrategy='drop', rank=10, implicitPrefs=False, nonnegative=True)
als_model = als.fit(train_df)

Покажите для выбранных вами фильмов топ-20 наиболее похожих фильмов

In [23]:
movies_df = sqlContext.read.format("com.databricks.spark.csv") \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('file:///' + MOVIES_PATH) \
    .drop('genres')

In [35]:
selected_movies = StructType([StructField('movieId', IntegerType())])
selected_movies = sqlContext.createDataFrame(
    [[589], [296], [5952], [858], [4306], [1], [2628], [329], [4896], [2948]], 
    schema=selected_movies_schema
)
selected_movies \
    .join(movies_df, on='movieId') \
    .collect()

[Row(movieId=589, title='Terminator 2: Judgment Day (1991)'),
 Row(movieId=296, title='Pulp Fiction (1994)'),
 Row(movieId=5952, title='Lord of the Rings: The Two Towers'),
 Row(movieId=858, title='Godfather'),
 Row(movieId=4306, title='Shrek (2001)'),
 Row(movieId=1, title='Toy Story (1995)'),
 Row(movieId=2628, title='Star Wars: Episode I - The Phantom Menace (1999)'),
 Row(movieId=329, title='Star Trek: Generations (1994)'),
 Row(movieId=4896, title="Harry Potter and the Sorcerer's Stone (a.k.a. Harry Potter and the Philosopher's Stone) (2001)"),
 Row(movieId=2948, title='From Russia with Love (1963)')]

In [36]:
movie_factors = als_model.itemFactors \
    .withColumnRenamed('id', 'movieId')

selected_movies_factors = selected_movies \
    .join(movie_factors, on='movieId') \
    .withColumnRenamed('movieId', 'selectedId') \
    .withColumnRenamed('features', 'selectedFeatures')

In [37]:
from scipy.spatial.distance import cosine


def cosine_dist(selected_features, features):
    return float(cosine(selected_features, features))

cosine_dist_udf = F.udf(cosine_dist, FloatType())

In [38]:
N_NEIGHBOURS = 20

movie_dists = selected_movies_factors \
    .join(movie_factors, on=F.col('movieId') != F.col('selectedId')) \
    .withColumn('d', cosine_dist_udf('selectedFeatures', 'features')) \
    .drop('selectedFeatures') \
    .drop('features')
    
dist_window = Window().partitionBy('selectedId').orderBy('d')

neighbours = movie_dists \
    .withColumn('rank', F.rank().over(dist_window)) \
    .filter(F.col('rank') <= N_NEIGHBOURS) \
    .withColumnRenamed('movieId', 'neighbourId') \
    .drop('rank')

# Поскольку реализация ALS в spark не предлагает способв поиска похожих объектов, 
# выбирались ближайшие по косинусному расстоянию.

In [39]:
neighbours \
    .join(movies_df, on=F.col('selectedId') == F.col('movieId')) \
    .drop('movieId') \
    .withColumnRenamed('title', 'selectedTitle') \
    .join(movies_df, on=F.col('neighbourId') == F.col('movieId')) \
    .drop('movieId') \
    .drop('selectedId') \
    .drop('neighbourId') \
    .collect()

[Row(d=0.0030002568382769823, selectedTitle='Godfather', title="One Flew Over the Cuckoo's Nest (1975)"),
 Row(d=0.005089139100164175, selectedTitle='Godfather', title='Godfather: Part II'),
 Row(d=0.007284280378371477, selectedTitle='Godfather', title='Halfmoon (Paul Bowles - Halbmond) (1995)'),
 Row(d=0.007284280378371477, selectedTitle='Godfather', title='Inside (1996)'),
 Row(d=0.008123008534312248, selectedTitle='Godfather', title='Deer Hunter'),
 Row(d=0.008655307814478874, selectedTitle='Godfather', title='Cool Hand Luke (1967)'),
 Row(d=0.008879559114575386, selectedTitle='Godfather', title='Soldier'),
 Row(d=0.009235410019755363, selectedTitle='Godfather', title='Sherlock Holmes Faces Death (1943)'),
 Row(d=0.011764594353735447, selectedTitle='Godfather', title='Goodfellas (1990)'),
 Row(d=0.011827447451651096, selectedTitle='Godfather', title='Swingers (1996)'),
 Row(d=0.013080274686217308, selectedTitle='Godfather', title='Apocalypse Now (1979)'),
 Row(d=0.01330265961587429,

### Ваша формулировка

На лекции было еще несколько ML формулировок задачи рекомендаций. Выберете одну из них и реализуйте метод.

In [26]:
from itertools import chain


SAMPLING_RATIO = .003


def pref_id(pref):
    def cast(identifier):
        return pref + str(identifier)
    return F.udf(cast, StringType())


def make_walks(data, walk_len, min_rating):
    edges = data \
        .filter(F.col('rating') >= min_rating) \
        .drop('rating') \
        .drop('timestamp')
    walks = edges \
        .sample(SAMPLING_RATIO) \
        .withColumnRenamed('userId', 'u0') \
        .withColumnRenamed('movieId', 'm0')
    for i in range(1, walk_len):
        col_prev_movie = 'm' + str(i - 1)
        col_cur_movie = 'm' + str(i)
        col_cur_user = 'u' + str(i)
        walks = walks \
            .join(edges.sample(SAMPLING_RATIO), on=
                  F.col(col_prev_movie) == F.col('movieId')) \
            .withColumnRenamed('userId', col_cur_user) \
            .drop('movieId') \
            .join(edges.sample(SAMPLING_RATIO), on=
                  F.col(col_cur_user) == F.col('userId')) \
            . withColumnRenamed('movieId', col_cur_movie) \
            .drop('userId')
        
    for i in range(walk_len):
        c_cur_movie = 'm' + str(i)
        c_cur_user = 'u' + str(i)
        walks = walks \
            .withColumn(c_cur_movie, pref_id('m')(c_cur_movie)) \
            .withColumn(c_cur_user, pref_id('u')(c_cur_user))
    
    node_columns = list(chain(*(('u' + str(i), 'm' + str(i)) for i in range(walk_len))))
    walks = walks \
        .withColumn('walks', F.array(*node_columns)) \
        .drop(*node_columns)
    return walks.cache()

# Для обучения DeepWalk необходимы случайные блуждания в графе. Блуждания генерировались
# последовательным добавлением рёбер из случайного подмножества, нового на каждой итерации.
# Доля используемых рёбер подбиралась вручную для получения оптимального количества 
# блужданий. При изменении некоторых других параметров потребуется подбирать её заново.

In [42]:
from pyspark.ml.feature import Word2Vec


def unpref_id(pref):
    def cast(identifier):
        return int(identifier[1:]) if identifier[0] == pref else -1
    return F.udf(cast, IntegerType())


def select_vectors(id_col_name, vec_col_name, pref, vec_model):
    return vec_model.getVectors() \
        .withColumn(id_col_name, unpref_id(pref)('word')) \
        .filter(F.col(id_col_name) >= 0) \
        .drop('word') \
        .withColumnRenamed('vector', vec_col_name)
        


class DeepWalkModel():
    def __init__(self, word2vec_model, occured_interactions):
        self.user_vecs = select_vectors('userId', 'user_vector', 'u', word2vec_model)
        self.movie_vecs = select_vectors('movieId', 'movie_vector', 'm', word2vec_model)
        self.occured_interactions = occured_interactions
    
    def recommend(self, n):
        user_window = Window().partitionBy('userId').orderBy('d')
        
        return self.user_vecs \
            .crossJoin(self.movie_vecs) \
            .join(self.occured_interactions, on=['userId', 'movieId'], how='left') \
            .filter(F.isnull('interacted')) \
            .drop('interacted') \
            .withColumn('d', cosine_dist_udf('user_vector', 'movie_vector')) \
            .withColumn('rank', F.rank().over(user_window)) \
            .filter(F.col('rank') <= n) \
            .drop('d')
    
    
class DeepWalk():
    def __init__(self, vectorSize=100, stepSize=0.025, 
                 maxIter=1, windowSize=5, walkLen=20, minRating=3.):
        self.vectorSize = vectorSize
        self.stepSize=stepSize
        self.maxIter = maxIter
        self.windowSize = windowSize
        self.walkLen = walkLen
        self.minRating = minRating
        
    def fit(self, data, walks=None):
        w2v = Word2Vec(vectorSize=self.vectorSize, minCount=1, 
                       stepSize=self.stepSize, maxIter=self.maxIter, 
                       inputCol='walks', outputCol='deepWalkEmbedding', 
                       windowSize=self.windowSize)
        if walks is None:
            walks = make_walks(data, self.walkLen, self.minRating)
        w2v_model = w2v.fit(walks)
        occured_interactions = data \
            .filter(F.col('rating') > self.minRating) \
            .select('userId', 'movieId') \
            .withColumn('interacted', F.lit(True))
        return DeepWalkModel(w2v_model, occured_interactions)
    
# Из-за неочевидности группировки рейтингов фильмов по сессиям была выбрана формулировка 
# предсказания рёбер. Поскольку вес рёбер не учитывается, негативные оценки отбрасывались.
# Модель DeepWalk была выбрана из-за удобства реализации через Word2Vec в spark.

## Evaluation Results

Сравните реализованные методы с помощью выбранных метрик. Не забывайте про оптимизацию гиперпараметров.

In [13]:
def get_users(df):
    return df \
        .select('userId') \
        .distinct()


def get_relevant_recs(recs, gt):
    return recs.join(gt, on=(recs.userId == gt.userId) & (recs.movieId == gt.movieId)) \
        .drop(gt.movieId) \
        .drop(gt.userId)

In [14]:
def precision_at_k(recs, gt, k):
    relevant_recs = get_relevant_recs(recs, gt) \
        .filter(F.col('rank') <= k)
    all_users = get_users(recs)
    return relevant_recs \
        .groupBy(F.col('userId')) \
        .agg(F.count('movieID').alias('relevant')) \
        .withColumn('prec', F.col('relevant') / k) \
        .drop('relevant') \
        .join(all_users, on='userId', how='right') \
        .fillna(0) \
        .select(F.avg('prec').alias('prec')) \
        .collect()[0].prec

In [15]:
def hr(recs, gt):
    relevant_recs = get_relevant_recs(recs, gt)
    return get_users(relevant_recs).count() / get_users(recs).count()

In [16]:
from pyspark.sql import Row


def flat_recs(recs):
    return ([
        (recs.userId, movieId, i)
        for i, (movieId, _) in enumerate(recs.recommendations, 1)
    ])


def recommend(model, n):
    return model.recommendForAllUsers(n) \
        .rdd \
        .flatMap(flat_recs) \
        .map(lambda r: Row(userId=r[0], movieId=r[1], rank=r[2])) \
        .toDF() \
        .repartition(4)

In [18]:
als_recommndations = recommend(als_model, 10)

precision_at_k(als_recommndations, val_df, 10), hr(als_recommndations, val_df)

(0.0280346820809249, 0.2153179190751445)

In [19]:
!pip3.5 install hyperopt

Collecting hyperopt
  Downloading hyperopt-0.2.3-py3-none-any.whl (1.9 MB)
[K     |████████████████████████████████| 1.9 MB 800 kB/s eta 0:00:01
Collecting networkx==2.2
  Downloading networkx-2.2.zip (1.7 MB)
[K     |████████████████████████████████| 1.7 MB 1.2 MB/s eta 0:00:01     |███████████████████████▏        | 1.2 MB 758 kB/s eta 0:00:01
[?25hCollecting tqdm
  Downloading tqdm-4.45.0-py2.py3-none-any.whl (60 kB)
[K     |████████████████████████████████| 60 kB 1.1 MB/s eta 0:00:01
Collecting cloudpickle
  Downloading cloudpickle-1.3.0-py2.py3-none-any.whl (26 kB)
Collecting future
  Downloading future-0.18.2.tar.gz (829 kB)
[K     |████████████████████████████████| 829 kB 542 kB/s eta 0:00:01
Installing collected packages: networkx, tqdm, cloudpickle, future, hyperopt
    Running setup.py install for networkx ... [?25ldone
[?25h    Running setup.py install for future ... [?25ldone
[?25hSuccessfully installed cloudpickle-1.3.0 future-0.18.2 hyperopt-0.2.3 networkx-2.2 tqd

In [20]:
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK


def als_obj(config):
    model = ALS(**config).fit(train_df)
    recommendations = recommend(model, 10)
    prec_10 = precision_at_k(recommendations, val_df, 10)
    return {'loss': 1 - prec_10, 'status': STATUS_OK}

# Оптимизировалась точность на первых десяти, для её максимизации минимизировалась обратная
# величина.


static_params = {
    'userCol': 'userId', 
    'itemCol': 'movieId', 
    'ratingCol': 'rating', 
    'coldStartStrategy': 'drop', 
    'implicitPrefs': False, 
    'nonnegative': True
}

In [21]:
max_iter_options = [10, 20, 50]
rank_options = [20, 40, 100]

space = {
    **static_params,
    'regParam': 0.1,
    
    'maxIter': hp.choice('maxIter', max_iter_options), 
    'rank': hp.choice('rank', rank_options) 
}

trials = Trials()
best_rank_iter = fmin(fn=als_obj,
            space=space,
            algo=tpe.suggest,
            max_evals=6,
            trials=trials)

print(best_rank_iter)

max_iter = max_iter_options[best_rank_iter['maxIter']]
rank = rank_options[best_rank_iter['rank']]

100%|██████████| 6/6 [06:24<00:00, 64.13s/trial, best loss: 0.9413294797687861]
{'rank': 2, 'maxIter': 2}


In [22]:
reg_param_options = [.01, .05, .1, 0.2]

space = {
    **static_params,
    'maxIter': max_iter, 
    'rank': rank,
    
    'regParam': hp.choice('reg_param', reg_param_options)
}

trials = Trials()
best_reg_param = fmin(fn=als_obj,
            space=space,
            algo=tpe.suggest,
            max_evals=4,
            trials=trials)

reg_param = reg_param_options[best_reg_param['reg_param']]
print(reg_param)

100%|██████████| 4/4 [16:11<00:00, 242.89s/trial, best loss: 0.9244219653179191]
0.05


In [38]:
graph_walks = make_walks(train_df, 20, 3.).limit(300000)

def deep_walk_obj(config):
    model = DeepWalk(**config).fit(train_df, graph_walks)
    prec_10 = precision_at_k(model.recommend(10), val_df, 10)
    return {'loss': 1 - prec_10, 'status': STATUS_OK}

In [43]:
vector_size_options = [20, 40, 100]

space = {
    'vectorSize': hp.choice('vectorSize', vector_size_options),
    'stepSize': .025,
    'maxIter': 1, 
    'windowSize': 5
}

trials = Trials()
best_vector_size = fmin(fn=deep_walk_obj,
            space=space,
            algo=tpe.suggest,
            max_evals=3,
            trials=trials)

dw_vector_size = vector_size_options[best_vector_size['vectorSize']]
print(dw_vector_size)

100%|██████████| 3/3 [07:47<00:00, 155.72s/trial, best loss: 0.9448366013071895]
40


In [45]:
step_size_options = [.005, .025, .1]

space = {
    'stepSize': hp.choice('stepSize', step_size_options), 
    'vectorSize': dw_vector_size, 
    'maxIter': 1, 
    'windowSize': 5
}

trials = Trials()
best_step_size = fmin(fn=deep_walk_obj,
            space=space,
            algo=tpe.suggest,
            max_evals=3,
            trials=trials)

dw_step_size = step_size_options[best_step_size['stepSize']]
print(dw_step_size)

100%|██████████| 3/3 [08:27<00:00, 169.27s/trial, best loss: 0.9445751633986929]
0.005


In [46]:
window_size_options = [5, 10]

space = {
    'windowSize': hp.choice('windowSize', window_size_options), 
    'stepSize': dw_step_size, 
    'vectorSize': dw_vector_size, 
    'maxIter': 1
}

trials = Trials()
best_window_size = fmin(fn=deep_walk_obj,
            space=space,
            algo=tpe.suggest,
            max_evals=2,
            trials=trials)

dw_window_size = window_size_options[best_window_size['windowSize']]
print(dw_window_size)

100%|██████████| 2/2 [08:00<00:00, 240.45s/trial, best loss: 0.9441830065359477]
10


In [47]:
max_iter_options = [1, 3]

space = {
    'maxIter': hp.choice('maxIter', max_iter_options),
    'windowSize': dw_window_size,
    'stepSize': dw_step_size, 
    'vectorSize': dw_vector_size
}

trials = Trials()
best_max_iter = fmin(fn=deep_walk_obj,
            space=space,
            algo=tpe.suggest,
            max_evals=2,
            trials=trials)

dw_max_iter = max_iter_options[best_max_iter['maxIter']]
print(dw_max_iter)

100%|██████████| 2/2 [18:35<00:00, 557.83s/trial, best loss: 0.9474509803921568]
3


In [23]:
als_opt_model = ALS(maxIter=max_iter, regParam=reg_param, rank=rank, userCol='userId', 
                    itemCol='movieId', ratingCol='rating', coldStartStrategy='drop', 
                    implicitPrefs=False, nonnegative=True).fit(train_df)
als_opt_recommendations = recommend(als_opt_model, 10)
precision_at_k(als_opt_recommendations, test_df, 10), hr(als_opt_recommendations, test_df)

(0.025794797687861293, 0.17991329479768786)

In [50]:
dw_opt_model = DeepWalk(maxIter=dw_max_iter, windowSize=dw_window_size,
    stepSize=dw_step_size, vectorSize=dw_vector_size).fit(train_df, graph_walks)
dw_opt_recommendations = dw_opt_model.recommend(10)
precision_at_k(dw_opt_recommendations, test_df, 10), hr(dw_opt_recommendations, test_df)

(0.01816993464052286, 0.14248366013071895)