In [1]:
#from google.colab import drive
#drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# install stuff

In [2]:
! pip3 install pyspark pandas scikit-learn



# setup pyspark

In [3]:
import pyspark.sql
import pyspark
from pyspark.sql import SparkSession, functions as sf, Window
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, Word2Vec
from pyspark.mllib.linalg import Vectors
from pyspark.sql.types import DoubleType

In [4]:
sparkConf = pyspark.SparkConf() \
    .set("spark.executor.memory", "512m")\
    .set("spark.driver.memory", "512m")
ss = pyspark.sql.SparkSession.builder.config(conf=sparkConf).master('local[1]').getOrCreate()

# load data

In [5]:
#train_data = ss.read.orc('/content/drive/MyDrive/hw_data/posts_train.orc')
#test_data = ss.read.orc('/content/drive/MyDrive/hw_data/posts_test.orc')
#channel_data = ss.read.orc('/content/drive/MyDrive/hw_data/channels_orc')

In [6]:
train_data = ss.read.orc('./hw_data/posts_train.orc')
test_data = ss.read.orc('./hw_data/posts_test.orc')
channel_data = ss.read.orc('./hw_data/channels_orc')

In [7]:
train_data

DataFrame[channel_id: int, id: int, tg_id: int, text: string, views: int, has_image: string, is_forwarded: string, date: string, forwarded_id: string]

In [8]:
test_data

DataFrame[channel_id: int, id: int, tg_id: int, text: string, has_image: string, is_forwarded: string, date: string, forwarded_id: string]

In [9]:
channel_data

DataFrame[description: string, is_private: boolean, last_parsed: timestamp, name: string, post_count: double, tg_id: bigint, title: string, updated: timestamp, user_count: double, channel_id: bigint]

In [10]:
channel_data.select('title').show(15)

+--------------------+
|               title|
+--------------------+
|             Moein Z|
|            Мерзость|
|Christian Mezmur ...|
|    Ideal qomat siri|
|Parivartan by Abh...|
|             Банкста|
|    WarisanSalaf.Com|
|                РИАN|
|                 ОНФ|
|Lawak Netizen Kini🍁|
|         Grantlar.uz|
|           Offerzone|
| ▶CiNEMAWORLD HEVC ◀|
|              Шапито|
|Ustozlar va Pedag...|
+--------------------+
only showing top 15 rows



заджойним метаданные

In [11]:
train_data = train_data.join(channel_data, on='channel_id', how='left')
test_data = test_data.join(channel_data, on='channel_id', how='left')

# посмотрим чего у нас там есть


In [12]:
display(train_data.show(5))
display(test_data.show(5))
display(channel_data.show(5))

+----------+--------+-----+--------------------+-----+---------+------------+--------------------+------------+--------------------+----------+--------------------+------------+----------+----------+------------+--------------------+----------+
|channel_id|      id|tg_id|                text|views|has_image|is_forwarded|                date|forwarded_id|         description|is_private|         last_parsed|        name|post_count|     tg_id|       title|             updated|user_count|
+----------+--------+-----+--------------------+-----+---------+------------+--------------------+------------+--------------------+----------+--------------------+------------+----------+----------+------------+--------------------+----------+
|       221|29741094| 7182|МК, а это вот про...|15368|        f|           f|2018-11-03 13:05:...|        NULL|Политика, сплетни...|     false|2018-12-07 00:04:...|ekvinokurova|    1015.0|1030852584|Ekvinokurova|2018-12-07 00:04:...|    8732.0|
|       221|46751120

None

+----------+--------+-----+--------------------+---------+------------+--------------------+------------+--------------------+----------+--------------------+--------------+----------+----------+--------------+--------------------+----------+
|channel_id|      id|tg_id|                text|has_image|is_forwarded|                date|forwarded_id|         description|is_private|         last_parsed|          name|post_count|     tg_id|         title|             updated|user_count|
+----------+--------+-----+--------------------+---------+------------+--------------------+------------+--------------------+----------+--------------------+--------------+----------+----------+--------------+--------------------+----------+
|      4127|53225416| 2137|История про песик...|        f|           f|2019-01-14 15:42:...|        NULL|antihipstaswag@gm...|     false|2018-12-08 16:19:...|antihipstaswag|    1974.0|1063226247|ANTIHIPSTASWAG|2018-12-08 16:19:...|    5399.0|
|      4127|53225442| 2135|Н

None

+--------------------+----------+--------------------+----------------+----------+----------+--------------------+--------------------+----------+----------+
|         description|is_private|         last_parsed|            name|post_count|     tg_id|               title|             updated|user_count|channel_id|
+--------------------+----------+--------------------+----------------+----------+----------+--------------------+--------------------+----------+----------+
|                    |     false|2019-01-26 16:53:...|   MoeinZchannel|     708.0|1002972402|             Moein Z|2019-01-26 16:53:...|   62411.0|      7910|
|Вокруг столько ме...|     false|2019-02-12 00:39:...|  merzotachannel|    1027.0|1336284461|            Мерзость|2019-02-12 00:39:...|   12982.0|     14121|
|🗣እኛስ የተሰቀለውን ክርስ...|     false|2019-02-10 06:57:...|christian_mezmur|    1168.0|1136987361|Christian Mezmur ...|2019-02-10 06:57:...|   21704.0|     17375|
|     Chiroyli_qomatt|     false|2018-12-08 00:54:...

None

In [20]:
columns_to_analyze = ['has_image', 'is_forwarded']

for column in columns_to_analyze:
    num_train_rows_with_t = train_data.select(sf.col(column)).filter(sf.col(column)=='t').count()

    num_test_rows_with_t = test_data.select(sf.col(column)).filter(sf.col(column)=='t').count()

    train_percentage = round(num_train_rows_with_t / train_data.count() * 100, 2)
    test_percentage = round(num_test_rows_with_t / test_data.count() * 100, 2)

    print(f'train: {column} {num_train_rows_with_t} ({train_percentage}%)')
    print(f'test: {column} {num_test_rows_with_t} ({test_percentage}%)')


train: has_image 2043958 (37.43%)
test: has_image 98825 (40.44%)
train: is_forwarded 595694 (10.91%)
test: is_forwarded 33286 (13.62%)


In [21]:
channel_summary = channel_data.select('channel_id', 'updated')
channel_summary = channel_summary.groupby('channel_id')
channel_summary = channel_summary.agg(
    sf.count('*').alias('num_occurrences'),
    sf.max('updated').alias('last_updated'),
    sf.min('updated').alias('first_updated')
)
channel_summary = channel_summary.orderBy('num_occurrences', ascending=False)
channel_summary.show(10, 0)


+----------+---------------+-----------------------+-----------------------+
|channel_id|num_occurrences|last_updated           |first_updated          |
+----------+---------------+-----------------------+-----------------------+
|45166     |1              |2019-02-13 06:23:33.062|2019-02-13 06:23:33.062|
|56242     |1              |2019-01-26 22:01:37.681|2019-01-26 22:01:37.681|
|4894      |1              |2018-12-07 07:15:24.675|2018-12-07 07:15:24.675|
|4823      |1              |2019-02-12 02:29:14.729|2019-02-12 02:29:14.729|
|11938     |1              |2018-12-11 07:35:03.549|2018-12-11 07:35:03.549|
|15194     |1              |2019-02-12 16:06:03.971|2019-02-12 16:06:03.971|
|15057     |1              |2018-12-09 10:05:50.788|2018-12-09 10:05:50.788|
|62989     |1              |2019-02-10 23:56:27.604|2019-02-10 23:56:27.604|
|42969     |1              |2018-12-08 16:17:51.693|2018-12-08 16:17:51.693|
|541       |1              |2019-02-10 01:00:38.254|2019-02-10 01:00:38.254|

В channel_data лежит только по одной строчке для каждого из каналов, прикольные фичи с количеством юзеров во времени и их просмотрами не посчитать :(

In [15]:
display('Уникальные каналы в train:', train_data.select('channel_id').distinct().count())
display('Уникальные каналы в test:', test_data.select('channel_id').distinct().count())

'Уникальные каналы в train:'

2000

'Уникальные каналы в test:'

1197

Кажется, что можем заджойнить количество просмотров предудщих постов в тест и что-нибудь посчитать. Проверим, можно ли так


In [16]:
(train_data.groupby('channel_id')
            .agg(sf.max(sf.col('date')).alias('last_train'))
            .join(test_data
                        .groupby('channel_id')
                        .agg(sf.min(sf.col('date')).alias('first_test')),
                  on='channel_id', how='inner')
            .withColumn('days_between_train_test',
                        sf.datediff(sf.col('first_test'),
                                   sf.col('last_train')))
            .orderBy('days_between_train_test')
).show(10, 0)

+----------+----------------------+----------------------+-----------------------+
|channel_id|last_train            |first_test            |days_between_train_test|
+----------+----------------------+----------------------+-----------------------+
|114       |2018-12-31 13:19:10+00|2019-01-01 14:34:35+00|1                      |
|184       |2018-12-31 23:33:36+00|2019-01-01 10:16:08+00|1                      |
|164       |2018-12-31 21:09:50+00|2019-01-01 07:03:55+00|1                      |
|25        |2018-12-31 22:13:23+00|2019-01-01 05:55:59+00|1                      |
|166       |2018-12-31 22:51:51+00|2019-01-01 11:26:54+00|1                      |
|47        |2018-12-31 22:07:45+00|2019-01-01 10:22:56+00|1                      |
|167       |2018-12-31 17:53:43+00|2019-01-01 13:04:16+00|1                      |
|187       |2018-12-31 17:36:56+00|2019-01-01 07:12:14+00|1                      |
|168       |2018-12-31 16:40:51+00|2019-01-01 08:39:50+00|1                      |
|42 

In [17]:
(train_data.groupby('channel_id')
            .agg(sf.max(sf.col('date')).alias('last_train'))
            .join(test_data
                        .groupby('channel_id')
                        .agg(sf.min(sf.col('date')).alias('first_test')),
                  on='channel_id', how='inner')
            .withColumn('days_between_train_test',
                        sf.datediff(sf.col('first_test'),
                                   sf.col('last_train')))
            .orderBy('days_between_train_test')
).count()

1197

Действительно можем. Дальше реализуем фичу со средним по трейн датасету в тесте

# make features

In [24]:
from pyspark.sql.functions import length, size, trim, lower, expr, avg, row_number, array, when, split, col, to_json, udf, explode, hour, max, count, to_timestamp, countDistinct

def make_features(df, is_train = True, train_data=None):
  cols = ['id', 'has_image', 'is_forwarded', 'is_private', 'post_count', 'user_count',
          'previous_views_mean', 'original_post_ratio', 'hours_diff', 'media_post_ratio',
          'active_last_3_days', 'text_length', 'word_count', 'language', 'has_emoji', 'has_less_than_30_words']
  if 'views' in df.columns:
    cols.append('views')
##Метаданные
    # Доля постов с медиа (картинки приятнее-интереснее смотреть, хочется чаще заходить)
  df = df.withColumn('media_post_ratio', sf.mean(sf.when(sf.col('has_image') == True, 1.0).otherwise(0.0)).over(Window.partitionBy("channel_id")))

    # Доля собственных постов (гипотеза - у каналов, которые много репостят (мало уникального контента) - просмотров меньше)
  df = df.withColumn("is_forwarded_numeric", sf.when(df.is_forwarded == True, 0.0).otherwise(1.0))
  df = df.withColumn('original_post_ratio', sf.avg("is_forwarded_numeric").over(Window.partitionBy("channel_id")))

##Текстовые фичи
  #определим язык по описанию(тут еще и метаданные)))
  #логика-тг распространен больше в россии и снг - каналы и посты на русском будут иметь больше просмотров
  df = df.withColumn("language", sf.when(sf.regexp_extract(col("description"), r'\b[а-яА-Я]+\b', 0) != "", 1).otherwise(0))

  #Проверим наличие эмодзи и спецсимволов (привлекают больше внимания)))
  df = df.withColumn("has_emoji", sf.when(sf.regexp_extract(col("text"), r"[\u2700-\u27BF]|\u2B50|\u2B55|\u203C|\u2049|[\u2500-\u25FF]|\u2600-\u26FF|[\u2190-\u21FF]", 0) != "", 1).otherwise(0))

  # Подсчет слов и символов
  df = df.withColumn('text_length', length(trim(lower(col('text')))))
  df = df.withColumn('word_count', size(split(trim(lower(col('text'))), ' ')))
  #Проверим, что количество слов меньше 30 (гипотеза, что к постам к коротким текстам чаще возвращаются,
  # они популярнее - по аналогии с твиттером каким-нибудь)
  df = df.withColumn("has_less_than_30_words", sf.when(col("word_count") < 30, 1).otherwise(0))

##Виндоу функции
    # Флажок активности за последние 3 дня (обозначим какие-нибудь мертвые каналы, которые давно/совсем не часто постят)
  last_3_days_window = Window.partitionBy("channel_id").orderBy(col("date").cast("long")).rangeBetween(-3*24*3600, -1)
  df = df.withColumn(
       'active_last_3_days',
       (sf.size(sf.collect_set("date").over(last_3_days_window)) > 0).cast("integer")
  )

#количество часов разницы между постами
  wd = Window.partitionBy("channel_id").orderBy("date")
  df = df.withColumn('previous_date',sf.lag('date', 1).over(wd)
          ).withColumn(
          'hours_diff', sf.datediff(sf.col('date'), sf.col('previous_date'))*24 +
          sf.hour(sf.col('date')) - sf.hour(sf.col('previous_date'))
          ).fillna(0, subset='hours_diff')


#cоздадим фичу в трейне по среднему количеству просмотров постов
  if is_train:
    w1 = Window.partitionBy("channel_id").orderBy("date")
    df = df.withColumn(
        'previous_views_mean',
        sf.mean('views').over(w1.rowsBetween(Window.unboundedPreceding, -1))
    )

#и передадим значения в тестовый датасет
  else:
    w1 = Window.partitionBy("channel_id").orderBy(sf.desc("date"))
    df_prev_views = train_data.withColumn(
        "rn",
        sf.row_number().over(w1)
    ).filter("rn = 1").select(
        "channel_id",
        sf.col("views").alias('previous_views_mean')
    )
    df = df.hint('merge').join(df_prev_views, on='channel_id', how='left')

  df = df.select(*cols)
  return df


In [25]:
train_features = make_features(train_data).cache()
test_features = make_features(test_data, is_train=False, train_data=train_data).cache()

In [26]:
#test_features.show()

+--------+---------+------------+----------+----------+----------+-------------------+-------------------+----------+------------------+------------------+-----------+----------+--------+---------+----------------------+
|      id|has_image|is_forwarded|is_private|post_count|user_count|previous_views_mean|original_post_ratio|hours_diff|  media_post_ratio|active_last_3_days|text_length|word_count|language|has_emoji|has_less_than_30_words|
+--------+---------+------------+----------+----------+----------+-------------------+-------------------+----------+------------------+------------------+-----------+----------+--------+---------+----------------------+
|57193448|        f|           f|     false|     791.0|    4868.0|               3026|                1.0|         0|               0.0|                 1|       NULL|        -1|       1|        0|                     1|
|57193446|        f|           f|     false|     791.0|    4868.0|               3026|                1.0|         0

In [27]:
#train_features.show()

+--------+---------+------------+----------+----------+----------+-------------------+-------------------+----------+------------------+------------------+-----------+----------+--------+---------+----------------------+-----+
|      id|has_image|is_forwarded|is_private|post_count|user_count|previous_views_mean|original_post_ratio|hours_diff|  media_post_ratio|active_last_3_days|text_length|word_count|language|has_emoji|has_less_than_30_words|views|
+--------+---------+------------+----------+----------+----------+-------------------+-------------------+----------+------------------+------------------+-----------+----------+--------+---------+----------------------+-----+
|39957267|        t|           f|     false|     791.0|    4868.0|               NULL| 0.9823008849557522|         0|0.2503160556257901|                 1|       NULL|        -1|       1|        0|                     1| 1217|
|39957266|        t|           f|     false|     791.0|    4868.0|             1217.0| 0.982

In [28]:
train_features.write.csv('train_csv', mode='overwrite', header=True)
test_features.write.csv('test_csv', mode='overwrite', header=True)

# load features to pandas
you also can use .toPandas()

In [29]:
import subprocess
import glob
import os
import shutil
import pandas as pd

def load_and_merge_csv(path, **kwargs):
    dfs = []
    for g in glob.glob(os.path.join(path, '*.csv')):
        dfs.append(pd.read_csv(g, **kwargs))
    res = pd.concat(dfs)
    res = res.set_index('id')
    return res

In [30]:
trainXY = load_and_merge_csv('train_csv')
testX = load_and_merge_csv('test_csv')

In [31]:
trainXY

Unnamed: 0_level_0,has_image,is_forwarded,is_private,post_count,user_count,previous_views_mean,original_post_ratio,hours_diff,media_post_ratio,active_last_3_days,text_length,word_count,language,has_emoji,has_less_than_30_words,views
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1
6851167,f,f,False,733.0,20622.0,,0.988355,0,0.020378,1,1792.0,247,1,0,0,5472
6851166,f,f,False,733.0,20622.0,5472.000000,0.988355,5,0.020378,1,1119.0,157,1,0,0,4578
6851165,f,f,False,733.0,20622.0,5025.000000,0.988355,20,0.020378,1,2442.0,364,1,0,0,5182
6851164,f,f,False,733.0,20622.0,5077.333333,0.988355,7,0.020378,1,1540.0,215,1,0,0,3633
6851163,f,f,False,733.0,20622.0,4716.250000,0.988355,12,0.020378,1,2251.0,319,1,0,0,3700
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
47528347,f,f,False,1847.0,8536.0,3268.013072,0.987507,0,0.313960,1,,-1,0,0,1,2617
47528341,f,f,False,1847.0,8536.0,3267.658683,0.987507,3,0.313960,1,271.0,43,0,0,0,4818
47528338,t,f,False,1847.0,8536.0,3268.502176,0.987507,53,0.313960,1,,-1,0,0,1,3135
47528339,t,f,False,1847.0,8536.0,3268.429581,0.987507,0,0.313960,1,,-1,0,0,1,3377


In [32]:
testX

Unnamed: 0_level_0,has_image,is_forwarded,is_private,post_count,user_count,previous_views_mean,original_post_ratio,hours_diff,media_post_ratio,active_last_3_days,text_length,word_count,language,has_emoji,has_less_than_30_words
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
53061668,f,f,False,2161.0,43444.0,11573,0.994178,0,0.253275,1,270.0,30,1,0,0
53061667,f,f,False,2161.0,43444.0,11573,0.994178,4,0.253275,1,125.0,14,1,0,1
53061666,f,f,False,2161.0,43444.0,11573,0.994178,1,0.253275,1,172.0,25,1,0,1
53061665,f,f,False,2161.0,43444.0,11573,0.994178,0,0.253275,1,176.0,21,1,0,1
53061663,t,f,False,2161.0,43444.0,11573,0.994178,1,0.253275,1,,-1,1,0,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
57031265,f,f,False,573.0,9642.0,11910,1.000000,25,0.111111,1,1578.0,317,0,0,0
57031264,f,f,False,573.0,9642.0,11910,1.000000,405,0.111111,1,140.0,18,0,0,1
57031263,f,f,False,573.0,9642.0,11910,1.000000,20,0.111111,1,1167.0,218,0,0,0
57031262,t,f,False,573.0,9642.0,11910,1.000000,0,0.111111,1,28.0,1,0,0,1


In [33]:
import numpy as np

Ycol = 'views'
trainX, trainY = trainXY.drop(Ycol, axis=1), trainXY[Ycol]
trainY = np.log(trainY + 100)
trainX['previous_views_mean'] = np.log(trainX['previous_views_mean']+100)
testX['previous_views_mean'] = np.log(testX['previous_views_mean']+100)


Разберемся с Null значениями

In [34]:
trainX['text_length'] = trainX['text_length'].fillna(0)
testX['text_length'] = testX['text_length'].fillna(0)
trainX['word_count'] = trainX['word_count'].replace(-1, 0)
testX['word_count'] = testX['word_count'].replace(-1, 0)

testX.loc[testX.isna().any(axis=1), 'hours_diff'] = 1
trainX.loc[trainX.isna().any(axis=1), 'previous_views_mean'] = trainX.user_count


In [35]:
trainX_dummy = pd.get_dummies(trainX)
testX_dummy = pd.get_dummies(testX)

# train your model and predict test

In [37]:
from sklearn.ensemble import GradientBoostingRegressor
model = GradientBoostingRegressor(n_estimators=100, learning_rate=0.1, random_state=52)
model.fit(trainX_dummy, trainY)

testY = model.predict(testX_dummy)


In [38]:
prediction = testY

In [39]:
prediction.shape

(244386,)

In [40]:
assert prediction.shape == (244386,)

# submit

In [41]:
! curl -H 'Cache-Control: no-cache' https://raw.githubusercontent.com/mike0sv/lsml_submit_server/2024/src/client.py -o client.py

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1899  100  1899    0     0   8239      0 --:--:-- --:--:-- --:--:--  8220


In [42]:
import client

In [43]:
client.make_eval(pd.DataFrame({'views': prediction}, index=testX.index))

Enter username:
hse-17
Enter password:
··········


{'data': {'mape': 9.010809152074037,
  'mean_absolute_error': 0.7298100964933767,
  'mean_squared_error': 1.5812998143659356,
  'rmse': 1.257497441097172,
  'rmspe': 16.255820162461866},
 'ok': True}

In [44]:
#client.make_eval(pd.DataFrame({'views': prediction}, index=testX.index), final=True)

In [45]:
client.check_results()

{'2024-06-18': [{'baseline_beaten': True,
   'is_final': False,
   'metrics': {'mape': 9.010809152074032,
    'mean_absolute_error': 0.7298100964933757,
    'mean_squared_error': 1.5812998143659354,
    'rmse': 1.257497441097172,
    'rmspe': 16.25582016246188}},
  {'baseline_beaten': True,
   'is_final': True,
   'metrics': {'mape': 9.010809152074032,
    'mean_absolute_error': 0.7298100964933757,
    'mean_squared_error': 1.5812998143659354,
    'rmse': 1.257497441097172,
    'rmspe': 16.25582016246188}},
  {'baseline_beaten': True,
   'is_final': False,
   'metrics': {'mape': 9.010809152074037,
    'mean_absolute_error': 0.7298100964933767,
    'mean_squared_error': 1.5812998143659356,
    'rmse': 1.257497441097172,
    'rmspe': 16.255820162461866}}]}