<a href="https://colab.research.google.com/github/CasiCode/credit-scoring/blob/main/credit_scoring.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Рассмотрим задачу кредитного скоринга на основе кредитной истории клиента. Мы располагаем огромным датасетом - более полутора миллионов записей с кредитной историей анонимизированных клиентов Альфа Банка. На основе кредитной истории клиента до момента подачи заявки на новый кредит нужно оценить, насколько благонадежным является клиент, и определить вероятность его ухода в дефолт по новому кредиту, то есть предсказывать, насколько вероятна невыплата кредита со стороны потенциального клиента банка. Каждый кредит описывается набором из 60 категориальных признаков.

В этом решении мы будем использовать градиентный бустинг на основе LightGBM для регрессии значения, равного вероятности ухода клиента в дефолт.

Датасет представляет собой набор директорий с parquet файлами. Этот бинарный формат крайне эффективно сжимает данные по колонкам. Однако, для непосредственной работы с данными и построения моделей нам нужно прочитать их и трансформировать в pandas.DataFrame. При этом сделать это эффективно по памяти.

Устанавливаем модуль fastparquet - он будет использоваться для быстрого разархивирования parquet файлов в csv.

In [None]:
%%capture
!pip install fastparquet

Импортим необходимые модули:

In [None]:
import os
import getpass

if not os.environ.get('KAGGLE_USERNAME'):
  os.environ['KAGGLE_USERNAME'] = getpass.getpass('Enter username for Kaggle: ')

if not os.environ.get('KAGGLE_KEY'):
  os.environ['KAGGLE_KEY'] = getpass.getpass('Enter API key for Kaggle: ')

In [None]:
from typing import List
import sys
import zipfile

import numpy as np
import pandas as pd
from tqdm.notebook import tqdm

from fastparquet import ParquetFile

from sklearn.model_selection import KFold
from sklearn.metrics import roc_auc_score
import lightgbm as lgb

from kaggle.api.kaggle_api_extended import KaggleApi

Подтягиваем датасет с Kaggle:

In [None]:
api = KaggleApi()
api.authenticate()


competition_name = 'alfa-bank-pd-credit-history'
download_path = './kaggle_data'

if not os.path.exists(download_path):
    os.makedirs(download_path)

try:
    api.competition_download_files(competition_name, path=download_path, quiet=False)
    print(f'Competition files downloaded to: {download_path}')

except Exception as e:
    print(f'Error downloading competition data: {e}')
    print('Check the competition name and your Kaggle API credentials.')


files = os.listdir(download_path)

for file in files:
    if file.endswith('.zip'):
        zip_path = os.path.join(download_path, file)
        try:
            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                zip_ref.extractall(download_path)
            print(f'Extracted: {file}')

        except Exception as e:
            print(f'Error extracting {file}: {e}')

print('Download and extraction complete.')

При чтении всех данных сразу, они займут значительный объем памяти (более 11 GB). Решение – читать данные итеративно небольшими чанками. Чанки организованы таким образом, что для конкретного клиента вся информация о его кредитной истории до момента подачи заявки на кредит расположена внутри одного чанка. Это позволяет загружать данные в память небольшими порциями, выделять все необходимые признаки и получать результирующий фрейм для моделирования. Для этих целей объявим функцию read_parquet_from_local.

In [None]:
def read_parquet_from_local(
        path: str, start_from: int = 0,
        num_parts_to_read: int = 2, columns: List[str] = None,
        verbose: bool = False) -> pd.DataFrame:

    res = []
    start_from = max(0, start_from)
    dataset_paths = {
        int(os.path.splitext(filename)[0].split("_")[-1]):
            os.path.join(path, filename)
            for filename in os.listdir(path)
    }
    chunks = [dataset_paths[num] for num in sorted(
        dataset_paths.keys()) if num >= start_from][:num_parts_to_read]

    if verbose:
        print("Reading chunks:", *chunks, sep="\n")

    for chunk_path in tqdm(chunks, desc="Reading dataset with Pandas"):
        pf = ParquetFile(chunk_path)
        chunk = pf.to_pandas(columns)
        res.append(chunk)

    return pd.concat(res).reset_index(drop=True)

Проведем тест функции и оценим занимаемую память:

In [None]:
data_frame = read_parquet_from_local('./kaggle_data/data_for_competition/train_data', start_from=0, num_parts_to_read=1)

memory_usage_of_frame = data_frame.memory_usage(index=True).sum() / 10**9
expected_memory_usage = memory_usage_of_frame * 12
print(f"Объем памяти в RAM одной партиции данных с кредитными историями: {round(memory_usage_of_frame, 3)} GB")
print(f"Ожидаемый размер в RAM всего датасета: {round(expected_memory_usage, 3)} GB")

Все фичи нашего датафрейма являются категориальными. Выведем для каждой фичи количество ее уникальных значений.

In [None]:
for feat, count in zip(data_frame.columns.values, data_frame.nunique()):
    print(f'{feat}: {count}')

Видим, что 12 фичей имеют более 10 уникальных значений. Если кодировать их через OneHotEncoding, это приведет к "взрыву" необходимого объема памяти, так как каждая из 12 фичей будет порождать N_i столбцов, где N_i - количество уникальных значений у i-й фичи.

Почистим память:

In [None]:
import gc

del data_frame
gc.collect()

Базовым подходом к решению этой задачи является построение классической модели машинного обучения на аггрегациях от последовательностей категориальных признаков. В данном случае мы закодируем признаки с помощью count-encoding'а, применим к ним аггрегирование (наиболее очевидными аггрегациями являются среднее и сумма) и обучим на этом градиентный бустинг из реализации lightgbm.

Описанный подход к обработке кредитных историй клиентов реализован в виде класса-трансформера DataAggregator ниже:

In [None]:
%load_ext autoreload
%autoreload 2

class DataAggregator(object):
    def __init__(self):
        self.encoded_feats = None

    def __extract_count_aggregations(
            self,
            dataframe: pd.DataFrame
        ) -> pd.DataFrame:
        feat_cols = list(dataframe.columns.values)
        feat_cols.remove('id')
        feat_cols.remove('rn')

        encoded_feats = dataframe[feat_cols].apply(
            lambda col: col.map(col.value_counts())
        )
        encoded_feats['id'] = dataframe['id']

        feats = encoded_feats.groupby('id').agg(['mean', 'sum']).reset_index()
        feats.columns = ['_'.join(col) if col[1] else col[0] for col in feats.columns.values]

        return feats

    def __transform_data(
            self,
            path_to_dataset: str,
            num_parts_to_preprocess_at_once: int = 1,
            num_parts_total: int = 25,
            mode: str = 'fit_transform',
            save_to_path=None,
            verbose: bool = False):
        assert mode in ['fit_transform', 'transform'
            ],f'Unrecognized mode: {mode}. Available modes: fit_transform, transform'

        preprocessed_frames = []

        for step in tqdm(range(0, num_parts_total, num_parts_to_preprocess_at_once),
                         desc='Transforming sequential data'):
            dataframe = read_parquet_from_local(
                path_to_dataset,
                start_from=step,
                num_parts_to_read=num_parts_to_preprocess_at_once,
                verbose=verbose
            )
            feats = self.__extract_count_aggregations(dataframe)
            if save_to_path:
                feats.to_parquet(os.path.join(
                    save_to_path, f'processed_chunk_{step}.pq'
                ))
                preprocessed_frames.append(feats)

        feats = pd.concat(preprocessed_frames)
        feats.fillna(np.uint8(0), inplace=True)
        dummies = list(feats.columns.values)
        dummies.remove('id')

        if (mode == 'fit_transform'):
            self.encoded_feats = dummies
        else:
            assert not self.encoded_feats is None, 'Transformer not fitted'
            for col in self.encoded_feats:
                if not col in dummies:
                    feats[col] = np.uint8(0)

        return feats[['id']+self.encoded_feats]

    def fit_transform(
            self,
            path_to_dataset: str,
            num_parts_to_preprocess_at_once: int = 1,
            num_parts_total: int = 50,
            save_to_path=None,
            verbose: bool = False):
        return self.__transform_data(
            path_to_dataset=path_to_dataset,
            num_parts_to_preprocess_at_once=num_parts_to_preprocess_at_once,
            num_parts_total=num_parts_total,
            mode='fit_transform',
            save_to_path=save_to_path,
            verbose=verbose)

    def transform(
            self,
            path_to_dataset: str,
            num_parts_to_preprocess_at_once: int = 1,
            num_parts_total: int = 50,
            save_to_path=None,
            verbose: bool = False):
        return self.__transform_data(
            path_to_dataset=path_to_dataset,
            num_parts_to_preprocess_at_once=num_parts_to_preprocess_at_once,
            num_parts_total=num_parts_total,
            mode='transform',
            save_to_path=save_to_path,
            verbose=verbose)

Создадим директорию для предобработанных признаков обучающей выборки:

In [None]:
!mkdir ./kaggle_data/data_for_competition/train_features

Заполним эту директорию агрегированными данными:

In [None]:
%%time
aggregator = DataAggregator()
train_data = aggregator.fit_transform(
    './kaggle_data/data_for_competition/train_data',
    num_parts_to_preprocess_at_once=1,
    num_parts_total=12,
    save_to_path='./kaggle_data/data_for_competition/train_features',
    verbose=True)

Рассмотрим типы данных полученного датафрейма (Заметим также, что он один занимает около 3 GB памяти):

In [None]:
train_data.dtypes

Вес датафрейма можно уменьшить вдвое безо всякой потери качества, переведя 64-битные данные внутри в 32-битные. Точность чисел с плавающей запятой и размер целочисленных значений внутри датафрейма умещаются в 32-битный формат данных целиком.

In [None]:
train_data.memory_usage(deep=True)

In [None]:
for col in train_data.select_dtypes(include=['int64']).columns:
    train_data[col] = pd.to_numeric(train_data[col], downcast='integer')

for col in train_data.select_dtypes(include=['float64']).columns:
    train_data[col] = train_data[col].astype(np.float32)

In [None]:
train_data.dtypes

In [None]:
train_data.memory_usage(deep=True)

Рассмотрим теперь механизм кросс-валидации. Учитывая, что мы загружаем данные отдельными партациями, нам не подходит "классический" алгоритм разбиения датасета на train\val выборки.

Дело в том, что при разбиении каждой загружаемой партации на фолды "на месте" мы теряем репрезентативность, так как out-of-fold часть выборки будет локальной, а для адекватной валидации нужно, чтобы OOF охватывала весь датасет.

Более того, без механизма согласования OOF по фолдам мы не сможем собрать аггрегированное предсказание наших моделей на тесте. Это связано с архитектурой решения и будет видно позже.

Итак, рассмотрим теперь следующий механизм кросс-валидации:
1. Создается датафрейм, содержащий id объектов всей (!) выборки. Учитывая, что это единственный int столбец, он не займет критически много места;
2. Этот датафрейм расширяется колонкой fold, которая будет указывать, к валидации какого по счету фолда относится элемент;
3. Колонка fold заполняется с использованием инструмента KFold из scikit_learn. Это стандартный инструмент для разбиения наборов данных на фолды;

На этом этапе мы получаем датафрейм, глобально делящий все объекты из всех партаций в совокупности на фолды для кросс-валидации. Учитывая огромный объем каждой отдельной партации и случайность разбиения на фолды, мы получим примерно одинаковое соотношение объемов train и val частей в индивидуальных партациях.

При обучении модели каждая партация загружается и рассматривается независимо от остальных. Используя полученный на этапе 3 датафрейм, из партации выбираются соответствующие текущему фолду строки. Для итогового случая создается отдельная модель. Она обучается на финальных данных и добавляется в общий ансамбль. Это гарантирует, что все модели ансамбля в совокупности будут обучены на полном объеме данных без исключения.  

К концу обучения общее количество моделей в ансамбле будет равно MxN, по M моделей на каждую партацию, рассмотренную в N разных вариациях разбиения.

Итоговое предсказание на тесте будет получено аггрегированием предсказаний всего ансамбля моделей. Именно на этом месте "стреляет" согласованность моделей в ансамбле.

In [None]:
train_paths = {
        os.path.join('./kaggle_data/data_for_competition/train_features', filename)
        for filename in os.listdir('./kaggle_data/data_for_competition/train_features')
}

id_dfs = []
for path in train_paths:
    pf = ParquetFile(path)
    df = pf.to_pandas(columns=['id'])
    id_dfs.append(df)

kf_ids = pd.concat(id_dfs, ignore_index=True)
kf_ids['fold'] = -1

kf = KFold(n_splits=5, random_state=100, shuffle=True)
for fold, (_, val_idx) in enumerate(kf.split(kf_ids)):
    kf_ids.loc[val_idx, 'fold'] = fold

In [None]:
kf_ids.fold.value_counts()

In [None]:
feat_cols = list(train_data.columns.values)
feat_cols.remove('id')

In [None]:
feat_cols

In [None]:
del train_data
gc.collect()

Подгружаем в память целевую переменную из train_target.csv:

In [None]:
train_target = pd.read_csv('./kaggle_data/data_for_competition/train_target.csv')

In [None]:
%%time

models = []

tree_parameters = {
    'objective': 'binary',
    'metric': 'auc',
    'learning_rate': 0.05,
    'max_depth': 5,
    'reg_lambda': 1,
    'num_leaves': 64,
    'n_estimators': 2000,
    'min_data_in_leaf': 256,
    'verbosity': -1
}

for fold_ in range(5):
    print(f'Training with fold {fold_} started')

    val_preds_list = []
    val_targets_list = []

    for path in train_paths:
        pf = ParquetFile(path)
        df = pf.to_pandas()
        df = df.merge(kf_ids, on='id')
        df = df.merge(train_target, on='id')

        val = df[df['fold'] == fold]
        train = df[df['fold'] != fold]

        if val.empty or train.empty:
            print('Skipped: no val or train data for this fold')
            continue

        features = [c for c in train.columns if c not in ('id', 'flag', 'fold')]

        lgb_model = lgb.LGBMClassifier(**tree_parameters)
        lgb_model.fit(
            train[features], train.flag.values,
            eval_set=[(val[features], val.flag.values)]
        )

        preds = lgb_model.predict_proba(val[features])[:, 1]
        val_preds_list.append(preds)
        val_targets_list.append(val['flag'].values)
        models.append(lgb_model)

        del df, val, train
        gc.collect()

    fold_preds = np.concatenate(val_preds_list)
    fold_targets = np.concatenate(val_targets_list)
    print('CV ROC-AUC: ', roc_auc_score(fold_targets, fold_preds))

    print(f'Training with fold {fold_} completed')

Теперь проделаем то же самое для тестовой выборки:

In [None]:
!mkdir ./kaggle_data/data_for_competition/test_features

In [None]:
test_data = aggregator.transform(
    './kaggle_data/data_for_competition/test_data',
    num_parts_to_preprocess_at_once=2,
    num_parts_total=2,
    save_to_path='./kaggle_data/data_for_competition/test_features',
    verbose=True)

for col in test_data.select_dtypes(include=['int64']).columns:
    test_data[col] = pd.to_numeric(test_data[col], downcast='integer')

for col in test_data.select_dtypes(include=['float64']).columns:
    test_data[col] = test_data[col].astype(np.float32)

In [None]:
test_target = pd.read_csv('./kaggle_data/data_for_competition/test_target.csv')

score = np.zeros(len(test_data))
for model in tqdm(models):
    score += model.predict_proba(test_data[feat_cols])[:, 1]/len(models)
submission = pd.DataFrame({
    'id': test_data['id'].values,
    'score': score
})

print('TEST ROC-AUC: ', roc_auc_score(test_target, score))