<a href="https://colab.research.google.com/github/Benetti-Hub/Benetti-Hub-Kaggle-Home-Credit-Risk/blob/main/0_feature_engineering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
%%capture
!pip install --upgrade polars
!pip install --upgrade pandas
!pip install lightgbm
!pip install pyarrow

In [3]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES
# TO THE CORRECT LOCATION (/kaggle/input) IN YOUR NOTEBOOK,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.

import os
import sys
from tempfile import NamedTemporaryFile
from urllib.request import urlopen
from urllib.parse import unquote, urlparse
from urllib.error import HTTPError
from zipfile import ZipFile
import tarfile
import shutil

CHUNK_SIZE = 40960
DATA_SOURCE_MAPPING = # LINK FOR THE DATASOURCES
KAGGLE_INPUT_PATH='/kaggle/input'
KAGGLE_WORKING_PATH='/kaggle/working'
KAGGLE_SYMLINK='kaggle'

!umount /kaggle/input/ 2> /dev/null
shutil.rmtree('/kaggle/input', ignore_errors=True)
os.makedirs(KAGGLE_INPUT_PATH, 0o777, exist_ok=True)
os.makedirs(KAGGLE_WORKING_PATH, 0o777, exist_ok=True)

try:
  os.symlink(KAGGLE_INPUT_PATH, os.path.join("..", 'input'), target_is_directory=True)
except FileExistsError:
  pass
try:
  os.symlink(KAGGLE_WORKING_PATH, os.path.join("..", 'working'), target_is_directory=True)
except FileExistsError:
  pass

for data_source_mapping in DATA_SOURCE_MAPPING.split(','):
    directory, download_url_encoded = data_source_mapping.split(':')
    download_url = unquote(download_url_encoded)
    filename = urlparse(download_url).path
    destination_path = os.path.join(KAGGLE_INPUT_PATH, directory)
    try:
        with urlopen(download_url) as fileres, NamedTemporaryFile() as tfile:
            total_length = fileres.headers['content-length']
            print(f'Downloading {directory}, {total_length} bytes compressed')
            dl = 0
            data = fileres.read(CHUNK_SIZE)
            while len(data) > 0:
                dl += len(data)
                tfile.write(data)
                done = int(50 * dl / int(total_length))
                sys.stdout.write(f"\r[{'=' * done}{' ' * (50-done)}] {dl} bytes downloaded")
                sys.stdout.flush()
                data = fileres.read(CHUNK_SIZE)
            if filename.endswith('.zip'):
              with ZipFile(tfile) as zfile:
                zfile.extractall(destination_path)
            else:
              with tarfile.open(tfile.name) as tarfile:
                tarfile.extractall(destination_path)
            print(f'\nDownloaded and uncompressed: {directory}')
    except HTTPError as e:
        print(f'Failed to load (likely expired) {download_url} to path {destination_path}')
        continue
    except OSError as e:
        print(f'Failed to load {download_url} to path {destination_path}')
        continue

print('Data source import complete.')



Downloading home-credit-credit-risk-model-stability, 3375704785 bytes compressed
Downloaded and uncompressed: home-credit-credit-risk-model-stability
Downloading credit-default-extras, 1147475 bytes compressed
Downloaded and uncompressed: credit-default-extras
Downloading homecredit-models-public/other/lgb/1, 16608576 bytes compressed
Downloaded and uncompressed: homecredit-models-public/other/lgb/1
Downloading homecredit-models-public/other/cat/1, 49048908 bytes compressed
Downloaded and uncompressed: homecredit-models-public/other/cat/1
Data source import complete.


In [4]:
import gc
import os
import pickle
import re
from datetime import datetime
from functools import partial
from glob import glob
from pathlib import Path

import numpy as np
import pandas as pd
import polars as pl
from google.colab import drive
from tqdm import tqdm


In [5]:
drive.mount('/content/drive')
# Setup the directory for the notebook:s
DRIVE_PATH = "/content/drive/MyDrive/Projects/KaggleDefaults"
os.makedirs(DRIVE_PATH, exist_ok=True)

os.chdir('/working/')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [6]:
BATCH_SIZE = 5_000_000

ROOT = Path("/kaggle/input/home-credit-credit-risk-model-stability")
TRAIN_DIR = ROOT / "parquet_files" / "train"

train_base = (
    pl.read_parquet(TRAIN_DIR / "train_base.parquet")
    .select(pl.col('case_id'))
    .sort(pl.col('case_id'))
)

NUM_SAMPLES_TRAIN = train_base.shape[0]
num_batches_train = int(np.ceil(NUM_SAMPLES_TRAIN / BATCH_SIZE))

IS_ACTIVE_DICT = (
    pd.read_csv(f"{ROOT}/feature_definitions.csv")
    .assign(is_active=lambda x: x['Description'].str.lower().str.contains('active'))
    .set_index('Variable')['is_active']
    .to_dict()
)


In [7]:
# Utils func for the problem
def to_pandas(df_data: pl.DataFrame) -> pd.DataFrame:
    """Cast a polars dataframe to pandas and convert string/object columns to categorical"""
    df_data = df_data.to_pandas()
    cat_cols = list(df_data.select_dtypes("object").columns)
    df_data[cat_cols] = df_data[cat_cols].astype("category")

    return df_data


def reduce_mem_usage(data: pd.DataFrame) -> pd.DataFrame:
    """Reduce the memory usage of a pandas dataframe"""

    for col in data.columns:
        if col == "case_id":
            continue

        col_type = str(data[col].dtype)
        if col_type == "category":
            continue
        elif col_type == "object":
            data[col] = data[col].astype("category")
        else:
            c_min = data[col].min()
            c_max = data[col].max()
            if col_type[:3] == "int":
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    data[col] = data[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    data[col] = data[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    data[col] = data[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    data[col] = data[col].astype(np.int64)

            elif col_type[:5] == "float":
                if (
                    c_min > np.finfo(np.float32).min
                    and c_max < np.finfo(np.float32).max
                ):
                    data[col] = data[col].astype(np.float32)
                else:
                    data[col] = data[col].astype(np.float64)

    return data


In [8]:
class Pipeline:
    @staticmethod
    def set_table_dtypes(data: pl.LazyFrame) -> pl.LazyFrame:

        mapping = {
            "case_id": pl.Int32,
            "date_decision": pl.Date,
            "WEEK_NUM": pl.Int16,
            "num_group1": pl.Int16,
            "num_group2": pl.Int16,
            "P": pl.Float32,
            "A": pl.Float32,
            "M": pl.String,
            "D": pl.Date,
        }

        for col in data.columns:
            try:
                if col in mapping:
                    data = data.with_columns(
                        pl.col(col).cast(mapping[col], strict=False)
                    )
                elif col[-1] in mapping:
                    data = data.with_columns(
                        pl.col(col).cast(mapping[col[-1]], strict=False)
                    )
            except:
                to_keep = ["target", "case_id", "WEEK_NUM", "num_group1", "num_group2"]
                # We cannot drop any of these columns!
                if col not in to_keep:
                    data = data.drop(col)

        return data

    @staticmethod
    def handle_dates(data: pl.LazyFrame) -> pl.LazyFrame:
        data_schema = data.schema
        for col in data.columns:
            if col[-1] == "D" and isinstance(data_schema[col], pl.Datetime):
                data = data.with_columns(pl.col(col) - pl.col("date_decision"))
                data = data.with_columns(pl.col(col).dt.total_days())
            if "year" in col:
                data = data.drop(col)

        return data.drop("date_decision")

    @staticmethod
    def filter_cols(data: pl.LazyFrame) -> pl.LazyFrame:
        to_keep = [
            "target",
            "case_id",
            "WEEK_NUM",
            "date_decision",
            "num_group1",
            "num_group2",
        ]
        for col in data.columns:
            if col not in to_keep and data[col].is_null().mean() > 0.95:
                data = data.with_columns(pl.col(col).is_null().alias(col))

        for col in data.columns:
            if (col not in to_keep) and isinstance(data[col].dtype, pl.String):
                freq = data[col].n_unique()
                if (freq == 1) or (freq > 200):
                    # Here using another type of encoding might be beneficial
                    data = data.with_columns(pl.col(col).is_null().alias(col))
        return data


class Aggregator:

    @staticmethod
    def _aggregate(cols, methods):
        return [
            method(col).alias(f"{method.__name__}_{col}")
            for method in methods
            for col in cols
        ]

    @staticmethod
    def quantile_expr(data: pl.LazyFrame):

        def build_quantile(q):
            func = lambda x: pl.col(x).fill_null(0).quantile(q)
            func.__name__ = f"quantile_{q:.2f}"
            return func

        cols = [col for col in data.columns if col[-1] in ("P", "A")]
        agg_funcs = (build_quantile(q) for q in [0.1, 0.5, 0.75, 0.9])
        return Aggregator._aggregate(cols, agg_funcs)

    @staticmethod
    def imq_expr(data: pl.LazyFrame):
        def _index_mass_quantile(q) -> int:
            def imq(x: pl.Series):
                cumsum = pl.col(x).cum_sum()
                distribution = cumsum / cumsum.max()
                return pl.arg_where(
                    (distribution >= q).cast(pl.Boolean, strict=False)
                ).first()

            func = lambda x: imq(x)
            func.__name__ = f"imq_{q}"
            return func

        cols = [col for col in data.columns if col[-1] in ("P", "A")]
        agg_funcs = (_index_mass_quantile(q) for q in [0.1, 0.75, 0.9])
        return Aggregator._aggregate(cols, agg_funcs)

    @staticmethod
    def abs_sum_diff(data: pl.LazyFrame):
        def _abs_sum_diff() -> int:
            def abs_diff(x: pl.Series):
                return pl.col(x).abs().diff().sum()

            func = lambda x: abs_diff(x)
            func.__name__ = "abs_diff"
            return func

        cols = [col for col in data.columns if col[-1] in ("P", "A")]
        return Aggregator._aggregate(cols, (_abs_sum_diff(),))

    @staticmethod
    def kurtosis(data: pl.LazyFrame):
        def _kurtosis() -> int:
            def ks(x: pl.Series):
                return pl.col(x).kurtosis()

            func = lambda x: ks(x)
            func.__name__ = "ks_"
            return func

        cols = [col for col in data.columns if col[-1] in ("P", "A")]
        return Aggregator._aggregate(cols, (_kurtosis(),))

    @staticmethod
    def num_expr(data: pl.LazyFrame):
        cols = [col for col in data.columns if col[-1] in ("P", "A", "D")]
        agg_funcs = (pl.max, pl.min, pl.sum, pl.mean, pl.first, pl.last, pl.std)
        return Aggregator._aggregate(cols, agg_funcs)

    @staticmethod
    def str_expr(data: pl.LazyFrame):
        cols = [col for col in data.columns if col[-1] in ("M",)]
        expr_all = Aggregator._aggregate(cols, (pl.first, pl.last, pl.max, pl.min))
        expr_mode = [
            pl.col(col).drop_nulls().mode().first().alias(f"mode_{col}") for col in cols
        ]
        return expr_all + expr_mode

    @staticmethod
    def other_expr(data: pl.LazyFrame):
        cols = [col for col in data.columns if col[-1] in ("T", "L")]
        agg_funcs = (pl.first, pl.last, pl.sum, pl.max, pl.min)
        return Aggregator._aggregate(cols, agg_funcs)

    @staticmethod
    def count_expr(data: pl.LazyFrame):
        cols = [col for col in data.columns if "num_group" in col]
        return Aggregator._aggregate(cols, (pl.count,))

    @staticmethod
    def date_diff(data: pl.LazyFrame):
        return data.with_columns(
            pl.col(c)
            .diff()
            .over("case_id")
            .sort_by("num_group1")
            .dt.total_days()
            .alias(f"diff_{c}")
            for c in data.columns
            if c[-1] in ("D")
        )

    @staticmethod
    def full_aggregation(dataset: pl.LazyFrame):

        dataset = Aggregator.date_diff(dataset)
        return (
            dataset.sort(pl.col("num_group1"))
            .group_by("case_id")
            .agg(
                *Aggregator.num_expr(dataset),
                *Aggregator.str_expr(dataset),
                *Aggregator.other_expr(dataset),
                *Aggregator.count_expr(dataset),
                *Aggregator.quantile_expr(dataset),
                *Aggregator.imq_expr(dataset),
                *Aggregator.kurtosis(dataset),
                *Aggregator.abs_sum_diff(dataset),
            )
        )

    @staticmethod
    def partial_aggregation(dataset: pl.LazyFrame):
        return (
            dataset.sort(pl.col("num_group2"))
            .group_by("case_id", "num_group1")
            .agg(
                *Aggregator.num_expr(dataset),
                *Aggregator.str_expr(dataset),
                *Aggregator.other_expr(dataset),
                *Aggregator.count_expr(dataset),
                *Aggregator.quantile_expr(dataset),
                *Aggregator.imq_expr(dataset),
                *Aggregator.kurtosis(dataset),
                *Aggregator.abs_sum_diff(dataset),
            )
        )


In [9]:
def process_file(
    path: str,
    prefix: str,
    p_id: int,
    start: int,
    stop: int,
    keep_cols: list[str] | None = None,
    suffix: str = "",
) -> None:
    if "*" in str(path):
        partitions = glob(f"{prefix}/{path}.parquet")
        data = pl.concat(
            [pl.scan_parquet(p, low_memory=True) for p in partitions],
            how="vertical_relaxed",
        )
    else:
        data = pl.scan_parquet(f"{prefix}/{path}.parquet", low_memory=True)

    keep_cols = keep_cols or data.columns
    data = (
        data.select(keep_cols)
        .pipe(Pipeline.set_table_dtypes)
        .filter((pl.col("case_id") >= start) & (pl.col("case_id") < stop))
    )
    path = path.replace("_*", "")
    if path[-1] == "1":
        data = data.pipe(Aggregator.full_aggregation)
    elif path[-1] == "2":
        data = data.pipe(Aggregator.partial_aggregation).pipe(
            Aggregator.full_aggregation
        )

    save_path = f"/kaggle/working/{path}{suffix}_{p_id}.parquet"
    data.collect().to_pandas().to_parquet(save_path, index=False)


def merge_datasets(files, name, suffix: str = "*"):

    def _filter_dataset(path):
        (
            pl.read_parquet(path)
            .pipe(Pipeline.filter_cols)
            .to_pandas()
            .to_parquet(f"filtered_{path}")
        )

    base = pl.scan_parquet(
        f"./{files[0]}_{suffix}.parquet", low_memory=True
    ).with_columns(
        month_decision=pl.col("date_decision").dt.month(),
        weekday_decision=pl.col("date_decision").dt.weekday(),
    )
    for i, file in enumerate(files[1:]):
        path = f"{file}_{suffix}.parquet"

        _filter_dataset(path)
        gc.collect()

        data = pl.scan_parquet(f"filtered_{path}", low_memory=True)
        base = base.join(data, how="left", on="case_id", suffix=f"_{i}", coalesce=True)
        print(f"joining {len(data.columns)} from {file}")

    base.pipe(Pipeline.handle_dates).collect().write_parquet(f"{name}.parquet")



In [10]:
sources = [
    "base",
    "applprev_2",
    "person_2",
    "static_cb_0",
    "static_0_*",
    "other_1",
    "applprev_1_*",
    "tax_registry_a_1",
    "tax_registry_b_1",
    "tax_registry_c_1",
    "credit_bureau_a_1_*",
    "credit_bureau_b_1",
    "person_1",
    "deposit_1",
    "debitcard_1",
    "credit_bureau_b_2",
]


# Special columns from credit_bureau_a_2_*
common_cols = ['case_id', 'num_group1', 'num_group2']
cba2_cols = [
    'collater_typofvalofguarant_298M',
    'collater_valueofguarantee_1124L',
    'pmts_dpd_1073P',
    'pmts_overdue_1140A',
    'pmts_dpd_303P',
    'pmts_overdue_1152A',
    'subjectroles_name_541M',
    'collaterals_typeofguarante_359M',
    'subjectroles_name_838M',
    'collaterals_typeofguarante_669M',
]

def preprocess_data(files, prefix, start, stop, p_id):

    scan_parquet = partial(
        process_file,
        prefix=prefix,
        p_id=p_id,
        start=start,
        stop=stop
    )
    for file in (pbar := tqdm(files, total=len(files))):
        pbar.set_description(f"Processing {file}")
        scan_parquet(file)

In [11]:
train_files = [f"train_{file}" for file in sources]

# Base preprocessing for most tables
preprocess_data(train_files, str(TRAIN_DIR), -1, np.inf, 0)

# CreditBureau2 processing:
for batch_idx in (pbar := tqdm(range(num_batches_train), total=num_batches_train)):

    start_idx = batch_idx * BATCH_SIZE
    end_idx = min((batch_idx + 1) * BATCH_SIZE, NUM_SAMPLES_TRAIN - 1)

    start = train_base[start_idx].item()
    stop = train_base[end_idx].item()
    if stop == train_base.max().item():
        stop += 1  # Include right

    scan_parquet = partial(
        process_file, prefix=str(TRAIN_DIR), p_id=batch_idx, start=start, stop=stop
    )
    for i, col in enumerate(cba2_cols, start=1):
        pbar.set_description(f"Processing {col}: {i}/{len(cba2_cols)}")
        scan_parquet(
            path="train_credit_bureau_a_2_*",
            keep_cols=common_cols + [col],
            suffix=f"_{col}",
        )
    gc.collect()


total_files = [
    *[f.replace("_*", "") for f in train_files],
    *[f"train_credit_bureau_a_2_{col}" for col in cba2_cols],
]

Processing train_credit_bureau_b_2: 100%|██████████| 16/16 [09:04<00:00, 34.03s/it]
Processing collaterals_typeofguarante_669M: 10/10: 100%|██████████| 1/1 [16:45<00:00, 1005.04s/it]


In [12]:
weeks_filter = (pl.col('WEEK_NUM') <= 62) | (pl.col('WEEK_NUM') >= 71)
train_data = (
    pl.scan_parquet('/kaggle/working/train_data.parquet')
    .filter(weeks_filter)
    .collect()
    .pipe(to_pandas)
    .pipe(reduce_mem_usage)
)
train_data.to_parquet(f"{DRIVE_PATH}/train_data_pandas.parquet", index=False)