# install stuff

In [1]:
from IPython.display import clear_output
! pip3 install pyspark pandas scikit-learn
clear_output()

# setup pyspark

In [2]:
import pyspark.sql
from pyspark.sql import functions as sf
import pyspark

In [3]:
sparkConf = pyspark.SparkConf() \
    .set("spark.executor.memory", "1024m")\
    .set("spark.driver.memory", "1024m")
ss = pyspark.sql.SparkSession.builder.config(conf=sparkConf).master('local[1]').getOrCreate()

# load data

In [4]:
! gdown https://drive.google.com/uc?id=1o_L2a62TCVTgCZfY08N6oMGeyY2tTTol

Downloading...
From: https://drive.google.com/uc?id=1o_L2a62TCVTgCZfY08N6oMGeyY2tTTol
To: /content/hw_data.zip
763MB [00:07, 108MB/s] 


In [5]:
! unzip hw_data.zip
clear_output()

In [6]:
train_data = ss.read.orc('/content/posts_train.orc')
test_data = ss.read.orc('/content/posts_test.orc')
channel_data = ss.read.orc('/content/channels_orc')

# make features

In [7]:
def show(df):
  return df.limit(50).toPandas()

Выделение года в отдельную фичу

In [8]:
@sf.udf
def year(date):
  t = date[:4]
  try: 
    t 
  except:
    t = -1
  return t
  
train_data = train_data.withColumn('year', year('date'))
test_data = test_data.withColumn('year', year('date'))

In [9]:
train_data = train_data.withColumn('year', sf.col('year').cast('int'))
test_data = test_data.withColumn('year', sf.col('year').cast('int'))

Выделение месяца в отдельную фичу

In [10]:
@sf.udf
def month(date):
  t = date[5:7]
  try: 
    t 
  except:
    t = -1
  return t
  
train_data = train_data.withColumn('month', month('date'))
test_data = test_data.withColumn('month', month('date'))

In [11]:
train_data = train_data.withColumn('month', sf.col('month').cast('int'))
test_data = test_data.withColumn('month', sf.col('month').cast('int'))

Выделение дня в отдельную фичу

In [12]:
@sf.udf
def day(date):
  t = date[8:10]
  try: 
    t 
  except:
    t = -1
  return t
  
train_data = train_data.withColumn('day', day('date'))
test_data = test_data.withColumn('day', day('date'))

In [13]:
train_data = train_data.withColumn('day', sf.col('day').cast('int'))
test_data = test_data.withColumn('day', sf.col('day').cast('int'))

Выделение часа в отдельную фичу

In [14]:
@sf.udf
def hour(date):
  t = date[11:13]
  try: 
    t 
  except:
    t = -1
  return t
  
train_data = train_data.withColumn('hour', hour('date'))
test_data = test_data.withColumn('hour', hour('date'))

In [15]:
train_data = train_data.withColumn('hour', sf.col('hour').cast('int'))
test_data = test_data.withColumn('hour', sf.col('hour').cast('int'))

Выделение минут в отдельную фичу

In [16]:
@sf.udf
def minute(date):
  t = date[14:16]
  try: 
    t 
  except:
    t = -1
  return t
  
train_data = train_data.withColumn('minute', minute('date'))
test_data = test_data.withColumn('minute', minute('date'))

In [17]:
train_data = train_data.withColumn('minute', sf.col('minute').cast('int'))
test_data = test_data.withColumn('minute', sf.col('minute').cast('int'))

Фича date больше не нужна

In [18]:
train_data = train_data.drop('date')
test_data = test_data.drop('date')

## 1. Фичи, использующие текстовые поля

- Определение количества слов в тексте

In [19]:
train_data = train_data.withColumn('word_count', sf.size(sf.split(sf.col('text'), ' ')))
test_data = test_data.withColumn('word_count', sf.size(sf.split(sf.col('text'), ' ')))

- определение наличия хэшэй в тексте. Популярные темы и явления отмечаются хэшами, по которым удобно переходить, следовательно, такие посты будут иметь больше просмотров. 

In [20]:
train_data = train_data.withColumn('hash_count', sf.col('text').like('%' + '#' + '%'))
test_data = test_data.withColumn('hash_count', sf.col('text').like('%' + '#' + '%'))

In [21]:
train_data = train_data.drop('text')
test_data = test_data.drop('text')
train_data = train_data.drop('forwarded_id')
test_data = test_data.drop('forwarded_id')

## 2. Фичи, использующие метаданные каналов

Из channel_data я решила отобрать post_count и user_count.

In [22]:
def make_features(df):
    cols = ['channel_id']
    if 'post_count' in df.columns:
        cols.append('post_count')
    if 'user_count' in df.columns:
        cols.append('user_count')
    return df.select(*cols)

In [23]:
channel_count = make_features(channel_data).cache()

In [24]:
train_data = train_data.join(channel_count, on='channel_id', how='left')
test_data = test_data.join(channel_count, on='channel_id', how='left')

In [25]:
train_data = train_data.withColumn('post_count', sf.col('post_count').cast('int'))
test_data = test_data.withColumn('post_count', sf.col('post_count').cast('int'))
train_data = train_data.withColumn('user_count', sf.col('user_count').cast('int'))
test_data = test_data.withColumn('user_count', sf.col('user_count').cast('int'))

## 3. Фичи, использующие window aggregation

In [26]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, avg, sum

- Посчитаем среднее количество слов в постах каждого канала.

In [27]:
window_chan  = Window.partitionBy("channel_id")

In [28]:
train_data = train_data.withColumn("avg", avg(col("word_count")).over(window_chan))
test_data = test_data.withColumn("avg", avg(col("word_count")).over(window_chan))

- Посчитаем количество постов в течение года

In [29]:
window_year  = Window.partitionBy("channel_id") \
          .orderBy('year') \
          .rangeBetween(-1, 0)

In [30]:
train_data = train_data.withColumn('post_count_year', sf.count('id').over(window_year)) \
  .filter(sf.col('year') != -1).orderBy('year')
test_data = test_data.withColumn('post_count_year', sf.count('id').over(window_year)) \
  .filter(sf.col('year') != -1).orderBy('year')

In [31]:
train_data.cache()
test_data.cache()

DataFrame[channel_id: int, id: int, tg_id: int, has_image: string, is_forwarded: string, year: int, month: int, day: int, hour: int, minute: int, word_count: int, hash_count: boolean, post_count: int, user_count: int, avg: double, post_count_year: bigint]

In [32]:
train_data.write.csv('train', mode='overwrite', header=True)
test_data.write.csv('test', mode='overwrite', header=True)

# load features to pandas
you also can use .toPandas()

In [33]:
import subprocess
import glob
import os
import shutil
import pandas as pd

def load_and_merge_csv(path, **kwargs):
    dfs = []
    for g in glob.glob(os.path.join(path, '*.csv')):
        dfs.append(pd.read_csv(g, **kwargs))
    res = pd.concat(dfs)
    res = res.set_index('id')
    return res

In [34]:
trainXY = load_and_merge_csv('train')
testX = load_and_merge_csv('test')

In [35]:
import numpy as np

In [36]:
X_train = trainXY.loc[:, trainXY.columns != 'views']
y_train = trainXY.loc[:, trainXY.columns == 'views']

In [37]:
y_train = np.log(y_train + 100)

In [38]:
X_train.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
Int64Index: 5460759 entries, 39956839 to 28680033
Data columns (total 15 columns):
 #   Column           Dtype  
---  ------           -----  
 0   channel_id       int64  
 1   tg_id            int64  
 2   has_image        object 
 3   is_forwarded     object 
 4   year             int64  
 5   month            int64  
 6   day              int64  
 7   hour             int64  
 8   minute           int64  
 9   word_count       int64  
 10  hash_count       object 
 11  post_count       int64  
 12  user_count       int64  
 13  avg              float64
 14  post_count_year  int64  
dtypes: float64(1), int64(11), object(3)
memory usage: 666.6+ MB


In [39]:
X_train.isnull().sum(axis = 0)

channel_id               0
tg_id                    0
has_image                0
is_forwarded             0
year                     0
month                    0
day                      0
hour                     0
minute                   0
word_count               0
hash_count         1490599
post_count               0
user_count               0
avg                      0
post_count_year          0
dtype: int64

In [40]:
testX.isnull().sum(axis = 0)

channel_id             0
tg_id                  0
has_image              0
is_forwarded           0
year                   0
month                  0
day                    0
hour                   0
minute                 0
word_count             0
hash_count         50578
post_count             0
user_count             0
avg                    0
post_count_year        0
dtype: int64

Заполнение ячеек, содержащих nan

In [46]:
X_train['hash_count'].fillna('unknown', inplace = True)
testX['hash_count'].fillna('unknown', inplace = True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  downcast=downcast,


# train your model and predict test

In [41]:
! pip install catboost
clear_output()

In [42]:
from catboost import CatBoostRegressor

In [43]:
cat_features = ['has_image', 'is_forwarded', 'hash_count']

In [44]:
model = CatBoostRegressor(iterations = 1200,
    random_seed=42,
    verbose = 100)

In [48]:
model.fit(
    X_train, y_train,
    cat_features=cat_features,
    logging_level='Verbose',  
    plot=True);

MetricVisualizer(layout=Layout(align_self='stretch', height='500px'))

Learning rate set to 0.157201
0:	learn: 1.4129604	total: 3.3s	remaining: 1h 6m 2s
100:	learn: 0.9962308	total: 3m 43s	remaining: 40m 31s
200:	learn: 0.9278870	total: 7m 27s	remaining: 37m 5s
300:	learn: 0.8850545	total: 11m 15s	remaining: 33m 36s
400:	learn: 0.8543944	total: 15m 2s	remaining: 29m 58s
500:	learn: 0.8306925	total: 18m 51s	remaining: 26m 19s
600:	learn: 0.8119592	total: 22m 35s	remaining: 22m 31s
700:	learn: 0.7956399	total: 26m 28s	remaining: 18m 51s
800:	learn: 0.7824234	total: 30m 24s	remaining: 15m 8s
900:	learn: 0.7704689	total: 34m 16s	remaining: 11m 22s
1000:	learn: 0.7607227	total: 38m 6s	remaining: 7m 34s
1100:	learn: 0.7518736	total: 42m	remaining: 3m 46s
1199:	learn: 0.7442354	total: 45m 50s	remaining: 0us


In [51]:
preds_gb = model.predict(testX)

In [52]:
preds_gb

array([8.89232394, 8.89232394, 8.89232394, ..., 8.66932226, 7.71420155,
       7.72195262])

In [53]:
assert preds_gb.shape == (244386,)

# submit

In [54]:
! curl -H 'Cache-Control: no-cache' https://raw.githubusercontent.com/mike0sv/lsml_submit_server/2021/src/client.py -o client.py

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1906  100  1906    0     0   9724      0 --:--:-- --:--:-- --:--:--  9724


In [55]:
import client

In [56]:
client.make_eval(pd.DataFrame({'views': preds_gb}, index=testX.index),  final=True)

Enter username:
user15
Enter password:
··········


{'data': {'mape': 13.005709215101932,
  'mean_absolute_error': 0.978996519831543,
  'mean_squared_error': 1.7351197777819296,
  'rmse': 1.3172394534715128,
  'rmspe': 20.03280290995948},
 'ok': True}

In [57]:
client.check_results()

{'2021-06-12': [{'baseline_beaten': False,
   'is_final': False,
   'metrics': {'mape': 451555.5305036703,
    'mean_absolute_error': 39931.497573789195,
    'mean_squared_error': 18144074756.68306,
    'rmse': 134699.94341751988,
    'rmspe': 1432182.57162512}},
  {'baseline_beaten': True,
   'is_final': False,
   'metrics': {'mape': 11.406853562182242,
    'mean_absolute_error': 0.8446788675221267,
    'mean_squared_error': 1.4115751139845738,
    'rmse': 1.1880972662137448,
    'rmspe': 18.925928461450223}}],
 '2021-06-13': [{'baseline_beaten': True,
   'is_final': True,
   'metrics': {'mape': 13.005709215101932,
    'mean_absolute_error': 0.978996519831543,
    'mean_squared_error': 1.7351197777819296,
    'rmse': 1.3172394534715128,
    'rmspe': 20.03280290995948}}]}