# Установка пакетов

In [1]:
! pip3 install pyspark pandas scikit-learn catboost

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


# Настройки pyspark'а

In [2]:
import pyspark.sql
from pyspark.sql import functions as sf
import pyspark

In [3]:
%load_ext autoreload
%autoreload 2

In [4]:
sparkConf = pyspark.SparkConf() \
    .set("spark.executor.memory", "16g")\
    .set("spark.driver.memory", "32g")\
    .set("spark.driver.memoryOverhead", '2g')\
    .set("spark.executor.memoryOverhead", '2g')
ss = pyspark.sql.SparkSession.builder.config(conf=sparkConf).master('local[1]').getOrCreate()

# Загрузка данных

Для получения данных нужно загрузить ноутбук в колаб, создать на Google Drive папку BigData и загрузить в нее датасеты. Для запуска в юпитере закомментируйте следующие две ячейки.

In [5]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [6]:
!ls '/content/gdrive/My Drive/BigData'

channels_orc  posts_test.orc  posts_train.orc


In [7]:
train_data = ss.read.orc('/content/gdrive/My Drive/BigData/posts_train.orc')
test_data = ss.read.orc('/content/gdrive/My Drive/BigData/posts_test.orc')
channel_data = ss.read.orc('/content/gdrive/My Drive/BigData/channels_orc')

In [8]:
# проверка колонок и типов данных train
train_data

DataFrame[channel_id: int, id: int, tg_id: int, text: string, views: int, has_image: string, is_forwarded: string, date: string, forwarded_id: string]

In [9]:
# проверка колонок и типов данных test
test_data

DataFrame[channel_id: int, id: int, tg_id: int, text: string, has_image: string, is_forwarded: string, date: string, forwarded_id: string]

In [10]:
# проверка колонок и типов данных в таблице с метаданными
channel_data

DataFrame[description: string, is_private: boolean, last_parsed: timestamp, name: string, post_count: double, tg_id: bigint, title: string, updated: timestamp, user_count: double, channel_id: bigint]

In [11]:
train_data_joined = train_data.join(channel_data, on='channel_id', how='left')
test_data_joined = test_data.join(channel_data, on='channel_id', how='left')

# Создание признаков

In [12]:
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import IntegerType, BooleanType, TimestampType

Из исходного датасета можно взять признаки:<br>

* `has_image` (данные из train/test)
*`is_forwarded` (данные из train/test)
*`is_private` (метаданные по каналу)
*`post_count` (метаданные по каналу)
*`user_count` (метаданные данные по каналу)
* `views` (таргет)

## Преобразование строкового типа данных в булевый тип

In [13]:
train_data_joined = train_data_joined.withColumn('is_forwarded', regexp_replace('is_forwarded', 'f', '0'))\
                                     .withColumn('is_forwarded', regexp_replace('is_forwarded', 't', '1'))\
                                     .withColumn('has_image', regexp_replace('has_image', 'f', '0'))\
                                     .withColumn('has_image', regexp_replace('has_image', 't', '1'))
train_data_joined = train_data_joined.withColumn("is_forwarded", train_data_joined.is_forwarded.cast(BooleanType()))\
                                     .withColumn("has_image", train_data_joined.has_image.cast(BooleanType()))

test_data_joined = test_data_joined.withColumn('is_forwarded', regexp_replace('is_forwarded', 'f', '0'))\
                                   .withColumn('is_forwarded', regexp_replace('is_forwarded', 't', '1'))\
                                   .withColumn('has_image', regexp_replace('has_image', 'f', '0'))\
                                   .withColumn('has_image', regexp_replace('has_image', 't', '1'))
test_data_joined = test_data_joined.withColumn("is_forwarded", test_data_joined.is_forwarded.cast(BooleanType()))\
                                   .withColumn("has_image", test_data_joined.has_image.cast(BooleanType()))

## Преобразование строкового типа в дату

In [14]:
train_data_joined = train_data_joined.withColumn("date", train_data_joined.date.cast(TimestampType()))
test_data_joined = test_data_joined.withColumn("date", test_data_joined.date.cast(TimestampType()))

In [15]:
# проверка внесенных изменений
test_data_joined

DataFrame[channel_id: int, id: int, tg_id: int, text: string, has_image: boolean, is_forwarded: boolean, date: timestamp, forwarded_id: string, description: string, is_private: boolean, last_parsed: timestamp, name: string, post_count: double, tg_id: bigint, title: string, updated: timestamp, user_count: double]

In [16]:
train_data_joined

DataFrame[channel_id: int, id: int, tg_id: int, text: string, views: int, has_image: boolean, is_forwarded: boolean, date: timestamp, forwarded_id: string, description: string, is_private: boolean, last_parsed: timestamp, name: string, post_count: double, tg_id: bigint, title: string, updated: timestamp, user_count: double]

## Создание признака "Количество символов в текстовом поле поста" (используется текстовое поле).

Обоснование: возможно, пользователь не захочет прочитать пост с большим количеством символов второй раз, просмотров будет меньше. 

In [17]:
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct, PandasUDFType
import pandas as pd

In [18]:
# функция для определения количества символов в текстовом поле
@pandas_udf(IntegerType())
def string_length(s: pd.Series) -> pd.Series:
    return s.str.len()

In [19]:
# количество символов в поле 'text'
train_data_joined = train_data_joined.withColumn('text_length', string_length(train_data_joined.text))
train_data_joined = train_data_joined.fillna(0, subset='text_length')
test_data_joined = test_data_joined.withColumn('text_length', string_length(test_data_joined.text))
test_data_joined = test_data_joined.fillna(0, subset='text_length')

In [20]:
# демонстрация полученных результатов
test_data_joined.select(['text', 'text_length']).show(5, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## Создание признака "Наличие ссылки в текстовом поле поста" (используется текстовое поле).

Обоснование: возможно, при наличии ссылки в текстовом поле, пользователь захочет несколько раз по ней перейти в разные промежутки времени, количество просмотров возрастет. 

In [21]:
from pyspark.sql.functions import udf

In [22]:
@udf
def link_in_text(t):
    if type(t) is not type(None):
        return 'http' in t
    else:
        return False

In [23]:
# наличие ссылки в поле "text"
train_data_joined = train_data_joined.withColumn('link_in_text', link_in_text(train_data_joined.text))
test_data_joined = test_data_joined.withColumn('link_in_text', link_in_text(test_data_joined.text))

In [24]:
# демонстрация полученных результатов
test_data_joined.select(['text', 'link_in_text']).show(5, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## Создание признака "Количество часов, прошедшее с публикации предыдущего поста" (используется оконная функция).

Обоснование: возможно, от частоты публикования постов зависит количество просмотров.

In [25]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag
import pyspark.sql.functions as F

In [26]:
windowSpec  = Window.partitionBy("channel_id").orderBy("date")

In [27]:
train_data_joined = train_data_joined.withColumn("date_of_previous_post", lag("date").over(windowSpec))
train_data_joined = train_data_joined.withColumn(
      'hours_after_previous_post', (F.unix_timestamp(train_data_joined.date) - F.unix_timestamp(train_data_joined.date_of_previous_post)) / 3600
    )
train_data_joined = train_data_joined.fillna(0, subset='hours_after_previous_post')

test_data_joined = test_data_joined.withColumn("date_of_previous_post", lag("date").over(windowSpec))
test_data_joined = test_data_joined.withColumn(
      'hours_after_previous_post', (F.unix_timestamp(test_data_joined.date) - F.unix_timestamp(test_data_joined.date_of_previous_post)) / 3600
    )
test_data_joined = test_data_joined.fillna(0, subset='hours_after_previous_post')

In [28]:
test_data_joined.select(['channel_id', 'date', 'date_of_previous_post', 'hours_after_previous_post']).show()

+----------+-------------------+---------------------+-------------------------+
|channel_id|               date|date_of_previous_post|hours_after_previous_post|
+----------+-------------------+---------------------+-------------------------+
|         2|2019-01-04 10:25:58|                 null|                      0.0|
|         2|2019-01-04 10:33:44|  2019-01-04 10:25:58|      0.12944444444444445|
|         2|2019-01-05 11:56:49|  2019-01-04 10:33:44|       25.384722222222223|
|         2|2019-01-08 11:24:49|  2019-01-05 11:56:49|        71.46666666666667|
|         2|2019-01-11 18:02:36|  2019-01-08 11:24:49|        78.62972222222223|
|         2|2019-01-12 06:57:37|  2019-01-11 18:02:36|       12.916944444444445|
|         2|2019-01-16 07:38:04|  2019-01-12 06:57:37|        96.67416666666666|
|         2|2019-01-17 10:25:16|  2019-01-16 07:38:04|       26.786666666666665|
|         2|2019-01-18 10:53:34|  2019-01-17 10:25:16|       24.471666666666668|
|         2|2019-01-22 08:29

## Создание признака "Среднее количество постов за последнюю неделю на канале" (используется оконная функция и rangeBetween).

In [29]:
from pyspark.sql.functions import mean, col

# количество секунд в i днях
days = lambda i: i * 86400 

# скользящее среденее по семи дням
windowSpec  = Window.partitionBy("channel_id").orderBy(col("date").cast("timestamp").cast("long")).rangeBetween(-days(7), 0)

train_data_joined = train_data_joined.withColumn("mean_count_per_last_week", mean("post_count").over(windowSpec))
train_data_joined = train_data_joined.fillna(0, subset='mean_count_per_last_week')

test_data_joined = test_data_joined.withColumn("mean_count_per_last_week", mean("post_count").over(windowSpec))
test_data_joined = test_data_joined.fillna(0, subset='mean_count_per_last_week')

## Создание признака "Количество символов в названии канала" (используется текстовое поле и метаданные по каналу).

Обоснование: каналы с коротким названием легко найти в поиске, соответственно, больше потенциально просмотров

In [30]:
# количество символов в поле 'title'
train_data_joined = train_data_joined.withColumn('title_length', string_length(train_data_joined.title))
train_data_joined = train_data_joined.fillna(0, subset='title_length')
test_data_joined = test_data_joined.withColumn('title_length', string_length(test_data_joined.title))
test_data_joined = test_data_joined.fillna(0, subset='title_length')

In [31]:
# демонстрация полученных результатов
test_data_joined.select(['title', 'title_length']).show(5)

+--------------+------------+
|         title|title_length|
+--------------+------------+
|ANTIHIPSTASWAG|          14|
|ANTIHIPSTASWAG|          14|
|   જ્ઞાન સારથિ|          11|
|   જ્ઞાન સારથિ|          11|
|   જ્ઞાન સારથિ|          11|
+--------------+------------+
only showing top 5 rows



## Создание признака "Порядковый номер публикации в канале" (используется оконная функция и rowsBetween).

Обоснование: с ростом количества постов в канале, должно увеличиваться количество просмотров в посте.

In [32]:
from pyspark.sql.functions import row_number

In [33]:
windowSpecNumber  = Window.partitionBy("channel_id").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [34]:
train_data_joined = train_data_joined.withColumn("number_of_post_in_channel", row_number().over(windowSpecNumber))
test_data_joined = test_data_joined.withColumn("number_of_post_in_channel", row_number().over(windowSpecNumber))

# проверка полученных результатов
test_data_joined.select(['channel_id', 'date', 'number_of_post_in_channel']).show()

+----------+-------------------+-------------------------+
|channel_id|               date|number_of_post_in_channel|
+----------+-------------------+-------------------------+
|         2|2019-01-04 10:25:58|                        1|
|         2|2019-01-04 10:33:44|                        2|
|         2|2019-01-05 11:56:49|                        3|
|         2|2019-01-08 11:24:49|                        4|
|         2|2019-01-11 18:02:36|                        5|
|         2|2019-01-12 06:57:37|                        6|
|         2|2019-01-16 07:38:04|                        7|
|         2|2019-01-17 10:25:16|                        8|
|         2|2019-01-18 10:53:34|                        9|
|         2|2019-01-22 08:29:15|                       10|
|         2|2019-01-24 07:38:31|                       11|
|         2|2019-01-25 07:33:28|                       12|
|         2|2019-01-28 08:12:24|                       13|
|         2|2019-01-29 08:43:24|                       1

In [35]:
test_data_joined.show(3, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

# Преобразование SparkDataframe в PandasDataframe

In [36]:
# фичи для модели
features = [
            "has_image",
            "is_forwarded",
            "is_private", 
            "post_count",
            "user_count",
            'text_length',
            'link_in_text',
            'hours_after_previous_post',
            'title_length',
            "number_of_post_in_channel",
            'mean_count_per_last_week'
]

# целевая переменная
target = 'views'

# id для индекса
id = 'id'

In [37]:
# удаление лишних столбцов
train_data_joined = train_data_joined.drop(*(set(train_data_joined.columns) - set(features) - set([target]) - set([id]))).cache()
test_data_joined = test_data_joined.drop(*(set(test_data_joined.columns) - set(features) - set([id]))).cache()

In [38]:
test_data_joined.show(5)

+--------+---------+------------+----------+----------+----------+-----------+------------+-------------------------+------------------------+------------+-------------------------+
|      id|has_image|is_forwarded|is_private|post_count|user_count|text_length|link_in_text|hours_after_previous_post|mean_count_per_last_week|title_length|number_of_post_in_channel|
+--------+---------+------------+----------+----------+----------+-----------+------------+-------------------------+------------------------+------------+-------------------------+
|57193448|    false|       false|     false|     791.0|    4868.0|          0|       false|                      0.0|                   791.0|          24|                        1|
|57193446|    false|       false|     false|     791.0|    4868.0|          0|       false|     0.008611111111111111|                   791.0|          24|                        2|
|57193444|    false|       false|     false|     791.0|    4868.0|          0|       false

In [39]:
# запись SparkDataFrame на локальный диск
test_data_joined.write.csv('test_csv', mode='overwrite', header=True)
train_data_joined.write.csv('train_csv', mode='overwrite', header=True)

In [40]:
import subprocess
import glob
import os
import shutil
import pandas as pd
import numpy as np


# функция для считывания данных с диска
# и преобразования их в .csv
def load_and_merge_csv(path, **kwargs):
    dfs = []
    for g in glob.glob(os.path.join(path, '*.csv')):
        dfs.append(pd.read_csv(g, **kwargs))
    res = pd.concat(dfs)
    res = res.set_index('id')
    return res

In [41]:
# считывание данных с диска
trainXY = load_and_merge_csv('train_csv')
testX = load_and_merge_csv('test_csv')
trainXY.reset_index(inplace=True)

In [42]:
# преобразование исходного таргета
trainXY['log_views'] = np.log(trainXY['views'] + 100)
trainXY.drop(columns=['views', 'id'], inplace=True)

# Обучение модели

In [52]:
from catboost import CatBoostRegressor

In [53]:
model = CatBoostRegressor()

In [54]:
# обучение бустинга CatBoost
model.fit(trainXY.drop(columns='log_views'), trainXY['log_views'])

Learning rate set to 0.159465
0:	learn: 1.4171025	total: 750ms	remaining: 12m 29s
1:	learn: 1.3698516	total: 1.26s	remaining: 10m 29s
2:	learn: 1.3320723	total: 1.76s	remaining: 9m 45s
3:	learn: 1.3034352	total: 2.25s	remaining: 9m 20s
4:	learn: 1.2811562	total: 2.73s	remaining: 9m 3s
5:	learn: 1.2622925	total: 3.2s	remaining: 8m 49s
6:	learn: 1.2467061	total: 3.68s	remaining: 8m 42s
7:	learn: 1.2344537	total: 4.18s	remaining: 8m 38s
8:	learn: 1.2249054	total: 4.62s	remaining: 8m 28s
9:	learn: 1.2170999	total: 5.07s	remaining: 8m 22s
10:	learn: 1.2094052	total: 5.56s	remaining: 8m 19s
11:	learn: 1.2039140	total: 6.02s	remaining: 8m 15s
12:	learn: 1.1987473	total: 6.5s	remaining: 8m 13s
13:	learn: 1.1930881	total: 6.96s	remaining: 8m 9s
14:	learn: 1.1891268	total: 7.4s	remaining: 8m 6s
15:	learn: 1.1862417	total: 7.86s	remaining: 8m 3s
16:	learn: 1.1819340	total: 8.33s	remaining: 8m 1s
17:	learn: 1.1781076	total: 8.73s	remaining: 7m 56s
18:	learn: 1.1757246	total: 9.17s	remaining: 7m 53

<catboost.core.CatBoostRegressor at 0x7fde7369d950>

In [55]:
prediction = model.predict(testX)

In [56]:
assert prediction.shape == (244386,)

# Сабмит решения

In [57]:
! curl -H 'Cache-Control: no-cache' https://raw.githubusercontent.com/mike0sv/lsml_submit_server/2021/src/client.py -o client.py

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1906  100  1906    0     0   8359      0 --:--:-- --:--:-- --:--:--  8359


In [58]:
import client

In [None]:
# login: user14
# pass: 36578081

In [59]:
client.make_eval(pd.DataFrame({'views': prediction}, index=testX.index)) #, final=True

Enter username:
user14
Enter password:
··········


{'data': {'mape': 14.373649391376023,
  'mean_absolute_error': 1.1970854816526635,
  'mean_squared_error': 2.3878908105716956,
  'rmse': 1.5452801721926337,
  'rmspe': 19.340244798255636},
 'ok': True}

In [60]:
client.check_results()

{'2022-06-21': [{'baseline_beaten': False,
   'is_final': False,
   'metrics': {'mape': 14.055298627263133,
    'mean_absolute_error': 1.1924354252995746,
    'mean_squared_error': 2.568408590454289,
    'rmse': 1.602625530326498,
    'rmspe': 19.552182150383207}},
  {'baseline_beaten': False,
   'is_final': False,
   'metrics': {'mape': 14.055298627263133,
    'mean_absolute_error': 1.1924354252995746,
    'mean_squared_error': 2.568408590454289,
    'rmse': 1.602625530326498,
    'rmspe': 19.552182150383207}},
  {'baseline_beaten': True,
   'is_final': False,
   'metrics': {'mape': 14.309883766567497,
    'mean_absolute_error': 1.1930475444922712,
    'mean_squared_error': 2.376865275607308,
    'rmse': 1.5417085572854903,
    'rmspe': 19.3046049248594}},
  {'baseline_beaten': True,
   'is_final': True,
   'metrics': {'mape': 14.309883766567497,
    'mean_absolute_error': 1.1930475444922712,
    'mean_squared_error': 2.376865275607308,
    'rmse': 1.5417085572854903,
    'rmspe': 19.

In [None]:
ss.stop()

# Мем

In [61]:
from IPython.display import Image
Image(url='https://c.tenor.com/hQvr-iA6_1cAAAAd/funny-memes.gif')