In [None]:
# 导入所需的库
import os
import gc
from glob import glob
from pathlib import Path
from datetime import datetime
import numpy as np
import pandas as pd
import polars as pl
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingClassifier
from catboost import CatBoostClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

In [None]:
# 设置根目录
ROOT = Path("/kaggle/input/home-credit-credit-risk-model-stability")
TRAIN_DIR = ROOT / "parquet_files" / "train"
TEST_DIR = ROOT / "parquet_files" / "test"

In [None]:
# 读取文件
def read_file(path, depth=None):
    df = pl.read_parquet(path)
    return process_dataframe(df, depth)

def read_files(regex_path, depth=None):
    chunks = [process_dataframe(pl.read_parquet(path), depth) for path in glob.glob(str(regex_path))]
    df = pl.concat(chunks, how="vertical_relaxed")
    df = df.unique(subset=["case_id"])
    return df

def process_dataframe(df, depth=None):
    if depth in [1, 2]:
        df = df.group_by("case_id").agg(Aggregator.get_exprs(df))
    return df

In [None]:
# 数据预处理
class Pipeline:
    @staticmethod
    def set_table_dtypes(df):
        dtype_mapping = {
            "case_id": pl.Int32,
            "WEEK_NUM": pl.Int32,
            "num_group1": pl.Int32,
            "num_group2": pl.Int32,
            "date_decision": pl.Date,
        }
        for col in df.columns:
            if col in dtype_mapping:
                df = df.with_columns(pl.col(col).cast(dtype_mapping[col]))
            elif col[-1] in ("P", "A"):
                df = df.with_columns(pl.col(col).cast(pl.Float64))
            elif col[-1] == "M":
                df = df.with_columns(pl.col(col).cast(pl.String))
            elif col[-1] == "D":
                df = df.with_columns(pl.col(col).cast(pl.Date))
        return df

    @staticmethod
    def handle_dates(df):
        date_cols = [col for col in df.columns if col[-1] == "D"]
        for col in date_cols:
            df = df.with_columns(
                (pl.col(col) - pl.col("date_decision")).alias(col)
            ).with_columns(
                pl.col(col).dt.total_days().cast(pl.Float32).alias(col)
            )
        df = df.drop(["date_decision", "MONTH"])
        return df

    @staticmethod
    def filter_cols(df):
        null_ratio = df.select([pl.col(col).is_null().mean().alias(col) for col in df.columns])
        high_null_cols = [col for col in null_ratio.columns if null_ratio[col][0] > 0.95 and col not in ["target", "case_id", "WEEK_NUM"]]
        df = df.drop(high_null_cols)

        string_cols = [col for col in df.columns if df[col].dtype == pl.String]
        for col in string_cols:
            if col not in ["target", "case_id", "WEEK_NUM"]:
                freq = df[col].n_unique()
                if freq == 1 or freq > 200:
                    df = df.drop(col)

        return df

In [None]:
# 聚合器
class Aggregator:
    @staticmethod
    def num_expr(df):
        cols = [col for col in df.columns if col[-1] in ("P", "A")]
        expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]
        return expr_max

    @staticmethod
    def date_expr(df):
        cols = [col for col in df.columns if col[-1] == "D"]
        expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]
        return expr_max

    @staticmethod
    def str_expr(df):
        cols = [col for col in df.columns if col[-1] in ("M",)]
        expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]
        return expr_max

    @staticmethod
    def other_expr(df):
        cols = [col for col in df.columns if col[-1] in ("T", "L")]
        expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]
        return expr_max

    @staticmethod
    def count_expr(df):
        cols = [col for col in df.columns if "num_group" in col]
        expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]
        return expr_max

    @staticmethod
    def get_exprs(df):
        exprs = Aggregator.num_expr(df) + \
                Aggregator.date_expr(df) + \
                Aggregator.str_expr(df) + \
                Aggregator.other_expr(df) + \
                Aggregator.count_expr(df)
        return exprs

In [None]:
# 特征工程
def feature_eng(df_base, depth_0, depth_1, depth_2):
    df_base = (
        df_base
        .with_columns(
            month_decision = pl.col("date_decision").dt.month(),
            weekday_decision = pl.col("date_decision").dt.weekday(),
        )
    )
    for i, df in enumerate(depth_0 + depth_1 + depth_2):
        df_base = df_base.join(df, how="left", on="case_id", suffix=f"_{i}")
    df_base = df_base.pipe(Pipeline.handle_dates)
    return df_base

In [None]:
# 读取数据
data_store = {
    "df_base": read_file(TRAIN_DIR / "train_base.parquet"),
    "depth_0": [
        read_file(TRAIN_DIR / "train_static_cb_0.parquet"),
        read_files(TRAIN_DIR / "train_static_0_*.parquet"),
    ],
    "depth_1": [
        read_files(TRAIN_DIR / "train_applprev_1_*.parquet", 1),
        read_file(TRAIN_DIR / "train_tax_registry_a_1.parquet", 1),
        read_file(TRAIN_DIR / "train_tax_registry_b_1.parquet", 1),
        read_file(TRAIN_DIR / "train_tax_registry_c_1.parquet", 1),
        read_files(TRAIN_DIR / "train_credit_bureau_a_1_*.parquet", 1),
        read_file(TRAIN_DIR / "train_credit_bureau_b_1.parquet", 1),
        read_file(TRAIN_DIR / "train_other_1.parquet", 1),
        read_file(TRAIN_DIR / "train_person_1.parquet", 1),
        read_file(TRAIN_DIR / "train_deposit_1.parquet", 1),
        read_file(TRAIN_DIR / "train_debitcard_1.parquet", 1),
    ],
    "depth_2": [
        read_file(TRAIN_DIR / "train_credit_bureau_b_2.parquet", 2),
        read_files(TRAIN_DIR / "train_credit_bureau_a_2_*.parquet", 2),
    ]
}

In [None]:
# 特征工程
df_train = feature_eng(**data_store)
print("Train data shape:", df_train.shape)

In [None]:
# 过滤列
df_train = Pipeline.filter_cols(df_train)

In [None]:
# 转换为 Pandas DataFrame
def to_pandas(df_data, cat_cols=None):
    df_data = df_data.to_pandas()
    if cat_cols is None:
        cat_cols = list(df_data.select_dtypes("object").columns)
    df_data[cat_cols] = df_data[cat_cols].astype("category")
    return df_data, cat_cols

df_train, cat_cols = to_pandas(df_train)

In [None]:
# 分离特征和目标变量
X = df_train.drop(columns=["target", "case_id", "WEEK_NUM"])
y = df_train["target"]

In [None]:
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
# 逻辑回归分类
logreg = LogisticRegression(max_iter=1000, random_state=42)
logreg.fit(X_train, y_train)
y_pred_logreg = logreg.predict(X_test)

# 评估逻辑回归模型
print("Logistic Regression Classification Report:")
print(classification_report(y_test, y_pred_logreg))

In [None]:
# GBDT 分类
gbdt = GradientBoostingClassifier(n_estimators=100, learning_rate=0.1, max_depth=3, random_state=42)
gbdt.fit(X_train, y_train)
y_pred_gbdt = gbdt.predict(X_test)

# 评估 GBDT 模型
print("GBDT Classification Report:")
print(classification_report(y_test, y_pred_gbdt))

In [None]:
# 使用 CatBoost 进行分类
cat_features = [col for col in X_train.columns if X_train[col].dtype.name in ['object', 'category']]
model = CatBoostClassifier(
    iterations=1000,  # 迭代次数
    learning_rate=0.03,  # 学习率
    depth=6,  # 树的深度
    loss_function='Logloss',  # 损失函数
    eval_metric='Accuracy',  # 评估指标
    random_seed=42,  # 随机种子
    verbose=100,  # 每 100 次迭代打印一次信息
    task_type='GPU',  # 使用 GPU
    devices='0:1'  # 使用哪些 GPU 设备，例如 '0:1' 表示使用第 0 和第 1 号 GPU
)

# 训练 CatBoost 模型
model.fit(
    X_train, y_train,
    eval_set=(X_test, y_test),
    cat_features=cat_features,  # 显式指定分类特征
    early_stopping_rounds=100,  # 早停，如果 100 轮内没有提升则停止训练
    use_best_model=True,  # 使用最佳模型
    plot=True  # 绘制训练过程中的学习曲线
)

# 预测
y_pred_catboost = model.predict(X_test)

# 评估 CatBoost 模型
print("CatBoost Classification Report:")
print(classification_report(y_test, y_pred_catboost))

# 保存模型
model.save_model('catboost_gpu_model.cbm')