In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns

# все любят картиночки:)

In [2]:
# docker контейнер уже должен содержать в себе скачанные данные
# скачаны отсюда: https://grouplens.org/datasets/movielens/
DATA_DIR = "/data/ml-latest"

In [3]:
# смотрим, какие файлы у нас есть
!ls {DATA_DIR}

README.txt	   genome-tags.csv  movies.csv	 tags.csv     tf_idf.parquet
genome-scores.csv  links.csv	    ratings.csv  tf_idf.json


In [4]:
# обратим внимание на файл с оценками
!head {DATA_DIR}/ratings.csv

userId,movieId,rating,timestamp
1,110,1.0,1425941529
1,147,4.5,1425942435
1,858,5.0,1425941523
1,1221,5.0,1425941546
1,1246,5.0,1425941556
1,1968,4.0,1425942148
1,2762,4.5,1425941300
1,2918,5.0,1425941593
1,2959,4.0,1425941601


In [5]:
# для подготовки данных будем использвоать Apache Spark
# (популярный фреймворк распределённых вычислений)
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    # если виртуальной машине нельзя добавить памяти, можно использовать меньше
    .config("spark.driver.memory", "8g")
    # можно явно количество ядер, которые будет использовать Spark
    # либо поставить звёздочку для всех доступных виртуальной машине
    .master("local[*]")
    .getOrCreate()
)

In [6]:
# считываем данные из CSV
# и преобразуем время проставления оценки из целого числа в дату со временем
import os
import pyspark.sql.functions as sql_func

ratings = (
    spark
    .read
    .csv(
        os.path.join(DATA_DIR, "ratings.csv"),
        header=True,
        inferSchema=True
    )
    # если используется меньше памяти,
    # то здесь можно взять не все данные, а а небольшую выборку
    # даже при fraction=0.01 качественная картина не меняется
    .sample(withReplacement=False, fraction=0.1, seed=0)
    .withColumn("rating_datetime", sql_func.from_unixtime("timestamp"))
    .drop("timestamp")
    .cache()
)

In [7]:
import os
import pyspark.sql.functions as sql_func

# поскольку в Parquet схема данных хранится внутри самого файла, читать их очень просто
tf_idf = spark.read.parquet(os.path.join(DATA_DIR, "tf_idf.parquet")).cache()
tf_idf.show()

+-------+--------------------+--------------------+
|movieId|               title|              tf_idf|
+-------+--------------------+--------------------+
|     35|   Carrington (1995)|(1024,[8,74,189,2...|
|    503| New Age, The (1994)|(1024,[434,769,82...|
|    583|Dear Diary (Caro ...|(1024,[434,741,84...|
|    594|Snow White and th...|(1024,[29,52,60,8...|
|    610|  Heavy Metal (1981)|(1024,[32,93,112,...|
|    614|       Loaded (1994)|(1024,[263,434],[...|
|    761| Phantom, The (1996)|(1024,[43,169,196...|
|    880|Island of Dr. Mor...|(1024,[44,81,219,...|
|   1369|I Can't Sleep (J'...|(1024,[263,434],[...|
|   1519|Broken English (1...|(1024,[434,829],[...|
|   1589|     Cop Land (1997)|(1024,[37,61,164,...|
|   1815|         Eden (1997)|(1024,[434,829],[...|
|   1881|Quest for Camelot...|(1024,[57,165,337...|
|   2080|Lady and the Tram...|(1024,[29,37,83,1...|
|   2324|Life Is Beautiful...|(1024,[3,31,32,45...|
|   2444|24 7: Twenty Four...|(1024,[122,221,43...|
|   2445|At 

In [8]:
# оцениваем размеры данных
print("всего пользователей:", ratings.select("userId").distinct().count())
print("всего фильмов:", ratings.select("movieId").distinct().count())
print("всего оценок:", ratings.count())

всего пользователей: 229897
всего фильмов: 26166
всего оценок: 2599745


In [9]:
# достаточно хорошим baseline является предсказывать среднюю оценку
mean_rating = ratings.agg(sql_func.avg("rating")).first()[0]
print("средняя оценка:", mean_rating)

средняя оценка: 3.5280369805500156


In [10]:
# функция, с помощью которой мы будем вычислять RMSE на обучающей выборке
from pyspark.sql import DataFrame
import numpy as np

def simple_evaluate(predictions_df: DataFrame) -> float:
    return np.sqrt(
            ratings
            .join(
                predictions_df,
                ["movieId", "userId"]
            ).select(
                sql_func.pow(
                    ratings.rating - predictions_df.prediction,
                    2
                ).alias("squared_error")
            )
            .agg(sql_func.avg("squared_error"))
            .first()[0]
    )

In [11]:
# рекомендуем любому пользователю любой фильм случайно
mean_predictions = ratings.withColumn("prediction", sql_func.lit(mean_rating))
print("ошибка предсказания:", simple_evaluate(mean_predictions))

ошибка предсказания: 1.065588580528197


In [12]:
# посмотрим на распределение средних оценок разных фильмов
movie_ratings = (
    ratings
    .groupBy("movieId")
    .agg(sql_func.avg("rating").alias("avg_movie_rating"))
    .cache()
)

In [13]:
# у разных пользователей разные распределения оценок
# кто-то более придирчив, а кто-то всем ставит пятёрки
user_ratings = (
    ratings
    .groupBy("userId")
    .agg(sql_func.avg("rating").alias("avg_user_rating"))
    .cache()
)

In [14]:
# а можем и не полусумму, а подобрать коэффициенты
# с помощью линейной регрессии
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

data = (
    VectorAssembler(
        inputCols = ["avg_movie_rating", "avg_user_rating", "tf_idf"],
        outputCol = "features"
    ).transform(
        ratings
        .join(movie_ratings, "movieId")
        .join(user_ratings, "userId")
        .join(tf_idf, "movieId")
    )
    .withColumnRenamed("rating", "label")
    .select("movieId", "userId", "label", "features")
    .cache()
)


In [15]:
# разбиваем полученные данные на обучающую и тестовую выборки
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)
train_data.cache()
test_data.cache()
# строим для каждого пользователя свою модель регрессии

DataFrame[movieId: int, userId: int, label: double, features: vector]

In [16]:
ratings.head()

Row(userId=1, movieId=91500, rating=2.5, rating_datetime='2015-03-09 23:10:47')

In [17]:
train_data.head()

Row(movieId=1, userId=47, label=5.0, features=SparseVector(1026, {0: 3.8815, 1: 4.0952, 5: 13.2529, 10: 4.8414, 29: 4.7023, 31: 33.8657, 40: 58.6335, 57: 5.1996, 74: 5.6896, 75: 5.3348, 83: 9.0288, 87: 5.0426, 90: 4.7193, 96: 5.0562, 101: 16.23, 113: 5.615, 115: 67.6634, 128: 15.8459, 139: 5.7842, 141: 4.4545, 176: 92.6263, 193: 6.0145, 200: 5.486, 211: 6.0983, 221: 5.2358, 228: 175.2795, 231: 5.1235, 235: 6.1687, 251: 23.3928, 257: 83.2934, 261: 174.8958, 274: 5.2777, 280: 6.2004, 296: 169.7406, 304: 4.4927, 330: 4.205, 332: 4.8779, 339: 214.2467, 341: 4.8924, 352: 31.5644, 354: 6.5135, 355: 138.3196, 356: 5.5568, 358: 36.8316, 380: 3.7539, 384: 5.9794, 388: 4.0237, 389: 2.014, 390: 56.0306, 393: 5.4549, 396: 5.9047, 401: 17.0881, 412: 5.3439, 416: 5.3485, 420: 6.3636, 429: 5.5181, 433: 5.7291, 436: 0.8282, 450: 4.1156, 452: 5.5739, 455: 55.5811, 481: 4.8982, 492: 104.1344, 501: 4.8195, 508: 6.1278, 520: 38.0981, 524: 4.7669, 527: 15.2101, 539: 5.9794, 540: 4.9309, 565: 5.18, 567: 5.1

In [18]:
test_data.head()

Row(movieId=1, userId=80, label=5.0, features=SparseVector(1026, {0: 3.8815, 1: 3.8571, 5: 13.2529, 10: 4.8414, 29: 4.7023, 31: 33.8657, 40: 58.6335, 57: 5.1996, 74: 5.6896, 75: 5.3348, 83: 9.0288, 87: 5.0426, 90: 4.7193, 96: 5.0562, 101: 16.23, 113: 5.615, 115: 67.6634, 128: 15.8459, 139: 5.7842, 141: 4.4545, 176: 92.6263, 193: 6.0145, 200: 5.486, 211: 6.0983, 221: 5.2358, 228: 175.2795, 231: 5.1235, 235: 6.1687, 251: 23.3928, 257: 83.2934, 261: 174.8958, 274: 5.2777, 280: 6.2004, 296: 169.7406, 304: 4.4927, 330: 4.205, 332: 4.8779, 339: 214.2467, 341: 4.8924, 352: 31.5644, 354: 6.5135, 355: 138.3196, 356: 5.5568, 358: 36.8316, 380: 3.7539, 384: 5.9794, 388: 4.0237, 389: 2.014, 390: 56.0306, 393: 5.4549, 396: 5.9047, 401: 17.0881, 412: 5.3439, 416: 5.3485, 420: 6.3636, 429: 5.5181, 433: 5.7291, 436: 0.8282, 450: 4.1156, 452: 5.5739, 455: 55.5811, 481: 4.8982, 492: 104.1344, 501: 4.8195, 508: 6.1278, 520: 38.0981, 524: 4.7669, 527: 15.2101, 539: 5.9794, 540: 4.9309, 565: 5.18, 567: 5.1

In [19]:
linear_model = LinearRegression().fit(train_data)
stacked_prediction = (
    ratings
    .join(linear_model.transform(train_data), ["movieId", "userId"])
    .select("movieId", "userId", "prediction")
)
print("ошибка предсказания:", simple_evaluate(stacked_prediction))

ошибка предсказания: 0.8343401416880049


In [20]:
test_prediction = (
    ratings
    .join(linear_model.transform(test_data), ["movieId", "userId"])
    .select("movieId", "userId", "prediction")
)
print("ошибка предсказания:", simple_evaluate(test_prediction))

ошибка предсказания: 0.8368112287753542


In [21]:
# получаем некоторую формулу для предсказания оценки, которую можно использовать для рекомендаций
print(
    "[на сколько пользователь оценит фильм] = {} + {} * [средняя оценка этого фильма] + {} * [средняя оценка из поставленных этим пользователем]"
    .format(
        round(linear_model.intercept, 2),
        round(linear_model.coefficients[0], 2),
        round(linear_model.coefficients[1], 2)
    )
)

[на сколько пользователь оценит фильм] = -2.44 + 0.83 * [средняя оценка этого фильма] + 0.87 * [средняя оценка из поставленных этим пользователем]


Эта формула является частным случаем рекомендтельной архитектуры, когда мы для заданного пользователя $u$ получаем рекомендации в два этапа:

1. составляем список объектов $i$, которые в принципе могут заинтересовать пользователя $u$
1. ранжируем этот список объектов по некоторому правилу

В частности, правило может основываться на присвоении каждому объекту из списка некоторой релевантности как функции от свойств пользователя и самого объекта: $r\left(u,i\right)$

В простейшем примере, рассмотренном в этой тетради $r\left(u,i\right)=ar_i+br_u+c$ - линейная функция от двух переменных:

1. $r_i$ - одного свойства объекта (популярности фильма в виде средней оценки) и
1. $r_u$ - одного свойства пользователя ("разборчивости" в виде среднего от распределения всех оценок в истории этого пользователя)