### Unzip data
Напрямую у меня файл не закачивался, так что я сначала загрузил файл на Drive, потом распаковал в папку hw_data

In [1]:
! gdown --id 1z8seE9AeTEoYBdMqoo-LXQAgHYRjejFj

Downloading...
From: https://drive.google.com/uc?id=1z8seE9AeTEoYBdMqoo-LXQAgHYRjejFj
To: /content/hw_data.zip
763MB [00:07, 98.6MB/s]


In [2]:
! mkdir hw_data || unzip hw_data.zip -d hw_data

mkdir: cannot create directory ‘hw_data’: File exists
Archive:  hw_data.zip
replace hw_data/channels_orc/.part-00003-6b7f7a7f-ffd7-44da-bd14-b7b142917916-c000.snappy.orc.crc? [y]es, [n]o, [A]ll, [N]one, [r]ename: A
 extracting: hw_data/channels_orc/.part-00003-6b7f7a7f-ffd7-44da-bd14-b7b142917916-c000.snappy.orc.crc  
  inflating: hw_data/channels_orc/part-00000-6b7f7a7f-ffd7-44da-bd14-b7b142917916-c000.snappy.orc  
  inflating: hw_data/channels_orc/part-00002-6b7f7a7f-ffd7-44da-bd14-b7b142917916-c000.snappy.orc  
 extracting: hw_data/channels_orc/._SUCCESS.crc  
 extracting: hw_data/channels_orc/.part-00002-6b7f7a7f-ffd7-44da-bd14-b7b142917916-c000.snappy.orc.crc  
 extracting: hw_data/channels_orc/.part-00000-6b7f7a7f-ffd7-44da-bd14-b7b142917916-c000.snappy.orc.crc  
 extracting: hw_data/channels_orc/_SUCCESS  
 extracting: hw_data/channels_orc/.part-00001-6b7f7a7f-ffd7-44da-bd14-b7b142917916-c000.snappy.orc.crc  
  inflating: hw_data/channels_orc/part-00003-6b7f7a7f-ffd7-44da-bd14-b

### Setup
Устанавливаем все как в примере

In [3]:
! pip3 install pyspark pandas scikit-learn



In [4]:
! pip install pycld3

import cld3  # либа для определения языка
import pyspark.sql
from pyspark.sql import functions as sf
from pyspark.sql import Window
import pyspark
import pandas as pd

from sklearn.preprocessing import LabelEncoder
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_absolute_error

import client

In [5]:
sparkConf = pyspark.SparkConf() \
    .set("spark.executor.memory", "4g")\
    .set("spark.driver.memory", "4g")\
    .set("spark.sql.broadcastTimeout", "600")  # увеличил timeout для бродкаст join'а
ss = pyspark.sql.SparkSession.builder.config(conf=sparkConf).master('local[1]').getOrCreate()
sc = ss.sparkContext

In [6]:
train_data = ss.read.orc('./hw_data/posts_train.orc')
test_data = ss.read.orc('./hw_data/posts_test.orc')
channel_data = ss.read.orc('./hw_data/channels_orc')

In [7]:
! nohup ssh -o StrictHostKeyChecking=no -R mars.ru77.ru:47033:*:4040 aig@mars.ru77.ru -p 2222 &

nohup: appending output to 'nohup.out'


### Make features

1. 1 балл за фичу, использующую window aggregation, до 2 штук, различающихся по смыслу (макс 2 балла)
1. 1 балл за фичу, использующую метаданные каналов (макс 1 балл)
1. 2 балла за фичу, использующую текстовые поля (макс 2 балла) фичи должны быть осмысленными для задачи
1. -1 балла за грязный код и -1 за отсутствие комментариев
1. -10 баллов за списывание))) (обоим участникам) (макс -10 баллов)


#### Helpers & Pre-process

In [8]:
def show(df):
  return df.limit(5).toPandas()

In [9]:
# кастуем даты в даты
train_data = train_data.withColumn("datetime", sf.col("date").cast("timestamp"))
train_data = train_data.withColumn("date", sf.col("date").cast("date"))

test_data = test_data.withColumn("datetime", sf.col("date").cast("timestamp"))
test_data = test_data.withColumn("date", sf.col("date").cast("date"))
# трансформируем таргет
train_data = train_data.withColumn("transformed_views", sf.log1p(sf.col("views") + 100))

#### Window aggregation features

In [10]:
# Первый "оконный" признак

def get_avg_num_posts(df):
  """Среднее число постов за день за предыдущие дни"""
  ws = Window\
        .partitionBy("channel_id")\
        .orderBy("date")\
        .rowsBetween(Window.unboundedPreceding, Window.currentRow)
  # Сначала находим дату первого поста
  df = df.withColumn("first_post_day", sf.min("date").over(ws))
  # Потом сколько прошло дней с даты первого поста по дату текущего поста
  df = df.withColumn("days_since_first_post", sf.datediff(sf.col("date"), sf.col("first_post_day")))
  # Делим кол-во постов на текущую дату на кол-во дней + 1 (чтобы не было NaN)
  df = df.withColumn("avg_num_posts_per_day", sf.count("id").over(ws) / (sf.col("days_since_first_post") + 1))
  return df


# Второй "оконный" признак

def get_post_row_number_for_date(df):
  """Номер поста за день"""
  ws = Window.partitionBy("channel_id", "date").orderBy("datetime")
  df = df.withColumn("post_row_number_over_day", sf.row_number().over(ws))
  return df


def get_window_features(df):
  df = get_avg_num_posts(df)
  df = get_post_row_number_for_date(df)
  return df

#### Meta features

In [11]:
def get_meta_features(df, meta_df):
  """Признаки из мета-данных канала"""
  # берем несколько мета признаков
  meta_df = meta_df.select("channel_id", "is_private", "post_count", "user_count")
  df = df.join(meta_df, on="channel_id", how="left")
  # активность пользователей (число постов делить на число юзеров)
  df = df.withColumn("users_activity", sf.col("post_count") / sf.col("user_count"))
  return df

#### Text features

In [14]:
@sf.udf
def get_lang_of_post(post):
  """Определяем язык сообщения"""
  # https://stackoverflow.com/questions/39142778/python-how-to-determine-the-language
  if not post: return "nan"
  lang = cld3.get_language(post).language
  return lang


@sf.udf
def get_post_link(post):
  """Есть ли ссылка в посте"""
  if not post: return 0  # пустой пост => ссылки нет
  return 1 if "http" in post else 0


def get_text_features(df):
  """Объединяем все признаки"""
  df = df.withColumn("post_lang", get_lang_of_post("text"))  # слишком медленно считает...
  df = df.withColumn("post_w_link", get_post_link("text"))
  # длина поста
  df = df.withColumn("post_length", sf.length("text")).fillna(0)
  return df

#### Соберем все вместе

In [15]:
def get_features(df, meta_df=channel_data):
  """
  Просто последовательно применим все преобразования
  -----
  Для оконных функций правильнее будет сначала объединить два сета (train и test)
  поэтому это посчитаем отдельно
  Для остальных - считаем раздельно train и test
  """
  df = get_meta_features(df, meta_df)
  df = get_text_features(df)
  return df

In [16]:
  # оконные фичи
  union_set = train_data\
                .select("id", "channel_id", "date", "datetime")\
                .union(test_data.select("id", "channel_id", "date", "datetime"))
  union_set = get_window_features(union_set)

In [17]:
train_data = train_data.join(union_set, on=["id", "channel_id", "date", "datetime"], how="left")
test_data = test_data.join(union_set, on=["id", "channel_id", "date", "datetime"], how="left")

In [18]:
train_data = get_features(train_data)
test_data  = get_features(test_data)

In [19]:
trainset = (train_data
       .select('transformed_views', 'days_since_first_post',
               'avg_num_posts_per_day', 'post_row_number_over_day','is_private',
               'post_count', 'user_count', 'users_activity', 'post_lang', 'post_w_link',
               'post_length')
       .toPandas()
       )

testset = (test_data
       .select('id', 'days_since_first_post',
               'avg_num_posts_per_day', 'post_row_number_over_day','is_private',
               'post_count', 'user_count', 'users_activity', 'post_lang', 'post_w_link',
               'post_length')
       .toPandas()
       )

In [21]:
LE = LabelEncoder().fit(trainset["post_lang"])

trainset.loc[:, "post_lang"] = LE.transform(trainset.loc[:, "post_lang"])
testset.loc[:, "post_lang"]  = LE.transform(testset.loc[:, "post_lang"])

In [23]:
ridge = Ridge(random_state=42)
ridge.fit(trainset.drop(["transformed_views"], axis=1), trainset["transformed_views"])
print(mean_absolute_error(trainset["transformed_views"], 
                    ridge.predict(trainset.drop(["transformed_views"], axis=1))))

0.9050244383753526


#### Делаем прогноз на тесте и сабмитим

In [24]:
prediction = ridge.predict(testset.drop(["id"], axis=1))
assert prediction.shape == (244386,)

In [25]:
! curl -H 'Cache-Control: no-cache' https://raw.githubusercontent.com/mike0sv/lsml_submit_server/2021/src/client.py -o client.py

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  1906  100  1906    0     0  18326      0 --:--:-- --:--:-- --:--:-- 18326


`BASELINE = {
    'mape': 15.707128974856676,
    'mean_absolute_error': 1.219070382113261,
    'mean_squared_error': 2.4324378881170055,
    'rmse': 1.5596274837655963,
    'rmspe': 23.50065988751091
}`

In [27]:
client.make_eval(pd.DataFrame({'views': prediction}, index=testset.id), final=True)

Enter username:
user33
Enter password:
··········


{'data': {'mape': 13.22467009468363,
  'mean_absolute_error': 0.9813200586263412,
  'mean_squared_error': 1.8763377085496036,
  'rmse': 1.3697947687699803,
  'rmspe': 21.49472443945848},
 'ok': True}