<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Movie-recommendation" data-toc-modified-id="Movie-recommendation-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Movie recommendation</a></span><ul class="toc-item"><li><span><a href="#Dataset" data-toc-modified-id="Dataset-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Dataset</a></span></li><li><span><a href="#Evaluation-Protocol" data-toc-modified-id="Evaluation-Protocol-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Evaluation Protocol</a></span></li><li><span><a href="#Models" data-toc-modified-id="Models-1.3"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>Models</a></span><ul class="toc-item"><li><span><a href="#ALS" data-toc-modified-id="ALS-1.3.1"><span class="toc-item-num">1.3.1&nbsp;&nbsp;</span><a href="https://spark.apache.org/docs/latest/ml-collaborative-filtering.html#explicit-vs-implicit-feedback" target="_blank">ALS</a></a></span></li><li><span><a href="#Ваша-формулировка" data-toc-modified-id="Ваша-формулировка-1.3.2"><span class="toc-item-num">1.3.2&nbsp;&nbsp;</span>Ваша формулировка</a></span></li></ul></li><li><span><a href="#Evaluation-Results" data-toc-modified-id="Evaluation-Results-1.4"><span class="toc-item-num">1.4&nbsp;&nbsp;</span>Evaluation Results</a></span></li></ul></li></ul></div>

# Movie recommendation

Ваша задача - рекомендация фильмов для пользователей


In [1]:
%matplotlib inline
%config InlineBackend.figure_format ='retina'

import os
import sys
import glob
import pickle
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession


spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName("spark_sql_examples") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "25g") \
    .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [2]:
from typing import *

## Dataset 

`MovieLens-25M`

In [3]:
DATA_PATH = '/home/egor/MLBD/data/ml-25m'

RATINGS_PATH = os.path.join(DATA_PATH, 'ratings.csv')
MOVIES_PATH = os.path.join(DATA_PATH, 'movies.csv')
TAGS_PATH = os.path.join(DATA_PATH, 'tags.csv')

In [4]:
import pyspark.sql.functions as F
from pyspark.sql.types import *


ratings_df = sqlContext.read.format("com.databricks.spark.csv") \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('file:///' + RATINGS_PATH)

In [5]:
ratings_df.describe()

DataFrame[summary: string, userId: string, movieId: string, rating: string, timestamp: string]

In [6]:
ratings_df = ratings_df.cache()

In [7]:
dataset_size = ratings_df.count()
dataset_size

25000095

In [8]:
n_users = ratings_df \
    .select('userId') \
    .distinct() \
    .count()
n_users

162541

In [9]:
ratings_df.select(F.min(F.col('timestamp')), F.max(F.col('timestamp'))).collect()

[Row(min(timestamp)=789652009, max(timestamp)=1574327703)]

## Evaluation Protocol

Так как мы хотим оценивать качество разных алгоритмов рекомендаций, то в первую очередь нам нужно определить
* Как разбить данные на `Train`/`Validation`/`Test`;
* Какие метрики использовать для оценки качества.

### Выбор методики разбиения датасета

Будем рассматривать две опции:
* Разбить все оценки во времени на три группы (train/valid/test)
* Разбить все оценки на сессии (серия оценок от одного пользователя), каждую из них разбить во времени

Второй способ хуже тем, что в нем мы заглядываем в будущее для предсказания оценок некоторых пользователей. Тем не менее, он решает проблему холодного старта (в первом способе некоторые пользователи встречаются в тестовой выборке, но при этом отсутствуют в тренировочной), из-за которой нам придется сократить тестовую выборку.

Значит, чтобы выбрать методику разбиения, попробуем разбить датасет первым способом и поймем, какое количество оценок и пользователей придется выкинуть из-за холодного старта. Затем решим, насколько это количество критично и стоит ли переходить к сессиям.

In [10]:
from pyspark.sql import Window

def split_by_col(df: pyspark.sql.dataframe.DataFrame, split_col: str, parts_fractions: List[float]):
    """
    df - DataFrame
    split_col - total order column
    parts_fractions - fractions of resulting parts
    """
    
    split_window = Window().orderBy(split_col)
    df = df.withColumn('fraction', F.percent_rank().over(split_window))
    
    parts_fractions = [0.] + [sum(parts_fractions[:(i + 1)]) for i in range(len(parts_fractions))]
    parts_fractions[-1] += 1e-9 # to allow weak inequality for the last part
    parts = [
        df.filter((low_fraction <= F.col('fraction')) & (F.col('fraction') < high_fraction)).drop('fraction')
        for low_fraction, high_fraction in zip(parts_fractions[:-1], parts_fractions[1:])
    ]
    
    return parts

In [11]:
train_df, valid_df, test_df = split_by_col(ratings_df, 'timestamp', [0.8, 0.1, 0.1])

Найдем границу, отделяющую тренировочную выборку:

In [12]:
train_time_separator = train_df.select(F.max(F.col('timestamp'))).collect()

In [13]:
train_time_separator = train_time_separator[0][0]

In [14]:
train_time_separator

1466837397

Посчитаем долю пользователей и рейтингов, которые нам придется выкинуть в валидационной и тестовой выборках

In [15]:
missed_info = ratings_df \
    .groupBy('userId') \
    .agg(F.min(F.col('timestamp')).alias('minTimestamp'), F.count('movieId').alias('movieCount')) \
    .select('userId', 'minTimestamp', 'movieCount') \
    .filter(F.col('minTimestamp') > train_time_separator)

In [16]:
missed_info.show()
missed_info.count()

+------+------------+----------+
|userId|minTimestamp|movieCount|
+------+------------+----------+
|   471|  1499822567|        39|
|   833|  1467556963|       158|
|  1088|  1501273541|        36|
|  1238|  1495751304|       150|
|  3794|  1484584684|       336|
|  4900|  1526377075|       337|
|  6357|  1485048811|       241|
|  7754|  1485014125|        42|
|  7833|  1546799783|        33|
|  9900|  1482079056|        63|
| 10206|  1497044862|       110|
| 10623|  1521403955|      1629|
| 10817|  1554696855|        24|
| 11141|  1509496887|        83|
| 11317|  1549388861|        22|
| 11458|  1472091641|        72|
| 12940|  1504635908|        32|
| 13832|  1494019772|        69|
| 13840|  1542844561|       102|
| 14450|  1532874678|       304|
+------+------------+----------+
only showing top 20 rows



24658

In [17]:
missed_info = missed_info \
    .select(F.count('userId').alias('missedUsers'), F.sum('movieCount').alias('missedRatings')) \
    .collect()

In [18]:
missed_info

[Row(missedUsers=24658, missedRatings=4235184)]

In [19]:
missed_info[0]['missedRatings'] / int(0.2 * dataset_size)

0.8470335812723911

In [20]:
non_train_users = valid_df \
    .union(test_df) \
    .select('userId') \
    .distinct() \
    .count()

In [21]:
missed_info[0]['missedUsers'] / non_train_users

0.7939083679448791

#### Вывод

Если мы будем делить датасет во времени, то мы потеряем 85% рейтингов в тестовой и валидационной выборках и почти 80% пользователей. Это очень существенное изменение размера датасета, поэтому разобьем датасет по сессиям, а уже их по времени.

In [22]:
def partitioned_split_by_col(
    df: pyspark.sql.dataframe.DataFrame, split_col: str, partition_col: str, parts_fractions: List[float]
):
    """
    df - DataFrame
    split_col - total order column
    parts_fractions - fractions of resulting parts
    """
    
    split_window = Window().orderBy(split_col).partitionBy(partition_col)
    df = df.withColumn('fraction', F.percent_rank().over(split_window))
    
    parts_fractions = [0.] + [sum(parts_fractions[:(i + 1)]) for i in range(len(parts_fractions))]
    parts_fractions[-1] += 1e-9 # to allow weak inequality for the last part
    parts = [
        df.filter((low_fraction <= F.col('fraction')) & (F.col('fraction') < high_fraction)).drop('fraction')
        for low_fraction, high_fraction in zip(parts_fractions[:-1], parts_fractions[1:])
    ]
    
    return parts

In [23]:
train_df, valid_df, test_df = partitioned_split_by_col(ratings_df, 'timestamp', 'userId', [0.8, 0.1, 0.1])

In [24]:
train_df.count() / dataset_size, valid_df.count() / dataset_size, test_df.count() / dataset_size

(0.8006953973574901, 0.09911926334679928, 0.10018533929571068)

In [25]:
train_df.select('userId').distinct().count(), \
valid_df.select('userId').distinct().count(), \
test_df.select('userId').distinct().count()

(162541, 158503, 158843)

### Выбор метрик для оценки качества

Чтобы понять, какие метрики адекватно использовать, надо сперва понять, сколько фильмов посмотрели люди в тестовой выборке

In [26]:
test_counts = test_df \
    .groupBy('userId') \
    .agg(F.count('movieId').alias('movieCount'))

In [27]:
test_counts.select(F.min('movieCount'), F.mean('movieCount'), F.max('movieCount')).collect()

[Row(min(movieCount)=1, avg(movieCount)=15.768041399369189, max(movieCount)=3191)]

In [28]:
count_window = Window().orderBy('movieCount')
test_counts_percentile = test_counts.withColumn('fraction', F.percent_rank().over(count_window))

test_counts_percentile.filter(F.col('fraction') <= 0.25).select(F.max('movieCount')).collect(), \
test_counts_percentile.filter(F.col('fraction') <= 0.5).select(F.max('movieCount')).collect(), \
test_counts_percentile.filter(F.col('fraction') <= 0.75).select(F.max('movieCount')).collect(), \
test_counts_percentile.filter(F.col('fraction') <= 0.8).select(F.max('movieCount')).collect(), \

([Row(max(movieCount)=4)],
 [Row(max(movieCount)=7)],
 [Row(max(movieCount)=17)],
 [Row(max(movieCount)=21)])

Для 80% пользователей достаточно рекомендовать около 20 фильмов

Будем использовать [Precision, NDCG] @ [1, 5, 10, 20] и MAP

In [284]:
from pyspark.mllib.evaluation import RankingMetrics

KS = [1, 5, 10, 20]

def get_metrics(predictions_df, gt_df):
    labels = gt_df \
        .groupBy('userId') \
        .agg(F.collect_set('movieId').alias('labels'))
    
    ranking_metrics = RankingMetrics(
        labels.join(predictions_df, on='userId') \
        .select('predictions', 'labels') \
        .rdd
    )
    
    metrics = {}
    
    for k in KS:
        metrics[f'Precision@{k}'] = ranking_metrics.precisionAt(k)
        metrics[f'NDCG@{k}'] = ranking_metrics.ndcgAt(k)
    metrics['MAP'] = ranking_metrics.meanAveragePrecision
    
    return metrics

## Models

Теперь мы можем перейти к формулировке задачи в терминах машинного обучения.

Одна из формулировок, к которой мы сведем нашу задачу - **Matrix Completetion**. Данную задачу будем решать с помощью `ALS`

### [ALS](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html#explicit-vs-implicit-feedback)

In [29]:
from pyspark.ml.recommendation import ALS

In [30]:
als = ALS(maxIter=5, rank=100, regParam=0.1, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

In [31]:
model = als.fit(train_df)

Покажите для выбранных вами фильмов топ-20 наиболее похожих фильмов

In [32]:
item_vectors = model.itemFactors \
    .select(F.col('id').alias('movieId'), F.col('features').alias('vector'))

In [33]:
movies_df = sqlContext.read.format("com.databricks.spark.csv") \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('file:///' + MOVIES_PATH)

In [34]:
movies_df.describe()

DataFrame[summary: string, movieId: string, title: string, genres: string]

In [35]:
movie_ids = [356, 318, 296, 364, 380, 377, 344, 367, 293, 231]

picked_movies = movies_df \
    .filter(F.col('movieId').isin(movie_ids)) \
    .join(item_vectors, on='movieId')

picked_movies.show()

+-------+--------------------+--------------------+--------------------+
|movieId|               title|              genres|              vector|
+-------+--------------------+--------------------+--------------------+
|    380|    True Lies (1994)|Action|Adventure|...|[0.064716086, -0....|
|    231|Dumb & Dumber (Du...|    Adventure|Comedy|[0.040393606, -0....|
|    293|Léon: The Profess...|Action|Crime|Dram...|[-0.060172215, -0...|
|    344|Ace Ventura: Pet ...|              Comedy|[0.07797792, -0.0...|
|    364|Lion King, The (1...|Adventure|Animati...|[0.24805997, -0.0...|
|    296| Pulp Fiction (1994)|Comedy|Crime|Dram...|[-0.097767875, -0...|
|    356| Forrest Gump (1994)|Comedy|Drama|Roma...|[0.07929398, -0.0...|
|    367|    Mask, The (1994)|Action|Comedy|Cri...|[-0.009585199, -0...|
|    377|        Speed (1994)|Action|Romance|Th...|[0.03229766, -0.0...|
|    318|Shawshank Redempt...|         Crime|Drama|[-0.07772393, -0....|
+-------+--------------------+--------------------+

In [36]:
from scipy.spatial.distance import cosine

In [38]:
def cos_dist(v, u):
    return float(cosine(v, u))
    
cos_udf = F.udf(cos_dist, FloatType())

Чтобы не делать вывод слишком большим, посмотрим на 10 (а не 20) близких фильмов для 10 популярных фильмов

In [163]:
SIMILAR = 10

movie_vectors = movies_df \
    .join(item_vectors, on='movieId')

for movie_id in movie_ids:
    picked_movie = picked_movies \
        .filter(F.col('movieId') == movie_id)
    
    print(f"Movies similar to {picked_movie.select('title').collect()[0]['title']}")
    
    picked_movie = picked_movie \
        .select(F.col('movieId').alias('pickedId'), F.col('vector').alias('pickedVector'))
     
    movie_similarity = movie_vectors \
        .join(picked_movie, on=F.col('movieId') != F.col('pickedId')) \
        .withColumn('dist', cos_udf(F.col('vector'), F.col('pickedVector'))) \
        .select('title', 'dist')
        
    similar_window = Window().orderBy('dist')
    
    similar_movies = movie_similarity \
        .withColumn('rank', F.rank().over(similar_window)) \
        .filter(F.col('rank') <= SIMILAR) \
        .select('title', 'dist')
    
    for row in similar_movies.collect():
        print(f"{row['dist']:.3f} | {row['title']}")
    
    print()

Movies similar to Forrest Gump (1994)
0.040 | Green Mile, The (1999)
0.050 | Scent of a Woman (1992)
0.053 | Rain Man (1988)
0.055 | Shawshank Redemption, The (1994)
0.056 | Few Good Men, A (1992)
0.056 | Cast Away (2000)
0.060 | Good Will Hunting (1997)
0.063 | As Good as It Gets (1997)
0.064 | Dances with Wolves (1990)
0.065 | Regarding Henry (1991)

Movies similar to Shawshank Redemption, The (1994)
0.030 | Schindler's List (1993)
0.030 | Rain Man (1988)
0.032 | Bronx Tale, A (1993)
0.033 | As Good as It Gets (1997)
0.035 | Good Will Hunting (1997)
0.037 | Glory (1989)
0.040 | Philadelphia (1993)
0.041 | Good Morning, Vietnam (1987)
0.041 | Usual Suspects, The (1995)
0.041 | Lean on Me (1989)

Movies similar to Pulp Fiction (1994)
0.026 | Reservoir Dogs (1992)
0.039 | True Romance (1993)
0.039 | Trainspotting (1996)
0.052 | Shallow Grave (1994)
0.057 | High Noon (2000)
0.058 | Clerks (1994)
0.059 | Macross: Flash Back 2012 (1987)
0.059 | Macross Zero (2002)
0.059 | Macross Frontier:

### Ваша формулировка

На лекции было еще несколько ML формулировок задачи рекомендаций. Выберете одну из них и реализуйте метод.

In [217]:
rank_window = Window().partitionBy('userId').orderBy('timestamp')

ratings_df_ranked = train_df \
    .withColumn('rank', F.rank().over(rank_window)) \
    .select('userId', 'movieId', 'rank')

In [218]:
movie_pairs = ratings_df_ranked \
    .select(
        F.col('userId').alias('userIdInit'), 
        F.col('movieId').alias('movieIdInit'), 
        F.col('rank').alias('rankInit')
    ) \
    .join(ratings_df_ranked, on=[F.col('userIdInit') == F.col('userId'), F.col('rankInit') < F.col('rank')]) \
    .withColumn('deltaRank', 1 / (F.col('rank') - F.col('rankInit'))) \
    .select(F.col('movieIdInit').alias('movie1'), F.col('movieId').alias('movie2'), 'deltaRank')

In [219]:
transition_weights = movie_pairs \
    .groupBy('movie1', 'movie2') \
    .agg(F.sum('deltaRank').alias('weight'))

In [220]:
transition_weights.show()

+------+------+------------------+
|movie1|movie2|            weight|
+------+------+------------------+
|   260|   912| 466.1012564665204|
| 58559| 88810|  53.9652675456479|
|  1198|  1136| 1050.577466623817|
|  1198| 68954| 633.5856414520265|
|  2571| 79132| 3631.446557388445|
|  1213|   593|1233.1577751746715|
|  1136|  2176|19.164468015686655|
|  1089|  1086| 56.99029650317705|
|  1217|   912|42.964930673210574|
|   608|  3000| 146.1083700873866|
|   608|  1250| 186.4604159665512|
|  3000| 63082| 57.08382250426676|
|116897|    32|10.220482535321636|
|  3462|  1199|22.004069106008863|
|  1250|  1207| 140.0087397216013|
|  1276|  1080| 112.9396771255049|
|109487| 44191|290.10415203679185|
|  3949|   953| 52.37575286743941|
|  1199| 77455|15.089828307796683|
| 78499|112556|134.64502293663443|
+------+------+------------------+
only showing top 20 rows



In [221]:
# Where does American Pie lead us?

transition_weights \
    .filter(F.col('movie1') == 2706) \
    .join(movies_df, on=F.col('movie2') == F.col('movieId')) \
    .select('movieId', 'title', 'weight') \
    .sort(F.col('weight').desc()) \
    .show()

+-------+--------------------+------------------+
|movieId|               title|            weight|
+-------+--------------------+------------------+
|   1917|   Armageddon (1998)| 1127.759743527475|
|   2710|Blair Witch Proje...|1015.6232804455163|
|   4718|American Pie 2 (2...| 992.4110949053967|
|   1517|Austin Powers: In...| 821.4880284492772|
|   2683|Austin Powers: Th...| 688.9186545850187|
|   2355|Bug's Life, A (1998)| 632.7361547343305|
|   1208|Apocalypse Now (1...| 587.9580275797273|
|   3114|  Toy Story 2 (1999)| 533.0524642263117|
|      2|      Jumanji (1995)| 516.5229523065678|
|   1682|Truman Show, The ...| 499.3032730629892|
|   1206|Clockwork Orange,...|481.63108806513225|
|   2716|Ghostbusters (a.k...|462.56122445125084|
|   2997|Being John Malkov...| 445.2078232850714|
|   1101|      Top Gun (1986)|443.51080322393676|
|   2959|   Fight Club (1999)|432.34083465151133|
|    223|       Clerks (1994)|424.17002996188955|
|   2918|Ferris Bueller's ...| 383.2630078221996|


In [243]:
class SequentialRulesModel:
    
    def __init__(self, ratings):
        last_rating_window = Window().partitionBy('userId').orderBy(F.col('timestamp').desc())

        self.last_user_ratings = ratings \
            .withColumn('timestampOrder', F.row_number().over(last_rating_window)) \
            .filter(F.col('timestampOrder') == 1) \
            .select('userId', 'movieId') \
            .cache()
        
        transition_weights = self.compute_transition_weights(ratings)
        
        top_transition_window = Window().partitionBy('movie1').orderBy(F.col('weight').desc())
        
        self.ranked_transition_weights = transition_weights \
            .withColumn('rank', F.row_number().over(top_transition_window)) \
            .drop('weight') \
            .cache()
    
    @staticmethod
    def compute_transition_weights(ratings):
        rank_window = Window().partitionBy('userId').orderBy('timestamp')

        ratings_ranked = ratings \
            .withColumn('rank', F.rank().over(rank_window)) \
            .select('userId', 'movieId', 'rank')
        
        movie_pairs = ratings_df_ranked \
            .select(
                F.col('userId').alias('userIdInit'), 
                F.col('movieId').alias('movieIdInit'), 
                F.col('rank').alias('rankInit')
            ) \
            .join(ratings_df_ranked, on=[F.col('userIdInit') == F.col('userId'), F.col('rankInit') < F.col('rank')]) \
            .withColumn('deltaRank', 1 / (F.col('rank') - F.col('rankInit'))) \
            .select(F.col('movieIdInit').alias('movie1'), F.col('movieId').alias('movie2'), 'deltaRank')
        
        return movie_pairs \
            .groupBy('movie1', 'movie2') \
            .agg(F.sum('deltaRank').alias('weight'))
        
    def recommend(self, k, users):
        top_k_transitions = self.ranked_transition_weights \
            .filter(F.col('rank') <= k) \
            .drop('rank')
        
        return self.last_user_ratings \
            .join(users, on='userId') \
            .join(top_k_transitions, on=F.col('movieId') == F.col('movie1')) \
            .select('userId', F.col('movie2').alias('recommendedMovieid'))

In [257]:
sequential_model = SequentialRulesModel(train_df.sample(0.01))

## Evaluation Results

Сравните реализованные методы с помощью выбранных метрик. Не забывайте про оптимизацию гиперпараметров.

In [230]:
valid_users = valid_df.select('userId').distinct()
test_users = test_df.select('userId').distinct()

In [300]:
def get_als_predictions(model, user_set):
    als_predictions = model.recommendForUserSubset(user_set, max(KS)).cache()    
    def extract_predictions(recommendations):
        return [rec.movieId for rec in recommendations]

    extract_predictions_udf = F.udf(extract_predictions, ArrayType(IntegerType()))

    als_predictions = als_predictions \
        .withColumn('predictions', extract_predictions_udf(F.col('recommendations'))) \
        .drop('recommendations')
    
    return als_predictions

In [301]:
def get_srm_predictions(model, user_set):
    srm_predictions = model.recommend(max(KS), user_set).cache()

    srm_predictions = srm_predictions \
        .groupBy('userId') \
        .agg(F.collect_set('recommendedMovieId').alias('predictions'))
    
    return srm_predictions

In [294]:
srm_predictions = get_srm_predictions(sequential_model, valid_users)
get_metrics(srm_predictions, valid_df)

{'Precision@1': 0.02582970415978375,
 'NDCG@1': 0.02582970415978375,
 'Precision@5': 0.026330279821795054,
 'NDCG@5': 0.027141763746588003,
 'Precision@10': 0.025913550583170644,
 'NDCG@10': 0.029724978636808033,
 'Precision@20': 0.025832207038093805,
 'NDCG@20': 0.037010902602625434,
 'MAP': 0.009752179444347512}

In [295]:
als_predictions = get_als_predictions(model, valid_users)
get_metrics(als_predictions, valid_df)

{'Precision@1': 2.523611540475576e-05,
 'NDCG@1': 2.5236115404755766e-05,
 'Precision@5': 1.7665280783329033e-05,
 'NDCG@5': 2.097586715394493e-05,
 'Precision@10': 1.0094446161902298e-05,
 'NDCG@10': 1.5447205721427435e-05,
 'Precision@20': 1.2618057702377883e-05,
 'NDCG@20': 2.4613230085202504e-05,
 'MAP': 4.766596847810347e-06}

In [298]:
srm_predictions = get_srm_predictions(sequential_model, test_users)
get_metrics(srm_predictions, test_df)

{'Precision@1': 0.01951578868508783,
 'NDCG@1': 0.01951578868508783,
 'Precision@5': 0.019390927593052724,
 'NDCG@5': 0.020010767693939967,
 'Precision@10': 0.019399667869495176,
 'NDCG@10': 0.02208205044223997,
 'Precision@20': 0.019334115796176762,
 'NDCG@20': 0.02735393132782173,
 'MAP': 0.006738204974498579}

In [302]:
als_predictions = get_als_predictions(model, test_users)
get_metrics(als_predictions, test_df)

{'Precision@1': 0.0,
 'NDCG@1': 0.0,
 'Precision@5': 6.295524511624691e-06,
 'NDCG@5': 6.214627880677473e-06,
 'Precision@10': 6.295524511624688e-06,
 'NDCG@10': 6.697675422241314e-06,
 'Precision@20': 8.184181865112098e-06,
 'NDCG@20': 1.2323547312328835e-05,
 'MAP': 1.692921712081672e-06}