In [79]:
import os
from typing import List, Optional, Dict, Tuple, Any
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.metrics import roc_auc_score
from catboost import CatBoostClassifier, Pool
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer
from sklearn.impute import SimpleImputer

In [52]:
# Создаём внутреннюю папку проекта
os.makedirs('pipeline', exist_ok=True) 

# Lists of features for the functions

In [29]:
# rn - уникальный признак

# Бинаризированные
pre_features = [
    'pre_since_opened',
    'pre_since_confirmed',
    'pre_pterm',
    'pre_fterm',
    'pre_till_pclose',
    'pre_till_fclose',
    'pre_loans_credit_limit',
    'pre_loans_next_pay_summ',
    'pre_loans_outstanding',
    'pre_loans_max_overdue_sum',
    'pre_loans_credit_cost_rate',
    'pre_loans5',
    'pre_loans530',
    'pre_loans3060',
    'pre_loans6090',
    'pre_loans90',
    'pre_util',
    'pre_over2limit',
    'pre_maxover2limit'
]

# Закодированные
enc_features = [
    'enc_loans_account_holder_type',
    'enc_loans_credit_status',
    'enc_loans_credit_type',
    'enc_loans_account_cur'
]

# Статусы ежемесячных платежей
enc_paym_features = [
    'enc_paym_0',
    'enc_paym_1',
    'enc_paym_2',
    'enc_paym_3',
    'enc_paym_4',
    'enc_paym_5',
    'enc_paym_6',
    'enc_paym_7',
    'enc_paym_8',
    'enc_paym_9',
    'enc_paym_10',
    'enc_paym_11',
    'enc_paym_12',
    'enc_paym_13',
    'enc_paym_14',
    'enc_paym_15',
    'enc_paym_16',
    'enc_paym_17',
    'enc_paym_18',
    'enc_paym_19',
    'enc_paym_20',
    'enc_paym_21',
    'enc_paym_22',
    'enc_paym_23',
    'enc_paym_24'
]

#  Флаги
flag_features = [
    'is_zero_loans5',
    'is_zero_loans530',
    'is_zero_loans3060',
    'is_zero_loans6090',
    'is_zero_loans90',
    'is_zero_util',
    'is_zero_over2limit',
    'is_zero_maxover2limit',
    'pclose_flag',
    'fclose_flag'
]

## List of features to download from the original dataset

In [18]:
df_source = pd.read_csv('prepared_data/source_data_train_1.csv')
df_source.shape

(20931476, 61)

In [19]:
df_result = pd.read_csv('prepared_data/cut_corr_imp_train.csv')
df_result.shape

(2400000, 61)

In [60]:
df_source_columns = df.columns.tolist()
df_source_columns[:10]

['id',
 'rn',
 'pre_since_opened',
 'pre_since_confirmed',
 'pre_pterm',
 'pre_fterm',
 'pre_till_pclose',
 'pre_till_fclose',
 'pre_loans_credit_limit',
 'pre_loans_next_pay_summ']

In [61]:
df_result_columns = df_train.columns.tolist()
df_result_columns[:10]

['id',
 'flag',
 'is_zero_sum_prop_1',
 'enc_paym_avg_0_1_this_year_diff',
 'pre_util_prop_3',
 'enc_loans_credit_type_prop_0',
 'pre_till_pclose_prop_10',
 'pre_util_prop_6',
 'pre_loans_outstanding_prop_1',
 'pre_util_mean_freq']

In [62]:
'''
Формируем список колонок из df_source_columns,
которые НЕ встречаются ни в одном названии из df_result_columns как подстрока.
'''
drop_list = []
for col_source in df_source_columns:
    found = False
    for col_result in df_result_columns:
        if col_source in col_result:
            found = True
            break
    if not found:
        drop_list.append(col_source)

print(len(drop_list))
drop_list

30


['pre_loans_total_overdue',
 'pre_loans3060',
 'pre_loans6090',
 'pre_loans90',
 'is_zero_loans3060',
 'is_zero_loans6090',
 'is_zero_loans90',
 'pre_maxover2limit',
 'is_zero_util',
 'is_zero_maxover2limit',
 'enc_paym_3',
 'enc_paym_4',
 'enc_paym_5',
 'enc_paym_6',
 'enc_paym_7',
 'enc_paym_11',
 'enc_paym_12',
 'enc_paym_13',
 'enc_paym_14',
 'enc_paym_15',
 'enc_paym_16',
 'enc_paym_17',
 'enc_paym_18',
 'enc_paym_19',
 'enc_paym_20',
 'enc_paym_21',
 'enc_paym_22',
 'enc_paym_23',
 'pclose_flag',
 'fclose_flag']

In [63]:
needed_columns = [x for x in df_source_columns if x not in drop_list]

print(len(needed_columns))
needed_columns[:10]

31


['id',
 'rn',
 'pre_since_opened',
 'pre_since_confirmed',
 'pre_pterm',
 'pre_fterm',
 'pre_till_pclose',
 'pre_till_fclose',
 'pre_loans_credit_limit',
 'pre_loans_next_pay_summ']

In [64]:
'''
Добавим недостающие признаки из групп flag_features и enc_paym _features, 
для правильной работы функций обрабатывающих эти группы. 
'''
features_list= [
    'is_zero_loans3060',
    'is_zero_loans6090',
    'is_zero_loans90',
    'enc_paym_3',
    'enc_paym_4',
    'enc_paym_5',
    'enc_paym_6',
    'enc_paym_7',
    'enc_paym_11',
    'enc_paym_12',
    'enc_paym_13',
    'enc_paym_14',
    'enc_paym_15',
    'enc_paym_16',
    'enc_paym_17',
    'enc_paym_18',
    'enc_paym_19',
    'enc_paym_20',
    'enc_paym_21',
    'enc_paym_22',
    'enc_paym_23'
]

# Список признаков для скачивания из исходного датасета
needed_columns = needed_columns + features_list

print(len(needed_columns))
needed_columns[:10]

52


['id',
 'rn',
 'pre_since_opened',
 'pre_since_confirmed',
 'pre_pterm',
 'pre_fterm',
 'pre_till_pclose',
 'pre_till_fclose',
 'pre_loans_credit_limit',
 'pre_loans_next_pay_summ']

# Pipeline

In [68]:
# СКАЧИВАЕМ ИСХОДНЫЙ ДАТАСЕТ
# Путь до данных в проекте
path = 'train_data/'

def read_parquet_dataset_from_local(
    path_to_dataset: str,
    start_from: int = 0,
    num_parts_to_read: int = 2,
    columns: Optional[List[str]] = None,
    verbose: bool = False
) -> pd.DataFrame:
    '''
    Функция читает num_parts_to_read партиций, преобразовывает их к pd.DataFrame и возвращает.

    Параметры:
    path_to_dataset : путь до директории с партициями
    start_from : номер партиции, с которой нужно начать чтение
    num_parts_to_read : количество партиций, которые требуется прочитать
    columns : список колонок, которые нужно прочитать из партиции
    verbose : выводить ли дополнительную информацию
    
    Возвращает:
    pd.DataFrame 
    '''
    res = []
    dataset_paths = sorted(
        os.path.join(path_to_dataset, filename)
        for filename in os.listdir(path_to_dataset)
        if filename.startswith('train')
    )

    if verbose:
        print('Dataset paths:')
        for path in dataset_paths:
            print(path)

    start_from = max(0, start_from)
    chunks = dataset_paths[start_from: start_from + num_parts_to_read]

    if verbose:
        print('Reading chunks:')
        for chunk in chunks:
            print(chunk)

    for chunk_path in tqdm(chunks, desc="Reading dataset with pandas"):
        if verbose:
            print('Reading chunk:', chunk_path)
        chunk = pd.read_parquet(chunk_path, columns=columns)
        res.append(chunk)

    return pd.concat(res).reset_index(drop=True)

def prepare_transactions_dataset(
    path_to_dataset: str,
    num_parts_to_preprocess_at_once: int = 1,
    num_parts_total: int = 50,
    save_to_path: str = None,
    verbose: bool = False,
    columns: Optional[List[str]] = None
) -> pd.DataFrame:
    '''
    Функция возвращает исходный pd.DataFrame с признаками из которых нужно собрать
    учебный датасет.

    Параметры:
    path_to_dataset : путь до датасета с партициями
    num_parts_to_preprocess_at_once : количество партиций, 
        которые будут одновременно держаться и обрабатываться в памяти
    num_parts_total : общее количество партиций, которые нужно обработать
    save_to_path : путь до папки для сохранения обработанных блоков в .parquet-формате; 
        если None, сохранение не происходит
    verbose : логировать каждую обрабатываемую часть данных
    columns : список колонок, которые нужно оставить

    Возвращает:
    pd.DataFrame : датафрейм с объединёнными данными
    '''
    preprocessed_frames = []

    for step in tqdm(range(0, num_parts_total, num_parts_to_preprocess_at_once),
                     desc="Transforming transactions data"):
        transactions_frame = read_parquet_dataset_from_local(
            path_to_dataset,
            start_from=step,
            num_parts_to_read=num_parts_to_preprocess_at_once,
            verbose=verbose,
            columns=columns
        )

       # Записываем подготовленные данные в файл
        if save_to_path:
            block_as_str = str(step)
            if len(block_as_str) == 1:
                block_as_str = '00' + block_as_str
            else:
                block_as_str = '0' + block_as_str
            transactions_frame.to_parquet(os.path.join(save_to_path, f'processed_chunk_{block_as_str}.parquet'))

        preprocessed_frames.append(transactions_frame)
    
    return pd.concat(preprocessed_frames)

In [143]:
'''
Собираем исходный датасет из parquet файлов,  
скачиваем только необходимые колонки
'''
data = prepare_transactions_dataset(
    path,
    num_parts_to_preprocess_at_once=1,
    num_parts_total=12,
    save_to_path='train_data/',
    columns=needed_columns) 

# Загружаем датасет с целевой переменной
target = pd.read_csv('train_target.csv')

# Делим датасет с целевой переменной на train/test части
y_train, y_test  = train_test_split(target, train_size=0.8, random_state=0, stratify=target.flag)

# Забираем наборы id из train/test
train_id = y_train['id'].values
test_id = y_test['id'].values

# На основе наборов id делим исходный датасет на train/test части
X_train = data.set_index('id').loc[train_id].reset_index()
X_test = data.set_index('id').loc[test_id].reset_index()

# Сбросим индексы для приведения к единому виду с X_train/X_test 
y_train = y_train.reset_index(drop=True)['flag']
y_test = y_test.reset_index(drop=True)['flag']

X_train.shape, X_test.shape, y_train.shape, y_test.shape

Transforming transactions data:   0%|          | 0/12 [00:00<?, ?it/s]

Reading dataset with pandas:   0%|          | 0/1 [00:00<?, ?it/s]

Reading dataset with pandas:   0%|          | 0/1 [00:00<?, ?it/s]

Reading dataset with pandas:   0%|          | 0/1 [00:00<?, ?it/s]

Reading dataset with pandas:   0%|          | 0/1 [00:00<?, ?it/s]

Reading dataset with pandas:   0%|          | 0/1 [00:00<?, ?it/s]

Reading dataset with pandas:   0%|          | 0/1 [00:00<?, ?it/s]

Reading dataset with pandas:   0%|          | 0/1 [00:00<?, ?it/s]

Reading dataset with pandas:   0%|          | 0/1 [00:00<?, ?it/s]

Reading dataset with pandas:   0%|          | 0/1 [00:00<?, ?it/s]

Reading dataset with pandas:   0%|          | 0/1 [00:00<?, ?it/s]

Reading dataset with pandas:   0%|          | 0/1 [00:00<?, ?it/s]

Reading dataset with pandas:   0%|          | 0/1 [00:00<?, ?it/s]

((20931476, 52), (5231241, 52), (2400000,), (600000,))

In [144]:
# Сохраним разделённые данные
X_train.to_csv('pipeline/X_train.csv', index=False)
X_test.to_csv('pipeline/X_test.csv', index=False)
y_train.to_csv('pipeline/y_train.csv', index=False)
y_test.to_csv('pipeline/y_test.csv', index=False)
X_train.shape, X_test.shape, y_train.shape, y_test.shape

((20931476, 52), (5231241, 52), (2400000,), (600000,))

In [145]:
# Загружаем исходные разделённые данные 
X_train = pd.read_csv('pipeline/X_train.csv')
X_test = pd.read_csv('pipeline/X_test.csv')
y_train = pd.read_csv('pipeline/y_train.csv')
y_test = pd.read_csv('pipeline/y_test.csv')
X_train.shape, X_test.shape, y_train.shape, y_test.shape

((20931476, 52), (5231241, 52), (2400000, 1), (600000, 1))

In [160]:
# Возьмём небольшую часть датасета для быстрой отладки пайплайна
X_train_1 = X_train[:1000].copy()
y_train_1 = y_train[:1000].copy()
print('X_train_1.shape', X_train_1.shape)
print('y_train_1.shape', y_train_1.shape)

X_test_1 = X_test[:1000].copy()
y_test_1 = y_test[:1000].copy()
print('X_test_1.shape', X_test_1.shape)
print('y_test_1.shape', y_test_1.shape)

X_train_1.shape (1000, 52)
y_train_1.shape (1000, 1)
X_test_1.shape (1000, 52)
y_test_1.shape (1000, 1)


In [161]:
# ПРЕПРОЦЕССИНГ ДАННЫХ

def convert_all_to_numeric(df):
    '''
    Функция преобразует типы всех колоноки в числовые с заменой ошибок на NaN.
    '''
    return df.apply(lambda col: pd.to_numeric(col, errors='coerce'))

In [162]:
# Создаём SimpleImputer и настраиваем вывод в pandas DataFrame
imputer = SimpleImputer(strategy='median')
imputer.set_output(transform='pandas')

# Создаём паплайн препроцессинга
preprocessing_pipe = Pipeline([
    ('to_numeric', FunctionTransformer(convert_all_to_numeric)),
    ('imputer', imputer),
    ('to_int', FunctionTransformer(lambda df: df.astype(int), validate=False)),
    ('drop_duplicates', FunctionTransformer(lambda df: df.drop_duplicates(), validate=False))
])

# Создаём пайплайн
main_pipe = Pipeline([
    ('preprocessing', preprocessing_pipeline),
])


# Трансформируем train датасет
X_train_1 = main_pipe.fit_transform(X_train_1)

X_train_1

Unnamed: 0,id,rn,pre_since_opened,pre_since_confirmed,pre_pterm,pre_fterm,pre_till_pclose,pre_till_fclose,pre_loans_credit_limit,pre_loans_next_pay_summ,...,enc_paym_14,enc_paym_15,enc_paym_16,enc_paym_17,enc_paym_18,enc_paym_19,enc_paym_20,enc_paym_21,enc_paym_22,enc_paym_23
0,1506130,1,10,6,1,16,3,5,0,5,...,0,0,0,0,0,0,1,0,0,0
1,1506130,2,10,6,4,13,1,7,15,5,...,0,0,0,0,0,0,1,0,0,0
2,1506130,3,5,9,17,8,1,11,15,2,...,0,0,0,0,0,0,1,0,0,0
3,1506130,4,11,1,15,9,2,6,2,2,...,3,3,3,3,3,3,4,3,3,3
4,1506130,5,1,9,15,8,1,11,2,5,...,3,3,3,3,3,3,4,3,3,3
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,348416,4,7,0,16,8,12,11,16,2,...,3,3,3,3,3,3,4,3,3,3
996,2451179,1,16,7,4,8,1,11,4,2,...,0,0,0,0,0,0,1,0,0,0
997,2451179,2,15,4,4,8,1,11,4,2,...,0,0,0,0,0,0,1,0,0,3
998,2451179,3,15,12,12,16,9,13,2,2,...,0,0,0,0,0,0,1,0,0,0
