# install stuff

In [29]:
# ! pip3 install pyspark pandas scikit-learn polyglot pyicu pycld2 morfessor optuna catboost

# setup pyspark

In [1]:
import pyspark.sql
from pyspark.sql import functions as sf
import pyspark
from polyglot.detect import Detector
from pyspark.sql.window import Window
import numpy as np

In [3]:
sparkConf = pyspark.SparkConf() \
    .set("spark.executor.memory", "16384m")\
    .set("spark.driver.memory", "16384m")
ss = pyspark.sql.SparkSession.builder.config(conf=sparkConf).master('local[1]').getOrCreate()

Тут необходимо воспользоваться google.drive

In [4]:
# from google.colab import drive
# drive.mount('/content/drive')

In [5]:
# import zipfile
# with zipfile.ZipFile('./drive/MyDrive/HW2/hw_data.zip', 'r') as zip_ref:
#     zip_ref.extractall('./drive/MyDrive/HW2/')

# load data

In [6]:
train_data = ss.read.orc('./drive/MyDrive/HW2/posts_train.orc')
test_data = ss.read.orc('./drive/MyDrive/HW2/posts_test.orc')
channel_data = ss.read.orc('./drive/MyDrive/HW2/channels_orc')

Переименование полей и визуализация

In [7]:
channel_data = channel_data.withColumnRenamed("tg_id", "tg_id_channel")

In [19]:
def show(df):
  return df.limit(5).toPandas()

Соединим трейн и тест с данными о каналах

In [20]:
train_all = train_data.join(channel_data, on=['channel_id'], how='left')
test_all = test_data.join(channel_data, on=['channel_id'], how='left')

Приведем дату к нормальному виду

In [21]:
train_all = train_all.withColumn('date_time', sf.col('date').cast('timestamp'))
train_all = train_all.withColumn('date', sf.col('date').cast('date'))
train_all = train_all.withColumn('last_parsed', sf.col('last_parsed').cast('date'))

test_all = test_all.withColumn('date_time', sf.col('date').cast('timestamp'))
test_all = test_all.withColumn('date', sf.col('date').cast('date'))
test_all = test_all.withColumn('last_parsed', sf.col('last_parsed').cast('date'))

Посчитаем количество дней между сбором информации и сообщением

In [22]:
@sf.udf
def days_until_parse(date1, date2):
  return (date2 - date1).days

In [23]:
train_all = train_all.withColumn('days_until_parse', days_until_parse('date', 'last_parsed').cast('int'))

test_all = test_all.withColumn('days_until_parse', days_until_parse('date', 'last_parsed').cast('int'))

Поработаем с текстовыми признаками:
1. Сформируем признаки - язык: сообщения, описания канала, наименования канала

In [24]:
@sf.udf
def text_lang(text):
  try:
    lang = Detector(text).language.code
  except:
    lang = -1
  return lang

In [25]:
train_all = train_all.withColumn('text_lang', text_lang('text'))
train_all = train_all.withColumn('description_lang', text_lang('description'))
train_all = train_all.withColumn('titte_lang', text_lang('title'))

test_all = test_all.withColumn('text_lang', text_lang('text'))
test_all = test_all.withColumn('description_lang', text_lang('description'))
test_all = test_all.withColumn('titte_lang', text_lang('title'))

2. Посчитаем длину сообщения

In [26]:
@sf.udf
def text_len(text):
  try:
    long = len(text)
  except:
    long = 0
  return long

In [27]:
train_all = train_all.withColumn('text_len', text_len('text').cast('int'))

test_all = test_all.withColumn('text_len', text_len('text').cast('int'))

Удалим ненужные признаки (которые далее не будем использовать)

In [28]:
columns_to_drop = ['text', 'forwarded_id', 'description', 'last_parsed', 'name', 'title', 'updated']

train_all = train_all.drop(*columns_to_drop)

test_all = test_all.drop(*columns_to_drop)

In [29]:
# show(train_all)

#### Window aggregation

1. Номер поста за последние 7 дней в канале (активность в канале)

In [30]:
ws = Window.partitionBy('channel_id') \
          .orderBy(sf.col("date_time").cast("long")) \
          .rangeBetween(-604800, 0)

post_number_week_train = train_all.select('id', 'channel_id', 'date_time')\
  .withColumn('post_number_week', sf.count('id').over(ws)) \
  .orderBy('channel_id')

columns_to_drop = ['date_time', 'channel_id']
post_number_week_train = post_number_week_train.drop(*columns_to_drop)

train_all = train_all.join(post_number_week_train, on=['id'], how='left')


post_number_week_test = test_all.select('id', 'channel_id', 'date_time')\
  .withColumn('post_number_week', sf.count('id').over(ws)) \
  .orderBy('channel_id')

post_number_week_test = post_number_week_test.drop(*columns_to_drop)

test_all = test_all.join(post_number_week_test, on=['id'], how='left')

In [31]:
# show(train_all)

2. Максимальная длина поста в канале (с картинкой и без) за неделю на текущий момент 

In [32]:
ws = Window.partitionBy('channel_id', 'has_image') \
          .orderBy(sf.col("date_time").cast("long")) \
          .rangeBetween(-604800, 0)

post_len_week_max_train = train_all.select('id', 'channel_id', 'has_image', 'date_time', 'text_len')\
  .withColumn('post_len_week_max', sf.max('text_len').over(ws)) \
  .orderBy('channel_id')

columns_to_drop = ['channel_id', 'has_image', 'date_time', 'text_len']

post_len_week_max_train = post_len_week_max_train.drop(*columns_to_drop)
train_all = train_all.join(post_len_week_max_train, on=['id'], how='left')

post_len_week_max_test = test_all.select('id', 'channel_id', 'has_image', 'date_time', 'text_len')\
  .withColumn('post_len_week_max', sf.max('text_len').over(ws)) \
  .orderBy('channel_id')

post_len_week_max_test = post_len_week_max_test.drop(*columns_to_drop)
test_all = test_all.join(post_len_week_max_test, on=['id'], how='left')

Нагенерим обычные признаки:
1. Среднее и стандартное  отклонение количества участников от приватности канала
3. Среднее и стандартное  отклонение количества постов от приватности канала
4. Среднее и стандартное  отклонение постов и участников от языка описания канала


In [33]:
channel_data = channel_data.withColumn('description_lang', text_lang('description'))

In [34]:
private_user_count = channel_data.groupby('is_private').agg(sf.mean('user_count').alias('mean_user_count'), 
                                              sf.stddev('user_count').alias('stddev_user_count'))

private_post = channel_data.groupby('is_private').agg(sf.mean('post_count').alias('mean_post'), 
                                              sf.stddev('post_count').alias('stddev_samp_post'))

lang_discr = channel_data.groupby('description_lang').agg(sf.mean('post_count').alias('lang_mean_post'), 
                                              sf.stddev('post_count').alias('lang_stddev_samp_post'), 
                                              sf.mean('user_count').alias('lang_mean_user_count'), 
                                              sf.stddev('user_count').alias('lang_stddev_user_count'))

In [35]:
train_all = train_all.join(private_user_count, on=['is_private'], how='left')
train_all = train_all.join(private_post, on=['is_private'], how='left')
train_all = train_all.join(lang_discr, on=['description_lang'], how='left')

test_all = test_all.join(private_user_count, on=['is_private'], how='left')
test_all = test_all.join(private_post, on=['is_private'], how='left')
test_all = test_all.join(lang_discr, on=['description_lang'], how='left')

4. Среднее и стандартное отклонение длины сообщения от языка сообщения

In [36]:
text_lang_train = train_all.groupby('text_lang').agg(sf.mean('text_len').alias('mean_text_lang'), 
                                              sf.stddev('text_len').alias('stddev_samp_text_lang'))

text_lang_test = test_all.groupby('text_lang').agg(sf.mean('text_len').alias('mean_text_lang'), 
                                              sf.stddev('text_len').alias('stddev_samp_text_lang'))

In [37]:
train_all = train_all.join(text_lang_train, on=['text_lang'], how='left')

test_all = test_all.join(text_lang_test, on=['text_lang'], how='left')

# make features

In [38]:
def make_features(df):
    cols = df.columns
    return df.select(*cols)

In [39]:
train_features = make_features(train_all).cache()
test_features = make_features(test_all).cache()

In [40]:
train_features.write.csv('train_csv', mode='overwrite', header=True)
test_features.write.csv('test_csv', mode='overwrite', header=True)

# load features to pandas
you also can use .toPandas()

In [1]:
import subprocess
import glob
import os
import shutil
import pandas as pd
import numpy as np

def load_and_merge_csv(path, **kwargs):
    dfs = []
    for g in glob.glob(os.path.join(path, '*.csv')):
        dfs.append(pd.read_csv(g, **kwargs))
    res = pd.concat(dfs)
    res = res.set_index('id')
    return res

In [2]:
# trainXY = load_and_merge_csv('train_csv')
# testX = load_and_merge_csv('test_csv')

Сохраним данные, они очень долго грузились (на всякий случай)

In [3]:
# trainXY.reset_index().to_csv('./drive/MyDrive/HW2/train_data.csv', index=False)
# testX.reset_index().to_csv('./drive/MyDrive/HW2/test_data.csv', index=False)

Загрузим данные, если они были сохранены

In [4]:
trainXY = pd.read_csv('train_data.csv')
testX = pd.read_csv('test_data.csv')

trainXY = trainXY.set_index('id')
testX = testX.set_index('id')

  has_raised = await self.run_ast_nodes(code_ast.body, cell_name,


Преобразуем данные к нормальному виду, если они были сохранены неправильном формате.

In [5]:
def change_data_type(df):
    df.text_lang = df.text_lang.astype(str)
    df.description_lang = df.description_lang.astype(str)
    df.titte_lang = df.titte_lang.astype(str)
    df.tg_id = df.tg_id.astype(int) 
    if 'views' in df.columns:
        df.views = df.views.astype(int)
    df.date = df.date.astype('datetime64')
    df.post_count = df.post_count.astype(int)
    df.tg_id_channel = df.tg_id_channel.astype(np.int64)
    df.user_count = df.user_count.astype(int)
    df.days_until_parse = df.days_until_parse.astype(int)
    df.text_len = df.text_len.astype(int)             
    df.post_number_week = df.post_number_week.astype(int)     
    df.post_len_week_max = df.post_len_week_max.astype(int)
    return df   

trainXY = change_data_type(trainXY)     
testX = change_data_type(testX)               

In [6]:
Ycol = 'views'
to_drop = ['channel_id', 'date_time']
trainX, trainY = trainXY.drop(Ycol, axis=1).drop(to_drop, axis=1), trainXY[Ycol]
trainY = np.log(trainY + 100)

testX = testX.drop(to_drop, axis=1)

In [7]:
del trainXY

## LightGBM

In [8]:
from sklearn.model_selection import train_test_split
trainX, a, trainY, b = train_test_split(trainX, trainY, train_size=0.05, random_state=42)
del a, b

Тренеровать будем бустинг

In [9]:
import lightgbm as lgb
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error as MAE
from sklearn.metrics import mean_squared_error as MSE
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import LabelEncoder

import optuna
from optuna.samplers import TPESampler

Создадим все метрики качества, которые используются в бейзлайне

In [10]:
def MAPE(y_true, y_pred):
    epsilon = np.finfo(np.float64).eps
    mape = np.abs(y_pred - y_true) / np.maximum(np.abs(y_true), epsilon)
    return np.mean(mape.values)*100

def RMSE(y_true, y_pred):
    return np.sqrt(MSE(y_true, y_pred))

def RMSPE(y_true, y_pred):
    epsilon = np.finfo(np.float64).eps
    rmspe = (np.sqrt(np.mean(np.square((y_true - y_pred) / (y_true + epsilon))))) * 100
    return rmspe

class MultiColumnLabelEncoder:
    def __init__(self,columns = None):
        self.columns = columns 
        
    def fit(self,X,y=None):
        return self 

    def transform(self,X):
        '''
        Transforms columns of X specified in self.columns using
        LabelEncoder(). If no columns specified, transforms all
        columns in X.
        '''
        output = X.copy()
        if self.columns is not None:
            for col in self.columns:
                output[col] = LabelEncoder().fit_transform(output[col])
        else:
            for colname,col in output.iteritems():
                output[colname] = LabelEncoder().fit_transform(col)
        return output

    def fit_transform(self,X,y=None):
        return self.fit(X,y).transform(X)

Объединим трейн, тест и сделаем лейбл енкодинг

In [11]:
trainX['train'] = 1
testX['train'] = 0
X = pd.concat((trainX, testX), axis=0, sort=False)

In [12]:
cat = ['text_lang', 'description_lang', 'is_private', 'tg_id', 'has_image', 
       'is_forwarded', 'tg_id_channel', 'titte_lang']
X = MultiColumnLabelEncoder(columns = cat).fit_transform(X)

Разъединим тест и трейн

In [13]:
trainX = X[X.train == 1].drop('train', axis=1)
testX = X[X.train == 0].drop(['train', 'date'], axis=1)

In [14]:
trainX['target'] = trainY.values
trainX = trainX.sort_values(by='date').reset_index().drop('id', axis=1) 

y = trainX['target']
trainX.drop(['date', 'target'], axis=1, inplace=True)

In [15]:
del X

Поскольку тестовая часть приходит из будущего, то будем  обучать модель с учетом этого фактора.

In [16]:
train_inx = range(int(y.shape[0]*0.7)) 
val_inx = range(int(y.shape[0]*0.7)+1, int(y.shape[0]*0.85))
test_inx = range(int(y.shape[0]*0.85)+1, y.shape[0])

In [17]:
X_train, X_val, X_test = trainX.loc[train_inx], trainX.loc[val_inx], trainX.loc[test_inx]
X_global_train = pd.concat([X_train, X_val], ignore_index=True)
y_train, y_val, y_test = y.loc[train_inx], y.loc[val_inx], y.loc[test_inx]
y_global_train = pd.concat([y_train, y_val], ignore_index=True)

Обучим модель

In [18]:
dtrain = lgb.Dataset(X_train, label=y_train, categorical_feature=cat, free_raw_data=False)
dtrain_global = lgb.Dataset(X_global_train, label=y_global_train, categorical_feature=cat, free_raw_data=False)
dX = lgb.Dataset(trainX, label=trainY, categorical_feature=cat, free_raw_data=False)

In [None]:
sampler=TPESampler(seed=10)

def objective_lgb(trial):    
    param = {'num_leaves': trial.suggest_int('num_leaves', 10, 500),
          'min_child_samples': trial.suggest_int('min_child_samples', 10, 1000),
          'objective': 'rmse',
          'max_depth': trial.suggest_int('max_depth', 50, 300),
          'learning_rate': trial.suggest_uniform('learning_rate', 0.01, 0.4),
          "metric": 'rmse',
          'reg_alpha': trial.suggest_uniform('reg_alpha', 0.01, 0.7),
          'reg_lambda': trial.suggest_uniform('reg_lambda', 0.01, 0.7),
          'num_boost_round': 1000}
    
    gbm = lgb.train(param, dtrain, verbose_eval=False)
    preds = gbm.predict(X_val)
    RMSPE_score = RMSPE(y_test, preds)
    return RMSPE_score

study_lgb = optuna.create_study(direction='minimize', sampler=sampler)
study_lgb.optimize(objective_lgb, n_trials=50, timeout=None, n_jobs=-1)

[32m[I 2021-06-06 15:17:43,287][0m A new study created in memory with name: no-name-d9beab65-cd6b-4514-abfd-3432d6cd9919[0m
[32m[I 2021-06-06 15:17:55,466][0m Trial 7 finished with value: 19.8218737396597 and parameters: {'num_leaves': 13, 'min_child_samples': 210, 'max_depth': 285, 'learning_rate': 0.10691841874191638, 'reg_alpha': 0.5450430095327169, 'reg_lambda': 0.021132400174327638}. Best is trial 7 with value: 19.8218737396597.[0m
[32m[I 2021-06-06 15:17:56,263][0m Trial 5 finished with value: 100.0 and parameters: {'num_leaves': 413, 'min_child_samples': 705, 'max_depth': 206, 'learning_rate': 0.30619109059796407, 'reg_alpha': 0.23574790250260907, 'reg_lambda': 0.5330825378668302}. Best is trial 7 with value: 19.8218737396597.[0m
[32m[I 2021-06-06 15:19:58,399][0m Trial 1 finished with value: 24.80969767693165 and parameters: {'num_leaves': 38, 'min_child_samples': 467, 'max_depth': 131, 'learning_rate': 0.10651488971060097, 'reg_alpha': 0.5559348210371101, 'reg_lambd

In [26]:
param = {'objective': 'mape', 'seed': 77}
gbm = lgb.train(param, dtrain_global, num_boost_round=500)
preds = gbm.predict(X_test)
RMSPE_score = RMSPE(y_test, preds)
MAPE_score = MAPE(y_test, preds)
MAE_score = MAE(y_test, preds)
RMSE_score = RMSE(y_test, preds)
MSE_score = MSE(y_test, preds)



In [27]:
results = pd.DataFrame({'Baseline': [23.50065988751091, 
                                     15.707128974856676,
                                     1.219070382113261,
                                     1.5596274837655963,
                                     2.4324378881170055],
    'LightGBM': [RMSPE_score, MAPE_score, MAE_score, RMSE_score, MSE_score]})
results.index = ['RMSPE', 'MAPE', 'MAE', 'RMSE', 'MSE']
results

Unnamed: 0,Baseline,LightGBM
RMSPE,23.50066,16.308871
MAPE,15.707129,9.19084
MAE,1.21907,0.691817
RMSE,1.559627,1.113427
MSE,2.432438,1.239719


Поскольку модель со стандартными гиперпарметрами уже показывает результат лучше бейзлайно, то перебирать гиперпараметры не будем. Просто обучим модель на всех данных и сделаем сабмит

Сформируем предсказания

In [18]:
param = {'objective': 'rmse', 'seed': 77}
gbm = lgb.train(param, dX, num_boost_round=500)
preds_test = np.round(gbm.predict(testX), 5)



In [19]:
assert preds_test.shape == (244386,)

In [20]:
# testX = testX.reset_index().rename(columns={'index':'id'})
# testX = testX.set_index('id')
# testX

# submit

In [21]:
! curl -H 'Cache-Control: no-cache' https://raw.githubusercontent.com/mike0sv/lsml_submit_server/2021/src/client.py -o client.py

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1906  100  1906    0     0   8251      0 --:--:-- --:--:-- --:--:--  8251


In [22]:
import client

In [23]:
client.make_eval(pd.DataFrame({'views': preds_test}, index=testX.index))

Enter username:
user25
Enter password:
··········


{'data': {'mape': 15.155165215400004,
  'mean_absolute_error': 1.133835040144799,
  'mean_squared_error': 2.392502846343855,
  'rmse': 1.5467717499178264,
  'rmspe': 24.51301322038978},
 'ok': True}

In [None]:
client.check_results()

{}