In [4]:
import os
import sys
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 10 --executor-cores 3 --executor-memory 4g --driver-memory 5g --conf spark.yarn.executor.memoryOverhead=400 pyspark-shell'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.7
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.


In [5]:
from pyspark import SparkConf
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StopWordsRemover
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark import Row
import json

conf = SparkConf()

spark = (SparkSession
         .builder
         .config(conf=conf)
         .appName("lab03_sai")
         .getOrCreate())

In [6]:
spark

In [383]:
spark.stop()

In [4]:
!hdfs dfs -ls /labs/slaba03/

Found 4 items
-rw-r--r--   3 hdfs hdfs   91066524 2022-01-06 18:46 /labs/slaba03/laba03_items.csv
-rw-r--r--   3 hdfs hdfs   29965581 2022-01-06 18:46 /labs/slaba03/laba03_test.csv
-rw-r--r--   3 hdfs hdfs   74949368 2022-01-06 18:46 /labs/slaba03/laba03_train.csv
-rw-r--r--   3 hdfs hdfs  871302535 2022-01-06 18:46 /labs/slaba03/laba03_views_programmes.csv


In [9]:
!hdfs dfs -cat /labs/slaba03/laba03_train.csv | head -3

user_id,item_id,purchase
1654,74107,0
1654,89249,0
cat: Unable to write to output stream.


In [10]:
!hdfs dfs -cat /labs/slaba03/laba03_items.csv | head -3

item_id	channel_id	datetime_availability_start	datetime_availability_stop	datetime_show_start	datetime_show_stop	content_type	title	year	genres	region_id
65667		1970-01-01T00:00:00Z	2018-01-01T00:00:00Z			1	на пробах только девушки (all girl auditions)	2013.0	Эротика	
65669		1970-01-01T00:00:00Z	2018-01-01T00:00:00Z			1	скуби ду: эротическая пародия (scooby doo: a xxx parody)	2011.0	Эротика	
cat: Unable to write to output stream.


In [11]:
!hdfs dfs -cat /labs/slaba03/laba03_test.csv | head -3

user_id,item_id,purchase
1654,94814,
1654,93629,
cat: Unable to write to output stream.


In [12]:
!hdfs dfs -cat /labs/slaba03/laba03_views_programmes.csv | head -3

user_id,item_id,ts_start,ts_end,item_type
0,7101053,1491409931,1491411600,live
0,7101054,1491412481,1491451571,live
cat: Unable to write to output stream.


In [None]:
# content_type = 1, жанры через запятую, ts-end-

In [7]:
train = (
    spark
    .read
    .option("header", True)
    .schema(StructType([
        StructField("user_id", IntegerType(), True),
        StructField("item_id", IntegerType(), True),
        StructField("purchase", IntegerType(), True)
    ]))
    .csv("/labs/slaba03/laba03_train.csv")
    .cache()
)

In [8]:
train.count()

5032624

In [9]:
items = (
    spark
    .read
    .option("header", True)
    .schema(StructType([
        StructField("item_id", IntegerType()),
        StructField("channel_id", IntegerType()),
        StructField("datetime_availability_start", StringType()),
        StructField("datetime_availability_stop", StringType()),
        StructField("datetime_show_start", StringType()),
        StructField("datetime_show_stop", StringType()),
        StructField("content_type", IntegerType()),
        StructField("title", StringType(), True),
        StructField("year", FloatType(), True),
        StructField("genres", StringType()),
        StructField("region_id", IntegerType())
    ]))
    .option("delimiter", "\t")
    .csv("/labs/slaba03/laba03_items.csv")
    .cache()
)

In [10]:
items.count()

635568

In [265]:
items.where("genres is null").show(2)

+-------+----------+---------------------------+--------------------------+-------------------+------------------+------------+--------------------+------+------+---------+
|item_id|channel_id|datetime_availability_start|datetime_availability_stop|datetime_show_start|datetime_show_stop|content_type|               title|  year|genres|region_id|
+-------+----------+---------------------------+--------------------------+-------------------+------------------+------------+--------------------+------+------+---------+
|   6151|      null|       1970-01-01T00:00:00Z|      2018-01-01T00:00:00Z|               null|              null|           1|              родина|2011.0|  null|     null|
|   9055|      null|       1970-01-01T00:00:00Z|      2018-01-01T00:00:00Z|               null|              null|           1|американская исто...|2011.0|  null|     null|
+-------+----------+---------------------------+--------------------------+-------------------+------------------+------------+--------

In [11]:
fill = F.array().cast("array<string>")

In [12]:
items = (
    items
    .where("content_type = 1")
)

In [13]:
test = (
    spark
    .read
    .option("header", True)
    .schema(StructType([
        StructField("user_id", IntegerType(), True),
        StructField("item_id", IntegerType(), True)
    ]))
    .csv("/labs/slaba03/laba03_test.csv")
    .cache()
)

In [14]:
test.count()

2156840

In [15]:
views = (
    spark
    .read
    .option("header", True)
    .schema(StructType([
        StructField("user_id", IntegerType()),
        StructField("item_id", IntegerType()),
        StructField("ts_start", IntegerType()),
        StructField("ts_end", IntegerType()),
        StructField("item_type", StringType())
    ]))
    .csv("/labs/slaba03/laba03_views_programmes.csv")
    .cache()
)

In [16]:
views.count()

20845607

In [78]:
train.show(1)

+-------+-------+--------+
|user_id|item_id|purchase|
+-------+-------+--------+
|   1654|  74107|       0|
+-------+-------+--------+
only showing top 1 row



In [151]:
items.select("genres").distinct().show(truncate=False)

+--------------------------------------------------------------------------+
|genres                                                                    |
+--------------------------------------------------------------------------+
|Ужасы                                                                     |
|Драмы,Зарубежные,Спорт                                                    |
|Для детей,Наши                                                            |
|Детективы,Триллеры,Драмы,Фантастика                                       |
|Мистические,Приключения,Драмы,Мелодрамы,Зарубежные                        |
|Приключения,Советское кино,Исторические,Русские,Для всей семьи,Музыкальные|
|Русские мультфильмы,Про животных,Для всей семьи,Для детей                 |
|Западные мультфильмы,Сериалы,Для детей,Зарубежные                         |
|Военные,Приключения,Драмы                                                 |
|Комедии,Советское кино,Экранизации,Мелодрамы,Для всей семьи               |

In [199]:
items.show(10)

+-------+----------+---------------------------+--------------------------+-------------------+------------------+------------+--------------------+------+--------------------+---------+-----------+--------------------+--------------------+
|item_id|channel_id|datetime_availability_start|datetime_availability_stop|datetime_show_start|datetime_show_stop|content_type|               title|  year|              genres|region_id|first_genre|          all_genres|         prep_genres|
+-------+----------+---------------------------+--------------------------+-------------------+------------------+------------+--------------------+------+--------------------+---------+-----------+--------------------+--------------------+
|  65667|      null|       1970-01-01T00:00:00Z|      2018-01-01T00:00:00Z|               null|              null|           1|на пробах только ...|2013.0|             Эротика|     null|    эротика|           [эротика]|           [эротика]|
|  65669|      null|       1970-01-0

In [200]:
items.where("prep_genres is null").show(2)

+-------+----------+---------------------------+--------------------------+-------------------+------------------+------------+--------------------+------+------+---------+-----------+----------+-----------+
|item_id|channel_id|datetime_availability_start|datetime_availability_stop|datetime_show_start|datetime_show_stop|content_type|               title|  year|genres|region_id|first_genre|all_genres|prep_genres|
+-------+----------+---------------------------+--------------------------+-------------------+------------------+------------+--------------------+------+------+---------+-----------+----------+-----------+
|   6151|      null|       1970-01-01T00:00:00Z|      2018-01-01T00:00:00Z|               null|              null|           1|              родина|2011.0|  null|     null|       null|        []|       null|
|   9055|      null|       1970-01-01T00:00:00Z|      2018-01-01T00:00:00Z|               null|              null|           1|американская исто...|2011.0|  null|     n

In [80]:
test.show(1)

+-------+-------+
|user_id|item_id|
+-------+-------+
|   1654|  94814|
+-------+-------+
only showing top 1 row



In [81]:
views.show(1)

+-------+-------+----------+----------+---------+
|user_id|item_id|  ts_start|    ts_end|item_type|
+-------+-------+----------+----------+---------+
|      0|7101053|1491409931|1491411600|     live|
+-------+-------+----------+----------+---------+
only showing top 1 row



In [61]:
train.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- purchase: integer (nullable = true)



In [70]:
items.printSchema()

root
 |-- item_id: integer (nullable = true)
 |-- channel_id: integer (nullable = true)
 |-- datetime_availability_start: string (nullable = true)
 |-- datetime_availability_stop: string (nullable = true)
 |-- datetime_show_start: string (nullable = true)
 |-- datetime_show_stop: string (nullable = true)
 |-- content_type: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: float (nullable = true)
 |-- genres: string (nullable = true)
 |-- region_id: integer (nullable = true)



In [64]:
test.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)



In [76]:
views.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- ts_start: integer (nullable = true)
 |-- ts_end: integer (nullable = true)
 |-- item_type: string (nullable = true)



In [17]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, VectorAssembler
from pyspark.ml.pipeline import PipelineModel

In [18]:
df_user_stat = (
    train
    .groupBy("user_id")
    .agg(
        F.count("purchase").alias("user_count"),
        F.sum("purchase").alias("user_sum"),
        F.mean("purchase").alias("user_avg")
    )
)

In [19]:
df_item_stat = (
    train
    .groupBy("item_id")
    .agg(
        F.count("purchase").alias("item_count"),
        F.sum("purchase").alias("item_sum"),
        F.mean("purchase").alias("item_avg")
    )
)

In [20]:
df_user_stat_views = (
    views
    .withColumn("duration", F.col("ts_end") - F.col("ts_start"))
    .groupBy("user_id")
    .agg(
        F.count("user_id").alias("user_views_count"),
        F.sum("duration").alias("user_duration_sum"),
        F.min("duration").alias("user_duration_min"),
        F.max("duration").alias("user_duration_max"),
        F.mean("duration").alias("user_duration_avg"),
    )
)

In [21]:
df_user_stat_views_live = (
    views
    .where("item_type = 'live'")
    .withColumn("duration", F.col("ts_end") - F.col("ts_start"))
    .groupBy("user_id")
    .agg(
        F.count("user_id").alias("user_views_live_count"),
        F.sum("duration").alias("user_duration_live_sum"),
        F.min("duration").alias("user_duration_live_min"),
        F.max("duration").alias("user_duration_live_max"),
        F.mean("duration").alias("user_duration_live_avg"),
    )
)

In [22]:
df_user_stat_views_pvr = (
    views
    .where("item_type = 'pvr'")
    .withColumn("duration", F.col("ts_end") - F.col("ts_start"))
    .groupBy("user_id")
    .agg(
        F.count("user_id").alias("user_views_pvr_count"),
        F.sum("duration").alias("user_duration_pvr_sum"),
        F.min("duration").alias("user_duration_pvr_min"),
        F.max("duration").alias("user_duration_pvr_max"),
        F.mean("duration").alias("user_duration_pvr_avg"),
    )
)

In [23]:
train_df = (
    train
    .join(items, "item_id", "left")
    .join(df_user_stat, "user_id", "left")
    .join(df_item_stat, "item_id", "left")
    .join(df_user_stat_views, "user_id", "left")
    .join(df_user_stat_views_live, "user_id", "left")
    .join(df_user_stat_views_pvr, "user_id", "left")
    .withColumn("all_genres", F.split(F.lower(F.trim(F.coalesce(F.col("genres"), F.lit("")))), ","))
    .na.fill(0)
)

In [24]:
test_df = (
    test
    .join(items, "item_id", "left")
    .join(df_user_stat, "user_id", "left")
    .join(df_item_stat, "item_id", "left")
    .join(df_user_stat_views, "user_id", "left")
    .join(df_user_stat_views_live, "user_id", "left")
    .join(df_user_stat_views_pvr, "user_id", "left")
    .withColumn("all_genres", F.split(F.lower(F.trim(F.coalesce(F.col("genres"), F.lit("")))), ","))
    .na.fill(0)
)

In [25]:
cols = [
    'user_count',
    'user_sum',
    'user_avg',
    'item_count',
    'item_sum',
    'item_avg',
    'user_views_count',
    'user_duration_sum',
    'user_duration_min',
    'user_duration_max',
    'user_duration_avg',
    'user_views_live_count',
    'user_duration_live_sum',
    'user_duration_live_min',
    'user_duration_live_max',
    'user_duration_live_avg',
    'user_views_pvr_count',
    'user_duration_pvr_sum',
    'user_duration_pvr_min',
    'user_duration_pvr_max',
    'user_duration_pvr_avg',
    'genres_vector'
]
cols

['user_count',
 'user_sum',
 'user_avg',
 'item_count',
 'item_sum',
 'item_avg',
 'user_views_count',
 'user_duration_sum',
 'user_duration_min',
 'user_duration_max',
 'user_duration_avg',
 'user_views_live_count',
 'user_duration_live_sum',
 'user_duration_live_min',
 'user_duration_live_max',
 'user_duration_live_avg',
 'user_views_pvr_count',
 'user_duration_pvr_sum',
 'user_duration_pvr_min',
 'user_duration_pvr_max',
 'user_duration_pvr_avg',
 'genres_vector']

In [317]:
count_vectorizer = CountVectorizer(inputCol="all_genres", outputCol="genres_vector")

In [318]:
assembler = VectorAssembler(inputCols=cols, outputCol="features")

In [319]:
gbt = GBTClassifier(labelCol="purchase", maxDepth=4, minInstancesPerNode=3)

In [320]:
pipeline = Pipeline(stages=[
    count_vectorizer,
    assembler,
    gbt
])

In [321]:
pipeline_model = pipeline.fit(train_df)

In [323]:
pipeline_model.write().save("tmp/lab03/pipeline_model")

In [26]:
pipeline_model = PipelineModel.load("tmp/lab03/pipeline_model")

In [27]:
predictions = pipeline_model.transform(test_df)

In [365]:
predictions.select("user_id", "item_id", "features", "rawPrediction", "probability", "prediction").show(1, False)

+-------+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------+-----------------------------------------+----------+
|user_id|item_id|features                                                                                                                                                                                                                                                                                                            |rawPrediction                           |probability                              |prediction|
+-------+-------+---------------------------------------------------------------------------------------------------------------------------------------------

In [367]:
predictions.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- channel_id: integer (nullable = true)
 |-- datetime_availability_start: string (nullable = true)
 |-- datetime_availability_stop: string (nullable = true)
 |-- datetime_show_start: string (nullable = true)
 |-- datetime_show_stop: string (nullable = true)
 |-- content_type: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: float (nullable = false)
 |-- genres: string (nullable = true)
 |-- region_id: integer (nullable = true)
 |-- first_genre: string (nullable = true)
 |-- user_count: long (nullable = true)
 |-- user_sum: long (nullable = true)
 |-- user_avg: double (nullable = false)
 |-- item_count: long (nullable = true)
 |-- item_sum: long (nullable = true)
 |-- item_avg: double (nullable = false)
 |-- user_views_count: long (nullable = true)
 |-- user_duration_sum: long (nullable = true)
 |-- user_duration_min: integer (nullable = true)
 |-- user_duration_max: integer 

In [28]:
first_element = F.udf(lambda v: v.toArray().tolist(), ArrayType(FloatType()))

In [32]:
final_df = predictions.select("user_id", "item_id", first_element(F.col("probability")).getItem(1).alias("purchase")).cache()

In [30]:
final_df.show(5)

+-------+-------+-----------+
|user_id|item_id|   purchase|
+-------+-------+-----------+
| 754230|  93486|0.057003632|
| 754230|  94819| 0.04769586|
| 754230|  73041| 0.04769586|
| 754230|  74440| 0.06884378|
| 754230|  74452| 0.05287387|
+-------+-------+-----------+
only showing top 5 rows



In [33]:
final_df.count()

2156840

In [34]:
views.unpersist()

DataFrame[user_id: int, item_id: int, ts_start: int, ts_end: int, item_type: string]

In [35]:
train.unpersist()

DataFrame[user_id: int, item_id: int, purchase: int]

In [36]:
items.unpersist()

DataFrame[item_id: int, channel_id: int, datetime_availability_start: string, datetime_availability_stop: string, datetime_show_start: string, datetime_show_stop: string, content_type: int, title: string, year: float, genres: string, region_id: int, first_genre: string]

In [37]:
train.unpersist()

DataFrame[user_id: int, item_id: int, purchase: int]

In [38]:
res = final_df.toPandas()

In [39]:
res = res.sort_values(["user_id", "item_id"])

In [43]:
res.reset_index(inplace=True)

In [45]:
res = res.drop("index", axis=1)

In [49]:
res.head(2)

Unnamed: 0,user_id,item_id,purchase
0,1654,336,0.043958
1,1654,678,0.043958


In [50]:
res.to_csv("lab03.csv")

In [51]:
spark.stop()