# install stuff

In [1]:
! pip3 install pyspark pandas scikit-learn

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
from google.colab import drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


# setup pyspark

In [3]:
import pyspark.sql
from pyspark.sql import functions as sf
import pyspark

In [4]:
import numpy as np
import pandas as pd

In [5]:
sparkConf = pyspark.SparkConf() \
    .set("spark.executor.memory", "512m")\
    .set("spark.driver.memory", "512m")
ss = pyspark.sql.SparkSession.builder.config(conf=sparkConf).master('local[1]').getOrCreate()

# load data

In [38]:
train_data = ss.read.orc('/content/drive/MyDrive/hw_data/posts_train.orc')
test_data = ss.read.orc('/content/drive/MyDrive/hw_data/posts_test.orc')
channel_data = ss.read.orc('/content/drive/MyDrive/hw_data/channels_orc')

In [39]:
#загрузка данных из шаблона
# train_data = ss.read.orc('./hw_data/posts_train.orc')
# test_data = ss.read.orc('./hw_data/posts_test.orc')
# channel_data = ss.read.orc('./hw_data/channels_orc')

# make features

In [40]:
def make_features(df):
    cols = ['id']
    if 'views' in df.columns:
        cols.append('views')
    if 'channel_id' in df.columns:
        cols.append('channel_id')
    if 'has_image' in df.columns:
        cols.append('has_image')
    if 'is_forwarded' in df.columns:
        cols.append('is_forwarded')
    if 'date' in df.columns:
        cols.append('date')
    if 'text' in df.columns:
        cols.append('text')
    return df.select(*cols)

In [41]:
train_features = make_features(train_data).cache()
test_features = make_features(test_data).cache()

In [42]:
channel_data.columns

['description',
 'is_private',
 'last_parsed',
 'name',
 'post_count',
 'tg_id',
 'title',
 'updated',
 'user_count',
 'channel_id']

In [43]:
#добавляем столбец длина поста
import pyspark.sql.functions as F
train_features = train_features.withColumn("post_len", F.length("text"))
test_features = test_features.withColumn("post_len", F.length("text"))

# feature engineering

In [44]:
channel_data = channel_data.drop('description', 'name', 'tg_id', 'title') #удаляем из df описание канала, имя и заголовок
train_features = train_features.drop('text')#удаляем колонку с текстом
test_features = test_features.drop('text')#удаляем колонку с текстом

In [45]:
train_features = train_features.fillna(0)
test_features = test_features.fillna(0)

In [46]:
from pyspark.sql import Row
from pyspark.sql.functions import desc

In [47]:
#совмещаем информацию о посте с информацией о канале в помощью left join
traindf = train_features.join(channel_data, ['channel_id'], "left")
testdf = test_features.join(channel_data, ['channel_id'],"left")

In [48]:
import datetime as dt

In [49]:
#меняем текстовые признаки на bool
traindf = traindf.replace({'f': '0', 't': '1'}, subset =["has_image","is_forwarded"])
testdf = testdf.replace({'f': '0', 't': '1'}, subset =["has_image","is_forwarded"])
traindf = traindf.replace({'false': '0', 'true': '1'}, subset =["is_private"])
testdf = testdf.replace({'false': '0', 'true': '1'}, subset =["is_private"])

In [50]:
#удаляем лишний столбец последнего парсинга
traindf = traindf.drop('last_parsed')
testdf = testdf.drop('last_parsed')

In [51]:
#приводим bool признаки в типу integer
from pyspark.sql.types import IntegerType, DateType
traindf = traindf.withColumn('has_image', traindf['has_image'].cast(IntegerType()))
traindf = traindf.withColumn('is_forwarded', traindf['is_forwarded'].cast(IntegerType()))
testdf = testdf.withColumn('has_image', testdf['has_image'].cast(IntegerType()))
testdf = testdf.withColumn('is_forwarded', testdf['is_forwarded'].cast(IntegerType()))
traindf = traindf.withColumn('is_private', traindf['is_private'].cast(IntegerType()))
testdf = testdf.withColumn('is_private', testdf['is_private'].cast(IntegerType()))

In [52]:
#приводим timestamp к datetype
traindf = traindf.withColumn('date', traindf['date'].cast(DateType()))
testdf = testdf.withColumn('date', testdf['date'].cast(DateType()))
traindf = traindf.withColumn('updated', traindf['updated'].cast(DateType()))
testdf = testdf.withColumn('updated', testdf['updated'].cast(DateType()))

In [53]:
#приводим datetype к toordinal
import pyspark.sql.functions as F
traindf = traindf.withColumn("date", F.unix_timestamp(traindf['date']))
testdf = testdf.withColumn("date", F.unix_timestamp(testdf['date']))
traindf = traindf.withColumn('updated', F.unix_timestamp(traindf['updated']))
testdf = testdf.withColumn('updated', F.unix_timestamp(testdf['updated']))

In [54]:
from pyspark.sql.window import Window

In [55]:
from pyspark.sql.functions import log

#vectorizing, fitting and prediction

In [56]:
#таргет приводим к виду, указанному в задании, так как у просмотров экспоненциальное распределение
traindf = traindf.withColumn('views', log(traindf.views + 100))

In [57]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [58]:
traindf.columns

['channel_id',
 'id',
 'views',
 'has_image',
 'is_forwarded',
 'date',
 'post_len',
 'is_private',
 'post_count',
 'updated',
 'user_count']

In [59]:
#задаем столбцы для векторизации
assembler = VectorAssembler(
  inputCols=['has_image',
 'is_forwarded',
 'date',
 'post_len',
 'is_private',
 'post_count',
 'updated',
 'user_count'],
    outputCol="features")

Трансформируем столбцы в векторный вид фичи + таргет

In [60]:
output = assembler.transform(traindf)

In [61]:
outputtest = assembler.transform(testdf)

In [62]:
output.select("features", "views").show()

+--------------------+------------------+
|            features|             views|
+--------------------+------------------+
|[0.0,0.0,1.541203...| 9.646528652727188|
|[0.0,0.0,1.543190...|  9.43731656931057|
|[0.0,0.0,1.526428...|11.157449801606978|
|[0.0,0.0,1.541116...| 9.243871855184976|
|[0.0,0.0,1.502323...|10.756774584831282|
|[0.0,0.0,1.540857...| 4.605170185988092|
|[0.0,0.0,1.540339...| 8.671115273688494|
|[0.0,0.0,1.543190...|10.516373251493265|
|[1.0,0.0,1.539907...| 8.781401907682376|
|[0.0,0.0,1.544054...| 8.962007209588313|
|[0.0,0.0,1.53792E...| 9.619199713152362|
|[0.0,1.0,1.544054...| 9.104535313079205|
|[0.0,0.0,1.528502...| 8.987946539552519|
|[0.0,0.0,1.544054...| 8.723394022000136|
|[0.0,0.0,1.52496E...| 9.167432870964905|
|[0.0,0.0,1.544054...| 8.753371421000903|
|[0.0,0.0,1.524182...| 9.217514575724183|
|[0.0,0.0,1.544054...| 7.689828668736484|
|[0.0,0.0,1.522368...|  9.70509758756311|
|[1.0,0.0,1.544054...| 7.675081857716334|
+--------------------+------------

In [63]:
train_data = output.select("features", "views")
test_data = outputtest.select("features")

In [64]:
train_data.show()

+--------------------+------------------+
|            features|             views|
+--------------------+------------------+
|[0.0,0.0,1.541203...| 9.646528652727188|
|[0.0,0.0,1.543190...|  9.43731656931057|
|[0.0,0.0,1.526428...|11.157449801606978|
|[0.0,0.0,1.541116...| 9.243871855184976|
|[0.0,0.0,1.502323...|10.756774584831282|
|[0.0,0.0,1.540857...| 4.605170185988092|
|[0.0,0.0,1.540339...| 8.671115273688494|
|[0.0,0.0,1.543190...|10.516373251493265|
|[1.0,0.0,1.539907...| 8.781401907682376|
|[0.0,0.0,1.544054...| 8.962007209588313|
|[0.0,0.0,1.53792E...| 9.619199713152362|
|[0.0,1.0,1.544054...| 9.104535313079205|
|[0.0,0.0,1.528502...| 8.987946539552519|
|[0.0,0.0,1.544054...| 8.723394022000136|
|[0.0,0.0,1.52496E...| 9.167432870964905|
|[0.0,0.0,1.544054...| 8.753371421000903|
|[0.0,0.0,1.524182...| 9.217514575724183|
|[0.0,0.0,1.544054...| 7.689828668736484|
|[0.0,0.0,1.522368...|  9.70509758756311|
|[1.0,0.0,1.544054...| 7.675081857716334|
+--------------------+------------

In [65]:
from pyspark.ml.regression import LinearRegression
# создаем объект линейной регрессии
lr = LinearRegression(labelCol='views')

In [66]:
# обучаем модель на тренировочных данных
lrModel = lr.fit(train_data)

In [67]:
#получаем df с предсказаниями
lr_predictions = lrModel.transform(test_data)

In [68]:
row_list = lr_predictions.select('prediction').collect()

In [69]:
id_array = [row.prediction for row in row_list]

Преобразовываем предсказания к виду массива

In [70]:
prediction = np.array(id_array)

In [71]:
#преобразуем df в pandas для сабмита
testX = testdf.toPandas()
testX = testX.set_index('id')

# submit

In [None]:
! curl -H 'Cache-Control: no-cache' https://raw.githubusercontent.com/mike0sv/lsml_submit_server/2023/src/client.py -o client.py

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  1907  100  1907    0     0  10308      0 --:--:-- --:--:-- --:--:-- 10252100  1907  100  1907    0     0  10308      0 --:--:-- --:--:-- --:--:-- 10252


In [None]:
import client

user39
18661894

In [None]:
client.make_eval(pd.DataFrame({'views': prediction}, index=testX.index), final=True)

{'data': {'mape': 13.349016863556527,
  'mean_absolute_error': 0.9534329712756189,
  'mean_squared_error': 1.8926469781501714,
  'rmse': 1.3757350682999148,
  'rmspe': 23.380359576731895},
 'ok': True}

In [None]:
client.check_results()

{'2022-06-22': [{'baseline_beaten': True,
   'is_final': False,
   'metrics': {'mape': 8.881464534228046,
    'mean_absolute_error': 0.7089464917938992,
    'mean_squared_error': 1.0720933724507733,
    'rmse': 1.035419418617776,
    'rmspe': 14.643067936852601}},
  {'baseline_beaten': True,
   'is_final': True,
   'metrics': {'mape': 8.899982652212794,
    'mean_absolute_error': 0.710523614059252,
    'mean_squared_error': 1.0798580448380761,
    'rmse': 1.0391621840877756,
    'rmspe': 14.707010297307093}}],
 '2023-06-13': [{'baseline_beaten': False,
   'is_final': False,
   'metrics': {'mape': 93.9294967848517,
    'mean_absolute_error': 8.075612894075983,
    'mean_squared_error': 67.65591656079468,
    'rmse': 8.225321669138216,
    'rmspe': 94.0097933517592}},
  {'baseline_beaten': False,
   'is_final': False,
   'metrics': {'mape': 15.696422344578455,
    'mean_absolute_error': 1.2476065193789692,
    'mean_squared_error': 2.5399464531397196,
    'rmse': 1.5937209458182193,
    