<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Movie-recommendation" data-toc-modified-id="Movie-recommendation-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Movie recommendation</a></span><ul class="toc-item"><li><span><a href="#Dataset" data-toc-modified-id="Dataset-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Dataset</a></span></li><li><span><a href="#Evaluation-Protocol" data-toc-modified-id="Evaluation-Protocol-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Evaluation Protocol</a></span></li><li><span><a href="#Models" data-toc-modified-id="Models-1.3"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>Models</a></span><ul class="toc-item"><li><span><a href="#ALS" data-toc-modified-id="ALS-1.3.1"><span class="toc-item-num">1.3.1&nbsp;&nbsp;</span><a href="https://spark.apache.org/docs/latest/ml-collaborative-filtering.html#explicit-vs-implicit-feedback" target="_blank">ALS</a></a></span></li><li><span><a href="#Ваша-формулировка" data-toc-modified-id="Ваша-формулировка-1.3.2"><span class="toc-item-num">1.3.2&nbsp;&nbsp;</span>Ваша формулировка</a></span></li></ul></li><li><span><a href="#Evaluation-Results" data-toc-modified-id="Evaluation-Results-1.4"><span class="toc-item-num">1.4&nbsp;&nbsp;</span>Evaluation Results</a></span></li></ul></li></ul></div>

# Movie recommendation

Ваша задача - рекомендация фильмов для пользователей


In [1]:
%matplotlib inline
%config InlineBackend.figure_format ='retina'

import os
import sys
import glob
import pickle
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession


spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName("spark_sql_examples") \
    .config("spark.executor.memory", "25g") \
    .config("spark.driver.memory", "25g") \
    .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

## Dataset 

`MovieLens-25M`

In [7]:
!ls /workspace/ml-25m

genome-scores.csv  links.csv   ratings.csv  tags.csv
genome-tags.csv    movies.csv  README.txt


In [2]:
DATA_PATH = '/workspace/ml-25m'

RATINGS_PATH = os.path.join(DATA_PATH, 'ratings.csv')
MOVIES_PATH = os.path.join(DATA_PATH, 'movies.csv')
TAGS_PATH = os.path.join(DATA_PATH, 'tags.csv')

In [3]:
import pyspark.sql.functions as F
from pyspark.sql.types import *


ratings_df = sqlContext.read.format("com.databricks.spark.csv") \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('file:///' + RATINGS_PATH)

In [7]:
old = ratings_df

In [8]:
ratings_df = old.limit(10000000)

In [4]:
movies_df = sqlContext.read.format("com.databricks.spark.csv") \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('file:///' + MOVIES_PATH)

In [11]:
ratings_df.take(1)

[Row(userId=1, movieId=296, rating=5.0, timestamp=1147880044)]

In [12]:
movies_df.take(1)

[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy')]

## Evaluation Protocol

Так как мы хотим оценивать качество разных алгоритмов рекомендаций, то в первую очередь нам нужно определить
* Как разбить данные на `Train`/`Validation`/`Test`;
* Какие метрики использовать для оценки качества.

In [5]:
from metrics import get_ate
from processing import split_by_col, partitioned_split_by_cols

### Protocol 1 --- split by timestamp

In [6]:
train_df, test_df = split_by_col(ratings_df, 'timestamp', [0.85, 0.15])

In [7]:
new_test_users_num = test_df.select('userId').distinct().subtract(train_df.select('userId').distinct()).count()
print("Number of users in test but not in train:", new_test_users_num)

Number of users in test but not in train: 17993


In [8]:
print("Number of users in train:", train_df.select('userId').distinct().count())
print("Number of users in test:", test_df.select('userId').distinct().count())

Number of users in train: 144548
Number of users in test: 24525


In [9]:
print("Number in train:", train_df.count())
print("Number in test:", test_df.count())

Number in train: 21250080
Number in test: 3750014


In [7]:
good_users = test_df.select('userId').distinct().intersect(train_df.select('userId').distinct()).collect()
good_users = [user['userId'] for user in good_users]
test_df = test_df\
    .filter(F.col('userId').isin(good_users))


### Protocol 2 --- split each user by timestamp and unite

In [6]:
train_df, test_df = partitioned_split_by_cols(ratings_df, 'userId', 'timestamp', [0.85, 0.15])

In [20]:
new_test_users_num = test_df.select('userId').distinct().subtract(train_df.select('userId').distinct()).count()
print("Number of users in test but not in train:", new_test_users_num)

Number of users in test but not in train: 0


In [21]:
print("Number of users in train:", train_df.select('userId').distinct().count())
print("Number of users in test:", test_df.select('userId').distinct().count())

Number of users in train: 162541
Number of users in test: 159964


In [14]:
print("Number in train:", train_df.count())
print("Number in test:", test_df.count())

Number in train: 21264618
Number in test: 3598611


### Protocols consensus

В первом способе получилось слишком много новых пользователей в тесте и валидейте, так что использовать в так виде или оставлять только "общих" пользователей кажется бессмысленным. Во втором способе этой проблемы нет. Теоретическая проблема: заглядывание в будущее. Но мы по сути заглядываем в будущее для оценок фильмов. А фильмы это фиксированный объект, со временем не меняется. Так что кажется, что ничего страшного в этом нет.

Далее используется разделение датасета вторым способом.

Считаем, что стоит рекомендовать посмотреть, если рейтинг поставленный >=3 (в оценке).

In [8]:
test_df = test_df.filter(F.col('rating') >= 3.)

### Protocols consensus

При рекомендации фильмов обычно число k в top-k не очень большое, так что, кажется, порядок не так важен. Интереса ради используется одна метрика порядка (MAP). Но основными метриками будут Precision@k, Recall@k. HR@k не используется, так как не очень интересно, сколько раз удалось пользователю посоветовать ХОТЬ ОДИН хороший фильм. Хочется скорее узнать среднее число удачных советов.

In [12]:
from pyspark.sql import Window

def MAP(recommended_df, true_df):
    users_num = true_df.select('userId').distinct().count()
    window = Window.partitionBy('userId').orderBy(F.col('rating').desc()).rowsBetween(-sys.maxsize, 0)
    recommended_df = recommended_df\
        .withColumn('one', F.lit(1))\
        .withColumn('rec_weight', F.lit(1.) / F.sum(F.col('one')).over(window))\
        .drop('one')\
        .withColumnRenamed('userId', 'user')\
        .withColumnRenamed('movieId', 'movie')
    
    window = Window.partitionBy('userId')
    true_df = true_df.withColumn('rel_count', F.count(F.col('movieId')).over(window))
    result = true_df\
        .join(recommended_df, [true_df['userId'] == recommended_df['user'], true_df['movieId'] == recommended_df['movie']])\
        .select('userId', 'movieId', 'rel_count', 'rec_weight')\
        .withColumn('total_weight', F.sum(F.col('rec_weight')).over(window))\
        .select('userId', 'rel_count', 'total_weight')\
        .distinct()\
        .withColumn('total_weight', F.col('total_weight') / F.col('rel_count'))\
        .select('total_weight')\
        .groupBy().agg(F.sum('total_weight').alias('sum')).take(1)[0]['sum']
    return result / users_num

In [13]:
from pyspark.sql import Window

def precision_at_k(recommended_df, true_df):
    users_num = true_df.select('userId').distinct().count()
    window = Window.partitionBy('userId')
    recommended_df = recommended_df.withColumn('rec_count', F.count(F.col('movieId')).over(window))\
        .withColumnRenamed('userId', 'user')\
        .withColumnRenamed('movieId', 'movie')
    result = true_df\
        .join(recommended_df, [true_df['userId'] == recommended_df['user'], true_df['movieId'] == recommended_df['movie']])\
        .select('userId', 'rec_count')\
        .withColumn('total', F.count(F.col('userId')).over(window) / F.col('rec_count'))\
        .distinct()\
        .select('total')\
        .groupBy().agg(F.sum('total').alias('sum')).take(1)[0]['sum']
    return result / users_num


In [14]:
def recall_at_k(recommended_df, true_df):
    users_num = true_df.select('userId').distinct().count()
    window = Window.partitionBy('userId')
    true_df = true_df.withColumn('rel_count', F.count(F.col('movieId')).over(window))\
        .withColumnRenamed('userId', 'user')\
        .withColumnRenamed('movieId', 'movie')

    result = recommended_df\
        .join(true_df, [true_df['user'] == recommended_df['userId'], true_df['movie'] == recommended_df['movieId']])\
        .select('userId', 'rel_count')\
        .withColumn('total', F.count(F.col('userId')).over(window) / F.col('rel_count'))\
        .distinct()\
        .select('total')\
        .groupBy().agg(F.sum('total').alias('sum')).take(1)[0]['sum']
    return result / users_num


## Models

Теперь мы можем перейти к формулировке задачи в терминах машинного обучения.

Одна из формулировок, к которой мы сведем нашу задачу - **Matrix Completetion**. Данную задачу будем решать с помощью `ALS`

### [ALS](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html#explicit-vs-implicit-feedback)

In [12]:
from pyspark.ml.recommendation import ALS
base_als = ALS(maxIter=10, rank=10, regParam=.1, userCol='userId', itemCol='movieId', ratingCol='rating', 
               coldStartStrategy='drop', implicitPrefs=False)
base_als_model = base_als.fit(train_df)

In [19]:
base_als_vectors = base_als_model.itemFactors
base_als_vectors.take(2)

[Row(id=10, features=[-0.6285361051559448, 0.2037687748670578, 0.6077628135681152, -0.52348393201828, 0.18761347234249115, -0.054997917264699936, 0.058772288262844086, -0.9275143146514893, 0.32570794224739075, 0.507000744342804]),
 Row(id=20, features=[-0.44506630301475525, 0.4190051555633545, 0.45511385798454285, -0.5662184357643127, 0.4383411109447479, 0.2585639953613281, 0.12199082225561142, -0.8230255246162415, -0.13716013729572296, 0.20074614882469177])]

Покажите для выбранных вами фильмов топ-20 наиболее похожих фильмов

In [20]:
def recommendations_to_titles(data):
    tmp = movies_df.select('movieId', 'title')\
        .join(data, movies_df['movieId'] == data['baseId'])\
        .drop('movieId')\
        .withColumnRenamed('title', 'baseTitle')
    return movies_df.select('movieId', 'title')\
        .join(tmp, movies_df['movieId'] == tmp['recommendationId'])\
        .drop('movieId')\
        .withColumnRenamed('title', 'recommendationTitle')\
        .sort(F.col('score').asc())

In [21]:
from pyspark.sql.functions import udf, array
from scipy.spatial.distance import cosine


def cosine_sim(array):
        return float(cosine(array[0], array[1]))


def find_similar_movie(vectors_df, movieIds, topK):
    base_df = vectors_df\
        .filter(F.col('id').isin(movieIds))\
        .withColumnRenamed('id', 'baseId')\
        .withColumnRenamed('features', 'baseFeatures')
    
    dot_udf = udf(cosine_sim, FloatType())

    
    result = vectors_df\
        .join(base_df, base_df['baseId'] != vectors_df['id'])\
        .withColumn('score', dot_udf(array('features', 'baseFeatures')))\
        .sort(F.col('score').asc())\
        .limit(topK)\
        .drop('features', 'baseFeatures')\
        .withColumnRenamed('id', 'recommendationId')
    return result

In [22]:
test_movies = recommendations_to_titles(find_similar_movie(base_als_vectors, [260], 20))
test_movies.take(10)

[Row(recommendationTitle='Star Wars: Episode V - The Empire Strikes Back (1980)', baseTitle='Star Wars: Episode IV - A New Hope (1977)', recommendationId=1196, baseId=260, score=0.0037523782812058926),
 Row(recommendationTitle='Star Wars: Episode VI - Return of the Jedi (1983)', baseTitle='Star Wars: Episode IV - A New Hope (1977)', recommendationId=1210, baseId=260, score=0.014959782361984253),
 Row(recommendationTitle='Raiders of the Lost Ark (Indiana Jones and the Raiders of the Lost Ark) (1981)', baseTitle='Star Wars: Episode IV - A New Hope (1977)', recommendationId=1198, baseId=260, score=0.018733611330389977),
 Row(recommendationTitle='Indiana Jones and the Last Crusade (1989)', baseTitle='Star Wars: Episode IV - A New Hope (1977)', recommendationId=1291, baseId=260, score=0.02520429901778698),
 Row(recommendationTitle='Star Trek II: The Wrath of Khan (1982)', baseTitle='Star Wars: Episode IV - A New Hope (1977)', recommendationId=1374, baseId=260, score=0.027708830311894417),
 

Предсказываем по 10 фильмов для каждого юзера из тестового датасета.

Так как нет функционала для указания айдишников, которые не надо выдавать в результате (фильмы, которые уже в трейне юзер посмотрел выдавать бессмысленно), то предсказываю по 30, а затем фильтрую и сокращаю.

In [13]:
base_als_result = base_als_model.recommendForUserSubset(test_df.select('userId').distinct(), 30)\
    .withColumn('rec', F.explode(F.col('recommendations'))).drop('recommendations')\
    .withColumn('movieId', F.col('rec')['movieId'])\
    .withColumn('rating', F.col('rec')['rating']).drop('rec')
base_als_result.take(2)

[Row(userId=9427, movieId=151989, rating=6.758526802062988),
 Row(userId=9427, movieId=101862, rating=6.072198867797852)]

In [14]:
window = Window.partitionBy('userId').orderBy(F.col('rating').desc()).rowsBetween(-sys.maxsize, 0)

tmp = base_als_result.select('userId', 'movieId')\
    .subtract(train_df.select('userId', 'movieId'))\
    .withColumnRenamed('userId', 'user')\
    .withColumnRenamed('movieId', 'movie')
tmp = base_als_result\
    .join(tmp, [tmp['user'] == base_als_result['userId'], tmp['movie'] == base_als_result['movieId']])\
    .withColumn('one', F.lit(1))\
    .withColumn('number', F.sum(F.col('one')).over(window))\
    .filter(F.col('number') <= 10)\
    .select('userId', 'movieId', 'rating')
tmp.take(2)

[Row(userId=9427, movieId=151989, rating=6.758526802062988),
 Row(userId=9427, movieId=101862, rating=6.072198867797852)]

In [15]:
base_als_result = tmp.filter(F.col('rating') >= 3)

In [16]:
precision_at_k(base_als_result, test_df)

0.0029593094944512957

In [17]:
all_metrics = {}
base_als_metrics = {
    'precision@k': precision_at_k(base_als_result, test_df),
    'recall@k': recall_at_k(base_als_result, test_df),
    'MAP': MAP(base_als_result, test_df)
}

all_metrics['base ALS'] = base_als_metrics

In [18]:
all_metrics

{'base ALS': {'MAP': 7.877653718243697e-05,
  'precision@k': 0.0029593094944512957,
  'recall@k': 0.0003514820684581914}}

### Ваша формулировка

На лекции было еще несколько ML формулировок задачи рекомендаций. Выберете одну из них и реализуйте метод.

In [9]:
GRAPH_PATH = os.path.join(DATA_PATH, 'rating_graph.edgelist')
NODE2VEC_EMB_PATH = '/workspace/node2vec/result/ratings.emd'


In [10]:
import networkx as nx

graph =nx.Graph()
for line in train_df.collect():
    graph.add_edge('u' + str(line['userId']), 'm' + str(line['movieId']), weight=line['rating'])

In [None]:
import networkx as nx
from node2vec import Node2Vec

node2vec = Node2Vec(graph, dimensions=10, walk_length=4, num_walks=1, workers=16)
model = node2vec.fit(window=5, min_count=1, batch_words=1)

Computing transition probabilities:   0%|          | 10/183267 [02:42<1216:19:53, 23.89s/it]

## Evaluation Results

Сравните реализованные методы с помощью выбранных метрик. Не забывайте про оптимизацию гиперпараметров.

In [None]:
######################################
######### YOUR CODE HERE #############
######################################