<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Movie-recommendation" data-toc-modified-id="Movie-recommendation-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Movie recommendation</a></span><ul class="toc-item"><li><span><a href="#Dataset" data-toc-modified-id="Dataset-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Dataset</a></span></li><li><span><a href="#Evaluation-Protocol" data-toc-modified-id="Evaluation-Protocol-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Evaluation Protocol</a></span></li><li><span><a href="#Models" data-toc-modified-id="Models-1.3"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>Models</a></span><ul class="toc-item"><li><span><a href="#ALS" data-toc-modified-id="ALS-1.3.1"><span class="toc-item-num">1.3.1&nbsp;&nbsp;</span><a href="https://spark.apache.org/docs/latest/ml-collaborative-filtering.html#explicit-vs-implicit-feedback" target="_blank">ALS</a></a></span></li><li><span><a href="#Ваша-формулировка" data-toc-modified-id="Ваша-формулировка-1.3.2"><span class="toc-item-num">1.3.2&nbsp;&nbsp;</span>Ваша формулировка</a></span></li></ul></li><li><span><a href="#Evaluation-Results" data-toc-modified-id="Evaluation-Results-1.4"><span class="toc-item-num">1.4&nbsp;&nbsp;</span>Evaluation Results</a></span></li></ul></li></ul></div>

# Movie recommendation

Ваша задача - рекомендация фильмов для пользователей


In [None]:
%matplotlib inline
%config InlineBackend.figure_format ='retina'

import os
import sys
import glob
import pickle
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession


spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName("spark_sql_examples") \
    .config("spark.executor.memory", "16g") \
    .config("spark.executor.cores", "5")\
    .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

## Dataset 

`MovieLens-25M`

In [None]:
DATA_PATH = '/workspace/data/ml-25m'

RATINGS_PATH = os.path.join(DATA_PATH, 'ratings.csv')
MOVIES_PATH = os.path.join(DATA_PATH, 'movies.csv')
TAGS_PATH = os.path.join(DATA_PATH, 'tags.csv')

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import *


ratings_df = sqlContext.read.format("com.databricks.spark.csv") \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('file:///' + RATINGS_PATH) \

movies_df = sqlContext.read.format("com.databricks.spark.csv") \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('file:///' + MOVIES_PATH)

In [None]:
ratings_df.take(5)

## Evaluation Protocol

Так как мы хотим оценивать качество разных алгоритмов рекомендаций, то в первую очередь нам нужно определить
* Как разбить данные на `Train`/`Validation`/`Test`;
* Какие метрики использовать для оценки качества.

In [None]:
from pyspark.sql.window import Window

rank_window = Window().orderBy('timestamp')

train_df = ratings_df \
    .withColumn('rank', F.percent_rank().over(rank_window)) \
    .filter(F.col('rank') <= 0.8) \
    .drop('rank').cache()
train_users = train_df.select('userId').distinct()

validate_df = ratings_df \
    .withColumn('rank', F.percent_rank().over(rank_window)) \
    .filter((F.col('rank') > 0.8) & (F.col('rank') <= 0.9)) \
    .drop('rank') \
    .join(train_users, 'userId').cache()
    
test_df = ratings_df \
    .withColumn('rank', F.percent_rank().over(rank_window)) \
    .filter(F.col('rank') > 0.9) \
    .drop('rank') \
    .join(train_users, 'userId').cache()
    


In [None]:
from pyspark.sql import Row

def precision_recall_hr(predicted, df):
    predicted = predicted \
        .rdd \
        .flatMap(lambda rec: [Row(userId = rec['userId'], movieId = row['movieId'], rank=rank) 
                              for rank, row in enumerate(rec['recommendations'])]) \
        .toDF()
    
    true = df \
        .join(predicted.select('userId').distinct(), 'userId') \
        .select('movieId', 'userId')
    
    predicted_sizes = predicted \
        .groupBy('userId') \
        .count().cache()
        
    true_sizes = true \
        .groupBy('userId') \
        .count().cache()
    
    intersection_sizes = predicted \
        .join(true, (true['movieId'] == predicted['movieId']) & (true['userId'] == predicted['userId'])) \
        .select(true['movieId'], true['userId']) \
        .groupBy('userId') \
        .count().cache()
    
    precision = predicted_sizes \
        .join(intersection_sizes, 'userId') \
        .withColumn("proportion", intersection_sizes['count'] / predicted_sizes['count']) \
        .agg({'proportion': 'avg'}) \
        .collect()[0][0]
    
    recall = true_sizes \
        .join(intersection_sizes, 'userId') \
        .withColumn("proportion", intersection_sizes['count'] / true_sizes['count']) \
        .agg({'proportion': 'avg'}) \
        .collect()[0][0]
    
    hr = intersection_sizes \
        .withColumn('bin', (F.col('count') > F.lit(0)).cast('double')) \
        .agg({'bin': 'avg'}) \
        .collect()[0][0]
    
    return precision, recall, hr

def k_metrics(model, df, k=5):
    return precision_recall_hr(model.recommendForAllUsers(k), df)

In [None]:
from scipy.spatial.distance import cosine


def cosine_dist(selected_features, features):
    return float(cosine(selected_features, features))

cosine_dist_udf = F.udf(cosine_dist, FloatType())

## Models

Теперь мы можем перейти к формулировке задачи в терминах машинного обучения.

Одна из формулировок, к которой мы сведем нашу задачу - **Matrix Completetion**. Данную задачу будем решать с помощью `ALS`

### [ALS](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html#explicit-vs-implicit-feedback)

In [None]:
from pyspark.ml.recommendation import ALS

als = ALS(maxIter=20, regParam=.05, userCol='userId', itemCol='movieId', ratingCol='rating', 
          coldStartStrategy='drop', rank=10, implicitPrefs=False, nonnegative=True)
als_model = als.fit(train_df)

Покажите для выбранных вами фильмов топ-20 наиболее похожих фильмов

In [None]:
from pyspark.sql.functions import rand

films = movies_df.orderBy(rand()).limit(20).cache()
films.collect()

In [None]:
def closest_als(model, target, n=20):
    target_films_ids = target.select('movieId')
    movie_features = model.itemFactors.withColumnRenamed('id', 'movieId')
    window = Window().partitionBy('targetMovieId').orderBy(F.col('dist').asc())

    result = target_films_ids \
        .join(movie_features, 'movieId') \
        .withColumnRenamed('movieId', 'targetMovieId') \
        .withColumnRenamed('features', 'targetFeatures') \
        .join(movie_features, F.col('movieId') != F.col('targetMovieId')) \
        .withColumn('dist', cosine_dist_udf('targetFeatures', 'features')) \
        .withColumn('rank', F.row_number().over(window)) \
        .filter(F.col('rank') <= n) \
        .select('targetMovieId', 'movieId', 'rank') \
        .join(movies_df, 'movieId') \
        .join(target.withColumnRenamed('movieId', 'targetMovieId').withColumnRenamed('title', 'targetTitle'), 'targetMovieId') \
        .select('targetMovieId', 'movieId', 'rank', 'title', 'targetTitle')
    
    return result

collected_results = closest_als(als_model, films).collect()

In [None]:
prev_target = -1
for row in collected_results:
    if prev_target != row['targetMovieId']:
        prev_target = row['targetMovieId']
        print("-"*60)
        print("-"*60)
        print("Closest to", row["targetTitle"])
        print("-"*60)
    print(row["rank"], row['title'])
    

### Ваша формулировка

На лекции было еще несколько ML формулировок задачи рекомендаций. Выберете одну из них и реализуйте метод.

In [None]:
######################################
######### YOUR CODE HERE #############
######################################

## Evaluation Results

Сравните реализованные методы с помощью выбранных метрик. Не забывайте про оптимизацию гиперпараметров.

In [None]:
k_metrics(als_model, validate_df, 10)

In [None]:
k_metrics(als_model, validate_df, 5)

In [None]:
k_metrics(als_model, validate_df, 20)