# Чередование наименьших квадратов на примере MovieLens (PySpark)

Факторизация матрицы с помощью [ALS](https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/recommendation.html#ALS) (метод наименьших квадратов) – алгоритм коллаборативной фильтрации.

В данном ноутбуке приведен пример использования алгоритма ALS для предсказания оценки фильма. Обычно используется для больших датасетов. Для примера воспользуемся небольшим набором данных, чтобы улучишть производительность

**P.S.**: Для запуска примера необходимо окружение PySpark. [Инструкция по установке](https://github.com/Microsoft/Recommenders/blob/master/SETUP.md#dependencies-setup)

In [23]:
# установим корневой путь и импортируем необходимые бибилиотеки и датасет
import sys
sys.path.append("../")
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType

from reco_utils.common.timer import Timer
from reco_utils.dataset import movielens
from reco_utils.common.notebook_utils import is_jupyter
from reco_utils.dataset.spark_splitters import spark_random_split
from reco_utils.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from reco_utils.common.spark_utils import start_or_get_spark

print("System version: {}".format(sys.version))
print("Spark version: {}".format(pyspark.__version__))


System version: 3.8.6 (v3.8.6:db455296be, Sep 23 2020, 13:31:39) 
[Clang 6.0 (clang-600.0.57)]
Spark version: 3.0.1


In [24]:
# выставим стандартные параметры:

# топ k элементов для рекомендации
TOP_K = 10

# размер датасета
MOVIELENS_DATA_SIZE = '100k'

### Настроим контекст для Spark

Настройки ниже хорошо подходят для работы на локальной машине, их стоит изменить при запуске эксперимента на кластере. Выберем один большой исполнитель с большим количеством потоков и укажем объем памяти 

In [25]:
spark = start_or_get_spark("ALS PySpark", memory="16g")

### Загрузим MovieLens датасет

In [26]:
schema = StructType(
    (
        StructField("UserId", IntegerType()),
        StructField("MovieId", IntegerType()),
        StructField("Rating", FloatType()),
        StructField("Timestamp", LongType()),
    )
)

data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema)
data.show()

100%|██████████| 4.81k/4.81k [00:09<00:00, 517KB/s]  
+------+-------+------+---------+
|UserId|MovieId|Rating|Timestamp|
+------+-------+------+---------+
|   196|    242|   3.0|881250949|
|   186|    302|   3.0|891717742|
|    22|    377|   1.0|878887116|
|   244|     51|   2.0|880606923|
|   166|    346|   1.0|886397596|
|   298|    474|   4.0|884182806|
|   115|    265|   2.0|881171488|
|   253|    465|   5.0|891628467|
|   305|    451|   3.0|886324817|
|     6|     86|   3.0|883603013|
|    62|    257|   2.0|879372434|
|   286|   1014|   5.0|879781125|
|   200|    222|   5.0|876042340|
|   210|     40|   3.0|891035994|
|   224|     29|   3.0|888104457|
|   303|    785|   3.0|879485318|
|   122|    387|   5.0|879270459|
|   194|    274|   2.0|879539794|
|   291|   1042|   4.0|874834944|
|   234|   1184|   2.0|892079237|
+------+-------+------+---------+
only showing top 20 rows



### Разделим датасет на train и test

In [27]:
train, test = spark_random_split(data, ratio=0.75, seed=2415)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

N train 74963
N test 25037


### Натренируем ALS модель на train датасете. Получим TOP_K рекоммендаций для test датасета

Для предсказания рейтингов будем использовать оценки юзеров из train датасета. Гиперпараметры модели возьмем [отсюда](http://mymedialite.net/examples/datasets.html)

In [28]:
header = {
    "userCol": "UserId",
    "itemCol": "MovieId",
    "ratingCol": "Rating",
}


als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

In [29]:
with Timer() as train_time:
    model = als.fit(train)

print("{} секунд заняло обучение модели".format(train_time.interval))

2.7273268020003343 секунд заняло обучение модели


Т.к. нет смысла работать с уже оцененными юзером фильмы, необходимо их удалить из списка

Поэтому сначала построим список всех фильмов для всех пользователей, а затем удалим оцененные фильмы конкретным пользователем, которые уже есть в train датасете

In [30]:
with Timer() as test_time:

    # объединим таблицы фильмов и пользователей
    users = train.select('UserId').distinct()
    items = train.select('MovieId').distinct()
    user_item = users.crossJoin(items)

    # сформируем список
    dfs_pred = model.transform(user_item)

    # удалим просмотренные
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train.alias("train"),
        (dfs_pred['UserId'] == train['UserId']) & (dfs_pred['MovieId'] == train['MovieId']),
        how='outer'
    )

    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train["train.Rating"].isNull()) \
        .select('pred.' + 'UserId', 'pred.' + 'MovieId', 'pred.' + "prediction")

    # посчитаем затраченое время
    top_all.cache().count()

print("{} секунд заняло предсказание".format(test_time.interval))

19.570216943000105 секунд заняло предсказание


In [20]:
top_all.show()

+------+-------+----------+
|UserId|MovieId|prediction|
+------+-------+----------+
|     1|    587|  3.479028|
|     1|    869|  2.895633|
|     1|   1208| 3.0098994|
|     1|   1357| 1.6365523|
|     2|     80| 2.8625576|
|     2|    472| 3.0787597|
|     2|    582| 3.7146451|
|     2|    838| 1.7649673|
|     2|    975| 3.0761478|
|     2|   1260| 2.2481644|
|     2|   1325| 2.0101647|
|     2|   1381| 3.6680305|
|     2|   1530| 2.7652826|
|     3|     22|  3.518471|
|     3|     57| 3.4261537|
|     3|     89| 3.4682543|
|     3|    367|  2.986559|
|     3|   1091| 2.3534873|
|     3|   1167|   3.13494|
|     3|   1499|  3.535588|
+------+-------+----------+
only showing top 20 rows



### Оценим работу ALS модели

In [31]:
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user="UserId", col_item="MovieId", 
                                    col_rating="Rating", col_prediction="prediction", 
                                    relevancy_method="top_k")

In [32]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

Model:	ALS
Top K:	10
MAP:	0.003964
NDCG:	0.038735
Precision@K:	0.042418
Recall@K:	0.014965


### Оценим предсказание оценки

In [33]:
prediction = model.transform(test)
prediction.cache().show()

+------+-------+------+---------+----------+
|UserId|MovieId|Rating|Timestamp|prediction|
+------+-------+------+---------+----------+
|   332|    148|   5.0|887938486| 3.5596893|
|   236|    148|   4.0|890117028| 2.4714663|
|   178|    148|   4.0|882824325| 3.6450977|
|   328|    148|   3.0|885048638|  3.253691|
|   919|    148|   3.0|875289417| 2.4988792|
|    54|    148|   3.0|880937490| 3.1066952|
|   120|    148|   3.0|889490499| 3.0998797|
|    92|    148|   2.0|877383934| 2.7356203|
|   486|    148|   2.0|879874903| 2.0614953|
|   552|    148|   3.0|879222452| 3.1767642|
|   834|    148|   4.0|890862563| 3.6788952|
|    59|    148|   3.0|888203175|  2.887632|
|   757|    148|   4.0|888444948| 3.0533493|
|   434|    148|   3.0|886724797| 3.1537614|
|   391|    148|   3.0|877400062| 2.1381872|
|   438|    148|   5.0|879868443| 3.7995524|
|   532|    148|   5.0|888817717|  3.897446|
|   821|    148|   3.0|874792650| 3.2801647|
|   793|    148|   4.0|875104498| 2.9910336|
|   938|  

In [34]:
rating_eval = SparkRatingEvaluation(test, prediction, col_user="UserId", col_item="MovieId", 
                                    col_rating="Rating", col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')

Model:	ALS rating prediction
RMSE:	0.970564
MAE:	0.753342
Explained variance:	0.256275
R squared:	0.252496


In [35]:
# освободим ресурсы
spark.stop()