In [1]:
!pip install pyarrow
!pip install fastparquet

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


In [12]:
import datetime
import multiprocessing
from multiprocessing import Lock, Process, Queue, current_process
import time
import queue # imported for using queue.Empty exception
import warnings

import numpy as np
import pandas as pd
import plotly.express as px
import plotly.figure_factory as ff


# pof.init_notebook_mode()
# pio.renderers.default = "png"


def set_up_printing():
    """Sets up display parameters"""

    max_dimensions_display_limit = 1000
    pd.set_option('display.max_columns', max_dimensions_display_limit)
    pd.set_option('display.max_colwidth', None)

    pd.set_option('display.max_rows', max_dimensions_display_limit)
    pd.set_option('display.width', None)

    pd.set_option('display.max_info_columns', max_dimensions_display_limit)
    pd.set_option('display.max_info_rows', max_dimensions_display_limit)

    warnings.filterwarnings('ignore')


def get_low_informative(dataframe, limit):
    """Gets low informative columns"""
    low_information_cols = []

    for col in dataframe.columns:
        # наибольшая относительная частота значения признака
        top_freq = dataframe[col].value_counts(normalize=True).max()
        # доля уникальных значений признака
        nunique_ratio = dataframe[col].nunique() / dataframe[col].count()
        # сравниваем с пороговым значением заданным экспертно
        if top_freq > limit:
            print(f'{col}: {round(top_freq * 100, 2)}% одинаковых значений')
            low_information_cols.append(col)
        # сравниваем долю уникальных значений с порогом
        if nunique_ratio > limit:
            print(f'{col}: {round(nunique_ratio * 100, 2)}% уникальных значений')
            low_information_cols.append(col)

    return low_information_cols


def info_aux_and_nans(dataframe, message):
    """Prints dataframe info and shows nan stats"""
    print(message, '\n')
    dataframe.info()
    dataframe.isnull().sum()
    print('\n')


def shape(dataframe, message):
    """Prints dataframe shape"""
    print(message, '\n')
    print(' - размерность набора данных - ', dataframe.shape)
    print('\n')


def info_aux_and_head(dataframe, message):
    """Prints dataframe info plas prints first 3 lines"""
    print(message, '\n')
    dataframe.info()
    print(dataframe.head(3))
    print('\n')


def get_vector(x):
    print('debug: summing..')
    x_sum = x.sum()
    x_sum['user_id'] = x.iloc[0, 0]
    return x_sum


def do_job(tasks_to_accomplish, tasks_that_are_done):
    while True:
        try:
            task = tasks_to_accomplish.get_nowait()
        except queue.Empty:

            break
        else:
            vector = get_vector(task)
            print(vector)
            tasks_that_are_done.put(vector)
            time.sleep(.5)
    return True


# настроим отображение при выводе в консоль
set_up_printing()

print(datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"))

# считаем данные и одновременно удалим пустые строки
dataframe = pd.read_parquet(r'C:\kM\mml\dataton_2023\train_mfti.parquet')
shape(dataframe, 'Входной датасет')
get_low_informative(dataframe, 0.99)
info_aux_and_nans(dataframe,
                  'Входной датасет после удаления пустых строк, дубликатов и проверки на информативность')

dataframe = dataframe.head(1000).drop_duplicates()

# в случае продакшен проекта скорее всего стоило бы рассчитать на всех событиях, предварительно
# отбирая данные на основе требований заказчика в первую очередь, а также разведочного анализа
# в данном случае отбросим все preview значения, что позволит несколько уменьшить размерность
dataframe = dataframe[dataframe['event_type'].isin(
    ['show_vacancy', 'click_contacts', 'click_favorite', 'click_phone', 'click_response'])]
dataframe['user_id'] = dataframe['user_id'].fillna(dataframe['cookie_id'])
dataframe['activity'] = dataframe['vacancy_id_'].astype(str) + '_' + dataframe['event_type'].astype(str)
dataframe = dataframe[['user_id', 'activity']]

print(datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
print('one hot encoding is in progress..')

dataframe = pd.get_dummies(dataframe, columns=['activity'], sparse=True)

print(datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
print('grouping by user is in progress..')

grouped_by_user = dataframe.groupby(['user_id'], sort=False, group_keys=False)[
    ['user_id'] + dataframe.columns.tolist()]


25/04/2023 05:55:44
Входной датасет 

 - размерность набора данных -  (12292588, 6)


Входной датасет после удаления пустых строк, дубликатов и проверки на информативность 

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 12292588 entries, 0 to 12292587
Data columns (total 6 columns):
 #   Column           Dtype 
---  ------           ----- 
 0   event_date       object
 1   event_timestamp  int64 
 2   vacancy_id_      int64 
 3   cookie_id        object
 4   user_id          object
 5   event_type       object
dtypes: int64(2), object(4)
memory usage: 562.7+ MB


25/04/2023 05:56:02
one hot encoding is in progress..
25/04/2023 05:56:02
grouping by user is in progress..


In [13]:
print(datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
print('creation of dataframes from groups is in progress..')

tasks_to_accomplish = Queue()
i = 0
for separated_gdf in (grouped_by_user.get_group(x) for x in grouped_by_user.groups):
    i = i + 1
    if i % 1000 == 0:
        print('Separated one more 100 of gdf: ' + str(i))
    tasks_to_accomplish.put(separated_gdf)

tasks_to_accomplish.qsize()

25/04/2023 05:56:45
creation of dataframes from groups is in progress..


85

In [14]:
print(datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
print('multiprocessing user vectorization is in progress..')

number_of_processes = 64
tasks_that_are_done = Queue()
processes = []

for w in range(number_of_processes):
    p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done))
    processes.append(p)
    p.start()

for p in processes:
    p.join()
    
tasks_that_are_done.qsize()

25/04/2023 05:56:59
multiprocessing user vectorization is in progress..


0

In [7]:
print(datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
print('writing to disc is in progress..')

user_vectors_list = list()
while not tasks_that_are_done.empty():
    print(datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
    user_vectors_list.append(tasks_that_are_done.get())

user_vectors = pd.DataFrame(user_vectors_list)
user_vectors.to_csv(r'C:\Users\M_N_K\OneDrive\Desktop\kosta_dataton_2023\user_vectors.to_csv')

user_vectors

25/04/2023 05:47:48
writing to disc is in progress..
