In [1]:
import os
import sys

In [2]:
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 5 --executor-memory 4g --executor-cores 1 --driver-memory 2g pyspark-shell --master yarn --deploy-mode cluster'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.7
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.


In [3]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark import Row
import json

conf = SparkConf()
# conf.set("spark.enableHiveSupport", "True")

spark = (SparkSession
         .builder
         .config(conf=conf)
         .appName("lab_03_demenev")
         .getOrCreate())

In [4]:
spark

In [5]:
from time import time
import datetime
def print_worktime(t):
    h = int(t//3600)
    m = int(t // 60)
    s = (t % 60)
    print('Код отработал за {0} часов {1} минут {2} секунд'.format(h,m,s))

In [6]:
items = spark.read.csv('/labs/slaba03/laba03_items.csv', header=True, sep="\t")
test = spark.read.csv('/labs/slaba03/laba03_test.csv', header=True)
train = spark.read.csv('/labs/slaba03/laba03_train.csv', header=True)
programmes = spark.read.csv('/labs/slaba03/laba03_views_programmes.csv', header=True)

In [7]:
test.show(2)

+-------+-------+--------+
|user_id|item_id|purchase|
+-------+-------+--------+
|   1654|  94814|    null|
|   1654|  93629|    null|
+-------+-------+--------+
only showing top 2 rows



In [8]:
train.show(2)

+-------+-------+--------+
|user_id|item_id|purchase|
+-------+-------+--------+
|   1654|  74107|       0|
|   1654|  89249|       0|
+-------+-------+--------+
only showing top 2 rows



In [9]:
items.show(2, vertical=True, truncate=False)

-RECORD 0-------------------------------------------------------------------------------
 item_id                     | 65667                                                    
 channel_id                  | null                                                     
 datetime_availability_start | 1970-01-01T00:00:00Z                                     
 datetime_availability_stop  | 2018-01-01T00:00:00Z                                     
 datetime_show_start         | null                                                     
 datetime_show_stop          | null                                                     
 content_type                | 1                                                        
 title                       | на пробах только девушки (all girl auditions)            
 year                        | 2013.0                                                   
 genres                      | Эротика                                                  
 region_id           

In [10]:
programmes.show(5)

+-------+-------+----------+----------+---------+
|user_id|item_id|  ts_start|    ts_end|item_type|
+-------+-------+----------+----------+---------+
|      0|7101053|1491409931|1491411600|     live|
|      0|7101054|1491412481|1491451571|     live|
|      0|7101054|1491411640|1491412481|     live|
|      0|6184414|1486191290|1486191640|     live|
|    257|4436877|1490628499|1490630256|     live|
+-------+-------+----------+----------+---------+
only showing top 5 rows



In [11]:
import pyspark.sql.functions as f
from pyspark.sql.functions import collect_list
from pyspark.sql.types import LongType, StringType, StructType, StructField, ByteType, ArrayType
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, GBTClassifier
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, CountVectorizer

import re

In [12]:
@f.udf(ArrayType(StringType()))
def parsing_data(text: str):
    if text == None:
        return ['']
#     text = " ".join(regex.findall(str(text).lower())).split(" ")
    return [str(word).lower() for word in text]

In [13]:
train_genres = train.join(items.select('item_id', 'genres'), ['item_id'], how='left')
train_genres = train_genres.groupby('item_id').agg(f.collect_list('genres').alias('genre_list')).cache()
train_genres_user = train.join(items.select('item_id', 'genres'), ['item_id'], how='left')
train_genres_user = train_genres_user.groupby('user_id').agg(f.collect_list('genres').alias('genre_list_user')).cache()

In [14]:
train_genres_user.show(2, vertical=True, truncate=False)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [15]:
train_genres.show(2, vertical=True, truncate=False)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [16]:
train_genres = train_genres.select('item_id', parsing_data('genre_list').alias('genres'))
train_genres_user = train_genres_user.select('user_id', parsing_data('genre_list_user').alias('genres_user'))

In [17]:
start = time()
cv_genre = CountVectorizer(inputCol="genres", outputCol="genre_features")
cv_genre_us = CountVectorizer(inputCol="genres_user", outputCol="genre_features_user")
model_genre = cv_genre.fit(train_genres)
model_genre_user = cv_genre_us.fit(train_genres_user)
result_genre_user = model_genre_user.transform(train_genres_user)
result_genre = model_genre.transform(train_genres)
print_worktime(time()-start)


Код отработал за 0 часов 0 минут 23.596380472183228 секунд


In [24]:
result_genre = result_genre.select('item_id', 'genre_features').distinct()
result_genre_user = result_genre_user.select('user_id', 'genre_features_user').distinct()

In [26]:
programmes = programmes.withColumn('length', f.col('ts_end') - f.col('ts_start'))
# programmes.show(5)

In [27]:
purch_am = train.groupby('item_id').agg(f.mean('purchase').alias('purch_prob'))
purch_am_user = train.groupby('user_id').agg(f.mean('purchase').alias('purch_prob_user'))


In [28]:
# watch metrics
user_views_df = programmes.groupby('user_id')\
                                .agg(F.sum('length').alias("sum_watch"),
                                     F.mean('length').alias("mean_watch"))

# Подход 1: предсказание через GBT classifier (долго и плохо)

In [29]:
train_with_features = train.join(purch_am, ['item_id'], how='left')
train_with_features = train_with_features.join(items.select(f.col('year').cast(DoubleType()).alias('year'), 'item_id'), ['item_id'], how='left')
train_with_features = train_with_features.join(purch_am_user, ['user_id'], how='left')
train_with_features = train_with_features.join(user_views_df, ['user_id'], how='left')
train_with_features = train_with_features.repartition(100)
train_with_features = train_with_features.join(result_genre_user, ['user_id'], how='left')
train_with_features = train_with_features.join(result_genre, ['item_id'], how='left').fillna(0).cache()

test_with_features = test.join(purch_am, ['item_id'], how='left')
test_with_features = test_with_features.join(items.select(f.col('year').cast(DoubleType()).alias('year'), 'item_id'), ['item_id'], how='left')
test_with_features = test_with_features.join(purch_am_user, ['user_id'], how='left')
test_with_features = test_with_features.join(user_views_df, ['user_id'], how='left')
test_with_features = test_with_features.repartition(100)
test_with_features = test_with_features.join(result_genre_user, ['user_id'], how='left')
test_with_features = test_with_features.join(result_genre, ['item_id'], how='left').fillna(0).cache()

In [30]:
start = time()
test_with_features.count()
print_worktime(time()-start)

Код отработал за 0 часов 2 минут 2.287085771560669 секунд


In [31]:
start = time()
train_with_features.count()
print_worktime(time()-start)

Код отработал за 0 часов 3 минут 34.253660440444946 секунд


In [32]:
train_with_features.columns

['item_id',
 'user_id',
 'purchase',
 'purch_prob',
 'year',
 'purch_prob_user',
 'sum_watch',
 'mean_watch',
 'genre_features_user',
 'genre_features']

In [33]:
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(outputCol="features") # , handleInvalid="keep"
vecAssembler.setInputCols(['purch_prob',
 'year',
 'purch_prob_user',
 'sum_watch',
 'mean_watch',
 'genre_features_user',
 'genre_features'])


VectorAssembler_e72441155914

In [34]:
train_with_features = train_with_features.withColumn('id', f.col('item_id') + f.col('user_id')).fillna(0)
test_with_features = test_with_features.withColumn('id', f.col('item_id') + f.col('user_id')).fillna(0)

train_df = vecAssembler.transform(train_with_features).select('features', f.col('purchase').cast(ByteType()).alias('purchase'), 'id', 'item_id', 'user_id')
test_df = vecAssembler.transform(test_with_features).select('features', 'purchase', 'id', 'item_id', 'user_id')

In [35]:
test_df = test_df.cache()
train_df = train_df.cache()


In [36]:
start = time()
test_df.count()
print_worktime(time()-start)

Код отработал за 0 часов 1 минут 40.78083086013794 секунд


In [37]:
start = time()
train_df.count()
print_worktime(time()-start)

Код отработал за 0 часов 3 минут 38.504204988479614 секунд


In [None]:
gbt = GBTClassifier(featuresCol='features', labelCol="purchase", stepSize=0.01, maxIter=100, maxDepth=3)

In [None]:
start = time()
gbt_model = gbt.fit(train_df)
predictions_gbt = gbt_model.transform(test_df)
print_worktime(time()-start)

In [None]:
gbt_model.featureImportances

In [None]:
# paramGrid = ParamGridBuilder().addGrid(gbt.maxBins, [30, 50])\
#                               .addGrid(gbt.maxDepth, [3, 5])\
#                               .addGrid(gbt.stepSize, [0.1, 0.2])\
#                               .build()
# crossval = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid,
#                               evaluator=evaluator, numFolds=3, parallelism=5)

In [None]:
# start = time()
# cv_model = crossval.fit(train_df)
# print(cv_model.avgMetrics)
# print_worktime(time()-start)

In [None]:
# cv_model.bestModel

In [None]:
# cv_model.bestModel.extractParamMap()

In [None]:
# train_df = train_df.repartition(100)
# test_df = test_df.repartition(100)

In [None]:
predict_to_double = f.udf(lambda p: float(p[1]), DoubleType())

In [None]:
result_df = predictions_gbt.select('user_id', 'item_id', predict_to_double('prediction').alias('purchase'))

# Подход 2: ALS - быстро и качественнее

In [None]:
from pyspark.ml.recommendation import ALS

In [None]:
als = ALS(maxIter=20, regParam=2.1, rank=6, coldStartStrategy="nan", \
          userCol='user_id', itemCol='item_id', ratingCol='purchase', \
          nonnegative=False, implicitPrefs=True, alpha=5.0, seed=42)

In [None]:
train_als = train.select(f.col('user_id').cast(IntegerType()), f.col('item_id').cast(IntegerType()), f.col('purchase').cast(IntegerType()))
test_als = test.select(f.col('user_id').cast(IntegerType()), f.col('item_id').cast(IntegerType()), f.col('purchase').cast(IntegerType()))

In [None]:
test_als.show()

In [None]:
start = time()
als_model = als.fit(train_als)
print_worktime(time()-start)

In [None]:
start = time()
prediction_als = als_model.transform(test_als)
print_worktime(time()-start)

In [None]:
result = prediction_als.select('user_id', 'item_id', f.col('prediction').alias('purchase'))

In [None]:
start = time()
result = result.orderBy("user_id", "item_id")
result = result.toPandas()
print_worktime(time()-start)

In [None]:
result.to_csv('lab03.csv')

In [None]:
predictions_gbt_final.count()

In [None]:
predictions_gbt.show(2, truncate=False)

In [None]:
predictions_gbt.columns

In [None]:
result = prediction_als.select('user_id', 'item_id', f.col('prediction').alias('purchase'))
# result = result.withColumnRenamed('probability', 'prediction')

In [None]:
prediction_als.show()

In [None]:
# result.show(2, truncate=False)

In [None]:
start = time()
result = result.orderBy("user_id", "item_id")
result = result.toPandas()
print_worktime(time()-start)

In [None]:
result_df.shape

In [None]:
result.to_csv('lab03.csv')

In [None]:
spark.stop()