# install stuff

In [None]:
! pip3 install pyspark pandas scikit-learn

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# setup pyspark

In [None]:
import pyspark
import pyspark.sql
from pyspark.sql import functions as sf

In [None]:
sparkConf = pyspark.SparkConf() \
    .set("spark.executor.memory", "512m")\
    .set("spark.driver.memory", "512m")
ss = pyspark.sql.SparkSession.builder.config(conf=sparkConf).master('local[*]').getOrCreate()

# load data

In [None]:
train_data = ss.read.orc('./drive/MyDrive/hw_data/posts_train.orc')
test_data = ss.read.orc('./drive/MyDrive/hw_data/posts_test.orc')
channel_data = ss.read.orc('./drive/MyDrive/hw_data/channels_orc')

Визуально оценим входные данные для большего понимания их структуры. Для этого выполним train_data.show() и channel_data.show().

In [None]:
train_data.show(5)

+----------+--------+-----+--------------------+-----+---------+------------+--------------------+------------+
|channel_id|      id|tg_id|                text|views|has_image|is_forwarded|                date|forwarded_id|
+----------+--------+-----+--------------------+-----+---------+------------+--------------------+------------+
|       221|29741094| 7182|МК, а это вот про...|15368|        f|           f|2018-11-03 13:05:...|        null|
|       221|46751120| 7388|Хочу пошутить, чт...|12448|        f|           f|2018-11-26 16:15:...|        null|
|       221|32631368| 5512|Уточню: Котляр бы...|69984|        f|           f|2018-05-16 16:03:...|        null|
|       221|46751758| 7173|Да не на два лаге...|10241|        f|           f|2018-11-02 13:45:...|        null|
|       221|33073441| 3185|А что касается Уд...|46847|        f|           f|2017-08-10 13:44:...|        null|
+----------+--------+-----+--------------------+-----+---------+------------+--------------------+------

In [None]:
train_data.show(5)

+----------+--------+-----+--------------------+-----+---------+------------+--------------------+------------+
|channel_id|      id|tg_id|                text|views|has_image|is_forwarded|                date|forwarded_id|
+----------+--------+-----+--------------------+-----+---------+------------+--------------------+------------+
|       221|29741094| 7182|МК, а это вот про...|15368|        f|           f|2018-11-03 13:05:...|        null|
|       221|46751120| 7388|Хочу пошутить, чт...|12448|        f|           f|2018-11-26 16:15:...|        null|
|       221|32631368| 5512|Уточню: Котляр бы...|69984|        f|           f|2018-05-16 16:03:...|        null|
|       221|46751758| 7173|Да не на два лаге...|10241|        f|           f|2018-11-02 13:45:...|        null|
|       221|33073441| 3185|А что касается Уд...|46847|        f|           f|2017-08-10 13:44:...|        null|
+----------+--------+-----+--------------------+-----+---------+------------+--------------------+------

In [None]:
channel_data.show(5)

+--------------------+----------+--------------------+----------------+----------+----------+--------------------+--------------------+----------+----------+
|         description|is_private|         last_parsed|            name|post_count|     tg_id|               title|             updated|user_count|channel_id|
+--------------------+----------+--------------------+----------------+----------+----------+--------------------+--------------------+----------+----------+
|                    |     false|2019-01-26 16:53:...|   MoeinZchannel|     708.0|1002972402|             Moein Z|2019-01-26 16:53:...|   62411.0|      7910|
|Вокруг столько ме...|     false|2019-02-12 00:39:...|  merzotachannel|    1027.0|1336284461|            Мерзость|2019-02-12 00:39:...|   12982.0|     14121|
|🗣እኛስ የተሰቀለውን ክርስ...|     false|2019-02-10 06:57:...|christian_mezmur|    1168.0|1136987361|Christian Mezmur ...|2019-02-10 06:57:...|   21704.0|     17375|
|     Chiroyli_qomatt|     false|2018-12-08 00:54:...

# make features

### Для того, чтобы определить популярность постов (предсказать количество просмотров этих постов) определим параметры, которые могут на это повлиять.

### Сначала рассмотрим tran_data:

Ожидается, что в этом датафрейме параметр **is_forwarded** должен играть роль, так как, если пост переслали в другое место - значит его и посмотрит больше людей.

Второй параметр - это наличие картинки (**has_image**), скорее всего пост в котором есть изображение привлечет больше внимания.

Третий параметр - это объем текста (**len_text**). Здесь будет интересно проверить какие записи более читаемы - короткие сообщения или лонг-риды, я предполагаю, что у котротких записей будет больше просмотров.

Также один из параметров это популярные слова (**pop_words**), которые привлекают внимание. Если их удасться выделить, то скорее всего чем больше таких слов - тем больше читателей.

### Из channel_data, метаданных каналов, возьмем два параметра:

Первый - это приватность канала (**is_private**), так как туда могут попасть меньше людей и размер канала (**user_count**), так как огромный приватный канал тоже должен получить много просмотров.

Теперь приведем эти параметры к виду, которй будет удобно использовать модели, а т.е. заменим все буквенные значения на числовые.

In [None]:
!pip install langdetect

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting langdetect
  Downloading langdetect-1.0.9.tar.gz (981 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m981.5/981.5 kB[0m [31m46.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: langdetect
  Building wheel for langdetect (setup.py) ... [?25l[?25hdone
  Created wheel for langdetect: filename=langdetect-1.0.9-py3-none-any.whl size=993224 sha256=29d44b1916542d67c006bb3bf0691ac4c61c48eb919feb12a2d84ae424991268
  Stored in directory: /root/.cache/pip/wheels/95/03/7d/59ea870c70ce4e5a370638b5462a7711ab78fba2f655d05106
Successfully built langdetect
Installing collected packages: langdetect
Successfully installed langdetect-1.0.9


In [None]:
# Сначала постараемся реализовать выбор оппулярных слов
# Для этого выберем 10000 популярных (самых просматриваемых) статей и выпишем из каждой по 5 самых популярных слов
# После этого составим большой списк самых популярных слов размером 500, уберем дупликаты
# После посмотрим все сообщения, если в сообщении будут встречаться популярные слова, то мы будем записывать их количество, вместо текста самого сообщения

import nltk
nltk.download('punkt')
import langdetect
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from pyspark.ml.feature import Tokenizer


# загрузка списка стоп-слов для всех языков
nltk.download("stopwords")
languages = nltk.corpus.stopwords.fileids()
stop_words = {}
for lang in languages:
    stop_words[lang] = set(stopwords.words(lang))

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [None]:
# нахождение топ-10000 строк по параметру views для каждой ячейки
top_rows = train_data.filter(train_data.views >= 900000).collect()
# число 900000 выбрано эвристически, так как такое количество просмотров примерно
# у первых top-10 тыс. постов

In [None]:
# нахождение топ-5 слов для каждой из топ-10000 строк
all_top_words = set()
for row in top_rows:
    text = row.text
    if (str(text) != 'None'):
      try:
        lang = langdetect.detect(text)
      except: lang = ''
      for l in languages:
        if lang in l:
          lang = l
          break

      if len(lang) <=2: lang = 'english'
      tokens = word_tokenize(text)
      words = [word for word in tokens if word.isalpha() and word not in stop_words[lang]]
      freq_dist = nltk.FreqDist(words)
      top_words = [word for word, freq in freq_dist.most_common(5)]

    if str(top_words) != 'None':
      for word in top_words:
          all_top_words.add(word)

In [None]:
# создание списка из 500 популярных слов
popular_words = list(all_top_words)

def replace_text_with_numbers(text):
    tokens = word_tokenize(text)
    words = [word for word in tokens if word.isalpha() and word in popular_words]
    freq_dist = nltk.FreqDist(words)
    total_count = sum(freq_dist.values())
    if total_count > 0:
        return sum([freq_dist[word] for word in freq_dist]) #/ total_count
    else:
        return 0

# регистрация пользовательской функции
replace_text_with_numbers_udf = udf(lambda x:replace_text_with_numbers(x), IntegerType())

# замена значения текста на цифру
train_data = train_data.na.fill(value="", subset=["text"])
train_data = train_data.withColumn("pop_words", replace_text_with_numbers_udf(col("text")))

test_data = test_data.na.fill(value="", subset=["text"])
test_data = test_data.withColumn("pop_words", replace_text_with_numbers_udf(col("text")))

In [None]:
# Теперь заменим формат остальных интересующих нас данных в датафрейме
def make_features(df_main, df_ch_data):

    # Присоединяем данные channel_data к основному датафрейму
    df_main = df_main.join(df_ch_data, on='channel_id')

    cols = ['channel_id', 'id', 'has_image', 'pop_words', 'is_forwarded', 'text_len',
            'user_count', 'post_count', 'is_private'
            ]
    if 'views' in df_main.columns:
        cols.append('views')


    # Приводим данные к "правильному" виду
    # Все данные имющие буквенные показатель true|false переведем в бинарный 1|0 соответственно
    # Так же все, что не формат int переведм в него и избавимся от Nan и пустых значений

    df_main = df_main.withColumn('is_forwarded', (df_main.is_forwarded == 't').cast('int'))
    df_main = df_main.withColumn('has_image', (df_main.has_image == 't').cast('int'))
    df_main = df_main.withColumn('text_len', sf.length('text'))
    df_main = df_main.withColumn('is_private', (df_main.is_private == 'true').cast('int'))
    df_main = df_main.withColumn('user_count', df_main.user_count.cast('int'))
    df_main = df_main.withColumn('post_count', df_main.post_count.cast('int'))

    df_main = df_main.na.fill(0)
    df_main = df_main.na.fill(value="", subset=["text"])
    df_main = df_main.select(*cols)

    return df_main

In [None]:
#train_features = make_features(train_data, channel_data).cache()
#test_features = make_features(test_data, channel_data).cache()

In [None]:
#train_features.write.csv('/content/train_csv', mode='overwrite', header=True)

In [None]:
#test_features.write.csv('/content/test_csv', mode='overwrite', header=True)

# load features to pandas
you also can use .toPandas()

In [None]:
import subprocess
import glob
import os
import shutil
import pandas as pd

def load_and_merge_csv_from_hdfs(path, **kwargs):
    dfs = []
    for g in glob.glob(os.path.join(path, '*.csv')):
        dfs.append(pd.read_csv(g, **kwargs))
    res = pd.concat(dfs)
    res = res.set_index('id')
    return res

In [None]:
trainXY = load_and_merge_csv_from_hdfs('train_csv')
testX = load_and_merge_csv_from_hdfs('test_csv')

In [None]:
import numpy as np

Ycol = 'views'
to_drop = ['channel_id']
trainX, trainY = trainXY.drop(Ycol, axis=1).drop(to_drop, axis=1), trainXY[Ycol]
trainY = np.log(trainY + 100)

testX = testX.drop(to_drop, axis=1)

# train your model and predict test

In [None]:
from sklearn.linear_model import LinearRegression

model_lr = LinearRegression().fit(trainX, trainY)
prediction = model_lr.predict(testX)

In [None]:
prediction.shape

(244386,)

In [None]:
assert prediction.shape == (244386,)

# submit

In [None]:
! curl -H 'Cache-Control: no-cache' https://raw.githubusercontent.com/mike0sv/lsml_submit_server/2023/src/client.py -o client.py

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  1907  100  1907    0     0  27637      0 --:--:-- --:--:-- --:--:-- 28044


In [None]:
import client

In [None]:
client.make_eval(pd.DataFrame({'views': prediction}, index=testX.index),  final=True)

Enter username:
user6
Enter password:
··········


{'data': {'mape': 13.50858208089605,
  'mean_absolute_error': 1.0075095653315056,
  'mean_squared_error': 1.9025964210013084,
  'rmse': 1.3793463745561911,
  'rmspe': 22.011605678149227},
 'ok': True}

In [None]:
client.check_results()

{'2022-06-16': [{'baseline_beaten': True,
   'is_final': False,
   'metrics': {'mape': 15.51374885768446,
    'mean_absolute_error': 1.1995994387258444,
    'mean_squared_error': 2.36390744538618,
    'rmse': 1.537500388743424,
    'rmspe': 23.18651980592003}}],
 '2022-06-21': [{'baseline_beaten': True,
   'is_final': False,
   'metrics': {'mape': 11.616181456785995,
    'mean_absolute_error': 0.9052580222263774,
    'mean_squared_error': 1.4823002054660392,
    'rmse': 1.2174975176426601,
    'rmspe': 17.56000396971201}},
  {'baseline_beaten': False,
   'is_final': False,
   'metrics': {'mape': 14.580987717036257,
    'mean_absolute_error': 1.0563568704578796,
    'mean_squared_error': 2.12623228903977,
    'rmse': 1.4581605841058007,
    'rmspe': 24.303077489906087}},
  {'baseline_beaten': False,
   'is_final': False,
   'metrics': {'mape': 14.61547861828265,
    'mean_absolute_error': 1.0568801245375414,
    'mean_squared_error': 2.137339817611141,
    'rmse': 1.4619643694738735,
  