# Подготовка датасета для обучения модели

In [5]:
!pip install loguru



In [21]:
import os
import numpy as np
import pandas as pd
import torch
from loguru import logger as logging
from transformers import AutoTokenizer, AutoModel
from sklearn.base import TransformerMixin, BaseEstimator
from tqdm import tqdm
tqdm.pandas()

try:
  from google.colab import drive
  drive.mount('/content/drive/')
except:
  pass

In [7]:
PATH = '/kaggle/input/ghghjg'
PATH_PREP = '/kaggle/working'

In [8]:
def fix_name(name):
    trans = {
        'jaroslav':'iaroslav',
        'zabaikal':'transbai',
        'primorie':'primor',
        'sebastop':'sevastop',
        'saratovs':'saratov',
        'voronezh':'voron'
    }
    if name in trans.keys():
        return(trans[name])
    return(name)


def clear_region_name(region_name):
    drop_words = [' kray', ' krai',' oblast', ' obl', ' republic', 'republic of ']+[x for x in '!"#$%&\'()*+,./:;<=>?@[\\]^_`{|}~’ ']
    region_name = region_name.lower()
    for drop_word in drop_words:
        region_name = region_name.replace(drop_word, '')
    return(fix_name(region_name[:8].replace('y','i')))

In [9]:
class BertTransformer(BaseEstimator, TransformerMixin):
    def __init__(
        self,
        tokenizer=os.path.join(PATH_PREP, 'tokenizer'),
        model=os.path.join(PATH_PREP, 'model'),
    ):
        logging.info('Loading tokenizer...')
        if type(tokenizer) == str:
          try:
            self.tokenizer = AutoTokenizer.from_pretrained(tokenizer)
          except:
            logging.info(f'{tokenizer} is not available. Downloading tokenizer...')
            self.tokenizer = AutoTokenizer.from_pretrained("cointegrated/rubert-tiny2")
            self.tokenizer.save_pretrained(tokenizer)
            logging.info('Tokenizer is downloaded.')
        else:
          self.tokenizer = tokenizer
        logging.info('Tokenizer is loaded.')

        logging.info('Loading language model...')
        if type(model) == str:
          try:
            self.model = AutoModel.from_pretrained(model)
          except:
            logging.info(f'{model} is not available. Downloading model...')
            self.model = AutoModel.from_pretrained("cointegrated/rubert-tiny2")
            self.model.save_pretrained(model)
            logging.info('Model is downloaded.')
        else:
            self.model = model
        logging.info('Language model is loaded.')
        try:
          self.model.cuda()
        except:
          pass

    def embed_bert_cls(self, text: str):
        model = self.model
        tokenizer = self.tokenizer
        t = tokenizer(text, padding=True, truncation=True, return_tensors='pt')
        with torch.no_grad():
            model_output = model(**{k: v.to(model.device) for k, v in t.items()})
        embeddings = model_output.last_hidden_state[:, 0, :]
        embeddings = torch.nn.functional.normalize(embeddings)
        return embeddings[0].cpu().numpy()

    def make_emb_feats(self, row):
        emb = self.embed_bert_cls(row['title'])
        res = pd.concat([pd.Series(row['rutube_video_id'])] + [pd.Series(e for e in emb)])
        return res

    def transform(self, text: str):
        return text.progress_apply(lambda x: self.make_emb_feats(x), axis=1)

    def fit(self, X, y=None):
        """No fitting necessary so we just return ourselves"""
        return self

In [22]:
class Prepeocessor():
  def __init__(
      self,
      video,
      events,
  ):
    self.video = video
    self.events = events
    self.videos_for_agg = None

  def gen_videos(self):
    cols = set(self.video.columns)
    logging.info('Generating video features...')
    if os.path.isfile(os.path.join(PATH_PREP, 'videos_for_agg.pq')):
      self.videos_for_agg = pd.read_parquet(os.path.join(PATH_PREP, 'videos_for_agg.pq'))
    else:
      self.videos_for_agg = self.video[['author_id', 'rutube_video_id', 'duration']]
      self.videos_for_agg.to_parquet(os.path.join(PATH_PREP, 'videos_for_agg.pq'))

    if os.path.isfile(os.path.join(PATH_PREP, 'title_freq.pq')):
      title_freq = pd.read_parquet(os.path.join(PATH_PREP, 'title_freq.pq'))
    else:
      title_freq = self.video.groupby(by='title')['rutube_video_id'].agg('nunique').reset_index(drop=False).rename(columns={'rutube_video_id': 'title_freq'})
      title_freq.to_parquet(os.path.join(PATH_PREP, 'title_freq.pq'))
    self.video = self.video.merge(title_freq, how='left', on='title')
    logging.info('Video features are generated.')
    logging.info(f'New cols: {", ".join(set(self.video.columns).difference(cols))}.')

  def gen_events(self):
    logging.info('Generating events features...')
    cols = set(self.events.columns)
    
    self.events = self.events.sort_values(["event_timestamp", "viewer_uid"])
    
    regions = pd.read_csv(os.path.join(PATH, 'regions_dict.csv'))
    self.events['region_for_time'] = self.events['region'].apply(clear_region_name)
    self.events['event_time'] = pd.to_datetime(self.events['event_timestamp'].apply(lambda x: x.replace('+03:00','')))
    self.events = self.events.merge(regions.rename(columns = {'region':'region_for_time'}), how = 'left', left_on = 'region_for_time', right_on = 'region_for_time')
    del regions
    
    self.events['hours_corrector'] = self.events['hours_corrector'].fillna(0) 
    self.events['local_event_timestamp'] = self.events['event_time'] + self.events['hours_corrector'].astype('timedelta64[h]')
    self.events['weekday'] = self.events['local_event_timestamp'].dt.weekday
    self.events['hour'] = self.events['local_event_timestamp'].dt.hour
    self.events = self.events.drop(columns=['hours_corrector', 'region_for_time', 'local_event_timestamp'])
    
    self.events['event_timestamp'] = pd.to_datetime(self.events['event_timestamp'])
    self.events['time_diff'] = self.events.groupby('viewer_uid')['event_timestamp'].diff().dt.total_seconds() / 60
    self.events['time_diff2'] = self.events.groupby('viewer_uid')['time_diff'].shift(-1)
    self.events['time_diff2'] = self.events['time_diff2'] - (self.events['total_watchtime'] / 60)
    self.events['time_diff'] = self.events.groupby('viewer_uid')['time_diff2'].shift(1)
    self.events['new_session'] = (self.events['time_diff'] > 120) | self.events['time_diff'].isna()
    self.events['session_id'] = self.events.groupby('viewer_uid')['new_session'].cumsum() 

    self.events['ua_device_type_set'] = self.events['ua_device_type'].apply(lambda x: set([x]))
    self.events['ua_device_type_set_diff'] = self.events.groupby('viewer_uid')['ua_device_type_set'].diff()
    self.events['ua_device_type_set_diff'] = self.events['ua_device_type_set_diff'].apply(lambda x: {} if pd.isna(x) else x)
    self.events['ua_device_type_set_diff'] = self.events['ua_device_type_set_diff'].apply(lambda x: np.NaN if len(x) == 0 else 1)

    self.events['ua_client_type_set'] = self.events['ua_client_type'].apply(lambda x: set([x]))
    self.events['ua_client_type_set_diff'] = self.events.groupby('viewer_uid')['ua_client_type_set'].diff()
    self.events['ua_client_type_set_diff'] = self.events['ua_client_type_set_diff'].apply(lambda x: {} if pd.isna(x) else x)
    self.events['ua_client_type_set_diff'] = self.events['ua_client_type_set_diff'].apply(lambda x: np.NaN if len(x) == 0 else 1)

    self.events['ua_client_name_set'] = self.events['ua_client_name'].apply(lambda x: set([x]))
    self.events['ua_client_name_set_diff'] = self.events.groupby('viewer_uid')['ua_client_name_set'].diff()
    self.events['ua_client_name_set_diff'] = self.events['ua_client_name_set_diff'].apply(lambda x: {} if pd.isna(x) else x)
    self.events['ua_client_name_set_diff'] = self.events['ua_client_name_set_diff'].apply(lambda x: np.NaN if len(x) == 0 else 1)
    self.events = self.events.drop(['ua_client_type_set', 'ua_device_type_set', 'ua_client_name_set'], axis=1)
    self.events[[
        'ua_device_type_set_diff', 'ua_client_type_set_diff',
        'ua_client_name_set_diff', 'time_diff', 'time_diff2'
    ]] = self.events[[
        'ua_device_type_set_diff', 'ua_client_type_set_diff',
        'ua_client_name_set_diff', 'time_diff', 'time_diff2']].fillna(0)
    
    if os.path.isfile(os.path.join(PATH_PREP, 'first_known_view_timelag.pq')):
      first_known_view_timelag = pd.read_parquet(os.path.join(PATH_PREP, 'first_known_view_timelag.pq'))
    else:
      first_known_view_timelag = self.events.groupby(by='rutube_video_id')['event_timestamp'].agg('min').reset_index(drop=False).rename(columns={'event_timestamp': 'timelag'})
      first_known_view_timelag.to_parquet(os.path.join(PATH_PREP, 'first_known_view_timelag.pq'))
    self.events = self.events.merge(first_known_view_timelag, how='left', on='rutube_video_id')
    del first_known_view_timelag
    self.events['timelag'] = (pd.to_datetime(self.events['event_timestamp']) - pd.to_datetime(self.events['timelag'])).dt.total_seconds()

    if os.path.isfile(os.path.join(PATH_PREP, 'video_id_freq.pq')):
      video_id_freq = pd.read_parquet(os.path.join(PATH_PREP, 'video_id_freq.pq'))
    else:
      video_id_freq = (self.events.groupby(by='rutube_video_id')['viewer_uid'].agg('count') / self.events.rutube_video_id.nunique()).reset_index(drop=False).rename(columns={'viewer_uid': 'video_id_freq'})
      video_id_freq.to_parquet(os.path.join(PATH_PREP, 'video_id_freq.pq'))
    self.events = self.events.merge(video_id_freq, how='left', on='rutube_video_id') 
    
    logging.info('Events features are generated.')
    logging.info(f'New cols: {", ".join(set(self.events.columns).difference(cols))}.')

  def gen_mixed(self):
    logging.info('Generating mixed features...')
    cols = set(self.events.columns)
    if os.path.isfile(os.path.join(PATH_PREP, 'author_id_freq.pq')):
      author_id_freq = pd.read_parquet(os.path.join(PATH_PREP, 'author_id_freq.pq'))
    else:
      author_id_freq = (self.events.merge(
          self.videos_for_agg, how='left', on='rutube_video_id'
          ).groupby(by='author_id')['viewer_uid'].agg('count') / len(self.events)).reset_index(drop=False).rename(columns={'viewer_uid': 'author_id_freq'})
      author_id_freq.to_parquet(os.path.join(PATH_PREP, 'author_id_freq.pq'))
    self.events = self.events.merge(self.videos_for_agg, how='left', on='rutube_video_id')
    self.events = self.events.merge(author_id_freq, how='left', on='author_id').drop(columns='author_id')
    self.events['frac_watch'] = self.events['total_watchtime'] * 60 / self.events['duration']
    del author_id_freq
    logging.info('Mixed features are generated.')
    logging.info(f'New cols: {", ".join(set(self.events.columns).difference(cols))}.')

## Чтение данных и сбор датасета

In [23]:
def join_csvs(path_events: [list[str], str] = [
    os.path.join(PATH, 'all_events.csv'),
    os.path.join(PATH, 'train_events.csv')
                                          ],
              path_videos: [list[str], str] = [
    os.path.join(PATH, 'video_info_v2.csv'),
                                          ],
              saving_path: [None, str] = None,
              make_title_emb: bool = False) -> pd.DataFrame:
  """
  Функция join_csvs возвращает DataFrame с объединенными данными о событиях
  и известными данными о видео, упоминаемых в данных с событиями. Информация
  о видео присоединяется к событиям слева по ключу `rutube_video_id`.

  Аргументы
  ----------
  path_events: list, str, default = [
    '/content/drive/MyDrive/Colab Notebooks/rutube/datasets/all_events.csv',
    '/content/drive/MyDrive/Colab Notebooks/rutube/datasets/train_events.csv'
                                      ]
      Строка или список строк, содержащие пути до csv-файлов с логами событий;
  path_videos: list, str, default =
    '/content/drive/MyDrive/Colab Notebooks/rutube/datasets/video_info_v2.csv'
      Строка или список строк, содержащие пути до csv-файлов с описанием видео;

  saving_path: None, str, default = None
      Строка, содержащая путь для записи csv-файла с итоговой таблицей. Если
      None, записи не происходит.

  Возвращает
  ----------
  data: pd.DataFrame
      Таблица, содержащая объединенные данные по всем файлам.
  """

  # инициализируем датасеты

  # если путь строка, то читаем csv-файл по пути, указанному в строке
  logging.info('Loading events datasets...')
  if type(path_events) == str:
    events = pd.read_csv(path_events)
  # иначе объединяем таблицы, считанные из списка строк с путями
  else:
    events = pd.concat([pd.read_csv(x) for x in path_events])
  logging.info('Events datasets are loaded.')

  if type(path_videos) == str:
    logging.info('Loading video info datasets...')
    videos = pd.read_csv(path_videos)
  else:
    videos = pd.concat([pd.read_csv(x) for x in path_videos])
  logging.info('Videos info datasets are loaded.')

  logging.info('Extracting features...')
  p = Prepeocessor(videos, events)
  p.gen_events()
  p.gen_videos()
  p.gen_mixed()
  events = p.events
  logging.info('Features are extracted.')



  # формирование эмбеддингов видео
  if make_title_emb:
    logging.info('Cooking videos titles embeddings...')
    bert_dicti = pd.DataFrame()
    text_vectorizer = BertTransformer()
    bert_dicti['title'] = pd.Series(videos.title.unique())
    bert_dicti = videos[['title', 'rutube_video_id']].merge(
        bert_dicti, how='left', on='title'
        )
    bert_dicti = pd.concat([bert_dicti, BertTransformer().fit_transform(bert_dicti)], axis=1)
    bert_dicti.columns = ['title', 'rutube_video_id'] + [f'e_{i}' for i in range(1, bert_dicti.shape[1])]
    logging.info('Videos titles embeddings are ready.')

  if make_title_emb:
    logging.info('Merging embeddings...')
    # присоединяем по ключу к событиям эмбеддинги описаний видео, упомянутых в них
    #events = (pd.read_csv(os.path.join(PATH, 'train_events.csv'))[['viewer_uid', 'rutube_video_id']]).merge(events, on=['viewer_uid', 'rutube_video_id'], how='left')
    events = events.merge(bert_dicti, how='left', on='rutube_video_id')
    logging.info('Data is merged.')

  # сохраняем итоговую таблицу в csv-файл, если указан путь для сохранения
  if saving_path:
    logging.info('Saving prepared table as csv...')
    events.to_csv(saving_path)
    logging.info('csv-file is saved.')
  return events

In [24]:
data = join_csvs(make_title_emb=False)

[32m2024-09-29 02:55:09.730[0m | [1mINFO    [0m | [36m__main__[0m:[36mjoin_csvs[0m:[36m39[0m - [1mLoading events datasets...[0m
[32m2024-09-29 02:55:33.620[0m | [1mINFO    [0m | [36m__main__[0m:[36mjoin_csvs[0m:[36m45[0m - [1mEvents datasets are loaded.[0m
[32m2024-09-29 02:55:35.305[0m | [1mINFO    [0m | [36m__main__[0m:[36mjoin_csvs[0m:[36m52[0m - [1mVideos info datasets are loaded.[0m
[32m2024-09-29 02:55:35.306[0m | [1mINFO    [0m | [36m__main__[0m:[36mjoin_csvs[0m:[36m54[0m - [1mExtracting features...[0m
[32m2024-09-29 02:55:35.307[0m | [1mINFO    [0m | [36m__main__[0m:[36mgen_events[0m:[36m30[0m - [1mGenerating events features...[0m
[32m2024-09-29 03:02:38.033[0m | [1mINFO    [0m | [36m__main__[0m:[36mgen_events[0m:[36m93[0m - [1mEvents features are generated.[0m
[32m2024-09-29 03:02:38.035[0m | [1mINFO    [0m | [36m__main__[0m:[36mgen_events[0m:[36m94[0m - [1mNew cols: video_id_freq, timelag, week

In [25]:
data.to_parquet(os.path.join(PATH_PREP, 'all_features.parquet'))

In [30]:
%cd /kaggle/working/
from IPython.display import FileLink
csv_file = 'val.parquet'
print("CSV file 'all_features.parquet' exported successfully.")
FileLink(csv_file)

/kaggle/working
CSV file 'all_features.parquet' exported successfully.


In [28]:
data.merge(pd.read_parquet('/kaggle/input/ghghjg/train_targets.pq'), on='viewer_uid', how='right').to_parquet(os.path.join(PATH_PREP, 'train.parquet'))
data.merge(pd.read_parquet('/kaggle/input/ghghjg/val_targets.pq'), on='viewer_uid', how='right').to_parquet(os.path.join(PATH_PREP, 'val.parquet'))

In [32]:
pd.read_parquet(os.path.join(PATH_PREP, 'val.parquet'))

Unnamed: 0,event_timestamp,region,ua_device_type,ua_client_type,ua_os,ua_client_name,total_watchtime,rutube_video_id,viewer_uid,event_time,...,ua_client_type_set_diff,ua_client_name_set_diff,timelag,video_id_freq,duration,author_id_freq,frac_watch,age,sex,age_class
0,2024-06-26 22:41:17+03:00,Nizhny Novgorod Oblast,desktop,browser,Windows,Yandex Browser,784,video_163973,10451714,2024-06-26 22:41:17,...,0.0,0.0,2238666.0,0.000135,787482,0.001338,0.059735,20,female,0
1,2024-06-27 20:06:42+03:00,Nizhny Novgorod Oblast,desktop,browser,Windows,Yandex Browser,103,video_20869,10451714,2024-06-27 20:06:42,...,0.0,0.0,2240910.0,0.000040,767257,0.001338,0.008055,20,female,0
2,2024-06-27 21:16:46+03:00,Nizhny Novgorod Oblast,desktop,browser,Windows,Yandex Browser,1932,video_315162,10451714,2024-06-27 21:16:46,...,0.0,0.0,1130274.0,0.005119,3714347,0.003729,0.031209,20,female,0
3,2024-06-27 21:51:19+03:00,Nizhny Novgorod Oblast,desktop,browser,Windows,Yandex Browser,2773,video_51778,10451714,2024-06-27 21:51:19,...,0.0,0.0,1129700.0,0.005500,3628416,0.003729,0.045855,20,female,0
4,2024-06-11 09:51:05+03:00,Lipetsk Oblast,smartphone,browser,Android,Firefox Mobile,535,video_193920,10380437,2024-06-11 09:51:05,...,0.0,0.0,468371.0,0.003920,220550,0.000258,0.145545,36,male,2
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
352502,2024-06-13 13:00:02+03:00,Moscow,smartphone,browser,Android,Chrome Mobile,72,video_172538,10684284,2024-06-13 13:00:02,...,0.0,0.0,446677.0,0.001620,197684,0.000811,0.021853,28,male,1
352503,2024-06-13 13:01:26+03:00,Moscow,smartphone,browser,Android,Chrome Mobile,62,video_148871,10684284,2024-06-13 13:01:26,...,0.0,0.0,446682.0,0.002133,319150,0.000811,0.011656,28,male,1
352504,2024-06-16 19:17:19+03:00,Moscow,smartphone,browser,Android,Chrome Mobile,42,video_156614,10684284,2024-06-16 19:17:19,...,0.0,0.0,334260.0,0.003931,190681,0.001353,0.013216,28,male,1
352505,2024-06-26 05:35:16+03:00,Moscow Oblast,smartphone,browser,Android,Chrome Mobile,33,video_175738,10684284,2024-06-26 05:35:16,...,0.0,0.0,226937.0,0.001688,181904,0.001353,0.010885,28,male,1
