## Рекомендательная модель MovieLens

In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

In [2]:
# создаем сессию spark
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    # если вертуальной машине нельзя добавить памяти, можно использовать меньше
    .config('spark_driver.memory', '4g')
    # можно явно количество ядер, которые будет использовать Spark
    # либо поставить звездочку для всех доступных виртуальной машине
    .master('local[*]')
    .getOrCreate()
    )

In [3]:
# считываем данные из CSV
# и преобразуем время проставления оценки из целого числа в дату со временем
import os
import pyspark.sql.functions as sql_func

movies = (
    spark
    .read
    .csv(os.path.join('data/movies.csv'), header=True, inferSchema=True)
    .withColumn('genres_list', sql_func.split('genres', '\|'))
    .select('movieId', 'title', 'genres_list')
    .cache()
    )

In [4]:
# взглянем внешне на датафрейм
movies.limit(10).toPandas()

Unnamed: 0,movieId,title,genres_list
0,1,Toy Story (1995),"[Adventure, Animation, Children, Comedy, Fantasy]"
1,2,Jumanji (1995),"[Adventure, Children, Fantasy]"
2,3,Grumpier Old Men (1995),"[Comedy, Romance]"
3,4,Waiting to Exhale (1995),"[Comedy, Drama, Romance]"
4,5,Father of the Bride Part II (1995),[Comedy]
5,6,Heat (1995),"[Action, Crime, Thriller]"
6,7,Sabrina (1995),"[Comedy, Romance]"
7,8,Tom and Huck (1995),"[Adventure, Children]"
8,9,Sudden Death (1995),[Action]
9,10,GoldenEye (1995),"[Action, Adventure, Thriller]"


In [5]:
movies.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres_list: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [6]:
# посмотрим на общее распределение тегов
# считываем csv никак не преобразую
raw_tags = (
    spark
    .read
    .csv(os.path.join('data/tags.csv'), header=True, inferSchema=True)
    .cache()
    )

In [7]:
# взглянем внешне на датафрейм
raw_tags.limit(10).toPandas()

Unnamed: 0,userId,movieId,tag,timestamp
0,2,60756,funny,1445714994
1,2,60756,Highly quotable,1445714996
2,2,60756,will ferrell,1445714992
3,2,89774,Boxing story,1445715207
4,2,89774,MMA,1445715200
5,2,89774,Tom Hardy,1445715205
6,2,106782,drugs,1445715054
7,2,106782,Leonardo DiCaprio,1445715051
8,2,106782,Martin Scorsese,1445715056
9,7,48516,way too long,1169687325


In [8]:
# протегированно чуть больше половина фильмов
movie_tag_count = raw_tags.count()
tag_count = raw_tags.select('tag').distinct().count()
movie_count = raw_tags.select('movieId').distinct().count()

print('ФИЛЬМОВ ВСЕГО:', movies.select('movieId').distinct().count())
print('ФИЛЬМОВ С ТЕГОМ:', movie_count)
print('РАЗЛИЧНЫХ ТЕГОВ', tag_count)
print('ВСЕГО СООТВЕТСТВИЯ ФИЛЬМ_ТЕГ:', movie_tag_count)
print('ДОЛЯ НЕНУЛЕВЫХ ЭЛЕМЕНТОВ В МАТРИЦЕ ФИЛЬМ_ТЕГ', 
     movie_tag_count / movie_count / tag_count)

ФИЛЬМОВ ВСЕГО: 9742
ФИЛЬМОВ С ТЕГОМ: 1572
РАЗЛИЧНЫХ ТЕГОВ 1589
ВСЕГО СООТВЕТСТВИЯ ФИЛЬМ_ТЕГ: 3683
ДОЛЯ НЕНУЛЕВЫХ ЭЛЕМЕНТОВ В МАТРИЦЕ ФИЛЬМ_ТЕГ 0.0014744338062090358


In [9]:
# некоторые теги различаются только регистром
print('РАЗЛИЧНЫХ ТЕГОВ БЕЗ УЧЕТА РЕГИСТРА:', 
     raw_tags.select(sql_func.upper(sql_func.col('tag'))).distinct().count())

РАЗЛИЧНЫХ ТЕГОВ БЕЗ УЧЕТА РЕГИСТРА: 1475


In [10]:
# не будем интересоваться, какой именно пользователь поставил тег и когда это произошло
tags = (
    raw_tags
    # теги могут различаться только регистром, поэтому привед их всех к верхниму
    .select(sql_func.col('movieId'),
    sql_func.upper(sql_func.col('tag')).alias('tag')
    )
    .groupBy('movieId')
    .agg(sql_func.collect_list('tag').alias('tags_list')) # собирает все теги в виде списка
    .join(movies, 'movieId', 'right')                     # right чтобы у всех фильмов были теги
    .cache()
)

In [11]:
tags.limit(10).toPandas()

Unnamed: 0,movieId,tags_list,title,genres_list
0,1,"[PIXAR, PIXAR, FUN]",Toy Story (1995),"[Adventure, Animation, Children, Comedy, Fantasy]"
1,2,"[GAME, FANTASY, MAGIC BOARD GAME, ROBIN WILLIAMS]",Jumanji (1995),"[Adventure, Children, Fantasy]"
2,3,"[MOLDY, OLD]",Grumpier Old Men (1995),"[Comedy, Romance]"
3,4,,Waiting to Exhale (1995),"[Comedy, Drama, Romance]"
4,5,"[PREGNANCY, REMAKE]",Father of the Bride Part II (1995),[Comedy]
5,6,,Heat (1995),"[Action, Crime, Thriller]"
6,7,[REMAKE],Sabrina (1995),"[Comedy, Romance]"
7,8,,Tom and Huck (1995),"[Adventure, Children]"
8,9,,Sudden Death (1995),[Action]
9,10,,GoldenEye (1995),"[Action, Adventure, Thriller]"


In [49]:
# вариант написания кода в синтаксесе SQL
raw_tags.createOrReplaceTempView('raw_tags') # регистрация таблицы для доступа в SQL запросах

spark.sql('''
SELECT * FROM raw_tags
''').limit(10).toPandas()


Unnamed: 0,userId,movieId,tag,timestamp
0,2,60756,funny,1445714994
1,2,60756,Highly quotable,1445714996
2,2,60756,will ferrell,1445714992
3,2,89774,Boxing story,1445715207
4,2,89774,MMA,1445715200
5,2,89774,Tom Hardy,1445715205
6,2,106782,drugs,1445715054
7,2,106782,Leonardo DiCaprio,1445715051
8,2,106782,Martin Scorsese,1445715056
9,7,48516,way too long,1169687325


In [13]:
# еще вариант написания кода в синтаксесе SQL
spark.sql('''
SELECT 
COLLECT_LIST (tag),
movieId FROM raw_tags
GROUP BY movieId
''').limit(10).toPandas()

Unnamed: 0,collect_list(tag),movieId
0,[hula hoop],471
1,"[dance, music]",1088
2,[aliens],1580
3,[lawyers],1645
4,"[adultery, Africa]",1959
5,[Stephen King],2122
6,[spoof],3175
7,[In Netflix queue],6466
8,[cancer],6620
9,[Nick and Nora Charles],7833


In [14]:
# объединим теги и жанры в единое пространство
from pyspark.sql.types import ArrayType, StringType

# B Spark нет некотрых полезных функций, но можно создать свои
#  в частности, мы хотим привести все жанры также в верхниму регистру
list_concat = sql_func.udf(
    lambda one_list, another_list:
        [str.upper(elem) for elem in one_list] + (another_list if another_list else []), # первый список - это список жанров
    returnType=ArrayType(StringType()) # возврат строк (тип указан явно)
)

In [15]:
# объединение жанров и тегов в один столбец (полный мешок слов)
content_features = (
    tags
    .select('movieId', 'title', 
           list_concat('genres_list', 'tags_list').alias('content_features'))
    .cache()
)

In [16]:
content_features.limit(10).toPandas()

Unnamed: 0,movieId,title,content_features
0,1,Toy Story (1995),"[ADVENTURE, ANIMATION, CHILDREN, COMEDY, FANTA..."
1,2,Jumanji (1995),"[ADVENTURE, CHILDREN, FANTASY, GAME, FANTASY, ..."
2,3,Grumpier Old Men (1995),"[COMEDY, ROMANCE, MOLDY, OLD]"
3,4,Waiting to Exhale (1995),"[COMEDY, DRAMA, ROMANCE]"
4,5,Father of the Bride Part II (1995),"[COMEDY, PREGNANCY, REMAKE]"
5,6,Heat (1995),"[ACTION, CRIME, THRILLER]"
6,7,Sabrina (1995),"[COMEDY, ROMANCE, REMAKE]"
7,8,Tom and Huck (1995),"[ADVENTURE, CHILDREN]"
8,9,Sudden Death (1995),[ACTION]
9,10,GoldenEye (1995),"[ACTION, ADVENTURE, THRILLER]"


In [17]:
# например Гарри Поттер и философский камень
content_features.where('movieId==4896').toPandas()

Unnamed: 0,movieId,title,content_features
0,4896,Harry Potter and the Sorcerer's Stone (a.k.a. ...,"[ADVENTURE, CHILDREN, FANTASY, MAGIC, WIZARDS,..."


In [18]:
# мешок слов (bag-of-words)
from pyspark.sql.functions import explode, count, desc

(
content_features
    .where('movieId==4896')
    .select(explode('content_features').alias('words'))
    .groupBy('words')
    .agg(count('words').alias('freq'))
    .orderBy(desc('freq'))
    .toPandas()
)

Unnamed: 0,words,freq
0,MAGIC,2
1,ADVENTURE,1
2,CHILDREN,1
3,WIZARDS,1
4,HARRY POTTER,1
5,FANTASY,1
6,EVERYTHING YOU WANT IS HERE,1
7,HUMOROUS,1
8,ALAN RICKMAN,1


In [19]:
# теперь фичи есть для всех фильмов
content_features.count()

9742

In [20]:
# посчитаем частоты встречаемости для тегов для всех фильмов
from pyspark.ml.feature import HashingTF

term_frequencies = HashingTF(
    # от каждого тега будет вычислен хэш
    # и по факту мы будем считать частоты бакетов хэшей, а не самих тегов
    numFeatures = 512, inputCol='content_features', 
    outputCol='term_frequencies').transform(content_features).cache()

In [50]:
# пример частот встречаемости бакетов (хранится в виде разряженного вектора)
term_frequencies.where('movieId==4896').first().term_frequencies

SparseVector(512, {6: 1.0, 99: 1.0, 113: 1.0, 172: 2.0, 241: 1.0, 309: 1.0, 344: 1.0, 402: 1.0, 404: 1.0})

In [22]:
# теперь сделаем поправку на частоту тегов в целом, чтобы убрать неиформативные теги 
# это второй шаг TF-IDF (term frequency, inverted document frequency)
from pyspark.ml.feature import IDF # IDF = ln(N+1 / n+1) N - общее кол-во фильмов, n - кол-во фильмов где есть этот тег

idf_model = IDF(inputCol='term_frequencies', outputCol='tf_idf',
                minDocFreq=2).fit(term_frequencies) # частота 2, чтобы выкинуть теги которые есть только у одного фильма

tf_idf = (idf_model.transform(term_frequencies)
         .select('movieId', 'title', 'tf_idf')
         .cache()
         )

In [23]:
# сами инвертированные частоты термов
idf_model.idf[:20]

array([7.10486282, 6.00625053, 0.        , 5.96542853, 6.88171926,
       8.08569207, 2.03753218, 0.        , 0.        , 7.79801   ,
       7.39254489, 7.79801   , 0.        , 7.23839421, 6.88171926,
       6.88171926, 7.57486644, 7.79801   , 6.98707978, 0.        ])

In [24]:
# результат TF-IDF преобразования для выбранного фильма
tf_idf.where('movieId==4896').first().tf_idf

SparseVector(512, {6: 2.0375, 99: 2.525, 113: 7.3925, 172: 14.2097, 241: 7.3925, 309: 6.2399, 344: 0.0, 402: 2.6755, 404: 6.3511})

In [51]:
#сохранение json файла
#tf_idf.coalesce(1).write.mode('Overwrite').json(os.path.join('data/tf_idf.json'))

### Самостоятельная работа
1. Получить гистограмму количества тегов у фильма
2. Получить гистограмму количества тегов у пользователя
3. Получить количества тегов по месяцам

In [53]:
raw_tags.createOrReplaceTempView('raw_tags')
y = spark.sql('''
    Select Distinct 
        Percentile(tag_count, array(0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9))
    From(
        Select userId, 
        
    )
''')

## 2. Построим профиль пользователя по истории действий пользователя

In [26]:
ratings = (
    spark
    .read
    .csv(os.path.join('data/ratings.csv'), header=True, inferSchema=True)
    # если используется меньше памяти, то здесь можно взять не все данные, а небольшую выборку
    # даже при fraction=0.01  качественная картина не меняется
    .sample(withReplacement=False, fraction=1.0, seed = 0)
    .withColumn('rating_datatime', sql_func.from_unixtime('timestamp'))
    .drop('timestamp')
    .cache()
    )

In [27]:
# файл с фильмами небольшой, так что его можно читать полностью
# даже если памяти доступно немного
movies_genres = (
    spark
    .read
    .csv(os.path.join('data/movies.csv'), header=True, inferSchema=True)
    # парсим информацию о жанрах
    .withColumn('genres_array', sql_func.split('genres', '\|'))
    .select('movieId', sql_func.explode('genres_array').alias('genre')) # explode превращает строку с одинм списком 
    .cache()                                                            # во мнжество строк
    )

In [28]:
# получим соответствие жанров фильмам: много жанров - один фильм
movies_genres.toPandas()

Unnamed: 0,movieId,genre
0,1,Adventure
1,1,Animation
2,1,Children
3,1,Comedy
4,1,Fantasy
...,...,...
22079,193583,Fantasy
22080,193585,Drama
22081,193587,Action
22082,193587,Animation


In [29]:
# у нас есть фильмы без оценок
print("ФИЛЬМОВ С ЖАНРАМИ: ", movies_genres.select ('movieID').distinct().count())
print("ФИЛЬМОВ С ОЦЕНКАМИ: ", ratings.select ('movieID').distinct().count())

ФИЛЬМОВ С ЖАНРАМИ:  9742
ФИЛЬМОВ С ОЦЕНКАМИ:  9724


In [30]:
#  создаем профиль пользователя (жанровые предпочтения):
# набор средних оценок фильмов одного жанра
user_profiles = (
    ratings
    .join(movies_genres, 'movieId')
    .groupBy('userId', 'genre')
    .agg(sql_func.avg('rating').alias('genry_rating')) # средняя оценка фильмов в жанрах, которые предпочитает пользователь
    .cache()
)

In [57]:
# посмотрим как выглядит профиль одного из пользователей
(
    user_profiles
    .where('userId==15')
    .orderBy(sql_func.desc('genry_rating'))
    .show()
)

+------+---------+------------------+
|userId|    genre|      genry_rating|
+------+---------+------------------+
|    15|      War|               4.1|
|    15|  Romance|3.8846153846153846|
|    15|   Horror|3.8181818181818183|
|    15|    Drama| 3.740740740740741|
|    15|   Sci-Fi|3.5847457627118646|
|    15|    Crime|3.5789473684210527|
|    15| Thriller|3.4318181818181817|
|    15|  Mystery|3.4166666666666665|
|    15|   Comedy| 3.357142857142857|
|    15|Adventure|3.3421052631578947|
|    15|     IMAX|3.3055555555555554|
|    15|   Action|3.2033898305084745|
|    15|Animation|2.9545454545454546|
|    15|  Fantasy|           2.90625|
|    15|  Musical|               2.7|
|    15| Children|2.6904761904761907|
|    15|  Western|               2.5|
+------+---------+------------------+



In [32]:
# предсказываем  оценку фильма как среднее по средним оценкам жанров данного пользователя
predictions = (
    ratings
    .join(movies_genres, 'movieId', 'left')
    .join(user_profiles, ['userId', 'genre'], 'left')
    .groupBy('userId', 'movieId', 'rating')
    .agg(sql_func.avg('genry_rating').alias('prediction'))
)



In [33]:
predictions.show()

+------+-------+------+------------------+
|userId|movieId|rating|        prediction|
+------+-------+------+------------------+
|     1|    441|   4.0|  4.27710843373494|
|     3|   2080|   0.5|             0.625|
|     4|    708|   4.0| 3.444462864721485|
|     4|   3489|   1.0|3.6163327749080927|
|     6|    248|   5.0|  3.37007874015748|
|    10|   2571|   0.5| 2.858974358974359|
|    10|   8961|   2.5| 3.564055493981995|
|    11|    529|   5.0| 4.166666666666667|
|    11|   1518|   4.0|3.5998641304347827|
|    11|   1784|   5.0| 3.938034188034188|
|    12|   1721|   5.0| 4.566964285714286|
|    16|    111|   4.5|  3.75742337164751|
|    18|    593|   4.5|3.6758940056598917|
|    18|   1206|   4.5| 3.815771630183201|
|    18|   7147|   4.5|3.7401814546309957|
|    18|  84392|   4.5|3.8740591432745712|
|    19|    745|   4.0|2.7579300238613964|
|    19|   1580|   2.0|2.6419778083295142|
|    19|   1689|   2.0|  2.69659581584869|
|    19|   2081|   4.0| 2.758146477406793|
+------+---

### Задание слушателям
Самостоятельно найти RMSE

In [64]:
from pyspark.sql import DataFrame
import numpy as np

RMSE = np.sqrt(
    predictions
    .select(
        sql_func(pow(predictions.prediction - predictions.rating, 2)
        .alias('sq_error'))
    .agg(sql_func.avg('sq_error'))
    .first()[0])
    

SyntaxError: unexpected EOF while parsing (<ipython-input-64-ce07abeb2e4e>, line 11)

## 3. CONTENT-BASED МОДЕЛЬ (слайды)

In [34]:
tf_idf.show()

+-------+--------------------+--------------------+
|movieId|               title|              tf_idf|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|(512,[6,99,229,24...|
|      2|      Jumanji (1995)|(512,[3,4,6,99,34...|
|      3|Grumpier Old Men ...|(512,[104,229,312...|
|      4|Waiting to Exhale...|(512,[229,343,434...|
|      5|Father of the Bri...|(512,[69,229,391]...|
|      6|         Heat (1995)|(512,[263,387,456...|
|      7|      Sabrina (1995)|(512,[229,343,391...|
|      8| Tom and Huck (1995)|(512,[6,402],[2.0...|
|      9| Sudden Death (1995)|(512,[387],[1.670...|
|     10|    GoldenEye (1995)|(512,[6,263,387],...|
|     11|American Presiden...|(512,[70,229,343,...|
|     12|Dracula: Dead and...|(512,[229,462],[0...|
|     13|        Balto (1995)|(512,[6,337,402],...|
|     14|        Nixon (1995)|(512,[70,378,434]...|
|     15|Cutthroat Island ...|(512,[6,343,387],...|
|     16|       Casino (1995)|(512,[194,434,456...|
|     17|Sen

In [35]:
# не будем использовать модели из SPark, а используем модели из Sklearn
# для распределения нагрузки под управлением spark создаем пользовательскую функцию

from sklearn.linear_model import ElasticNet        # линейная модель с регуляризацией
from pyspark.sql.types import FloatType, ArrayType

def sklearn_lr(spark_x: list, spark_y: list) -> list:
    '''
    spark_x: СПИСОК pyspark.ml.linalg.SparseVector - фичи для регрессии
    spark_y: СПИСОК ЗНАЧЕНИЙ ЦЕЛЕВОЙ ПЕРЕМЕННОЙ РЕГРЕССИИ
    return: СПИСОК КОЭФФИЦИЕНТОВ РЕГРЕСИИ
    '''
    # переводим данные из формата Spark в удобный для Sklearn
    X = np.array([vector.toArray() for vector in spark_x])
    y = np.array(spark_y).reshape(-1, 1)
    # применяем модель ElasticNet из Sklearn 
    lr = ElasticNet().fit(X, y)
    # возвращаем в ответе плотный вектор коэффициентов регрессии
    return [lr.sparse_coef_.todense().tolist()[0], lr.intercept_.tolist()]

# определяем Spark UDF, которая обучает регрессию на своих аргументах
reg_udf = sql_func.udf(sklearn_lr, returnType=ArrayType(ArrayType(FloatType())))

In [67]:
# разбиваем полученные данные на обучающую и тестовую выборки

%time
X_train, X_test = ratings.join(tf_idf, 'movieId').randomSplit([0.8, 0.2], seed=42)
X_train.cache()
X_test.cache()


#строим для каждого пользователя свою модель регрессии
model_coef = (
    X_train
    .groupBy('userId')
    .agg(
        sql_func.collect_list('tf_idf').alias('X'),
        sql_func.collect_list('rating').alias('y')
    )
    .withColumn('model_coef', reg_udf('X', 'y')) # к каждой строчке применить функцию пользовательскую
    .cache()
)

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 5.96 µs


In [37]:
# сохраняем полцученные коэффициенты регрессии на диск
# model_coef.write.mode('Overwrite').parquet(os.path.join('data/model_coef.parquet'))

In [38]:
%%time
model_coef.count()

CPU times: user 20.8 ms, sys: 13.3 ms, total: 34.1 ms
Wall time: 13.3 s


610

In [39]:
# ввиду применение регуляризации, среди коэффициентов много нулей
model_coef.show()

+------+--------------------+--------------------+--------------------+
|userId|                   X|                   y|          model_coef|
+------+--------------------+--------------------+--------------------+
|   148|[(512,[88,90,187,...|[4.0, 3.0, 4.0, 3...|[[0.0, 0.0, 0.0, ...|
|   463|[(512,[0,3,4,9,15...|[4.0, 4.0, 4.0, 3...|[[0.0, 0.0, 0.0, ...|
|   471|[(512,[6,99,229,2...|[5.0, 4.0, 3.0, 4...|[[0.0, 0.0, 0.0, ...|
|   496|[(512,[6,187,229,...|[1.0, 5.0, 4.5, 4...|[[0.0, 0.0, 0.0, ...|
|   243|[(512,[6,263,387]...|[5.0, 4.0, 4.0, 5...|[[0.0, 0.0, 0.0, ...|
|   392|[(512,[324,343,43...|[3.0, 1.0, 2.0, 3...|[[0.0, 0.0, 0.0, ...|
|   540|[(512,[263,385,41...|[4.5, 3.5, 5.0, 4...|[[0.0, 0.0, 0.0, ...|
|    31|[(512,[69,229,391...|[3.0, 4.0, 4.0, 3...|[[0.0, 0.0, 0.0, ...|
|   516|[(512,[6,29,35,66...|[5.0, 4.0, 4.0, 3...|[[0.0, 0.0, 0.0, ...|
|    85|[(512,[6,434],[2....|[5.0, 5.0, 4.0, 4...|[[0.0, 0.0, 0.0, ...|
|   137|[(512,[10,52,143,...|[4.0, 5.0, 5.0, 4...|[[0.0, 0.0, 0.

In [40]:
# перемножение векторного представления фильма и векторного представления пользователя
from pyspark.ml.linalg import SparseVector

def lr_apply (x: SparseVector, lr_coef: list) -> float:
    '''
    param x ВЕКТОР ФИЧ ДЛЯ РЕГРЕССИИ
    param lr_coef
    return: ПРЕДСКАЗАННОЕ МОДЕЛЬЮ РЕГРЕССИИ ЗНАЧЕНИЕ
    '''
    return float(np.array(x).dot(np.array(lr_coef[0])) + lr_coef[1][0])

lr_apply_udf = sql_func.udf(lr_apply, returnType=FloatType())

In [41]:
# функция предсказание
from pyspark.sql import DataFrame

def get_prediction(data: DataFrame) ->DataFrame:
    return (
        data
        .join(model_coef, 'userId')
        .select('userId', 'rating', 'movieId', 'tf_idf', lr_apply_udf('tf_idf', 'model_coef').alias('prediction'))
        .cache()
    )

In [42]:
%%time
# предсказание на трэйне
train_prediction = get_prediction(X_train)

CPU times: user 13.8 ms, sys: 4.5 ms, total: 18.3 ms
Wall time: 120 ms


In [43]:
# сохраняем предсказания на диск

# train_prediction.write.mode('Overwrite').parquet(os.path.join('data/train_prediction.parquet'))

In [44]:
train_prediction.show()

+------+------+-------+--------------------+----------+
|userId|rating|movieId|              tf_idf|prediction|
+------+------+-------+--------------------+----------+
|     1|   4.0|      1|(512,[6,99,229,24...| 4.3707867|
|    17|   4.5|      1|(512,[6,99,229,24...| 4.1896553|
|    18|   3.5|      1|(512,[6,99,229,24...| 3.7066014|
|    19|   4.0|      1|(512,[6,99,229,24...| 2.6095765|
|    21|   3.5|      1|(512,[6,99,229,24...|  3.298365|
|    27|   3.0|      1|(512,[6,99,229,24...| 3.5607476|
|    31|   5.0|      1|(512,[6,99,229,24...|  3.871795|
|    32|   3.0|      1|(512,[6,99,229,24...| 3.7560976|
|    33|   3.0|      1|(512,[6,99,229,24...|    3.8125|
|    40|   5.0|      1|(512,[6,99,229,24...|    3.7125|
|    44|   3.0|      1|(512,[6,99,229,24...| 3.2521224|
|    45|   4.0|      1|(512,[6,99,229,24...|  3.863344|
|    50|   3.0|      1|(512,[6,99,229,24...| 2.7935605|
|    54|   3.0|      1|(512,[6,99,229,24...| 3.0833333|
|    57|   5.0|      1|(512,[6,99,229,24...| 3.4

In [45]:
# оценка модели при помощи RMSE
def evaluate_prediction(prediction: DataFrame) -> float:
    return np.sqrt(
        prediction
        .selectExpr('''
            CASE
                WHEN prediction > 5 THEN 5
                WHEN prediction < 0.5 THEN 0.5
                ELSE prediction
            END AS prediction
        ''', 'rating')
        .select(
            sql_func.pow(sql_func.col('rating') - sql_func.col('prediction'), 2)
            .alias('squared_error')
        )
        .agg(sql_func.avg('squared_error'))
        .first()[0]
    )

In [46]:
# оценка на трейне
evaluate_prediction(train_prediction)

0.9261553578265541

In [47]:
%%time
# предсказание на тесте
test_prediction = get_prediction(X_test)

CPU times: user 8.71 ms, sys: 4.06 ms, total: 12.8 ms
Wall time: 107 ms


In [48]:
# оценка на тесте
evaluate_prediction(test_prediction)

0.9378773706354082

## 4. Item-to-Item рекомендация (слайд)

In [73]:
from scipy.spatial.distance import euclidean

# определим функцию расстояния
distance = sql_func.udf(
    lambda x1, x2: euclidean(
        x1.toArray(),
        x2.toArray()
        ), # тут может потребоваться tolist() для некоторых расстояний
    returnType=FloatType()
)

In [74]:
# находим матрицу расстояний (ленивые вычисления, это только граф, т.к. помрет на большой матрице :)
distance_matrix = (
    tf_idf.alias('one')
    .crossJoin(tf_idf.alias('two'))
    .select(
        'one.movieId',
        'one.title',
        'two.movieId',
        'two.title',
        distance('one.tf_idf', 'two.tf_idf').alias('distance')
    )
)

In [76]:
# находим 10 ближайших соседей Гарри Потера
(
distance_matrix
    .where(sql_func.col('one.movieId')==4896)
    .orderBy('distance')
    .select('two.movieId', 'two.title', 'distance')
    .limit(10)
    .toPandas()
)

Unnamed: 0,movieId,title,distance
0,4896,Harry Potter and the Sorcerer's Stone (a.k.a. ...,0.0
1,5816,Harry Potter and the Chamber of Secrets (2002),13.840505
2,8368,Harry Potter and the Prisoner of Azkaban (2004),14.439393
3,40815,Harry Potter and the Goblet of Fire (2005),14.531631
4,8943,Being Julia (2004),16.071344
5,117887,Paddington (2014),19.0154
6,3967,Billy Elliot (2000),19.195896
7,329,Star Trek: Generations (1994),19.225018
8,2694,Big Daddy (1999),19.239136
9,104218,Grown Ups 2 (2013),19.239136


#### САМОСТОЯТЕЛЬНАЯ РАБОТА (СЛАЙД)

In [79]:
movies.where("title LIKE '%Trainspotting%'").show()

+-------+--------------------+--------------------+
|movieId|               title|         genres_list|
+-------+--------------------+--------------------+
|    778|Trainspotting (1996)|[Comedy, Crime, D...|
| 168266|T2: Trainspotting...|      [Crime, Drama]|
+-------+--------------------+--------------------+



In [80]:
(
distance_matrix
    .where(sql_func.col('one.movieId')==778)
    .orderBy('distance')
    .select('two.movieId', 'two.title', 'distance')
    .limit(10)
    .toPandas()
)

Unnamed: 0,movieId,title,distance
0,778,Trainspotting (1996),0.0
1,104863,What If (2013),11.870906
2,4623,Major League (1989),12.076021
3,5139,"Bad News Bears, The (1976)",12.076021
4,956,Penny Serenade (1941),12.220367
5,1302,Field of Dreams (1989),12.634316
6,5644,"Pride of the Yankees, The (1942)",13.579036
7,4410,Something Wild (1986),13.627515
8,166,"Doom Generation, The (1995)",13.627515
9,478,Jimmy Hollywood (1994),13.627515


In [81]:
(
distance_matrix
    .where(sql_func.col('one.movieId')==104863)
    .orderBy('distance')
    .select('two.movieId', 'two.title', 'distance')
    .limit(10)
    .toPandas()
)

Unnamed: 0,movieId,title,distance
0,104863,What If (2013),0.0
1,372,Reality Bites (1994),7.238394
2,542,Son in Law (1993),7.238394
3,1883,Bulworth (1998),7.238394
4,1175,Delicatessen (1991),7.238394
5,94,Beautiful Girls (1996),7.238394
6,803,Walking and Talking (1996),7.238394
7,195,Something to Talk About (1995),7.238394
8,1457,Fools Rush In (1997),7.238394
9,1784,As Good as It Gets (1997),7.238394


In [94]:
from scipy.spatial.distance import minkowski

# определим функцию расстояния
distance_cos = sql_func.udf(
    lambda x1, x2: minkowski(
        x1.toArray(),
        x2.toArray()
        ), # тут может потребоваться tolist() для некоторых расстояний
    returnType=FloatType()
)

In [95]:
distance_matrix_cos = (
    tf_idf.alias('one')
    .crossJoin(tf_idf.alias('two'))
    .select(
        'one.movieId',
        'one.title',
        'two.movieId',
        'two.title',
        distance_cos('one.tf_idf', 'two.tf_idf').alias('distance_cos')
    )
)

In [97]:
(
distance_matrix_cos
    .where(sql_func.col('one.movieId')==778)
    .orderBy('distance_cos')
    .select('two.movieId', 'two.title', 'distance_cos')
    .limit(10)
    .toPandas()
)

Py4JJavaError: An error occurred while calling o1347.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 120.0 failed 1 times, most recent failure: Lost task 0.0 in stage 120.0 (TID 3698, localhost, executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$evaluate$1.apply(BatchEvalPythonExec.scala:90)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$evaluate$1.apply(BatchEvalPythonExec.scala:89)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
	at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:628)
	at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
	at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1433)
	at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1430)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1035)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1017)
	at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1.apply(RDD.scala:1439)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1426)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:136)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3258)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3255)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3255)
	at sun.reflect.GeneratedMethodAccessor130.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$evaluate$1.apply(BatchEvalPythonExec.scala:90)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$evaluate$1.apply(BatchEvalPythonExec.scala:89)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
	at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:628)
	at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
	at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1433)
	at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1430)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
