## 基线：全局 XGBoost + 传统三支决策 (TWD)

In [None]:
import os
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold, train_test_split
from xgboost import XGBClassifier

from bttwdlib.config_loader import load_yaml_cfg, show_cfg
from bttwdlib.data_loader import load_dataset
from bttwdlib.preprocessing import prepare_features_and_labels
from bttwdlib.metrics import (
    compute_binary_metrics,
    compute_s3_metrics,
    log_metrics,
    predict_binary_by_cost,
)
from bttwdlib.threshold_search import (
    search_thresholds_with_regret,
    compute_regret,
)
from bttwdlib.utils_logging import log_info

root_path = Path("..").resolve()
log_info(f"[基线-XGB+TWD] 项目根路径: {root_path}")

In [None]:
# 读取 YAML 配置
cfg_name = "bank_bttwd.yaml"  # 可切换为 "adult_bttwd.yaml" 或 "airlines_delay.yaml"
cfg_path = root_path / "configs" / cfg_name
cfg = load_yaml_cfg(str(cfg_path))
show_cfg(cfg)

In [None]:
# 数据加载与预处理（与 BT-TWD 主流程保持一致）
df_raw, target_col = load_dataset(cfg)
X, y, meta = prepare_features_and_labels(df_raw, cfg)
log_info(
    f"[基线-XGB+TWD] 数据加载完成，样本数={len(y)}，特征维度={X.shape[1]}，正类比例={y.mean():.4f}"
)

In [None]:
# 辅助函数定义

def predict_s3_by_thresholds(y_score: np.ndarray, alpha: float, beta: float) -> np.ndarray:
    """基于全局 alpha/beta 生成三支预测 (1=POS, 0=NEG, -1=BND)。"""
    return np.where(y_score >= alpha, 1, np.where(y_score <= beta, 0, -1))

def run_xgb_twd_fold(X: np.ndarray, y: np.ndarray, train_idx: np.ndarray, test_idx: np.ndarray, cfg: dict) -> dict:
    data_cfg = cfg.get("DATA", {})
    thresh_cfg = cfg.get("THRESHOLDS", {})
    metrics_cfg = cfg.get("METRICS", {})
    xgb_cfg = cfg.get("BTTWD", {}).get("global_xgb", {})

    random_state = data_cfg.get("random_state", 42)
    bcfg = cfg.get("BTTWD", {})
    val_ratio = bcfg.get("val_ratio", 0.2)

    costs = thresh_cfg.get(
        "costs",
        {"C_TP": 0.0, "C_TN": 0.0, "C_FP": 2.0, "C_FN": 2.0, "C_BP": 1.5, "C_BN": 1.5},
    )
    alpha_grid = thresh_cfg.get("alpha_grid", np.linspace(0.1, 0.9, 9))
    beta_grid = thresh_cfg.get("beta_grid", np.linspace(0.0, 0.5, 6))
    gap_min = thresh_cfg.get("gap_min", 0.0)

    X_train_full, X_test = X[train_idx], X[test_idx]
    y_train_full, y_test = y[train_idx], y[test_idx]

    X_train, X_val, y_train, y_val = train_test_split(
        X_train_full,
        y_train_full,
        test_size=val_ratio,
        stratify=y_train_full,
        random_state=random_state,
    )

    model = XGBClassifier(
        n_estimators=xgb_cfg.get("n_estimators", 300),
        max_depth=xgb_cfg.get("max_depth", 4),
        learning_rate=xgb_cfg.get("learning_rate", 0.1),
        subsample=xgb_cfg.get("subsample", 0.8),
        colsample_bytree=xgb_cfg.get("colsample_bytree", 0.8),
        reg_lambda=xgb_cfg.get("reg_lambda", 1.0),
        random_state=xgb_cfg.get("random_state", 42),
        n_jobs=xgb_cfg.get("n_jobs", -1),
        eval_metric="logloss",
    )
    model.fit(X_train, y_train)
    log_info("[基线-XGB+TWD] 全局 XGB 模型训练完成。")

    y_proba_val = model.predict_proba(X_val)[:, 1]
    y_proba_test = model.predict_proba(X_test)[:, 1]

    best_alpha, best_beta, best_stats = search_thresholds_with_regret(
        prob=y_proba_val,
        y_true=y_val,
        alpha_grid=alpha_grid,
        beta_grid=beta_grid,
        costs=costs,
        gap_min=gap_min,
    )
    log_info(
        f"[基线-XGB+TWD] 全局阈值搜索完成: alpha={best_alpha:.3f}, beta={best_beta:.3f}, Regret={best_stats.get("Regret", best_stats.get("regret", float("nan"))):.4f}"
    )

    y_pred_s3 = predict_s3_by_thresholds(y_proba_test, best_alpha, best_beta)
    s3_metrics = compute_s3_metrics(y_true=y_test, y_s3_pred=y_pred_s3, y_score=y_proba_test, cfg_metrics=metrics_cfg, costs=costs)
    log_metrics("[基线-XGB+TWD] 测试集指标: ", s3_metrics)

    return {
        "metrics": s3_metrics,
        "alpha": best_alpha,
        "beta": best_beta,
        "val_stats": best_stats,
    }

In [None]:
# K 折训练 + 阈值搜索 + 指标计算
fold_metrics = []

data_cfg = cfg.get("DATA", {})
n_splits = data_cfg.get("n_splits", 5)
shuffle = data_cfg.get("shuffle", True)
random_state = data_cfg.get("random_state", 42)

skf = StratifiedKFold(n_splits=n_splits, shuffle=shuffle, random_state=random_state)

for fold_id, (train_idx, test_idx) in enumerate(skf.split(X, y), start=1):
    log_info(f"[基线-XGB+TWD] 开始第 {fold_id}/{n_splits} 折")
    result = run_xgb_twd_fold(X, y, train_idx, test_idx, cfg)
    metrics = result["metrics"]
    metrics["alpha"] = result["alpha"]
    metrics["beta"] = result["beta"]
    metrics["fold"] = fold_id
    fold_metrics.append(metrics)

fold_metrics_df = pd.DataFrame(fold_metrics)
fold_metrics_df

In [None]:
# 汇总 K 折均值与标准差，并保存为 CSV
metric_names = [
    "Precision",
    "Recall",
    "F1",
    "BAC",
    "AUC",
    "MCC",
    "Kappa",
    "BND_ratio",
    "POS_Coverage",
    "Regret",
]

summary = {"model": "Baseline_XGB_TWD"}
for name in metric_names:
    values = fold_metrics_df[name] if name in fold_metrics_df else []
    summary[f"{name}_mean"] = float(np.mean(values)) if len(values) else np.nan
    summary[f"{name}_std"] = float(np.std(values)) if len(values) else np.nan

summary_df = pd.DataFrame([summary])
summary_df

results_dir = root_path / "notebooks" / "results"
results_dir.mkdir(parents=True, exist_ok=True)
out_path = results_dir / "metrics_kfold_summary_xgb_twd.csv"
summary_df.to_csv(out_path, index=False)
log_info(f"[基线-XGB+TWD] K折指标汇总已保存到: {out_path}")

In [None]:
# （可选）单次 holdout 流程示例
use_holdout = False
if use_holdout:
    data_cfg = cfg.get("DATA", {})
    test_ratio = data_cfg.get("test_size", data_cfg.get("test_ratio", 0.2))
    random_state = data_cfg.get("random_state", 42)

    X_train_full, X_test, y_train_full, y_test = train_test_split(
        X,
        y,
        test_size=test_ratio,
        stratify=y,
        random_state=random_state,
    )

    bcfg = cfg.get("BTTWD", {})
    val_ratio = bcfg.get("val_ratio", 0.2)
    X_train, X_val, y_train, y_val = train_test_split(
        X_train_full,
        y_train_full,
        test_size=val_ratio,
        stratify=y_train_full,
        random_state=random_state,
    )

    result = run_xgb_twd_fold(
        X=np.vstack([X_train, X_val, X_test]),
        y=np.hstack([y_train, y_val, y_test]),
        train_idx=np.arange(len(y_train) + len(y_val)),
        test_idx=np.arange(len(y_train) + len(y_val), len(y_train) + len(y_val) + len(y_test)),
        cfg=cfg,
    )
    holdout_metrics = result["metrics"]
    log_metrics("[基线-XGB+TWD] Holdout 指标: ", holdout_metrics)