In [1]:
!hdfs dfs -rm -R /user/andrey.chubin/check_point_dir

rm: `/user/andrey.chubin/check_point_dir': No such file or directory


In [None]:
!hdfs dfs -rm -R /user/andrey.chubin/w_test_adj.parquet

In [None]:
!hadoop fs -get /labs/lab10data/lab10_views_programmes.csv

In [None]:
!hadoop fs -ls /user/andrey.chubin/

#### Запускаю Spark

In [2]:
import os
import sys
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 6 pyspark-shell'

spark_home = os.environ.get('SPARK_HOME', None)

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))

In [3]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.app.name", "andrey lab10 app") 

spark = SparkSession.builder.config(conf=conf).appName("andrey lab10 app").getOrCreate()

In [4]:
sc = spark.sparkContext

#### Устанавливаю папку для чекпоинта на всякий случай

In [5]:
sc.setCheckpointDir("/user/andrey.chubin/check_point_dir/")

In [6]:
import numpy as np
import pandas as pd

import warnings
warnings.filterwarnings('ignore')

from pyspark.sql import functions as f
from pyspark.sql.types import *
from pyspark.ml.feature import CountVectorizer

In [7]:
t_schema = StructType(fields=[StructField("user_id", StringType()),
                               StructField("item_id", StringType()),
                               StructField("purchase", DoubleType())])

#### Загружаю файлы. В файле с информацией о фильмах и программах сразу превращаю год создания в возраст в годах (математические операции с возрастом имеют больше смысла чем с годом)

In [8]:
test = spark.read.csv("/labs/lab10data/lab10_test.csv", sep=',', header=True, schema=t_schema)

train = spark.read.csv("/labs/lab10data/lab10_train.csv", sep=',', header=True, schema=t_schema)

items = spark.read.csv("/labs/lab10data/lab10_items.csv", sep='\t', header=True)\
.where(f.col('content_type')=='1')\
.select('item_id', 'year', 'genres')\
.select(f.col('item_id'), (2020 - f.col('year').cast(IntegerType())).alias('film_age'), f.col('genres'))

#### Группирую данные, чтобы узнать, сколько юзер посмотрел фильмов (потом эта фича не используется, так как она ухудшала результаты) и сколько юзеров посмотрели фильм. Проще говоря, узнаю активность и популярность

In [9]:
v_user = train.select('user_id', 'item_id').distinct().groupBy('user_id').agg(f.count('item_id').alias('v_user'))

v_item = train.select('user_id', 'item_id').distinct().groupBy('item_id').agg(f.count('user_id').alias('v_item'))

#### На этом этапе нахожу, сколько  в среднем часов (adjusted mean) провёл каждый юзер за просмотром контента и применяю MinMaxScale на данный показатель

In [10]:
details = spark.read.csv("/labs/lab10data/lab10_views_programmes.csv", sep=',', header=True)\
.select(f.col('user_id'), f.col('ts_start').cast(LongType()), f.col('ts_end').cast(LongType()))\
.select(f.col('user_id'), f.round((f.col('ts_end')-f.col('ts_start'))/3600, 2).alias('hours_spend'))\
.fillna(1.04, subset=['hours_spend'])\
.groupBy('user_id').agg(f.sum('hours_spend').alias("sum_hours_spend"),
                        f.count('hours_spend').alias("count_hours_spend"))\
.select(f.col('user_id'),
        f.round((f.col('sum_hours_spend')+(250*1.04))/(f.col('count_hours_spend')+250), 2)\
        .alias('hours_spend'),
        f.round((f.col('count_hours_spend')-1.0)/(113796.0-1.0), 4).alias('adj_views'))\
.cache()

#### Токенизатор жанров контента (каждый item мог иметь более одного жанра). Сейчас бы я применил на этом моменте заранее подготовленную Scala UDF, но тогда я этого не умел

In [11]:
@f.pandas_udf(ArrayType(StringType()))
def presplit(dataseries):
    return dataseries.apply(lambda x: x.split(',') if x != None else [])

@f.pandas_udf(ArrayType(StringType()))
def split(dataseries):
    return dataseries.apply(lambda x: [y.lower() for y in x])

In [12]:
items = items.withColumn('genres', presplit('genres'))
items = items.withColumn('genres', split('genres'))

In [13]:
def merge_two_lists(x, y):
    z = x.copy()
    return z + y

In [14]:
u_schema = StructType(fields=[StructField("user_id", StringType()),
                               StructField("genres", ArrayType(StringType()))])

In [15]:
res_u_schema = StructType(fields=[StructField("user_id", StringType()),
                               StructField("u_features", ArrayType(DoubleType()))])

res_i_schema = StructType(fields=[StructField("item_id", StringType()),
                               StructField("i_features", ArrayType(DoubleType()))])

In [16]:
user_pref = train.where(f.col('purchase')==1.0)\
.join(items.select(f.col('item_id'), f.col('genres')), on='item_id', how='left')\
.select(f.col('user_id'), f.col('genres'))\
.rdd.map(lambda x: (x[0], x[1])).reduceByKey(merge_two_lists, numPartitions=1)\
.toDF(schema=u_schema).cache()

#### Применяю CountVectorizer к жанрам, что представить жанровую принадлежность item в виде числового вектора

In [17]:
cv = CountVectorizer(inputCol="genres", outputCol="features", minDF=2.0)

model = cv.fit(items)

In [18]:
res_items = model.transform(items)\
.select(f.col('item_id'), f.col('features'))\
.rdd.map(lambda x: (x[0], x[1].toArray().tolist()))\
.toDF(schema=res_i_schema).cache()

#### Применяю тот же подход к юзерам => задача - определить любимые (самые часто просматриваемые) жанры

In [19]:
res_users = model.transform(user_pref)\
.select(f.col('user_id'), f.col('features'))\
.rdd.map(lambda x: (x[0], x[1].toArray().tolist()))\
.toDF(schema=res_u_schema).cache()

In [20]:
user_pref.unpersist()

DataFrame[user_id: string, genres: array<string>]

In [21]:
res_all_schema = StructType(fields=[StructField("user_id", StringType()),
                                    StructField("item_id", StringType()),
                                    StructField("features", ArrayType(DoubleType()))])

#### Создаю вектор для холодного старта. Если у контента не было указано жанра, то он получает нулевой векток, если у юзера, то вектор из единиц (считаем, что все жанры нравятся ему одинаково).
#### Делю вектор юзера на его длину.
#### И нахожу пересечения векторов контента и юзера (идея: предугадываем по жанру вероятность того, что контент понравится)
#### Дополнительно нахожу для test, покупал ли юзер что-то раньше

In [22]:
cold_array_user = [1.0 for i in range(75)]
cold_array_film = [0.0 for i in range(75)]

res_all = test.drop('purchase').union(train.drop('purchase'))\
.join(res_items, on='item_id', how='left')\
.join(res_users, on='user_id', how='left')\
.select(f.col('user_id'), f.col('item_id'), f.col("u_features"), f.col("i_features"))\
.rdd.map(lambda x: (x[0], x[1], x[2], x[3]) if x[2] != None else (x[0], x[1], cold_array_user, x[3]))\
.map(lambda x: (x[0], x[1], x[2], x[3]) if x[3] != None else (x[0], x[1], x[2], cold_array_film))\
.map(lambda x: (x[0], x[1], ((np.array(x[2])/(np.array(x[2]).sum()+0.000001))*np.array(x[3])).tolist()))\
.toDF(schema=res_all_schema)\
.join(items.select(f.col('item_id'), f.col('film_age')), on='item_id').cache()

In [23]:
items.unpersist()

DataFrame[item_id: string, film_age: int, genres: array<string>]

In [24]:
res_items.unpersist()

DataFrame[item_id: string, i_features: array<double>]

In [25]:
res_users.unpersist()

DataFrame[user_id: string, u_features: array<double>]

#### Дообрабатываем тренировочные и тестовые данные

In [26]:
w_test = test.join(res_all, on=['item_id', 'user_id'], how='left').join(details, on='user_id', how='left')\
.join(v_user, on='user_id', how='left').join(v_item, on='item_id', how='left')\
.select(f.col('user_id'), f.col('item_id'),
        f.col('film_age'), f.col('features'), f.col('hours_spend'),
        f.when(f.col('hours_spend').isNull(), 0.0).otherwise(1.0).alias('hours_inffered'),
        f.col('adj_views'), f.col('v_user'), f.col('v_item'), f.col('purchase')).cache()

In [27]:
w_train = train.join(res_all, on=['item_id', 'user_id'], how='left').join(details, on='user_id', how='left')\
.join(v_user, on='user_id', how='left').join(v_item, on='item_id', how='left')\
.select(f.col('user_id'), f.col('item_id'),
        f.col('film_age'), f.col('features'), f.col('hours_spend'),
        f.when(f.col('hours_spend').isNull(), 0.0).otherwise(1.0).alias('hours_inffered'),      
        f.col('adj_views'), f.col('v_user'), f.col('v_item'), f.col('purchase')).cache()

In [28]:
res_all.unpersist()

DataFrame[item_id: string, user_id: string, features: array<double>, film_age: int]

In [29]:
details.unpersist()

DataFrame[user_id: string, hours_spend: double, adj_views: double]

#### Переносим результат предыдущих этапов в pandas, так как впереди мудрёная запись в формат .libfm (встроенную запись в формат .libsvm использовать не удалось)

In [64]:
test_df = w_test\
.select('user_id', 'item_id', 'film_age', 'hours_spend', 'hours_inffered',
        'adj_views', 'purchase',
        'v_user', 'v_item', 'features')\
.toPandas()

In [65]:
train_df = w_train\
.select('user_id', 'item_id', 'film_age', 'hours_spend', 'hours_inffered',
        'adj_views', 'purchase',
        'v_user', 'v_item', 'features')\
.toPandas()

In [32]:
w_test.unpersist()

DataFrame[user_id: string, item_id: string, film_age: int, features: array<double>, hours_spend: double, hours_inffered: double, adj_views: double, v_user: bigint, v_item: bigint, purchase: double]

In [33]:
w_train.unpersist()

DataFrame[user_id: string, item_id: string, film_age: int, features: array<double>, hours_spend: double, hours_inffered: double, adj_views: double, v_user: bigint, v_item: bigint, purchase: double]

#### Заполняем оставшиеся пробелы 0, средним или медианой, в зависимости от фичи

In [66]:
test_df['purchase'].fillna(0.0, inplace=True)

In [67]:
train_df = train_df.astype({'user_id': int,
                            'item_id':int})

test_df = test_df.astype({'user_id': int,
                          'item_id':int})

train_df.sort_values(by = ['user_id', 'item_id'], ascending=[True, True], inplace=True)
train_df.reset_index(drop=True, inplace=True)
test_df.sort_values(by = ['user_id', 'item_id'], ascending=[True, True], inplace=True)
test_df.reset_index(drop=True, inplace=True)

In [68]:
train_df['film_age'].fillna(train_df['film_age'].min(), inplace=True)
test_df['film_age'].fillna(test_df['film_age'].min(), inplace=True)

In [69]:
train_df['hours_spend'].fillna(train_df['hours_spend'].median(), inplace=True)
test_df['hours_spend'].fillna(test_df['hours_spend'].median(), inplace=True)

In [70]:
train_df['adj_views'].fillna(train_df['adj_views'].median(), inplace=True)
test_df['adj_views'].fillna(test_df['adj_views'].median(), inplace=True)

In [71]:
train_df['v_item'].fillna(0, inplace=True)
test_df['v_item'].fillna(0, inplace=True)

In [72]:
train_df.fillna(train_df.median(), inplace=True)
test_df.fillna(test_df.median(), inplace=True)

In [73]:
b = train_df[train_df['purchase']==1.0][['user_id', 'purchase']].copy().drop_duplicates()

In [74]:
b.reset_index(drop=True, inplace=True)

In [75]:
b.rename(columns={'purchase':'bought_before'}, inplace=True)

In [76]:
train_df = pd.merge(train_df, b, how = 'left', on = 'user_id')
train_df['bought_before'].fillna(0.0, inplace=True)

test_df = pd.merge(test_df, b, how = 'left', on = 'user_id')
test_df['bought_before'].fillna(0.0, inplace=True)

#### Применяю MinMaxScaler на возраст фильма (кастомный был быстрее встроенного в sklearn)

In [77]:
def min_max_scaler(df, column):
    x_min = df[column].min()
    x_max = df[column].max()
    
    return df[column].apply(lambda x: (x-x_min)/(x_max-x_min))

In [78]:
train_df['film_age'] = min_max_scaler(train_df, 'film_age')
test_df['film_age'] = min_max_scaler(test_df, 'film_age')

train_df['v_item'] = min_max_scaler(train_df, 'v_item')
test_df['v_item'] = min_max_scaler(test_df, 'v_item')

In [87]:
full_df = pd.concat([train_df, test_df]) 
full_df.reset_index(drop=True, inplace=True)

#### Далее провожу ручную конвертацию в формат .libfm (чем-то схоже со sparse матрицами)

In [88]:
users = list(set(full_df['user_id']))
users.sort()

user_dic = {}
for idx, usr in enumerate(users):
    user_dic[usr] = idx

full_df['fm_user'] = full_df['user_id'].apply(lambda x: str(user_dic[x])+':1')

In [89]:
point1=len(user_dic)

In [90]:
films = list(set(full_df['item_id']))
films.sort()

film_dic = {}
for idx, flm in enumerate(films):
    film_dic[flm] = idx+point1

full_df['fm_item'] = full_df['item_id'].apply(lambda x: str(film_dic[x])+':1')

In [91]:
point2 = point1+len(film_dic)

In [92]:
full_df['film_age'] = full_df['film_age'].apply(lambda x: str(point2) + ':' + str(float("{0:.4f}".format(x))))
full_df['hours_spend'] = full_df['hours_spend'].apply(lambda x: str(point2+1) + ':' + str(float("{0:.4f}".format(x))))

full_df['hours_inffered'] = full_df['hours_inffered'].apply(lambda x: str(point2+2) + ':' + str(float("{0:.1f}".format(x))))
full_df['adj_views'] = full_df['adj_views'].apply(lambda x: str(point2+3) + ':' + str(float("{0:.4f}".format(x))))

full_df['bought_before'] = full_df['bought_before'].apply(lambda x: str(point2+4) + ':' + str(float("{0:.1f}".format(x))))
full_df['v_item'] = full_df['v_item'].apply(lambda x: str(point2+5) + ':' + str(float("{0:.4f}".format(x))))

In [93]:
full_df = full_df[['purchase', 'fm_user', 'fm_item', 'film_age', 'hours_spend',
                   'hours_inffered', 'adj_views', 'bought_before', 'v_item', 'features']].copy()

#### Проверяю промежуточный результат

In [94]:
full_df.head()

Unnamed: 0,purchase,fm_user,fm_item,film_age,hours_spend,hours_inffered,adj_views,bought_before,v_item,features
0,0.0,0:1,1941:1,5645:0.0495,5646:0.81,5647:1.0,5648:0.0017,5649:1.0,5650:0.6233,"[0.12499998437500197, 0.0, 0.0, 0.0, 0.0, 0.0,..."
1,0.0,0:1,1943:1,5645:0.0495,5646:0.81,5647:1.0,5648:0.0017,5649:1.0,5650:0.5205,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.2499999687500..."
2,0.0,0:1,1944:1,5645:0.099,5646:0.81,5647:1.0,5648:0.0017,5649:1.0,5650:0.6164,"[0.12499998437500197, 0.0, 0.0, 0.0, 0.0, 0.0,..."
3,0.0,0:1,1945:1,5645:0.0693,5646:0.81,5647:1.0,5648:0.0017,5649:1.0,5650:0.7603,"[0.12499998437500197, 0.0, 0.0, 0.0, 0.0, 0.0,..."
4,0.0,0:1,1946:1,5645:0.0891,5646:0.81,5647:1.0,5648:0.0017,5649:1.0,5650:0.6027,"[0.12499998437500197, 0.0, 0.0, 0.0, 0.0, 0.0,..."


In [95]:
num = 6

for el in range(75):
    full_df['f_'+str(el)] = full_df['features'].apply(lambda x: str(point2+num) + ':' + str(float("{0:.4f}".format(x[el]))))
    num+=1
    
full_df.drop('features', axis=1, inplace=True)

In [96]:
fm_train = full_df.iloc[0:train_df.shape[0],:]
fm_test = full_df.iloc[train_df.shape[0]:,:]

fm_test['purchase'].fillna(0, inplace=True)
fm_test['purchase'] = fm_test['purchase'].astype(int)
fm_train['purchase'] = fm_train['purchase'].astype(int)

In [97]:
fm_train.to_csv('train.libfm', header = None, index = False, sep = ' ')
fm_test.to_csv('test.libfm', header = None, index = False, sep = ' ')

In [98]:
!sed -i 's/"//g' train.libfm 
!sed -i 's/"//g' test.libfm 

#### Сверяюсь, что ничего не забыл

In [106]:
train_df.head()

Unnamed: 0,user_id,item_id,film_age,hours_spend,hours_inffered,adj_views,purchase,v_user,v_item,features,bought_before
0,1654,326,0.049505,0.81,1.0,0.0017,0.0,2568,0.623288,"[0.12499998437500197, 0.0, 0.0, 0.0, 0.0, 0.0,...",1.0
1,1654,357,0.049505,0.81,1.0,0.0017,0.0,2568,0.520548,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.2499999687500...",1.0
2,1654,396,0.09901,0.81,1.0,0.0017,0.0,2568,0.616438,"[0.12499998437500197, 0.0, 0.0, 0.0, 0.0, 0.0,...",1.0
3,1654,400,0.069307,0.81,1.0,0.0017,0.0,2568,0.760274,"[0.12499998437500197, 0.0, 0.0, 0.0, 0.0, 0.0,...",1.0
4,1654,423,0.089109,0.81,1.0,0.0017,0.0,2568,0.60274,"[0.12499998437500197, 0.0, 0.0, 0.0, 0.0, 0.0,...",1.0


In [63]:
!head -n1 train.libfm

0 0:1 1941:1 5645:0.0495 5646:0.81 5647:1.0 5648:0.0017 5649:1.0 5650:0.125 5651:0.0 5652:0.0 5653:0.0 5654:0.0 5655:0.0 5656:0.0 5657:0.0 5658:0.0 5659:0.0 5660:0.0 5661:0.0 5662:0.0 5663:0.0 5664:0.0 5665:0.0 5666:0.0 5667:0.0 5668:0.0 5669:0.0 5670:0.0 5671:0.0 5672:0.0 5673:0.0 5674:0.0 5675:0.0 5676:0.0 5677:0.0 5678:0.0 5679:0.0 5680:0.0 5681:0.0 5682:0.0 5683:0.0 5684:0.0 5685:0.0 5686:0.0 5687:0.0 5688:0.0 5689:0.0 5690:0.0 5691:0.0 5692:0.0 5693:0.0 5694:0.0 5695:0.0 5696:0.0 5697:0.0 5698:0.0 5699:0.0 5700:0.0 5701:0.0 5702:0.0 5703:0.0 5704:0.0 5705:0.0 5706:0.0 5707:0.0 5708:0.0 5709:0.0 5710:0.0 5711:0.0 5712:0.0 5713:0.0 5714:0.0 5715:0.0 5716:0.0 5717:0.0 5718:0.0 5719:0.0 5720:0.0 5721:0.0 5722:0.0 5723:0.0 5724:0.0


In [99]:
!head -n1 train.libfm

0 0:1 1941:1 5645:0.0495 5646:0.81 5647:1.0 5648:0.0017 5649:1.0 5650:0.6233 5651:0.125 5652:0.0 5653:0.0 5654:0.0 5655:0.0 5656:0.0 5657:0.0 5658:0.0 5659:0.0 5660:0.0 5661:0.0 5662:0.0 5663:0.0 5664:0.0 5665:0.0 5666:0.0 5667:0.0 5668:0.0 5669:0.0 5670:0.0 5671:0.0 5672:0.0 5673:0.0 5674:0.0 5675:0.0 5676:0.0 5677:0.0 5678:0.0 5679:0.0 5680:0.0 5681:0.0 5682:0.0 5683:0.0 5684:0.0 5685:0.0 5686:0.0 5687:0.0 5688:0.0 5689:0.0 5690:0.0 5691:0.0 5692:0.0 5693:0.0 5694:0.0 5695:0.0 5696:0.0 5697:0.0 5698:0.0 5699:0.0 5700:0.0 5701:0.0 5702:0.0 5703:0.0 5704:0.0 5705:0.0 5706:0.0 5707:0.0 5708:0.0 5709:0.0 5710:0.0 5711:0.0 5712:0.0 5713:0.0 5714:0.0 5715:0.0 5716:0.0 5717:0.0 5718:0.0 5719:0.0 5720:0.0 5721:0.0 5722:0.0 5723:0.0 5724:0.0 5725:0.0


In [107]:
f_df = pd.concat([train_df, test_df]) 
f_df.reset_index(drop=True, inplace=True)

prob = pd.read_csv('prob1.txt', header = None)
out_test = f_df.iloc[train_df.shape[0]:,0:2]
out_test.reset_index(drop=True, inplace=True)
out_test['purchase'] = prob
out_test.to_csv('lab10s(1_1).csv', index=False)

In [100]:
spark.stop()

#### После применяю файл на ALS в libfm 

### Итог: первое место среди все учащихся на соревновании

<img src="image.jpg" width="800" height="400">