### Дипломная работа Александра Соколова

#### Предобработка данных для градиентного бустинга
Кернел 2 из 5 в разделе ML (отредактирован 21.04.2021)
---

# 1. Импорт библиотек, инициализация глобальных констант
## 1.1. Импорт библиотек

In [1]:
import pandas as pd
import gc
import os
import tqdm
import numpy as np

np.warnings.filterwarnings('ignore')

## 1.2. Глобальные константы

In [2]:
# CURRENT_DIR = './'  # имя текущей директории для локальной машины 
CURRENT_DIR = '../'  # имя текущей директории для каггл

PATH_TO_TRAIN = CURRENT_DIR + 'input/alfabattle2-sandbox/alfabattle2_sand_alfabattle2_train_transactions_contest/train_transactions_contest'
PATH_TO_TEST = CURRENT_DIR + 'input/alfabattle2-sandbox/alfabattle2_sand_alfabattle2_test_transactions_contest/test_transactions_contest'

PATH_TO_TRAIN_TARGET = CURRENT_DIR + 'input/alfabattle2-sandbox/alfabattle2_sand_alfabattle2_train_target.csv'
PATH_TO_TEST_TARGET = CURRENT_DIR + 'input/alfabattle2-sandbox/alfabattle2_sand_alfabattle2_test_target_contest.csv'

PATH_TO_WORKDIR = CURRENT_DIR + 'working/'

In [3]:
!pip freeze > requirements.txt

# 2. Вспомогательные функции

In [4]:
def read_parquet_dataset_from_local(path_to_dataset: str, 
                                    start_from: int = 0,
                                    num_parts_to_read: int = 2, 
                                    columns=None, 
                                    verbose=False,
                                    info_num_parts=False) -> pd.DataFrame:
    """
    читает num_parts_to_read партиций, преобразует их к pd.DataFrame и возвращает
    :param path_to_dataset: путь до директории с партициями
    :param start_from: номер партиции, с которой начать чтение
    :param num_parts_to_read: количество партиций, которые требуется прочитать
    :param columns: список колонок, которые нужно прочитать из партиции
    :return: pd.DataFrame
    """

    res = []
    list_paths = sorted([os.path.join(path_to_dataset, filename) for filename in os.listdir(path_to_dataset) 
                              if filename.startswith('part')])
    if info_num_parts:
        print(f'Кол-во партиций в папке: {len(list_paths)}')
    start_from = max(0, start_from)
    list_path_to_partitions = list_paths[start_from: start_from + num_parts_to_read]
    if verbose:
        print('Reading chunks:\n')
        for path_to_partition in list_path_to_partitions:
            print(path_to_pirtition)
    for path_to_parquet in tqdm.tqdm_notebook(list_path_to_partitions, 
                                              desc="Читаем файлы:"):
        temp_parquet = pd.read_parquet(path_to_parquet,columns=columns)
        res.append(temp_parquet)
        del temp_parquet
        gc.collect()
    return pd.concat(res).reset_index(drop=True)


def __amnt_pivot_table_by_column_as_frame(frame, column, agg_funcs=None) -> pd.DataFrame:
    """
    Строит pivot table для между колонкой `amnt`  и column на основе переданных aggregations_on
    :param frame: pd.DataFrame транзакций
    :param column: название колонки, на основе `amnt`  и column будет построен pivot_table
    :param agg_funcs: список из функций, которые нужно применить, по умолчанию ['mean', 'count']
    :return: pd.DataFrame
    """
    if agg_funcs is None:
        agg_funcs = ['mean', 'count']
    aggs = pd.pivot_table(frame, values='amnt',
                          index=['app_id'], columns=[column],
                          aggfunc={'amnt': agg_funcs},
                          fill_value=0.0)
    aggs.columns = [f'{col[0]}_{column}_{col[1]}' for col in aggs.columns.values]
    return aggs

def extract_basic_aggregations(transactions_frame: pd.DataFrame, 
                               cat_columns=None, 
                               agg_funcs=None) -> pd.DataFrame:
    """
    :param transactions_frame: pd.DataFrame с транзакциями
    :param cat_columns: список категориальных переменных, для которых будут построены агрегаты по `amnt`
    :param agg_funcs: список функций, который нужно применить для подсчета агрегатов, 
    :по умолчанию ['sum', 'mean', 'count']
    :return: pd.DataFrame с извлеченными признаками
    """
    if not cat_columns:
        cat_columns = CAT_COLUMNS

    pivot_tables = []
    for col in cat_columns:
        pivot_tables.append(__amnt_pivot_table_by_column_as_frame(transactions_frame, column=col,
                                                                  agg_funcs=agg_funcs))
    pivot_tables = pd.concat(pivot_tables, axis=1)

    aggs = {
        # посчитаем статистики для транзакций
        'amnt': ['mean', 'median', 'sum', 'std'],
        # посчитаем разумные агрегаты для разницы в часах между транзакциями
        'hour_diff': ['max', 'mean', 'median', 'var', 'std'],
        # добавим самую раннюю/позднюю и среднюю дату транзакции до подачи заявки на кредит
        'days_before': ['min', 'max', 'median']}

    numeric_stats = transactions_frame.groupby(['app_id']).agg(aggs)

    # дадим разумные имена новым колонкам; может не работать в python 3.5, так как порядок ключей в словаре не
    # гарантирован
    numeric_stats.columns = [k + '_' + agg for k in aggs.keys() for agg in aggs[k]]

    return pd.concat([pivot_tables, numeric_stats], axis=1).reset_index()

def prepare_transactions_dataset(path_to_dataset: str, 
                                 num_parts_to_preprocess_at_once: int = 1, 
                                 num_parts_total: int=50, 
                                 save_to_path=None, 
                                 verbose: bool=False):
    """
    :возвращает готовый pd.DataFrame с признаками, на которых можно учить модель для целевой задачи.
    :path_to_dataset: str - путь до датасета с партициями
    :num_parts_to_preprocess_at_once: int - количество партиций, которые будут одновременно держаться в памяти и обрабатываться
    :num_parts_total: int - общее количество партиций, которые нужно обработать
    :save_to_path: str - путь до папки, в которой будет сохранен каждый обработанный блок в .parquet формате. Если None, то не будет сохранен 
    :verbose: bool - логирует каждый обрабатываемый кусок данных
    """
    preprocessed_frames = []
    block = 0
    for step in tqdm.tqdm_notebook(range(0, num_parts_total, num_parts_to_preprocess_at_once), 
                                   desc="Общий прогресс препроцессинга:"):
        transactions_frame = read_parquet_dataset_from_local(path_to_dataset, 
                                                             step, 
                                                             num_parts_to_preprocess_at_once, 
                                                             verbose=verbose)
        features = extract_basic_aggregations(transactions_frame, 
                                              cat_columns=['mcc_category', 
                                                           'day_of_week', 
                                                           'operation_type'])
        if save_to_path:
            block_as_str = str(block)
            if len(block_as_str) == 1:
                block_as_str = '00' + block_as_str
            else:
                block_as_str = '0' + block_as_str
            features.to_parquet(os.path.join(save_to_path, f'processed_chunk_{block_as_str}.parquet'))
            
        preprocessed_frames.append(features)
    return pd.concat(preprocessed_frames)

def reduce_mem_usage_df(d_df: pd.DataFrame)-> [pd.DataFrame, list]:
    """
    :перебирает все столбцы датафрейма и изменяет тип данных для уменьшения использования памяти.
    """
    start_mem = d_df.memory_usage().sum() / 1024**2
    print('Размер памяти исходного датафрейма {:.2f} MB'.format(start_mem))
    
    d_log = []
    for col in d_df.columns:
        col_type = d_df[col].dtype

        if col_type != object:
            c_min = d_df[col].min()
            c_max = d_df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    d_df[col] = d_df[col].astype(np.int8)
                    d_log.append(f'{col} :from int64 to int8')
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    d_df[col] = d_df[col].astype(np.int16)
                    d_log.append(f'{col} :from int64 to int16')

    end_mem = d_df.memory_usage().sum() / 1024**2
    print('Размер памяти после оптимизации: {:.2f} MB'.format(end_mem))
    print('Уменьшено на {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))
    print('Список внесенных изменений вы можете посмотреть в логе')
    
    return d_df, d_log

# 3. Импорт и препроцессинг данных
---
## 3.1 Оценка размеров данных и необходимой памяти
В данном соревновании сырые данные представлены в формате Parquet. Это крайне эффективный бинарный формат сжатия данных по колонкам. Однако для непосредственной работы с данными и построением моделей, нам нужно их прочитать и трансформировать в pandas.DataFrame. При этом сделать это эффективно по памяти. Для примера прочитаем одну партицию в память и оценим, сколько RAM она занимает.

In [5]:
%%time
transactions_frame = read_parquet_dataset_from_local(PATH_TO_TRAIN, 
                                                     start_from=0, 
                                                     num_parts_to_read=1,
                                                     info_num_parts=True)

memory_usage_of_frame = transactions_frame.memory_usage(index=True).sum() / 10**9
expected_memory_usage = memory_usage_of_frame * 50
print(f'Объем памяти в  RAM одной партиции данных с транзакциями: {round(memory_usage_of_frame, 3)} Gb')
print(f'Ожидаемый размер в RAM всего датасета: {round(expected_memory_usage, 3)} Gb')

Кол-во партиций в папке: 50


Читаем файлы::   0%|          | 0/1 [00:00<?, ?it/s]

Объем памяти в  RAM одной партиции данных с транзакциями: 0.476 Gb
Ожидаемый размер в RAM всего датасета: 23.798 Gb
CPU times: user 1.73 s, sys: 1.71 s, total: 3.44 s
Wall time: 4.17 s


На платформе у нас выделено 16Гб (но по факту с учетом реализации python через docker менее 12ГБ). Датасет в памяти не уместится - нам потребуется значительный объем RAM или дополнительные ресурсы. Решение - читать данные итеративно. Партиции организованы таким образом, что  для конкретного клиента вся информация о его транзакциях до момента подачи заявки на кредит расположена внутри одной партиции (транзакции сгруппированы по полю `app_id`). Это позволяет загружать данные в память кусками, выделять все необходимые признаки и получать результирующий фрейм для моделирования. Для этих целей мы будем использовать функцию `utils.read_parquet_dataset_from_local`.  
По времени обработка одной партиции занимает 4,5 секунды. С учетом 50 партиций, общее время обработки должно быть в пределах 5 минут.

In [6]:
del transactions_frame
gc.collect()

20

## 3.2 Импорт и препроцессинг данных трейна

Опытным путем было установлено, что максимальное кол-во файлов, которое способна переварить оперативка за одну итерацию = 5 

In [7]:
%%time
train_data = prepare_transactions_dataset(PATH_TO_TRAIN, 
                                    num_parts_to_preprocess_at_once=5, 
                                    num_parts_total=50)

Общий прогресс препроцессинга::   0%|          | 0/10 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

CPU times: user 3min 32s, sys: 1min 54s, total: 5min 26s
Wall time: 5min 7s


In [8]:
print(f'Объем в RAM всего train датасета с агрегатами: {round(train_data.memory_usage(index=True).sum() / 10**9, 3)} Gb')

Объем в RAM всего train датасета с агрегатами: 0.979 Gb


## 3.3 Импорт и препроцессинг данных теста

In [9]:
%%time
test_data = prepare_transactions_dataset(PATH_TO_TEST, 
                                         num_parts_to_preprocess_at_once=5, 
                                         num_parts_total=50)

Общий прогресс препроцессинга::   0%|          | 0/10 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

Читаем файлы::   0%|          | 0/5 [00:00<?, ?it/s]

CPU times: user 2min 14s, sys: 58.8 s, total: 3min 13s
Wall time: 3min 12s


In [10]:
print(f'Объем в RAM всего test датасета с агрегатами: {round(test_data.memory_usage(index=True).sum() / 10**9, 3)} Gb')

Объем в RAM всего test датасета с агрегатами: 0.511 Gb


## 4. Сокращение размеров датасетов
---
Перед сохранением датасетов для дальнейшего их использования в кернелах при построении моделей и их анализа желательно сократить их размер, для этого мы используем функцию 'reduce_mem_usage_df'

In [11]:
train_data, log_reduce_mem_train = reduce_mem_usage_df(train_data)

Размер памяти исходного датафрейма 933.87 MB
Размер памяти после оптимизации: 602.05 MB
Уменьшено на 35.5%
Список внесенных изменений вы можете посмотреть в логе


In [12]:
# log_reduce_mem_train

In [13]:
test_data, log_reduce_mem_test = reduce_mem_usage_df(test_data)

Размер памяти исходного датафрейма 487.10 MB
Размер памяти после оптимизации: 314.50 MB
Уменьшено на 35.4%
Список внесенных изменений вы можете посмотреть в логе


In [14]:
# log_reduce_mem_test

# 5. Мерджинг датасетов с признаком product

In [15]:
train_targets = pd.read_csv(PATH_TO_TRAIN_TARGET)
train_targets.head()

Unnamed: 0,app_id,product,flag
0,0,3,0
1,1,1,0
2,2,1,0
3,3,1,0
4,4,1,0


In [16]:
merged_train_data = train_data.merge(train_targets[['app_id', 'product', 'flag']], on=['app_id'])

In [17]:
test_target =  pd.read_csv(PATH_TO_TEST_TARGET)
test_target.head()

Unnamed: 0,app_id,product
0,1063620,0
1,1063621,0
2,1063622,1
3,1063623,1
4,1063624,2


In [18]:
merged_test_data = test_data.merge(test_target[['app_id', 'product']], on='app_id')

# 6. Сохранение датасетов

In [19]:
merged_train_data.to_csv('./merged_train_data.csv', index=None)
merged_test_data.to_csv('./merged_test_data.csv', index=None)