In [1]:
import pandas as pd
import numpy as np
from scipy.sparse import coo_matrix
import scipy
from scipy.sparse.linalg import svds
from numpy.linalg import svd 
from scipy.sparse import csr_matrix
import time
from sklearn import preprocessing
import matplotlib.pyplot as plt

In [2]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [3]:
import findspark
findspark.init(spark_home='/opt/cloudera/parcels/SPARK2/lib/spark2')

from pyspark import SparkConf, SparkContext, HiveContext
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, udf, regexp_extract, regexp_replace, concat, lit, when, length, unix_timestamp, add_months
from pyspark.sql.functions import month, datediff, lag, to_timestamp, avg, collect_list, stddev, row_number, coalesce
from pyspark.storagelevel import StorageLevel
import pyspark.sql.functions as F
from pyspark.sql.functions import explode

conf = SparkConf().setAll([('spark.yarn.executor.memoryOverhead', '32g'), 
                           ('spark.driver.memory','32g'),
                           ('hive.exec.dynamic.partition','true'),
                           ('hive.exec.dynamic.partition.mode','nonstrict'),
                           #,('spark.rpc.message.maxSize', '512')
                          ])

spark = SparkSession.builder.appName('recsysSparkBaseline').getOrCreate()
sc = spark.sparkContext
hive = HiveContext(sc)

In [4]:
import pyspark.sql.functions as F 

In [5]:
# spark.stop()

## prepare data

In [6]:
data_path = '/data/agoryach/datagym-recsys-01/big-hw-02/input/'
train_path = data_path + 'train_data_full.csv'
test_path = data_path + 'sample_submission_full.csv'
userRecs_path = data_path + 'userRecs.csv'

In [89]:
train = pd.read_csv(train_path)
test = pd.read_csv(test_path)

In [90]:
aggr1 = {
'session_duration': 'max', 
'video_duration': 'max'
}

train_s = train.groupby(['user_id', 'primary_video_id'])\
           .agg(aggr1).reset_index().reset_index()

train_s['session_duration_clean'] = train_s[['session_duration', 'video_duration']].min(axis=1)
train_s['watching_percentage'] = train_s['session_duration_clean']/train_s['video_duration']
train_s = train_s[['user_id', 'primary_video_id', 'watching_percentage']]

user_labels = {k:v for v, k in dict(enumerate(train_s.user_id.unique())).items()}
item_labels = {k:v for v, k in dict(enumerate(train_s.primary_video_id.unique())).items()}

train_s['user_num'] = train_s['user_id'].map(user_labels)
train_s['item_num'] = train_s['primary_video_id'].map(item_labels)

#train_s['target'] = np.where(train_s['watching_percentage']>=0.5, 1, 0)

In [91]:
train_s = train_s.sort_values(['user_num', 'watching_percentage'], ascending=[True, True])
train_s['rating'] = train_s.groupby('user_num').cumcount() + 1

In [92]:
train_spark = train_s[['user_id', 'primary_video_id', 'user_num', 'item_num', 'watching_percentage']]

In [93]:
schema = StructType([StructField("user_id", IntegerType(), True)\
                    ,StructField("primary_video_id", IntegerType(), True)\
                    ,StructField("user_num", IntegerType(), True)\
                    ,StructField("item_num", IntegerType(), True)\
                    ,StructField("rating", FloatType(), True)])
                       
df = spark.createDataFrame(train_spark, schema=schema)

In [12]:
#df = df.withColumn('rating_u', F.floor(col('rating')*10)).drop('rating')

## make features

### item feats

In [15]:
# film info
item_info = pd.read_csv(data_path + 'video_meta_data_full.csv')
# ohe по популярным актерам
credits = pd.read_csv(data_path + 'video_credits_data_full.csv')
# facts about favourite as flag
favs = pd.read_csv(data_path + 'video_favs_data_full.csv')
# facts about like as flag 0, 1, -1
likes = pd.read_csv(data_path + 'video_likes_data_full.csv')
# facts about orders
orders = pd.read_csv(data_path + 'video_orders_data_full.csv')

In [16]:
item_info_agg = item_info.groupby('primary_video_id')\
.max().reset_index()\
[['primary_video_id', 'type', 'year', 'country', 'rating_imdb', 'rating_kinopoisk', \
  'score_by_popular', 'score_by_recommended', 'quality', 'age_limit']]

In [17]:
item_info_agg['item_FILM'] = np.where(item_info_agg['type']=='FILM', 1, 0)
item_info_agg['item_SERIAL'] = np.where(item_info_agg['type']=='SERIAL', 1, 0)
item_info_agg['item_SERIA'] = np.where(item_info_agg['type']=='SERIA', 1, 0)
item_info_agg['item_MULTFILM'] = np.where(item_info_agg['type']=='MULTFILM', 1, 0)
item_info_agg['item_SHOW'] = np.where((item_info_agg['type']=='SHOW')|(item_info_agg['type']=='SHOWFILM'), 1, 0)

item_info_agg['item_year_2000'] = np.where(item_info_agg['year'] >= 2000, 1, 0)
item_info_agg['item_year_1990'] = np.where((item_info_agg['year'] >= 1990)&(item_info_agg['year'] < 2000), 1, 0)
item_info_agg['item_year_1980'] = np.where((item_info_agg['year'] >= 1980)&(item_info_agg['year'] < 1990), 1, 0)
item_info_agg['item_year_1960'] = np.where((item_info_agg['year'] >= 1960)&(item_info_agg['year'] < 1980), 1, 0)
item_info_agg['item_year_1940'] = np.where((item_info_agg['year'] >= 1940)&(item_info_agg['year'] < 1960), 1, 0)
item_info_agg['item_year_1900'] = np.where((item_info_agg['year'] <  1940), 1, 0)

item_info_agg['country_USA'] = np.where((item_info_agg['country'] == 'США'), 1, 0)
item_info_agg['country_RUS'] = np.where((item_info_agg['country'] == 'Россия'), 1, 0)
item_info_agg['country_USSR'] = np.where((item_info_agg['country'] == 'СССР'), 1, 0)
item_info_agg['country_FRANCE'] = np.where((item_info_agg['country'] == 'Франция'), 1, 0)
item_info_agg['country_ENG'] = np.where((item_info_agg['country'] == 'Великобритания'), 1, 0)
item_info_agg['country_UKR'] = np.where((item_info_agg['country'] == 'Украина'), 1, 0)

item_info_agg['quality_FULL'] = np.where((item_info_agg['quality'] == 'FullHD'), 1, 0)
item_info_agg['quality_HD'] = np.where((item_info_agg['quality'] == 'HD'), 1, 0)

item_info_agg['age_limit_18'] = np.where(item_info_agg['age_limit']==18, 1, 0)
item_info_agg['age_limit_16'] = np.where(item_info_agg['age_limit']==16, 1, 0)
item_info_agg['age_limit_12'] = np.where(item_info_agg['age_limit']==12, 1, 0)
item_info_agg['age_limit_6']  = np.where(item_info_agg['age_limit']==6, 1, 0)

In [18]:
item_info_agg.drop(['type', 'year', 'country', 'quality', 'age_limit'], axis=1, inplace=True)

In [19]:
top10_actors = credits[credits['type']=='ROLE'].title.value_counts().head(10).index
top10_directors = credits[credits['type']=='DIRECTOR'].title.value_counts().head(10).index
top10_scenario = credits[credits['type']=='SCENARIO'].title.value_counts().head(10).index

top30_actors = credits[credits['type']=='ROLE'].title.value_counts().head(30).index
top30_directors = credits[credits['type']=='DIRECTOR'].title.value_counts().head(30).index
top30_scenario = credits[credits['type']=='SCENARIO'].title.value_counts().head(30).index

top100_actors = credits[credits['type']=='ROLE'].title.value_counts().head(100).index
top100_directors = credits[credits['type']=='DIRECTOR'].title.value_counts().head(100).index
top100_scenario = credits[credits['type']=='SCENARIO'].title.value_counts().head(100).index

In [20]:
credits['top10_actors'] = np.where(credits.title.isin(top10_actors), 1, 0)
credits['top10_directors'] = np.where(credits.title.isin(top10_directors), 1, 0)
credits['top10_scenario'] = np.where(credits.title.isin(top10_scenario), 1, 0)

credits['top30_actors'] = np.where(credits.title.isin(top30_actors), 1, 0)
credits['top30_directors'] = np.where(credits.title.isin(top30_directors), 1, 0)
credits['top30_scenario'] = np.where(credits.title.isin(top30_scenario), 1, 0)

credits['top100_actors'] = np.where(credits.title.isin(top100_actors), 1, 0)
credits['top100_directors'] = np.where(credits.title.isin(top100_directors), 1, 0)
credits['top100_scenario'] = np.where(credits.title.isin(top100_scenario), 1, 0)

credits_agg = credits.groupby('primary_video_id').sum().reset_index()
item_info_agg = item_info_agg.merge(credits_agg, how='left').fillna(-1)

### user feats

In [42]:
aggregation = {
'primary_video_id': 'count',
'watching_percentage': ['mean', 'max', 'min'],
# 'session_duration': ['mean', 'max', 'min'],
# 'player_position_min': ['mean', 'max', 'min'],
# 'player_position_max': ['mean', 'max', 'min'],
'rating_imdb': 'mean',
'rating_kinopoisk': 'mean',
'score_by_popular': 'mean',
'score_by_recommended': 'mean',
'item_FILM': 'sum',
'item_SERIAL': 'sum',
'item_MULTFILM': 'sum',
'item_SHOW': 'sum',
'item_year_2000': 'sum',
'item_year_1990': 'sum',
'item_year_1980': 'sum',
'item_year_1960': 'sum',
'item_year_1940': 'sum',
'item_year_1900': 'sum',
'country_USA': 'sum',
'country_RUS': 'sum',
'country_USSR': 'sum',
'country_FRANCE': 'sum',
'country_ENG': 'sum',
'country_UKR': 'sum',
'quality_FULL': 'sum',
'quality_HD': 'sum',
'age_limit_18': 'sum',
'age_limit_16': 'sum',
'age_limit_12': 'sum',
'age_limit_6': 'sum',
'top10_actors': 'sum',
'top10_directors': 'sum',
'top10_scenario': 'sum',
'top30_actors': 'sum',
'top30_directors': 'sum',
'top30_scenario': 'sum',
'top100_actors': 'sum',
'top100_directors': 'sum',
'top100_scenario': 'sum'
}

user_info_agg = train_s.groupby('user_id').agg(aggregation)
user_info_agg.columns=['user_'+x[0]+'_'+x[1] for x in user_info_agg.columns]
user_info_agg = user_info_agg.reset_index()

### feats to spark df

In [80]:
integers = ['int16', 'int32', 'int64']
floats = ['float16', 'float32', 'float64']
item_info_cols = []
for col in item_info_agg.columns:
    if item_info_agg.dtypes[col] in integers:
        item_info_cols.append(StructField(col, IntegerType(), True))
    if item_info_agg.dtypes[col] in floats:
        item_info_cols.append(StructField(col, FloatType(), True))
        
item_info_schema = StructType(item_info_cols)
item_info_df = spark.createDataFrame(item_info_agg, schema=item_info_schema)

In [83]:
integers = ['int16', 'int32', 'int64']
floats = ['float16', 'float32', 'float64']
user_info_cols = []

for col in user_info_agg.columns:
    if user_info_agg.dtypes[col] in integers:
        user_info_cols.append(StructField(col, IntegerType(), True))
    if user_info_agg.dtypes[col] in floats:
        user_info_cols.append(StructField(col, FloatType(), True))
        
user_info_schema = StructType(user_info_cols)
user_info_df = spark.createDataFrame(user_info_cols, schema=user_info_schema)

### join feats

In [95]:
join_condition_users = [df.user_id == user_info_df.user_id]

# джоин таргета и недельной витрины + удаление лишних колонок, если они есть
df1 = df.join(user_info_df, join_condition_users, how='left')\
                .drop('user_id')

In [100]:
join_condition_items = [df1.primary_video_id == item_info_df.primary_video_id]

# джоин таргета и недельной витрины + удаление лишних колонок, если они есть
result = df1.join(item_info_df, join_condition_items, how='left')\
                .drop('primary_video_id')

## fit

In [106]:
from pyspark.sql.functions import col

In [108]:
result = result.withColumn('target', when(col('rating') > 0.5, 1).otherwise(0))

In [109]:
(training, test) = result.randomSplit([0.8, 0.2])

In [110]:
# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, 
          regParam=0.01, 
          implicitPrefs=True, 
          userCol="user_num", itemCol="item_num", ratingCol="target",
          coldStartStrategy="drop")
model = als.fit(training)

KeyboardInterrupt: 

In [None]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", 
                                labelCol="target",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

In [235]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating_u",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 7.660692592176051


## predict

In [None]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)

In [None]:
userRecs.printSchema()

In [None]:
userRecs.show(5, True, True)

In [None]:
userRecs = userRecs.select(userRecs.user_num, explode(userRecs.recommendations).alias('item'))

In [None]:
userRecs.printSchema()

In [241]:
userRecs = userRecs.select(userRecs.user_num, 
                           userRecs.item.item_num.alias('item_num'), 
                           userRecs.item.rating.alias('rating'))

In [None]:
# hot_users_b = sc.broadcast(hot_users)
# hotRecs = userRecs.filter(col('user_num').isin(hot_users_b.value))

In [242]:
# save to hive
# userRecs_table = 'agoryach.recsys_user_recs_baseline'
# #hive.sql(f'drop table {userRecs_table}')

# userRecs.write.format('hive').saveAsTable(userRecs_table)

In [244]:
hdfs_save_path = '/user/agoryach/recsys_user_recs_baseline_10'
userRecs_path = data_path + 'userRecs_baseline_10.csv'

userRecs.write.format('com.databricks.spark.csv').save(hdfs_save_path, header = 'false', sep = '~')

In [245]:
!hdfs dfs -getmerge $hdfs_save_path $userRecs_path

## final step

In [246]:
item_labels_rev = {v:k for k,v in item_labels.items()}
user_labels_rev = {v:k for k,v in user_labels.items()}

In [247]:
names = ['user_num', 'item_num', 'rating']
userRecsDf = pd.read_csv(userRecs_path, names=names, sep='~')

In [248]:
userRecsDf['primary_video_id'] = userRecsDf['item_num'].map(item_labels_rev)
userRecsDf['user_id'] = userRecsDf['user_num'].map(user_labels_rev)

In [249]:
userRecsDf.head()

Unnamed: 0,user_num,item_num,rating,primary_video_id,user_id
0,148,255,0.065378,25397362,48615
1,148,254,0.064217,22838054,48615
2,148,181,0.062919,52363,48615
3,148,253,0.062629,22030859,48615
4,148,112,0.060544,21426265,48615


In [250]:
test = pd.read_csv(test_path).drop('primary_video_id', axis=1)

In [251]:
hot_users = test[test['user_id'].isin(userRecsDf.user_id.unique())].user_id.unique()
cold_users = list(set(test.user_id.unique()) - set(hot_users))

### hot users

In [252]:
import warnings
warnings.filterwarnings('ignore')

In [253]:
hotUserRecs = userRecsDf[userRecsDf['user_id'].isin(hot_users)]

In [254]:
hotUserRecs.isna().sum()

user_num            0
item_num            0
rating              0
primary_video_id    0
user_id             0
dtype: int64

In [255]:
hotResult = hotUserRecs[['user_id', 'primary_video_id']]

In [256]:
result1 = hotResult.groupby('user_id')['primary_video_id'].apply(list).reset_index()

In [257]:
result1['primary_video_id'] = result1['primary_video_id'].apply(lambda x: ' '.join([str(a) for a in x]))

In [258]:
result1.head()

Unnamed: 0,user_id,primary_video_id
0,1410,19134359 10350682 30564130 9278069 30986288 15...
1,1894,3696132 2677761 32222690 19624341 11536584 287...
2,3070,11842669 3696132 27740348 9516870 19624341 159...
3,5344,20320480 30564130 19134359 10350682 15147293 2...
4,5445,26320644 5147187 26997030 11536584 9583642 355...


### cold users

In [259]:
cold_items = ' '.join([str(x) for x in userRecsDf['primary_video_id'].value_counts().index[:10]])

In [260]:
result2 = pd.DataFrame(cold_users, columns=['user_id'])

In [261]:
result2['primary_video_id'] = [cold_items]*result2.shape[0]

In [262]:
result2.head()

Unnamed: 0,user_id,primary_video_id
0,118358019,10350682 2677761 3696132 19134359 21426265 287...
1,113246218,10350682 2677761 3696132 19134359 21426265 287...
2,24903694,10350682 2677761 3696132 19134359 21426265 287...
3,95027220,10350682 2677761 3696132 19134359 21426265 287...
4,37486613,10350682 2677761 3696132 19134359 21426265 287...


### concat

In [263]:
result = pd.concat((result1, result2))

In [264]:
result.user_id.nunique(), result.shape[0], test.shape[0], len(cold_users) + len(hot_users)

(187183, 187183, 187183, 187183)

In [265]:
subm_path = '/data/agoryach/datagym-recsys-01/big-hw-02/output/submission_5.csv'
subm_path

'/data/agoryach/datagym-recsys-01/big-hw-02/output/submission_5.csv'

In [266]:
result.to_csv(subm_path, index=False)