# 3. Рекомендательная система видеоконтента с implicit feedback – Spark ML

`Автор`: Елена Сидорова

`e-mail`: e_sidorova_94@mail.ru

## Описание задачи

В нашем распоряжении имеется уже предобработанный и очищенный датасет с фактами покупок абонентами телепередач от компании E-Contenta. По доступным нам данным, нужно предсказать вероятность покупки других передач этими, а, возможно, и другими абонентами.

In [160]:
!hdfs dfs -ls /labs/slaba03/

Found 4 items
-rw-r--r--   3 hdfs hdfs   91066524 2022-01-06 18:46 /labs/slaba03/laba03_items.csv
-rw-r--r--   3 hdfs hdfs   29965581 2022-01-06 18:46 /labs/slaba03/laba03_test.csv
-rw-r--r--   3 hdfs hdfs   74949368 2022-01-06 18:46 /labs/slaba03/laba03_train.csv
-rw-r--r--   3 hdfs hdfs  871302535 2022-01-06 18:46 /labs/slaba03/laba03_views_programmes.csv


В `laba03_train.csv` содержатся факты покупки (колонка purchase) пользователями (колонка user_id) телепередач (колонка item_id).

`laba03_items.csv` — дополнительные данные по items. В данном файле много лишней или ненужной информации, так что задача её фильтрации и отбора ложится на вас. Поля в файле, на которых хотелось бы остановиться:

`item_id` — primary key. Соответствует item_id в предыдущем файле.

`content_type` — тип телепередачи (1 — платная, 0 — бесплатная). Вас интересуют платные передачи.

`title` — название передачи, текстовое поле.

`year` — год выпуска передачи, число.


`genres` — поле с жанрами передачи, разделёнными через запятую.

`laba03_test.csv` — тестовый датасет без указанного целевого признака purchase, который нам и предстоит предсказать.

Дополнительный файл `laba03_views_programmes.csv` по просмотрам передач с полями:

`ts_start` — время начала просмотра.

`ts_end` — время окончания просмотра.

`item_type` — тип просматриваемого контента:

`live` — просмотр "вживую", в момент показа контента в эфире.

`pvr` — просмотр в записи, после показа контента в эфире.

Задание проходит в формате соревнования. Для нас оно начинается, когда мы успешно пройдём минимальный порог — AUC должен составить не менее 0.79. 

## Решение

In [1]:
from IPython.display import IFrame, Image

#### Импорт Spark, настройка спарк-сессии

In [105]:
import os
import sys
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 3 pyspark-shell'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.7
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.


In [106]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark import Row
import json

conf = SparkConf()

spark = (SparkSession
         .builder
         .config(conf=conf)
         .appName("sparkdf-ees")
         .getOrCreate())

#### Данные

In [4]:
schema = StructType(fields=[
    StructField("user_id", IntegerType()),
    StructField("item_id", IntegerType()),
    StructField("purchase", IntegerType())
])

In [5]:
# датасет с разметкой user_id - передача - факт покупки
df_train = spark.read.csv("/labs/slaba03/laba03_train.csv", header=True, schema = schema)

In [6]:
df_train.count()

5032624

In [6]:
#df_train.toPandas().to_csv('df_train.csv')

In [7]:
df_train.show()

+-------+-------+--------+
|user_id|item_id|purchase|
+-------+-------+--------+
|   1654|  74107|       0|
|   1654|  89249|       0|
|   1654|  99982|       0|
|   1654|  89901|       0|
|   1654| 100504|       0|
|   1654|  66187|       0|
|   1654|  84350|       0|
|   1654|  92854|       0|
|   1654|  72811|       0|
|   1654|  86876|       0|
|   1654| 102657|       0|
|   1654| 100482|       0|
|   1654|  89677|       0|
|   1654|  99419|       0|
|   1654|  66603|       0|
|   1654|   7363|       0|
|   1654|   1320|       0|
|   1654|  88892|       0|
|   1654|  66671|       0|
|   1654|  75925|       0|
+-------+-------+--------+
only showing top 20 rows



In [8]:
df_train.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- purchase: integer (nullable = true)



In [9]:
df_train.summary().show()

+-------+-----------------+------------------+--------------------+
|summary|          user_id|           item_id|            purchase|
+-------+-----------------+------------------+--------------------+
|  count|          5032624|           5032624|             5032624|
|   mean|869680.9464782189| 66869.30485865823|0.002166662957534...|
| stddev|60601.09821562329|35242.282055382406|0.046496977952916414|
|    min|             1654|               326|                   0|
|    25%|           846231|             65667|                   0|
|    50%|           885247|             79853|                   0|
|    75%|           908588|             93606|                   0|
|    max|           941450|            104165|                   1|
+-------+-----------------+------------------+--------------------+



In [6]:
# тестовый датасет
df_test = spark.read.csv("/labs/slaba03/laba03_test.csv", header=True,schema=schema)

In [8]:
df_test.count()

2156840

In [11]:
df_test.show()

+-------+-------+--------+
|user_id|item_id|purchase|
+-------+-------+--------+
|   1654|  94814|    null|
|   1654|  93629|    null|
|   1654|   9980|    null|
|   1654|  95099|    null|
|   1654|  11265|    null|
|   1654|  88896|    null|
|   1654|  67740|    null|
|   1654|  74271|    null|
|   1654|  99871|    null|
|   1654|  78570|    null|
|   1654|  71942|    null|
|   1654|  74367|    null|
|   1654|  98628|    null|
|   1654|  95887|    null|
|   1654|  77795|    null|
|   1654|  75152|    null|
|   1654|  74905|    null|
|   1654|   9068|    null|
|   1654|  72954|    null|
|   1654| 102431|    null|
+-------+-------+--------+
only showing top 20 rows



In [12]:
df_test.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- purchase: integer (nullable = true)



In [7]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

In [8]:
schema_i = StructType(fields=[
    StructField("item_id", IntegerType()),
    StructField("channel_id", IntegerType()),
    StructField("datetime_availability_start", TimestampType()),
    StructField("datetime_availability_stop", TimestampType()),
    StructField("datetime_show_start", TimestampType()),
    StructField("datetime_show_stop", TimestampType()),
    StructField("content_type", IntegerType()),
    StructField("title", StringType()),
    StructField("year", IntegerType()),
    StructField("genres", StringType()),
    StructField("region_id", IntegerType())
])

In [9]:
# датасет с фичами по программам
df_items = spark.read.option("header", "true").option("delimiter", "\t").csv("/labs/slaba03/laba03_items.csv")

In [12]:
df_items.show(10, truncate = False, vertical = True)

-RECORD 0-------------------------------------------------------------------------------------------------------------
 item_id                     | 65667                                                                                  
 channel_id                  | null                                                                                   
 datetime_availability_start | 1970-01-01T00:00:00Z                                                                   
 datetime_availability_stop  | 2018-01-01T00:00:00Z                                                                   
 datetime_show_start         | null                                                                                   
 datetime_show_stop          | null                                                                                   
 content_type                | 1                                                                                      
 title                       | на пробах только 

In [176]:
df_items.printSchema()

root
 |-- item_id: string (nullable = true)
 |-- channel_id: string (nullable = true)
 |-- datetime_availability_start: string (nullable = true)
 |-- datetime_availability_stop: string (nullable = true)
 |-- datetime_show_start: string (nullable = true)
 |-- datetime_show_stop: string (nullable = true)
 |-- content_type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- region_id: string (nullable = true)



In [10]:
# датасет с просмотрами программ
df_views = spark.read.option("header", "true").option("delimiter", ",").csv("/labs/slaba03/laba03_views_programmes.csv")

In [17]:
df_views.show(5, truncate = False)

+-------+-------+----------+----------+---------+
|user_id|item_id|ts_start  |ts_end    |item_type|
+-------+-------+----------+----------+---------+
|0      |7101053|1491409931|1491411600|live     |
|0      |7101054|1491412481|1491451571|live     |
|0      |7101054|1491411640|1491412481|live     |
|0      |6184414|1486191290|1486191640|live     |
|257    |4436877|1490628499|1490630256|live     |
+-------+-------+----------+----------+---------+
only showing top 5 rows



In [18]:
df_views.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- ts_start: string (nullable = true)
 |-- ts_end: string (nullable = true)
 |-- item_type: string (nullable = true)



Сгенерируем выборку дополнительных фичей.

1. Средняя конверсия `item`'а в продажу

In [11]:
import pyspark.sql.functions as f

In [14]:
df_train.rdd.getNumPartitions()

3

In [12]:
df_train = df_train.coalesce(6).cache()

In [13]:
df_test = df_test.coalesce(6).cache()

In [14]:
df_conv = df_train.groupBy("item_id")\
                .agg(f.count("item_id").alias("countit"),f.sum("purchase").alias("purch_sum"))\
                .orderBy("item_id", ascending=False)

In [15]:
df_conv = df_conv.withColumn('conv', df_conv.purch_sum / df_conv.countit)

In [186]:
df_conv.show()

+-------+-------+---------+--------------------+
|item_id|countit|purch_sum|                conv|
+-------+-------+---------+--------------------+
| 104165|   1369|        1|7.304601899196494E-4|
| 104152|   1365|        1|7.326007326007326E-4|
| 104114|   1366|        1|7.320644216691069E-4|
| 104016|   1389|        1|7.199424046076314E-4|
| 103931|   1366|        0|                 0.0|
| 103876|   1375|        1|7.272727272727272E-4|
| 103812|   1326|        1|7.541478129713424E-4|
| 103762|   1370|        1|  7.2992700729927E-4|
| 103589|   1363|        2|0.001467351430667645|
| 103580|   1360|        1|7.352941176470588E-4|
| 103377|   1346|        0|                 0.0|
| 103371|   1373|        2|0.001456664238892...|
| 103299|   1377|        0|                 0.0|
| 103235|   1350|        0|                 0.0|
| 103144|   1361|        1|7.347538574577516E-4|
| 103119|   1370|        1|  7.2992700729927E-4|
| 102966|   1346|        1|7.429420505200594E-4|
| 102809|   1333|   

In [187]:
df_conv.describe().show()

+-------+------------------+------------------+------------------+-------------------+
|summary|           item_id|           countit|         purch_sum|               conv|
+-------+------------------+------------------+------------------+-------------------+
|  count|              3704|              3704|              3704|               3704|
|   mean| 66877.31425485961|1358.6997840172787|2.9438444924406046|0.00216599307522978|
| stddev|35242.702380266725|19.600259577023216| 6.102958550267081|0.00448293201612351|
|    min|               326|              1275|                 0|                0.0|
|    max|            104165|              1421|                98|0.07153284671532846|
+-------+------------------+------------------+------------------+-------------------+



2. Средняя склонность клиента к покупке

In [16]:
df_client = df_train.groupBy("user_id")\
                .agg(f.count("user_id").alias("countit"),f.sum("purchase").alias("purch_sum"))\
                .orderBy("user_id", ascending=False)

In [17]:
df_client = df_client.withColumn('conv', df_client.purch_sum / df_client.countit)

In [191]:
df_client.show()

+-------+-------+---------+--------------------+
|user_id|countit|purch_sum|                conv|
+-------+-------+---------+--------------------+
| 941450|   2579|        2|7.754943776657619E-4|
| 941448|   2619|        0|                 0.0|
| 940969|   2582|        1|3.872966692486445E-4|
| 940600|   2594|        0|                 0.0|
| 940532|   2584|        1|3.869969040247678E-4|
| 940231|   2618|        0|                 0.0|
| 939947|   2612|        1|3.828483920367534...|
| 939937|   2585|        0|                 0.0|
| 939879|   2572|        0|                 0.0|
| 939856|   2599|        0|                 0.0|
| 939709|   2606|        1|3.837298541826554E-4|
| 939678|   2578|       10|0.003878975950349108|
| 939527|   2625|        1|3.809523809523809...|
| 939464|   2576|        8|0.003105590062111801|
| 939459|   2566|        1|3.897116134060795E-4|
| 939341|   2599|        0|                 0.0|
| 939337|   2587|        2|7.730962504831851E-4|
| 939238|   2589|   

In [192]:
df_client.describe().show()

+-------+-----------------+-----------------+-----------------+--------------------+
|summary|          user_id|          countit|        purch_sum|                conv|
+-------+-----------------+-----------------+-----------------+--------------------+
|  count|             1941|             1941|             1941|                1941|
|   mean|869672.3745492015|2592.799587841319|5.617722823286965|0.002164949883315817|
| stddev|60648.36081128864|28.04299938064805|15.18600057864742|0.005807849230483885|
|    min|             1654|             2495|                0|                 0.0|
|    max|           941450|             2690|              490| 0.18617021276595744|
+-------+-----------------+-----------------+-----------------+--------------------+



Объединим эту информацию с нашими данными:

In [18]:
df_train1 = df_train.join(df_client.select(["user_id", "conv"]).withColumnRenamed("conv", "conv_user"), on = 'user_id', how="left")\
                    .join(df_conv.select(["item_id", "conv"]).withColumnRenamed("conv", "conv_product"), on = 'item_id', how="left")

In [25]:
df_train1.count()

5032624

In [19]:
df_test1 = df_test.join(df_client.select(["user_id", "conv"]).withColumnRenamed("conv", "conv_user"), on = 'user_id', how="left")\
                    .join(df_conv.select(["item_id", "conv"]).withColumnRenamed("conv", "conv_product"), on = 'item_id', how="left")

In [26]:
df_test1.count()

2156840

In [None]:
#df_test1.sort(col('user_id'), col('item_id')).show()

In [29]:
df_train1.show()

+-------+-------+--------+--------------------+--------------------+
|item_id|user_id|purchase|           conv_user|        conv_product|
+-------+-------+--------+--------------------+--------------------+
|   8389| 746713|       0|                 0.0|0.005979073243647235|
|   8389| 883098|       0|0.001948558067030...|0.005979073243647235|
|   8389| 903491|       0|0.001161440185830...|0.005979073243647235|
|   8389| 903826|       0|0.001544401544401...|0.005979073243647235|
|   8389| 916566|       0|3.840245775729646...|0.005979073243647235|
|   8389| 899224|       0|  7.8003120124805E-4|0.005979073243647235|
|   8389| 932980|       0|3.858024691358024...|0.005979073243647235|
|   8389| 939459|       0|3.897116134060795E-4|0.005979073243647235|
|   8389| 797350|       0|                 0.0|0.005979073243647235|
|   8389| 848246|       0|0.005838847800700...|0.005979073243647235|
|   8389| 902359|       0|0.002337358784573432|0.005979073243647235|
|   8389| 857661|       0| 0.00115

In [196]:
df_test1.show()

+-------+-------+--------+--------------------+--------------------+
|item_id|user_id|purchase|           conv_user|        conv_product|
+-------+-------+--------+--------------------+--------------------+
|   8389| 761341|    null|3.875968992248062E-4|0.005979073243647235|
|   8389| 776188|    null|0.001152516327314637|0.005979073243647235|
|   8389| 846231|    null|0.001923816852635629|0.005979073243647235|
|   8389| 822709|    null|3.789314134141720...|0.005979073243647235|
|   8389| 824008|    null|3.821169277799006...|0.005979073243647235|
|   8389| 890476|    null|                 0.0|0.005979073243647235|
|   8389| 899993|    null|3.889537145079735...|0.005979073243647235|
|   8389| 937345|    null|3.850596842510589E-4|0.005979073243647235|
|   8389| 566758|    null|3.766478342749529E-4|0.005979073243647235|
|   8389| 816426|    null|                 0.0|0.005979073243647235|
|   8389| 892290|    null|7.719027402547279E-4|0.005979073243647235|
|   8389| 886063|    null|0.002698

Обработаем текстовую информацию по фильмам.

In [20]:
items = df_items.filter(df_items.content_type==1).select(["item_id", "title", "genres"]).distinct()

In [198]:
items.show()

+-------+--------------------+--------------------+
|item_id|               title|              genres|
+-------+--------------------+--------------------+
|   9720|       черные паруса|                null|
|  73303|волки и овцы: бе-...|Семейные,Приключе...|
|  74318|      падение олимпа|Триллеры,Боевики,...|
|  77673|             везуха!|Русские мультфиль...|
|  78263|           наизнанку|Драмы,Криминал,За...|
|  79033|         день дурака|Приключения,Комед...|
|  93159|железный цех: обз...|Развлекательные,В...|
|  94240|    тринадцатый рейс|Русские мультфиль...|
|  94699|     утонуть на суше|Комедии,Драмы,Мел...|
|  95813|             адвокат|  Эротика,Зарубежные|
|  95848|        fuck vip lsd|  Эротика,Зарубежные|
|  98670|           екатерина|Драмы,Исторически...|
|  99471|черная роза – эмб...|Артхаус,Комедии,С...|
| 100149|    внешнее сходство|Триллеры,Драмы,За...|
| 100453|     играй до смерти|Ужасы,Триллеры,За...|
|   2343|           екатерина|Драмы,Мелодрамы,Наши|
|   7038|   

Преобразуем названия и жанры в слова

In [21]:
from pyspark.ml.feature import Tokenizer, HashingTF

In [22]:
from pyspark.ml.feature import StopWordsRemover, CountVectorizer

In [23]:
tokenizer_title = Tokenizer(inputCol="title", outputCol="titlew")

In [24]:
data_title = tokenizer_title.transform(items)

In [32]:
data_title.show()

+-------+--------------------+--------------------+--------------------+
|item_id|               title|              genres|              titlew|
+-------+--------------------+--------------------+--------------------+
|  95813|             адвокат|  Эротика,Зарубежные|           [адвокат]|
|  95848|        fuck vip lsd|  Эротика,Зарубежные|    [fuck, vip, lsd]|
|  98670|           екатерина|Драмы,Исторически...|         [екатерина]|
|  99471|черная роза – эмб...|Артхаус,Комедии,С...|[черная, роза, –,...|
| 100149|    внешнее сходство|Триллеры,Драмы,За...| [внешнее, сходство]|
| 100453|     играй до смерти|Ужасы,Триллеры,За...| [играй, до, смерти]|
|   9720|       черные паруса|                null|    [черные, паруса]|
|  73303|волки и овцы: бе-...|Семейные,Приключе...|[волки, и, овцы:,...|
|  74318|      падение олимпа|Триллеры,Боевики,...|   [падение, олимпа]|
|  77673|             везуха!|Русские мультфиль...|           [везуха!]|
|  78263|           наизнанку|Драмы,Криминал,За...|

Очистим данные от лишних слов, преобразуем тексты в бинарные вектора, используя hashing trick:

In [25]:
stop_words = StopWordsRemover.loadDefaultStopWords("russian")

In [26]:
swr = StopWordsRemover(inputCol=tokenizer_title.getOutputCol(), outputCol="words_filtered", stopWords=stop_words)

In [27]:
count_vectorizer = CountVectorizer(inputCol=swr.getOutputCol(), outputCol="word_vector", vocabSize=20000)

In [28]:
from pyspark.ml import Pipeline

In [29]:
preprocessing_title = Pipeline(stages=[
    tokenizer_title,
    swr,
    count_vectorizer
])

In [30]:
preprocessing_model = preprocessing_title.fit(items)

In [31]:
preprocessed_dataset = preprocessing_model.transform(items)

In [40]:
preprocessed_dataset.show()

+-------+--------------------+--------------------+--------------------+--------------------+--------------------+
|item_id|               title|              genres|              titlew|      words_filtered|         word_vector|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  94240|    тринадцатый рейс|Русские мультфиль...| [тринадцатый, рейс]| [тринадцатый, рейс]|(4207,[594,856],[...|
|  94699|     утонуть на суше|Комедии,Драмы,Мел...| [утонуть, на, суше]|     [утонуть, суше]|(4207,[896,1413],...|
|  95813|             адвокат|  Эротика,Зарубежные|           [адвокат]|           [адвокат]|  (4207,[970],[1.0])|
|  95848|        fuck vip lsd|  Эротика,Зарубежные|    [fuck, vip, lsd]|    [fuck, vip, lsd]|(4207,[322,369,21...|
|  98670|           екатерина|Драмы,Исторически...|         [екатерина]|         [екатерина]|  (4207,[254],[1.0])|
|  99471|черная роза – эмб...|Артхаус,Комедии,С...|[черная, роза, –,...|[черная,

In [41]:
preprocessed_dataset.count()

3704

Выделим названия в отдельные кластеры. В соответствии с данными используем common sense правило k=(n/2)^0.5

In [32]:
from pyspark.ml.clustering import KMeans

In [33]:
kmeans = KMeans(featuresCol="word_vector", k=27, seed=5757)

In [34]:
kmeans_model = kmeans.fit(preprocessed_dataset)

In [35]:
clustering = kmeans_model.transform(preprocessed_dataset)

In [46]:
clustering[clustering.columns[2:8] + ["prediction"]].take(200)

[Row(genres=None, titlew=['черные', 'паруса'], words_filtered=['черные', 'паруса'], word_vector=SparseVector(4207, {277: 1.0, 519: 1.0}), prediction=0, prediction=0),
 Row(genres='Семейные,Приключения,Комедии', titlew=['волки', 'и', 'овцы:', 'бе-е-е-зумное', 'превращение', '(сурдоперевод)'], words_filtered=['волки', 'овцы:', 'бе-е-е-зумное', 'превращение', '(сурдоперевод)'], word_vector=SparseVector(4207, {0: 1.0, 320: 1.0, 997: 1.0, 1114: 1.0, 1519: 1.0}), prediction=8, prediction=8),
 Row(genres='Триллеры,Боевики,Зарубежные', titlew=['падение', 'олимпа'], words_filtered=['падение', 'олимпа'], word_vector=SparseVector(4207, {84: 1.0, 1074: 1.0}), prediction=0, prediction=0),
 Row(genres='Русские мультфильмы,Приключения,Русские,Сериалы,Для всей семьи,Для детей', titlew=['везуха!'], words_filtered=['везуха!'], word_vector=SparseVector(4207, {1892: 1.0}), prediction=0, prediction=0),
 Row(genres='Драмы,Криминал,Зарубежные', titlew=['наизнанку'], words_filtered=['наизнанку'], word_vector=

In [36]:
from pyspark.ml.evaluation import ClusteringEvaluator

In [37]:
evaluator = ClusteringEvaluator(featuresCol="word_vector")

In [46]:
evaluator.evaluate(clustering)

-0.07593249793506252

In [38]:
clustering.filter(clustering.prediction == 4)[["title"]].show(5, truncate=False, vertical=True)

-RECORD 0------------------------------------
 title | навигатор игрового мира (выпуск 95) 
-RECORD 1------------------------------------
 title | навигатор игрового мира (выпуск 96) 
-RECORD 2------------------------------------
 title | навигатор игрового мира (выпуск 97) 
-RECORD 3------------------------------------
 title | навигатор игрового мира (выпуск 88) 
-RECORD 4------------------------------------
 title | навигатор игрового мира (выпуск 36) 
only showing top 5 rows



In [51]:
clustering.filter(clustering.prediction==4).show()

+-------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|item_id|               title|              genres|              titlew|      words_filtered|         word_vector|prediction|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  93131|навигатор игровог...|Развлекательные,В...|[навигатор, игров...|[навигатор, игров...|(4207,[40,70,74,7...|         4|
|  87940|навигатор игровог...|Развлекательные,В...|[навигатор, игров...|[навигатор, игров...|(4207,[40,70,74,7...|         4|
|  92706|навигатор игровог...|Развлекательные,В...|[навигатор, игров...|[навигатор, игров...|(4207,[40,70,74,7...|         4|
|  94252|навигатор игровог...|Развлекательные,В...|[навигатор, игров...|[навигатор, игров...|(4207,[40,70,74,7...|         4|
|  94872|навигатор игровог...|Развлекательные,В...|[навигатор, игров...|[навигатор, игров...|(4207,[40,70,74,7...|    

Сделаем аналогичную процедуру с жанрами:

In [52]:
df_items.show()

+-------+----------+---------------------------+--------------------------+-------------------+------------------+------------+--------------------+------+--------------------+---------+
|item_id|channel_id|datetime_availability_start|datetime_availability_stop|datetime_show_start|datetime_show_stop|content_type|               title|  year|              genres|region_id|
+-------+----------+---------------------------+--------------------------+-------------------+------------------+------------+--------------------+------+--------------------+---------+
|  65667|      null|       1970-01-01T00:00:00Z|      2018-01-01T00:00:00Z|               null|              null|           1|на пробах только ...|2013.0|             Эротика|     null|
|  65669|      null|       1970-01-01T00:00:00Z|      2018-01-01T00:00:00Z|               null|              null|           1|скуби ду: эротиче...|2011.0|             Эротика|     null|
|  65668|      null|       1970-01-01T00:00:00Z|      2018-01-01T

In [39]:
genres = df_items.filter(df_items.content_type==1).select(["item_id", "genres"]).distinct()

In [54]:
genres.show()

+-------+--------------------+
|item_id|              genres|
+-------+--------------------+
|   7693|Мультсериалы,Детс...|
|   9139|Ужасы,Триллеры,Бо...|
|  11040|             Комедия|
|  72394|Русские мультфиль...|
|  72907|Мультфильмы,Комед...|
|  73326|Детективы,Русские...|
|  73368|Ужасы,Для взрослы...|
|  73685|Русские мультфиль...|
|  74890|Ужасы,Триллеры,За...|
|  76221|Документальные,Би...|
|  78070|Западные мультфил...|
|  78117|Русские мультфиль...|
|  79406|Драмы,Фантастика,...|
|  83082|Русские мультфиль...|
|  88651|Приключения,Комед...|
|  88823|Мультфильмы,Полно...|
|  88922|    Мультфильмы,Наши|
|  92460|Драмы,Мелодрамы,З...|
|  95063|Для детей,Зарубежные|
|  95387|Военные,Советское...|
+-------+--------------------+
only showing top 20 rows



Преобразуем жанры в слова

In [40]:
tokenizer_g = Tokenizer(inputCol="genres", outputCol="genresw")

In [41]:
data_g = tokenizer_g.transform(genres)

In [57]:
data_g.show()

+-------+--------------------+--------------------+
|item_id|              genres|             genresw|
+-------+--------------------+--------------------+
|  95063|Для детей,Зарубежные|[для, детей,заруб...|
|  95387|Военные,Советское...|[военные,советско...|
|  98681|Комедии,Драмы,Мел...|[комедии,драмы,ме...|
| 100381|             Комедия|           [комедия]|
| 102099|Драмы,Исторически...|[драмы,историческ...|
| 102279|     Комедии,Русские|   [комедии,русские]|
|   7693|Мультсериалы,Детс...|[мультсериалы,дет...|
|   9139|Ужасы,Триллеры,Бо...|[ужасы,триллеры,б...|
|  11040|             Комедия|           [комедия]|
|  72394|Русские мультфиль...|[русские, мультфи...|
|  72907|Мультфильмы,Комед...|[мультфильмы,коме...|
|  73326|Детективы,Русские...|[детективы,русски...|
|  73368|Ужасы,Для взрослы...|[ужасы,для, взрос...|
|  73685|Русские мультфиль...|[русские, мультфи...|
|  74890|Ужасы,Триллеры,За...|[ужасы,триллеры,з...|
|  76221|Документальные,Би...|[документальные,б...|
|  78070|Зап

In [None]:
data_g.select('genres').distinct().collect()

In [42]:
word_counts = (
    data_g.withColumn("genres_u", F.explode(F.split(F.col("genres"), ",")))
    .groupBy("genres_u")
    .count()
    .sort("count", ascending=False)
)

In [59]:
word_counts.select('genres_u').show()

+-------------------+
|           genres_u|
+-------------------+
|         Зарубежные|
|              Драмы|
|            Комедии|
|           Триллеры|
|            Русские|
|            Боевики|
|               Наши|
|          Мелодрамы|
|        Приключения|
|          Для детей|
|              Ужасы|
|          Детективы|
|         Фантастика|
|           Криминал|
|        Мультфильмы|
|     Для всей семьи|
|            Военные|
|Русские мультфильмы|
|     Полнометражные|
|            Детские|
+-------------------+
only showing top 20 rows



In [43]:
genres_list =word_counts.select('genres_u').collect()

In [61]:
genres_list 

[Row(genres_u='Зарубежные'),
 Row(genres_u='Драмы'),
 Row(genres_u='Комедии'),
 Row(genres_u='Триллеры'),
 Row(genres_u='Русские'),
 Row(genres_u='Боевики'),
 Row(genres_u='Наши'),
 Row(genres_u='Мелодрамы'),
 Row(genres_u='Приключения'),
 Row(genres_u='Для детей'),
 Row(genres_u='Ужасы'),
 Row(genres_u='Детективы'),
 Row(genres_u='Фантастика'),
 Row(genres_u='Криминал'),
 Row(genres_u='Мультфильмы'),
 Row(genres_u='Для всей семьи'),
 Row(genres_u='Военные'),
 Row(genres_u='Русские мультфильмы'),
 Row(genres_u='Полнометражные'),
 Row(genres_u='Детские'),
 Row(genres_u='Эротика'),
 Row(genres_u='Советские'),
 Row(genres_u='Союзмультфильм'),
 Row(genres_u='Западные мультфильмы'),
 Row(genres_u='Советское кино'),
 Row(genres_u='Сериалы'),
 Row(genres_u='Исторические'),
 Row(genres_u='Документальные'),
 Row(genres_u='Про животных'),
 Row(genres_u='Фэнтези'),
 Row(genres_u='Мистические'),
 Row(genres_u='Семейные'),
 Row(genres_u='Мультсериалы'),
 Row(genres_u='Познавательные'),
 Row(genres_

In [52]:
data_g.withColumn("genres_u", F.explode(F.split(F.col("genres"), ","))).show()

+-------+--------------------+--------------------+-------------------+
|item_id|              genres|             genresw|           genres_u|
+-------+--------------------+--------------------+-------------------+
|   7693|Мультсериалы,Детс...|[мультсериалы,дет...|       Мультсериалы|
|   7693|Мультсериалы,Детс...|[мультсериалы,дет...|            Детские|
|   7693|Мультсериалы,Детс...|[мультсериалы,дет...|     Союзмультфильм|
|   7693|Мультсериалы,Детс...|[мультсериалы,дет...|               Наши|
|   9139|Ужасы,Триллеры,Бо...|[ужасы,триллеры,б...|              Ужасы|
|   9139|Ужасы,Триллеры,Бо...|[ужасы,триллеры,б...|           Триллеры|
|   9139|Ужасы,Триллеры,Бо...|[ужасы,триллеры,б...|            Боевики|
|   9139|Ужасы,Триллеры,Бо...|[ужасы,триллеры,б...|         Зарубежные|
|  11040|             Комедия|           [комедия]|            Комедия|
|  72394|Русские мультфиль...|[русские, мультфи...|Русские мультфильмы|
|  72394|Русские мультфиль...|[русские, мультфи...|            Р

In [44]:
item_binary = data_g.withColumn('genres', F.explode(F.split(F.col("genres"), ","))).groupBy("item_id").pivot("genres").agg(f.lit(1)).na.fill(0)

In [64]:
item_binary.show(truncate = False, vertical = True)

-RECORD 0----------------------
 item_id              | 691    
  сказка              | 0      
 Анимация             | 0      
 Аниме                | 0      
 Арт-хаус             | 0      
 Артхаус              | 0      
 Биография            | 0      
 Боевик               | 0      
 Боевики              | 0      
 Вестерн              | 0      
 Видеоигры            | 0      
 Военные              | 0      
 Военный              | 0      
 Детективы            | 0      
 Детские              | 1      
 Детские песни        | 0      
 Для взрослых         | 0      
 Для всей семьи       | 0      
 Для детей            | 0      
 Для самых маленьких  | 0      
 Документальные       | 0      
 Документальный       | 0      
 Драма                | 0      
 Драмы                | 0      
 Западные мультфильмы | 0      
 Зарубежные           | 0      
 Игры                 | 0      
 Исторические         | 0      
 Исторический         | 0      
 Комедии              | 0      
 Комедия

Получили для каждой item_id разметку по жанру. 

Можем формировать финальный срез данных

In [None]:
df_items.show(1)

отфильтруем тренировочную выборку по типу контента == 1 (платный)

In [47]:
df_train1.join(df_items.select(["item_id", "content_type"]), on = 'item_id', how="left").show()

+-------+-------+--------+--------------------+--------------------+------------+
|item_id|user_id|purchase|           conv_user|        conv_product|content_type|
+-------+-------+--------+--------------------+--------------------+------------+
|   8389| 754230|       0|0.027575641516660282|0.005979073243647235|           1|
|   8389| 780033|       0|7.757951900698216E-4|0.005979073243647235|           1|
|   8389| 798454|       0|3.840245775729646...|0.005979073243647235|           1|
|   8389| 825061|       0|0.001931247585940...|0.005979073243647235|           1|
|   8389| 833685|       0|0.007500986971969996|0.005979073243647235|           1|
|   8389| 851486|       0|                 0.0|0.005979073243647235|           1|
|   8389| 867850|       0|3.829950210647261...|0.005979073243647235|           1|
|   8389| 870928|       0|7.674597083653108E-4|0.005979073243647235|           1|
|   8389| 879401|       0|0.004283489096573208|0.005979073243647235|           1|
|   8389| 901457

In [45]:
df_train1 = df_train1.join(df_items.select(["item_id", "content_type"]), on = 'item_id', how="left")

In [46]:
df_test1 = df_test1.join(df_items.select(["item_id", "content_type"]), on = 'item_id', how="left")

In [47]:
df_train1 = df_train1.join(df_items.select(["item_id", "year"]), on = 'item_id', how="left")

In [48]:
df_test1 = df_test1.join(df_items.select(["item_id", "year"]), on = 'item_id', how="left")

добавим кластеры по названию:

In [49]:
df_train1 = df_train1.join(clustering.select(["item_id", "prediction"]).withColumnRenamed('prediction', 'cluster_name'), on = 'item_id', how="left")

In [50]:
df_test1 = df_test1.join(clustering.select(["item_id", "prediction"]).withColumnRenamed('prediction', 'cluster_name'), on = 'item_id', how="left")

добавим матрицу бинарных значений по жанру:

In [51]:
df_train1 = df_train1.join(item_binary, on = 'item_id', how="left")

In [52]:
df_test1 = df_test1.join(item_binary, on = 'item_id', how="left")

In [62]:
df_train1.count()

5032624

In [63]:
df_test1.count()

2156840

In [74]:
df_train1.show(1, truncate = False, vertical = True)

-RECORD 0-------------------------------------
 item_id              | 8389                  
 user_id              | 920599                
 purchase             | 0                     
 conv_user            | 0.0015564202334630351 
 conv_product         | 0.005979073243647235  
 content_type         | 1                     
 year                 | 1981.0                
 cluster_name         | 8                     
  сказка              | 0                     
 Анимация             | 0                     
 Аниме                | 0                     
 Арт-хаус             | 0                     
 Артхаус              | 0                     
 Биография            | 0                     
 Боевик               | 0                     
 Боевики              | 0                     
 Вестерн              | 0                     
 Видеоигры            | 0                     
 Военные              | 0                     
 Военный              | 0                     
 Детективы   

In [75]:
df_test1.show(1, truncate = False, vertical = True)

-RECORD 0-------------------------------------
 item_id              | 8389                  
 user_id              | 822709                
 purchase             | null                  
 conv_user            | 3.7893141341417203E-4 
 conv_product         | 0.005979073243647235  
 content_type         | 1                     
 year                 | 1981.0                
 cluster_name         | 8                     
  сказка              | 0                     
 Анимация             | 0                     
 Аниме                | 0                     
 Арт-хаус             | 0                     
 Артхаус              | 0                     
 Биография            | 0                     
 Боевик               | 0                     
 Боевики              | 0                     
 Вестерн              | 0                     
 Видеоигры            | 0                     
 Военные              | 0                     
 Военный              | 0                     
 Детективы   

In [53]:
from pyspark.sql.functions import col

In [54]:
from pyspark.sql.types import IntegerType,BooleanType,DateType

In [55]:
df_train1 = df_train1.withColumn("year", df_train1['year'].cast(IntegerType()))

In [56]:
df_test1 = df_test1.withColumn("year", df_test1['year'].cast(IntegerType()))

In [81]:
df_train1.printSchema()

root
 |-- item_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- purchase: integer (nullable = true)
 |-- conv_user: double (nullable = true)
 |-- conv_product: double (nullable = true)
 |-- content_type: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- cluster_name: integer (nullable = true)
 |--  сказка: integer (nullable = true)
 |-- Анимация: integer (nullable = true)
 |-- Аниме: integer (nullable = true)
 |-- Арт-хаус: integer (nullable = true)
 |-- Артхаус: integer (nullable = true)
 |-- Биография: integer (nullable = true)
 |-- Боевик: integer (nullable = true)
 |-- Боевики: integer (nullable = true)
 |-- Вестерн: integer (nullable = true)
 |-- Видеоигры: integer (nullable = true)
 |-- Военные: integer (nullable = true)
 |-- Военный: integer (nullable = true)
 |-- Детективы: integer (nullable = true)
 |-- Детские: integer (nullable = true)
 |-- Детские песни: integer (nullable = true)
 |-- Для взрослых: integer (nullable = true)
 |-- Дл

Обучим модель. 

Сформируем вектор фичей

In [57]:
from pyspark.ml.feature import VectorAssembler

In [61]:
df_train1.schema.names

['item_id',
 'user_id',
 'purchase',
 'conv_user',
 'conv_product',
 'content_type',
 'year',
 'cluster_name',
 ' сказка',
 'Анимация',
 'Аниме',
 'Арт-хаус',
 'Артхаус',
 'Биография',
 'Боевик',
 'Боевики',
 'Вестерн',
 'Видеоигры',
 'Военные',
 'Военный',
 'Детективы',
 'Детские',
 'Детские песни',
 'Для взрослых',
 'Для всей семьи',
 'Для детей',
 'Для самых маленьких',
 'Документальные',
 'Документальный',
 'Драма',
 'Драмы',
 'Западные мультфильмы',
 'Зарубежные',
 'Игры',
 'Исторические',
 'Исторический',
 'Комедии',
 'Комедия',
 'Короткометражки',
 'Короткометражные',
 'Криминал',
 'Кулинария',
 'Мелодрама',
 'Мелодрамы',
 'Мистические',
 'Музыкальные',
 'Музыкальный',
 'Мультсериалы',
 'Мультфильм',
 'Мультфильмы',
 'Мультфильмы в 3D',
 'Мюзиклы',
 'Научная фантастика',
 'Наши',
 'О здоровье',
 'Охота и рыбалка',
 'Передачи',
 'Познавательные',
 'Полнометражные',
 'Приключение',
 'Приключения',
 'Про животных',
 'Прочие',
 'Развивающие',
 'Развлекательные',
 'Реалити-шоу',
 'Ро

In [60]:
cols = ['item_id',
 'conv_user',
 'conv_product',
 'year',
 'cluster_name',
 ' сказка',
 'Анимация',
 'Аниме',
 'Арт-хаус',
 'Артхаус',
 'Биография',
 'Боевик',
 'Боевики',
 'Вестерн',
 'Видеоигры',
 'Военные',
 'Военный',
 'Детективы',
 'Детские',
 'Детские песни',
 'Для взрослых',
 'Для всей семьи',
 'Для детей',
 'Для самых маленьких',
 'Документальные',
 'Документальный',
 'Драма',
 'Драмы',
 'Западные мультфильмы',
 'Зарубежные',
 'Игры',
 'Исторические',
 'Исторический',
 'Комедии',
 'Комедия',
 'Короткометражки',
 'Короткометражные',
 'Криминал',
 'Кулинария',
 'Мелодрама',
 'Мелодрамы',
 'Мистические',
 'Музыкальные',
 'Музыкальный',
 'Мультсериалы',
 'Мультфильм',
 'Мультфильмы',
 'Мультфильмы в 3D',
 'Мюзиклы',
 'Научная фантастика',
 'Наши',
 'О здоровье',
 'Охота и рыбалка',
 'Передачи',
 'Познавательные',
 'Полнометражные',
 'Приключение',
 'Приключения',
 'Про животных',
 'Прочие',
 'Развивающие',
 'Развлекательные',
 'Реалити-шоу',
 'Романтические',
 'Русские',
 'Русские мультфильмы',
 'Семейные',
 'Семейный',
 'Сериалы',
 'Сказки',
 'Советские',
 'Советское кино',
 'Союзмультфильм',
 'Спорт',
 'Спортивные',
 'Триллер',
 'Триллеры',
 'Ужасы',
 'Фантастика',
 'Фантастические',
 'Фильмы',
 'Фильмы в 3D',
 'Фильмы-спектакли',
 'Фэнтези',
 'Хочу всё знать',
 'Экранизации',
 'Эротика',
 'Юмористические']

In [61]:
vectorAssembler = VectorAssembler(inputCols=cols, outputCol="features", handleInvalid="skip")

In [62]:
from pyspark.ml.classification import GBTClassifier

In [63]:
classifier = GBTClassifier(featuresCol="features", labelCol="purchase")

In [64]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, classifier])

In [74]:
vectorAssembler

VectorAssembler_7b73b581b75c

In [69]:
#df_train1 = df_train1.coalesce(8).cache()

In [65]:
df_train2 = df_train1.fillna(-1).coalesce(8).cache()

In [None]:
#df_train2.select(user_id, itrm_id, purchase).toPandas().to_csv(‘../lab03.csv’, sep=“,”, index=True)

In [113]:
#df_train2.show()

In [75]:
df_train2.count()

5032624

In [None]:
df_train2.write().overwrite().save("/user/elena.sidorova/df_train")

In [66]:
model = pipeline.fit(df_train2)

In [67]:
model.write().overwrite().save("/user/elena.sidorova/models")

In [92]:
#from pyspark.ml.pipeline import PipelineModel
#model = PipelineModel.load("/user/elena.sidorova/models")

Приступим к обучению моделей

In [68]:
predictions = model.transform(df_train2)

In [80]:
predictions_test = model.transform(df_test1.fillna(-1))

In [70]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [71]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="purchase", metricName='areaUnderROC')

Оценим точность модели на тренировочной выборке:

In [72]:
evaluator.evaluate(predictions)

0.9347227648871

In [79]:
predictions_test.show(1, truncate=False, vertical=True)

-RECORD 0--------------------------------------------------------------------------------------------------------------------------
 item_id              | 8389                                                                                                       
 user_id              | 761341                                                                                                     
 purchase             | null                                                                                                       
 conv_user            | 3.875968992248062E-4                                                                                       
 conv_product         | 0.005979073243647235                                                                                       
 content_type         | 1                                                                                                          
 year                 | 1981                                                

In [81]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

In [82]:
secondelement=udf(lambda v:float(v[1]),FloatType())

In [83]:
final = predictions_test.select("user_id", "item_id", secondelement("probability"))

In [84]:
final1 = predictions_test.select(predictions_test["user_id"], predictions_test["item_id"], secondelement(predictions_test["probability"]))

In [85]:
final1 = final1.withColumnRenamed("<lambda>(probability)","purchase")

In [86]:
final1.show()

+-------+-------+-----------+
|user_id|item_id|   purchase|
+-------+-------+-----------+
| 822709|   8389|0.044133298|
| 824008|   8389|0.044133298|
| 890476|   8389|0.044133298|
| 899993|   8389|0.044133298|
| 937345|   8389|0.044133298|
| 566758|   8389|0.044133298|
| 816426|   8389|0.044133298|
| 892290|   8389|0.044647682|
| 816244|   8389|0.046038166|
| 899758|   8389|0.044133298|
| 824997|   8389|0.044647682|
| 837780|   8389|0.045287456|
| 872536|   8389|0.044133298|
| 900450|   8389|0.045287456|
| 918664|   8389|0.044133298|
| 833749|   8389|0.044133298|
| 869294|   8389|0.044458497|
| 894833|   8389|0.044647682|
| 927261|   8389| 0.04623191|
| 830257|   8389|0.056885265|
+-------+-------+-----------+
only showing top 20 rows



In [87]:
final1 = final1.sort(col("user_id"),col("item_id"))

In [None]:
2156840

In [86]:
final1.count()

2156840

In [None]:
df_train.sort(col("user_id"),col("item_id")).show()

In [89]:
final1.show()

+-------+-------+-----------+
|user_id|item_id|   purchase|
+-------+-------+-----------+
|   1654|    336|0.043980245|
|   1654|    678|0.043980245|
|   1654|    691|0.043980245|
|   1654|    696|0.044042222|
|   1654|    763|0.043980245|
|   1654|    795| 0.04623191|
|   1654|    861|0.043980245|
|   1654|   1137| 0.04423483|
|   1654|   1159|0.044042222|
|   1654|   1428|0.043980245|
|   1654|   1685|0.044042222|
|   1654|   1686|0.043980245|
|   1654|   1704|0.044042222|
|   1654|   2093|0.043980245|
|   1654|   2343|0.043980245|
|   1654|   2451|0.043980245|
|   1654|   2469| 0.04697062|
|   1654|   2603|0.043980245|
|   1654|   2609|0.043980245|
|   1654|   2621|0.044042222|
+-------+-------+-----------+
only showing top 20 rows



Сохраним результат:

In [88]:
final1.select("user_id", "item_id", "purchase").toPandas().to_csv("lab03(01).csv", sep=",", index=True)

In [114]:
#!cat ./lab03.csv

**Итоговый результат соревнвоания, AUC на тестовой выборке: 0.847029552952**

Возможность более сложных моделей при наличии ресурсов на кластере:

In [101]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [102]:
paramGrid = ParamGridBuilder().addGrid(classifier.maxIter, [10, 30])\
                              .addGrid(classifier.maxDepth, [7, 12])\
                              .addGrid(classifier.stepSize, [0.2, 0.25])\
                              .build()

In [103]:
crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid,
                              evaluator=evaluator, numFolds=3, parallelism=3)

In [112]:
#cv_model = crossval.fit(df_train2)

In [None]:
cv_model.avgMetrics

In [None]:
cv_model.bestModel

In [None]:
predictions_cv = cv_model.transform(df_test1.fillna(-1))