# Задача построения модели подбора рекомендаций фильмов

В данном ноутбуке решается задача построения рекомендательной системы, которая на основе построения эмбедингов пользователей и эмбедингов фильмов могла бы рекомендовать наиболее релевантные новые фильмы пользователям.

Я буду решать эту задачу при помощи pyspark - фреймворка для распределенной обработки больших данных.

### Установка необходимых фреймворков и загрузка данных

In [None]:
#Java JDK
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Downloading Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
#Unzipping the hadoop file
!tar -xvf spark-3.4.0-bin-hadoop3.tgz

In [2]:
# скачиваем датасет
!wget http://files.grouplens.org/datasets/movielens/ml-latest.zip

--2023-05-10 20:31:39--  http://files.grouplens.org/datasets/movielens/ml-latest.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 277113433 (264M) [application/zip]
Saving to: ‘ml-latest.zip’


2023-05-10 20:31:45 (50.0 MB/s) - ‘ml-latest.zip’ saved [277113433/277113433]



In [3]:
# разархивируем
!unzip ml-latest.zip

Archive:  ml-latest.zip
   creating: ml-latest/
  inflating: ml-latest/links.csv     
  inflating: ml-latest/tags.csv      
  inflating: ml-latest/genome-tags.csv  
  inflating: ml-latest/ratings.csv   
  inflating: ml-latest/README.txt    
  inflating: ml-latest/genome-scores.csv  
  inflating: ml-latest/movies.csv    


In [4]:
# устанавливаем findspark
!pip install -q findspark

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"

In [6]:
# инициализируем сессию
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [7]:
# задаем путь к файлам
ratings_file ='/content/ml-latest/ratings.csv'
movies_file = '/content/ml-latest/movies.csv'
links_file = '/content/ml-latest/links.csv'

In [8]:
def readFiles(filename):
  data = spark.read.format('com.databricks.spark.csv').\
                               options(header='true', \
                               inferschema='true').\
                load(filename,header=True)
  return data

In [9]:
# считываем данные
ratings = readFiles(ratings_file)
movies = readFiles(movies_file)
links = readFiles(links_file)

In [10]:
ratings.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
+------+-------+------+----------+
only showing top 5 rows



### Подготовка данных для обучения, разбиение

Минимальный набор данных для построения рекомендательной системы - таблица взаимодействий (интеракций) пользователь-контент. Система логирования пользовательских действий сохраняет:
- пару пользователь-контент
- меру взаимодействия (например, рейтинг)
- временную метку, когда произошла интеракция.

Рекомендательные системы строятся вокруг этой таблицы. Такие данные можно представить в виде т.н. матрицы user-item matrix, где каждая строка - это пользователь, каждый столбец - контент. А в пересечении строки и столбца находится мера взаимодействия (в данном случае рейтинг). Для обучения модели нам понадобятся только эти данные: id пользователей и фильмов и рейтинги. Поэтому за основу мы возьмем таблицу ratings и удалим из неё столбец timestamp.

Матрица будет разреженной.

In [11]:
data = ratings.drop("timestamp")

Разбиваем данные на train и test в пропорции 80/20. Это часто встречающаяся пропорция для разделения. Такой выбор основывается на мысли о том, что отсутствие достаточного количества данных как в обучающем, так и в проверочном наборе приведет к тому, что модель будет трудно изучить/обучить или определить, действительно ли эта модель работает хорошо или нет.

In [12]:
train, test = data.randomSplit([0.8, 0.2])

### Метод и обоснование

Таблица взаимодействий пользователь-контент представляет собой матрицу, а к матрице можно применить алгоритмы матричной факторизации. После этого
с помощью матричного разложения мы получаем две матрицы более низкой размерности - одна описывает пользователей (факторы пользователей), вторая контент (факторы контента).
Чтобы получить рекомендации нужно вектор факторов пользователя (соответствующую строку в матрице) умножить на факторы контента и отсортировать полученные значения. Для матричного разложения можно использовать SVD или более продвинутый метод ALS-разложения. Мы используем второй метод, реализованный в pyspark.

In [13]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [14]:
# инициализируем параметры модели и обучаем её:
# nonnegative отвечает за необходимость получения неотрицательных значений в результате обучения. По умолчанию False.
# True применяется, когда переменные в физическом смысле не могут быть отрицательными, как рейтинг в нашем случае.
# coldStartStrategy = “drop” применяется чтобы игнорировать строки df где предсказания содержат NaN
model = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True, coldStartStrategy="drop").fit(train)

### Метрика и обоснование

Качество рекомендательной модели можно измерить как метриками классификации, так и метриками регрессии. При обучении ALS мы оптимизируем метрику, которая “приближает” произведение матриц факторов пользователей и контента к исходной матрице. В такой постановке можно использовать классические метрике RMSE, MAE для определения того, насколько хорошо “угадываем” уже существующие оценки. Также можно использовать метрики Precision и Recall.
Recall – доля релевантных объектов, показанных пользователю, относительно всех релевантных объектов.
Precision – доля релевантных пользователю объектов относительно тех, которые ему показали.
Я буду использовать RMSE.

In [15]:
# инициализируем RegressionEvaluator
from pyspark.ml.evaluation import RegressionEvaluator
evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")

In [16]:
# вычисляем предсказания и считаем RMSE на тестовой выборке
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("New RMSE: ", evaluator.evaluate(model.transform(test)))

New RMSE:  0.8247412247250272


In [22]:
# пример рекомендаций:
for_an_user = predictions.where(predictions.userId==234926).join(movies, "movieId").join(links, "movieId").select("userId","title","tmdbId","genres","prediction")
for_an_user.show(5)

+------+--------------------+------+--------------------+----------+
|userId|               title|tmdbId|              genres|prediction|
+------+--------------------+------+--------------------+----------+
|234926|    Assassins (1995)|  9691|Action|Crime|Thri...|  2.581965|
|234926|Seven (a.k.a. Se7...|   807|    Mystery|Thriller|  3.585339|
|234926|    City Hall (1996)| 11062|      Drama|Thriller| 3.0170746|
|234926|Awfully Big Adven...| 22279|               Drama| 2.8612847|
|234926|        Congo (1995)| 10329|Action|Adventure|...| 2.1707146|
+------+--------------------+------+--------------------+----------+
only showing top 5 rows

