In [1]:
import os
import sys
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 3 pyspark-shell'

spark_home = os.environ.get('SPARK_HOME', None)

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))

In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import json

In [3]:
conf = SparkConf()

spark = (SparkSession
         .builder
         .config(conf=conf)
         .appName("ALS purchase prediction")
         .getOrCreate())

In [4]:
spark

In [15]:


from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF
# from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
import numpy as np
import pandas as pd
import ast
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
from tqdm import tqdm

### Читаем данные ===============================================

In [6]:
# lab10_items.csv — дополнительные данные по items. В данном файле много лишней или ненужной 
# информации, так что задача её фильтрации и отбора ложится на вас. Поля в файле, на которых 
# хотелось бы остановиться:

#     item_id — primary key. Соответствует item_id в предыдущем файле.
#     content_type — тип телепередачи (1 — платная, 0 — бесплатная). Вас интересуют платные передачи.
#     title — название передачи, текстовое поле.
#     year — год выпуска передачи, число.
#     genres — поле с жанрами передачи, разделёнными через запятую.



items = spark.read.csv('/labs/lab10data/lab10_items.csv', header=True, sep='\t')
               
items.limit(5).toPandas()
# items.show(2, vertical=True, truncate=False)

Unnamed: 0,item_id,channel_id,datetime_availability_start,datetime_availability_stop,datetime_show_start,datetime_show_stop,content_type,title,year,genres,region_id
0,65667,,1970-01-01T00:00:00Z,2018-01-01T00:00:00Z,,,1,на пробах только девушки (all girl auditions),2013.0,Эротика,
1,65669,,1970-01-01T00:00:00Z,2018-01-01T00:00:00Z,,,1,скуби ду: эротическая пародия (scooby doo: a x...,2011.0,Эротика,
2,65668,,1970-01-01T00:00:00Z,2018-01-01T00:00:00Z,,,1,горячие девочки для горячих девочек (hot babes...,2011.0,Эротика,
3,65671,,1970-01-01T00:00:00Z,2018-01-01T00:00:00Z,,,1,соблазнительницы женатых мужчин (top heavy hom...,2011.0,Эротика,
4,65670,,1970-01-01T00:00:00Z,2018-01-01T00:00:00Z,,,1,секретные секс-материалы ii: темная секс парод...,2010.0,Эротика,


In [7]:
items.registerTempTable("items")
spark.sql("""
select
    count(distinct item_id) as item_id,
    count(distinct channel_id) as channel_id,
    count(distinct datetime_availability_start) as datetime_availability_start,
    count(distinct datetime_availability_stop) as datetime_availability_stop,
    count(distinct datetime_show_start) as datetime_show_start,
    count(distinct datetime_show_stop) as datetime_show_stop,
    count(distinct content_type) as content_type,
    count(distinct title) as title,
    count(distinct year) as year,
    count(distinct genres) as genres,
    count(distinct region_id) as region_id,
    count(*) as interactions    
from
    items
""").toPandas()

Unnamed: 0,item_id,channel_id,datetime_availability_start,datetime_availability_stop,datetime_show_start,datetime_show_stop,content_type,title,year,genres,region_id,interactions
0,635568,207,2,44071,43070,43356,2,23678,80,1076,18,635568


In [8]:
# lab10_test.csv — тестовый датасет без указанного целевого признака purchase, который вам и 
# предстоит предсказать.

test = spark.read.csv('/labs/lab10data/lab10_test.csv', header=True)
               
test.limit(5).toPandas()
# items.show(2, vertical=True, truncate=False)

Unnamed: 0,user_id,item_id,purchase
0,1654,94814,
1,1654,93629,
2,1654,9980,
3,1654,95099,
4,1654,11265,


In [9]:
test.registerTempTable("test")
spark.sql("""
select
    count(distinct user_id) as user_id,
    count(distinct item_id) as item_id,
    count(*) as interactions    
from
    test
""").toPandas()

Unnamed: 0,user_id,item_id,interactions
0,1941,3704,2156840


In [10]:
# В lab10_train.csv содержатся факты 
# покупки (колонка purchase) 
# пользователями (колонка user_id) 
# телепередач (колонка item_id). 
# Такой формат файла вам уже знаком.

train = spark.read.csv('/labs/lab10data/lab10_train.csv', header=True)
               
train.limit(5).toPandas()
# items.show(2, vertical=True, truncate=False)

Unnamed: 0,user_id,item_id,purchase
0,1654,74107,0
1,1654,89249,0
2,1654,99982,0
3,1654,89901,0
4,1654,100504,0


In [19]:
train.registerTempTable("train")
spark.sql("""
select
    count(distinct user_id) as user_id,
    count(distinct item_id) as item_id,
    count(*) as interactions    
from
    train
""").toPandas()

Unnamed: 0,user_id,item_id,interactions
0,1941,3704,5032624


In [11]:
# Дополнительный файл lab10_views_programmes.csv по просмотрам передач с полями:

#     ts_start — время начала просмотра
#     ts_end — время окончания просмотра
#     item_type— тип просматриваемого контента:
#         live — просмотр "вживую", в момент показа контента в эфире
#         pvr — просмотр в записи, после показа контента в эфире



views = spark.read.csv('/labs/lab10data/lab10_views_programmes.csv', header=True)
               
views.limit(5).toPandas()
# items.show(2, vertical=True, truncate=False)

Unnamed: 0,user_id,item_id,ts_start,ts_end,item_type
0,818606,6739803,1489489378,1489494212,live
1,818606,6271248,1486459714,1486460684,live
2,818606,6963181,1490737862,1490752561,live
3,818606,7371187,1492854851,1492855995,live
4,818606,6538677,1488209891,1488211584,live


In [12]:
views.registerTempTable("views")
spark.sql("""
select
    count(distinct user_id) as user_id,
    count(distinct item_id) as item_id,
    count(distinct ts_start) as ts_start,
    count(distinct ts_end) as ts_end,
    count(distinct item_type) as item_type,
    count(*) as interactions    
from
    views
""").toPandas()

Unnamed: 0,user_id,item_id,ts_start,ts_end,item_type,interactions
0,79385,633840,5126018,5106233,2,20845607


### Оцениваем предыдущий подбор параметров===================================

In [24]:
als_model_results = pd.read_csv('lab10_als_cv.csv')

In [29]:
sns.lineplot(data=als_model_results, x='rank', y='rmse', hue='regParam')
plt.show()

AttributeError: module 'seaborn' has no attribute 'lineplot'

### ALS ===============================================

In [18]:
print('Тестовыe user_id рекомендовали в учебном сете : ' + 
      str(train.groupBy('user_id').count().alias('a')
          .join(test.groupBy('user_id').count().alias('b'), F.col('a.user_id')==F.col('b.user_id'), how='inner').count()) + 
      ' из ' + 
      str(train.groupBy('user_id').count().count()) + 
      '. Всего записей: ' + 
      str(train.count()))

print('Было рекомендовано тестовых item_id в учебном сете : ' + 
      str(train.groupBy('item_id').count().alias('a')
          .join(test.groupBy('item_id').count().alias('b'), F.col('a.item_id')==F.col('b.item_id'), how='inner').count()) + 
      ' из ' + 
      str(train.groupBy('item_id').count().count()) + 
      '. Всего записей: ' + 
      str(train.count()))

Тестовыe user_id рекомендовали в учебном сете : 1941 из 1941. Всего записей: 5032624
Было рекомендовано тестовых item_id в учебном сете : 3704 из 3704. Всего записей: 5032624


In [None]:
# chk_train, chk_val = train_als.randomSplit([0.8, 0.2], seed=42)

In [19]:
%%time
train_als = (train
               .select(
                   F.col('user_id').cast('integer').alias('user_id'),
                   F.col('item_id').cast('integer').alias('item_id'),
                   F.col('purchase').cast('float').alias('purchase')
               ).dropna()
             
              )
train_als.show(5)

+-------+-------+--------+
|user_id|item_id|purchase|
+-------+-------+--------+
|   1654|  74107|     0.0|
|   1654|  89249|     0.0|
|   1654|  99982|     0.0|
|   1654|  89901|     0.0|
|   1654| 100504|     0.0|
+-------+-------+--------+
only showing top 5 rows

CPU times: user 5.87 ms, sys: 394 µs, total: 6.26 ms
Wall time: 212 ms


In [20]:
train_als.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- purchase: float (nullable = true)



In [21]:
%%time
test_als = (test
               .select(
                   F.col('user_id').cast('integer').alias('user_id'),
                   F.col('item_id').cast('integer').alias('item_id')
               )
              )
test_als.show(5)

+-------+-------+
|user_id|item_id|
+-------+-------+
|   1654|  94814|
|   1654|  93629|
|   1654|   9980|
|   1654|  95099|
|   1654|  11265|
+-------+-------+
only showing top 5 rows

CPU times: user 286 µs, sys: 4.22 ms, total: 4.5 ms
Wall time: 136 ms


In [22]:
%%time
# учим ml.ALS
als = ALS(userCol='user_id',
          itemCol='item_id',
          ratingCol='purchase', 
          implicitPrefs=True, 
          numUserBlocks=16, 
          numItemBlocks=16,
          maxIter=20,
          seed=42,
          nonnegative=True)

from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator

grid = (ParamGridBuilder()
        .addGrid(als.rank, [103]) ## Уже подобрали оптимум
        .addGrid(als.regParam, [.005]) ## Уже подобрали оптимум
        .build()
       )

evaluator = RegressionEvaluator(metricName="rmse", 
                                labelCol="purchase", 
                                predictionCol="prediction")

cv = CrossValidator(estimator=als,
                    estimatorParamMaps=grid,
                    evaluator=evaluator,
                    numFolds=5
                   )


als_model = cv.fit(train_als)

CPU times: user 594 ms, sys: 174 ms, total: 767 ms
Wall time: 6min 53s


In [23]:
%%time
# собираем параметры для следующего подбора
als_model_params = [{p.name: v for p, v in m.items()} for m in als_model.getEstimatorParamMaps()]

als_model_results = pd.DataFrame.from_dict([{als_model.getEvaluator().getMetricName(): metric, **ps}
                                            for ps, metric in zip(als_model_params, als_model.avgMetrics)
                                           ]
                                          )\
                      .sort_values(by='rmse', ascending=False)
als_model_results.to_csv('lab10_als_cv.csv')
als_model_results.head(20)

CPU times: user 16 ms, sys: 0 ns, total: 16 ms
Wall time: 15.5 ms


In [30]:
sns.lineplot(data=als_model_results, x='rank', y='rmse', hue='regParam')
plt.show()

AttributeError: module 'seaborn' has no attribute 'lineplot'

In [32]:
# посчитали ROC_AUC
chk_val_als = (als_model.bestModel.transform(train_als)).toPandas()

from sklearn.metrics import roc_auc_score
roc_auc_score(chk_val_als.purchase, chk_val_als.prediction)

0.9933840201174169

на скрытом датасете результат 0,89 что тоже хорошо.

In [None]:
# als_model.save(MAIN_PATH + 'model_rank_120_iter_20.mdl')  

In [None]:
# als_model = ALS_Model.read().load(MAIN_PATH + 'model_rank_120_iter_20.mdl')

In [31]:
%%time

# просто обрезаем верхи больше 1
def to_range(x, r):
    if x > r[1]:
        x = r[1]
#     elif x < r[0]:
#         x = r[0]
    return x

preds_als = (als_model.bestModel.transform(test_als)
             .rdd
             .map(lambda x: (x['user_id'], x['item_id'], to_range(x['prediction'], [0.0,1.0])))
             .toDF(['user_id', 'item_id', 'purchase'])
             .select(F.col('user_id').cast('integer'), 
                     F.col('item_id').cast('integer'), 
                     F.col('purchase').cast('float'))
             .na.fill(0)
             .orderBy(F.col('user_id').asc(), F.col('item_id').asc())
            )
            
preds_als.show(20)

+-------+-------+------------+
|user_id|item_id|    purchase|
+-------+-------+------------+
|   1654|    336|         0.0|
|   1654|    678|         0.0|
|   1654|    691|         0.0|
|   1654|    696| 6.175371E-4|
|   1654|    763| 8.159396E-4|
|   1654|    795|0.0016494164|
|   1654|    861|         0.0|
|   1654|   1137|         0.0|
|   1654|   1159|         0.0|
|   1654|   1428| 0.001444104|
|   1654|   1685|0.0014418728|
|   1654|   1686|         0.0|
|   1654|   1704|9.2851755E-4|
|   1654|   2093|         0.0|
|   1654|   2343|         0.0|
|   1654|   2451|         0.0|
|   1654|   2469| 0.028444147|
|   1654|   2603|         0.0|
|   1654|   2609|         0.0|
|   1654|   2621|0.0033966978|
+-------+-------+------------+
only showing top 20 rows

CPU times: user 89.7 ms, sys: 16.1 ms, total: 106 ms
Wall time: 1min 4s


In [33]:
preds_als = preds_als.toPandas()

In [34]:
test_als.count() - preds_als.shape[0]

0

###  пишем результат в файл ===============================================

In [35]:
%%time

preds_als.to_csv('ALS_purchase_prediction.csv', index=False)

CPU times: user 8.95 s, sys: 59.6 ms, total: 9.01 s
Wall time: 9.03 s


In [None]:
spark.stop()