In [1]:
# =========================
# 环境与基础导入
# =========================
import os
import glob
import math
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_predict
from sklearn.preprocessing import OneHotEncoder, QuantileTransformer, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import (
    roc_auc_score, average_precision_score, f1_score, precision_recall_curve,
    brier_score_loss, precision_score, recall_score
)

from sklearn.calibration import CalibratedClassifierCV
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, HistGradientBoostingClassifier
from sklearn.svm import LinearSVC

# 可选：如安装了 xgboost / catboost，会自动启用；未安装则跳过
try:
    import xgboost as xgb
    HAS_XGB = True
except Exception:
    HAS_XGB = False

try:
    from catboost import CatBoostClassifier
    HAS_CAT = True
except Exception:
    HAS_CAT = False

SEED = 2025
np.random.seed(SEED)


In [2]:
# ==================================
# 数据加载 + 目标变量统一
# ==================================

DATA_PATH = 'churn_clean.csv'
df = pd.read_csv(DATA_PATH)
print(f"已加载数据：{DATA_PATH}   形状={df.shape}")

# 统一目标：y=1 表示流失
if "Exited" in df.columns:
    y = df["Exited"].astype(int).values
    X = df.drop(columns=["Exited"]).copy()
elif "Attrition_Flag" in df.columns:
    y = (df["Attrition_Flag"].astype(str).str.lower().str.contains("attrited")).astype(int).values
    X = df.drop(columns=["Attrition_Flag"]).copy()
elif "Churn" in df.columns:
    y = df["Churn"]
    if y.dtype.kind in "biu":
        y = y.astype(int).values
    else:
        y = y.astype(str).str.lower().isin(["1", "yes", "true", "churn"]).astype(int).values
    X = df.drop(columns=["Churn"]).copy()
else:
    raise ValueError("未识别的目标列，请手工指定。")

# 去除明显的ID/无信息列（若存在）
drop_cols = [c for c in [
    "RowNumber","CustomerId","CustomerID","customerID","CLIENTNUM",
    "Surname","Id","ID","Unnamed: 0","Unnamed: 0.1"
] if c in X.columns]
if drop_cols:
    X = X.drop(columns=drop_cols)

print("特征列数：", X.shape[1], "；示例列：", list(X.columns)[:10])


已加载数据：churn_clean.csv   形状=(10000, 11)
特征列数： 10 ；示例列： ['CreditScore', 'Geography', 'Gender', 'Age', 'Tenure', 'Balance', 'NumOfProducts', 'HasCrCard', 'IsActiveMember', 'EstimatedSalary']


In [3]:
# ========================================
# 特征类型识别 + 通用预处理器
# 数值：中位数填充 + 分位数变换（抗极端值）
# 类别：众数填充 + OneHot
# ========================================
# 将布尔型统一为 object 以便 OneHot（如果你更希望当成数值，也可以改）
for c in X.columns:
    if X[c].dtype == bool:
        X[c] = X[c].astype(object)

num_cols = X.select_dtypes(include=[np.number]).columns.tolist()
cat_cols = X.select_dtypes(exclude=[np.number]).columns.tolist()

num_pipe = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median")),
    ("qtf", QuantileTransformer(output_distribution="normal", subsample=2_000_000, n_quantiles=min(1000, max(10, len(X)//5)))),
])

# 兼容老/新 sklearn：sparse=False 或 sparse_output=False 二选一
try:
    cat_encoder = OneHotEncoder(handle_unknown="ignore", sparse_output=False)
except TypeError:
    cat_encoder = OneHotEncoder(handle_unknown="ignore", sparse=False)

cat_pipe = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("ohe", cat_encoder),
])

preprocessor = ColumnTransformer(
    transformers=[
        ("num", num_pipe, num_cols),
        ("cat", cat_pipe, cat_cols),
    ],
    remainder="drop"
)

print(f"数值列 {len(num_cols)} 个；类别列 {len(cat_cols)} 个。")


数值列 8 个；类别列 2 个。


In [None]:
# ============================================
# 构造客户价值 v_i（可解释、可复现）
# - 自动在常见列中择优选择；若没有则退化为选择前若干数值列
# - 每个分量做 1%~99% 分位截尾 + [0,1] 归一化
# - 再按权重加权求和，最终再归一化到 [0,1]
# ============================================
def compute_customer_value(df_features: pd.DataFrame) -> pd.Series:
    df_ = df_features.copy()

    # Kaggle Churn_Modelling 的优先字段及权重---可学习
    main_scheme = [
        ("Balance", 0.45),
        ("EstimatedSalary", 0.20),
        ("NumOfProducts", 0.15),
        ("Tenure", 0.10),
        ("IsActiveMember", 0.10),
    ]

    # BankChurners 的备选字段及权重
    alt_scheme = [
        ("Total_Trans_Amt", 0.40),
        ("Credit_Limit", 0.20),
        ("Total_Trans_Ct", 0.15),
        ("Total_Relationship_Count", 0.10),
        ("Months_on_book", 0.10),
        ("Avg_Utilization_Ratio", 0.05),
    ]

    # 选择可用的方案
    scheme = [(c, w) for c, w in main_scheme if c in df_.columns]
    if len(scheme) < 3:
        scheme = [(c, w) for c, w in alt_scheme if c in df_.columns]

    # 若数据集完全不同，退化为前若干数值列等权
    if len(scheme) == 0:
        num_candidates = df_.select_dtypes(include=[np.number]).columns.tolist()
        # 避免把目标泄露进来，这里只用 X 的列，不包含 y
        num_candidates = [c for c in num_candidates]
        num_candidates = num_candidates[:min(6, len(num_candidates))]
        scheme = [(c, 1.0) for c in num_candidates]

    components = []
    weights = []
    for col, w in scheme:
        s = pd.to_numeric(df_[col], errors="coerce")
        s = s.fillna(s.median())
        q1, q99 = s.quantile([0.01, 0.99])
        s = s.clip(q1, q99)
        s = (s - s.min()) / (s.max() - s.min() + 1e-9)
        components.append(s.values)
        weights.append(w)

    weights = np.asarray(weights, dtype=float)
    if weights.sum() == 0:
        weights = np.ones_like(weights)
    weights = weights / weights.sum()

    V = np.zeros(len(df_), dtype=float)
    for comp, w in zip(components, weights):
        V += w * comp

    V = (V - V.min()) / (V.max() - V.min() + 1e-9)
    return pd.Series(V, index=df_.index, name="cust_value")

# 计算并附加到 X，便于分析（模型输入不一定使用该列，这里主要用于 sample_weight 与指标）
cust_value_all = compute_customer_value(X)
X_with_value = X.copy()
X_with_value["cust_value"] = cust_value_all
X_with_value.head()


Unnamed: 0,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,cust_value
0,619,France,Female,42,2,0.0,1,1,1,101348.88,0.220513
1,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0.433502
2,502,France,Female,42,8,159660.8,3,1,0,113931.57,0.751582
3,699,France,Female,39,1,0.0,2,0,0,93826.63,0.176011
4,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0.513615


In [5]:
# ===========================================
# 切分训练/测试 + 计算样本权重
# w_i = class_weight[y_i] * (1 + gamma * v_i)
# gamma 控制价值权重强度，默认 1.0，可调
# ===========================================
gamma = 1.0  # 可以调成 0.5~2.0 观察敏感性

X_train, X_test, y_train, y_test = train_test_split(
    X_with_value.drop(columns=["cust_value"]),
    y,
    test_size=0.2,
    random_state=SEED,
    stratify=y
)
v_train = cust_value_all.loc[X_train.index]  # 与训练样本对齐的客户价值
v_test  = cust_value_all.loc[X_test.index]

# 类别不平衡权重
cls_w = compute_class_weight(class_weight="balanced", classes=np.array([0, 1]), y=y_train)
class_weight_dict = {0: cls_w[0], 1: cls_w[1]}

# 样本权重
sample_weight_train = np.array([class_weight_dict[int(yi)] * (1.0 + gamma * vi)
                                for yi, vi in zip(y_train, v_train)], dtype=float)

print("训练集大小：", X_train.shape, "；测试集大小：", X_test.shape)
print("类别权重：", class_weight_dict)
print("样本权重示例（前5个）：", sample_weight_train[:5])


训练集大小： (8000, 10) ；测试集大小： (2000, 10)
类别权重： {0: 0.6279434850863422, 1: 2.4539877300613497}
样本权重示例（前5个）： [0.96808298 0.809095   0.82696893 3.85786522 0.90799112]


In [6]:
# ===================================================
# 评估函数（价值指标 + 校准质量 + 利润阈值）
# ===================================================
def value_precision_recall_at_k(y_true, scores, v, k=0.1):
    """返回 (Value-Precision@K, Value-Recall@K, 价值覆盖率)"""
    n = len(scores)
    top = max(1, int(math.ceil(k * n)))
    order = np.argsort(-scores)
    sel = order[:top]
    # Value-Precision@K：选中集合中真正流失的价值占选中价值的比例
    vp = v[sel][(y_true[sel] == 1)].sum() / (v[sel].sum() + 1e-12)
    # Value-Recall@K：选中集合中真正流失的价值占全部流失价值的比例
    vr = v[sel][(y_true[sel] == 1)].sum() / (v[(y_true == 1)].sum() + 1e-12)
    # 价值覆盖率：选中集合的总价值 / 全部价值
    cov = v[sel].sum() / (v.sum() + 1e-12)
    return float(vp), float(vr), float(cov)

def value_pr_auc(y_true, scores, v):
    """价值加权 PR-AUC：对 precision_recall_curve 使用 sample_weight=v"""
    precision, recall, _ = precision_recall_curve(y_true, scores, sample_weight=v)
    # AUC（梯形法则）
    return float(np.trapz(precision, recall))

def expected_calibration_error(y_true, prob, n_bins=15):
    """ECE：概率校准误差（等频分箱）"""
    order = np.argsort(prob)
    y_sorted = np.array(y_true)[order]
    p_sorted = np.array(prob)[order]
    bins = np.array_split(np.arange(len(prob)), n_bins)
    ece = 0.0
    for idx in bins:
        if len(idx) == 0:
            continue
        yb = y_sorted[idx]
        pb = p_sorted[idx]
        conf = pb.mean()
        acc = yb.mean()
        ece += len(idx) / len(prob) * abs(acc - conf)
    return float(ece)

def search_tau_by_profit(scores, y_true, v, C_contact=1.0, gain=1.5, budget=0.1):
    """
    在验证集或测试集上，以预算上限（Top-K 占比）为约束，搜索带来最高利润的阈值。
    - scores：预测概率
    - y_true：真实标签（0/1）
    - v：客户价值（非负）
    - C_contact：联系一个客户的成本
    - gain：当 y=1 且被联系时的收益系数（收益 = gain * v_i）
    - budget：最多联系的样本占比（0~1）
    返回：best_tau, best_profit, profit_curve（DataFrame: k, tau, profit）
    """
    n = len(scores)
    order = np.argsort(-scores)
    max_k = max(1, int(math.floor(budget * n)))
    ks = np.arange(1, max_k + 1)

    # 预先按分数排序后的 y 与 v
    y_sorted = y_true[order]
    v_sorted = v[order]
    s_sorted = scores[order]

    # 累计量，便于 O(1) 计算前 k 的利润
    cum_tp_value = np.cumsum(v_sorted * (y_sorted == 1))
    cum_all_cost = np.arange(1, n + 1) * C_contact

    profits = gain * cum_tp_value[:max_k] - cum_all_cost[:max_k]
    # 阈值取为第 k 个分数
    taus = s_sorted[ks - 1]

    best_idx = int(np.argmax(profits))
    best_tau = float(taus[best_idx])
    best_profit = float(profits[best_idx])

    import pandas as pd
    curve = pd.DataFrame({"k": ks, "tau": taus, "profit": profits})
    return best_tau, best_profit, curve


In [10]:
# ===================================================
# 基模型（带校准）构造器（兼容 sklearn 新旧 API）
# - 每个模型外面都包一层预处理 Pipeline
# - 采用 CalibratedClassifierCV(method='isotonic' 或 'sigmoid')
# ===================================================
import inspect
from sklearn.calibration import CalibratedClassifierCV

def calibrated(method="auto"):
    # 数据量较小时，用 'sigmoid' 更稳定；否则 'isotonic'
    if method == "auto":
        return "isotonic" if len(X_train) >= 2000 else "sigmoid"
    return method

def _build_calibrator(base_est, method="isotonic", cv=3):
    """
    兼容性封装：sklearn>=1.2 使用 estimator，旧版本使用 base_estimator
    """
    params = inspect.signature(CalibratedClassifierCV).parameters
    if "estimator" in params:  # 新版
        return CalibratedClassifierCV(estimator=base_est, method=method, cv=cv)
    else:                      # 旧版
        return CalibratedClassifierCV(base_estimator=base_est, method=method, cv=cv)

def make_base_models(preprocessor, class_weight_dict, seed=SEED):
    models = {}

    # 1) 逻辑回归（本身校准较好，这里不再套校准器）
    lr = Pipeline(steps=[
        ("prep", preprocessor),
        ("clf", LogisticRegression(
            max_iter=2000,
            class_weight=class_weight_dict,
            solver="lbfgs"
        ))
    ])
    models["LR"] = lr

    # 2) 随机森林 + 校准
    rf_cal = Pipeline(steps=[
        ("prep", preprocessor),
        ("clf", _build_calibrator(
            base_est=RandomForestClassifier(
                n_estimators=400,
                max_depth=None,
                min_samples_leaf=2,
                n_jobs=-1,
                random_state=seed,
                class_weight=class_weight_dict
            ),
            method=calibrated(),
            cv=3
        ))
    ])
    models["RF_cal"] = rf_cal

    # 3) 直方图GBDT（速度快）+ 校准
    hgb_cal = Pipeline(steps=[
        ("prep", preprocessor),
        ("clf", _build_calibrator(
            base_est=HistGradientBoostingClassifier(
                max_depth=None,
                learning_rate=0.06,
                l2_regularization=0.0,
                max_leaf_nodes=31,
                random_state=seed
            ),
            method=calibrated(),
            cv=3
        ))
    ])
    models["HGB_cal"] = hgb_cal

    # 4) 线性 SVM + 校准（SVM 原生无概率）
    lsvm_cal = Pipeline(steps=[
        ("prep", preprocessor),
        ("clf", _build_calibrator(
            base_est=LinearSVC(
                C=1.0,
                class_weight=class_weight_dict,
                random_state=seed
            ),
            method="sigmoid",  # 对于线性 SVM 用 Platt 更稳
            cv=3
        ))
    ])
    models["LinearSVM_cal"] = lsvm_cal

    # 5) 可选：XGBoost + 校准（若已安装）
    if HAS_XGB:
        scale_pos_weight = class_weight_dict[1] / max(1e-9, class_weight_dict[0])
        xgb_cal = Pipeline(steps=[
            ("prep", preprocessor),
            ("clf", _build_calibrator(
                base_est=xgb.XGBClassifier(
                    n_estimators=400,
                    max_depth=4,
                    learning_rate=0.06,
                    subsample=0.9,
                    colsample_bytree=0.8,
                    reg_lambda=1.0,
                    objective="binary:logistic",
                    eval_metric="logloss",
                    tree_method="hist",
                    random_state=seed,
                    n_jobs=-1,
                    scale_pos_weight=scale_pos_weight
                ),
                method=calibrated(),
                cv=3
            ))
        ])
        models["XGB_cal"] = xgb_cal

    # 6) 可选：CatBoost（自身带概率，通常较稳，这里不额外校准）
    if HAS_CAT:
        cat = Pipeline(steps=[
            ("prep", preprocessor),
            ("clf", CatBoostClassifier(
                iterations=600,
                depth=6,
                learning_rate=0.05,
                loss_function="Logloss",
                verbose=False,
                random_state=seed
            ))
        ])
        models["CatBoost"] = cat

    return models

base_models = make_base_models(preprocessor, class_weight_dict, seed=SEED)
list(base_models.keys())


['LR', 'RF_cal', 'HGB_cal', 'LinearSVM_cal']

In [11]:
# ===================================================
# 得到每个基模型的 OOF 概率（训练元学习器）
#         同时在全训数据上拟合一个最终基模型用于推理
# ===================================================
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED)

meta_train_list = []
meta_test_list  = []
fitted_bases = {}  # 存放全量拟合后的基模型

for name, est in base_models.items():
    print(f"[{name}] 生成 OOF 概率 ...")
    # cross_val_predict 会对 fit_params 自动按训练索引切片
    oof_proba = cross_val_predict(
        est, X_train, y_train,
        cv=skf,
        method="predict_proba",
        n_jobs=None,
        fit_params={"clf__sample_weight": sample_weight_train}
    )
    # 有些分类器的 predict_proba 只有两列（负类、正类）
    if oof_proba.ndim == 2 and oof_proba.shape[1] == 2:
        oof_pos = oof_proba[:, 1]
    else:
        oof_pos = oof_proba.ravel()
    meta_train_list.append(oof_pos)

    # 用全部训练集拟合一个最终版本，供测试/推理使用
    est.fit(X_train, y_train, clf__sample_weight=sample_weight_train)
    fitted_bases[name] = est

    # 测试集上也给出对应的基模型概率，作为元特征
    test_proba = est.predict_proba(X_test)
    if test_proba.ndim == 2 and test_proba.shape[1] == 2:
        test_pos = test_proba[:, 1]
    else:
        test_pos = test_proba.ravel()
    meta_test_list.append(test_pos)

# 组装元特征矩阵
meta_X_train = np.vstack(meta_train_list).T
meta_X_test  = np.vstack(meta_test_list).T

print("元特征维度（训练/测试）：", meta_X_train.shape, meta_X_test.shape)


[LR] 生成 OOF 概率 ...
[RF_cal] 生成 OOF 概率 ...
[HGB_cal] 生成 OOF 概率 ...
[LinearSVM_cal] 生成 OOF 概率 ...
元特征维度（训练/测试）： (8000, 4) (2000, 4)


In [13]:
# ===================================================
# 元学习器训练（逻辑回归 + 轻量校准）
# 
# ===================================================
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.calibration import CalibratedClassifierCV
import inspect

# 若前面未定义 _build_calibrator，这里补充一个兼容性封装
try:
    _build_calibrator
except NameError:
    def _build_calibrator(base_est, method="isotonic", cv=3):
        """
        兼容性封装：sklearn>=1.2 使用 estimator，旧版本使用 base_estimator
        """
        params = inspect.signature(CalibratedClassifierCV).parameters
        if "estimator" in params:  # 新版
            return CalibratedClassifierCV(estimator=base_est, method=method, cv=cv)
        else:                      # 旧版
            return CalibratedClassifierCV(base_estimator=base_est, method=method, cv=cv)

# 元特征标准化
meta_scaler = StandardScaler()
meta_X_train_std = meta_scaler.fit_transform(meta_X_train)
meta_X_test_std  = meta_scaler.transform(meta_X_test)

# 元学习器：逻辑回归 + 轻量校准（小数据用 sigmoid，大数据用 isotonic）
meta_base = LogisticRegression(max_iter=500, class_weight=class_weight_dict)
cal_method = "isotonic" if len(X_train) >= 2000 else "sigmoid"
meta_model = _build_calibrator(base_est=meta_base, method=cal_method, cv=3)

# 训练（将样本权重传入以体现价值权重）
meta_model.fit(meta_X_train_std, y_train, sample_weight=sample_weight_train)

# 在测试集上的最终概率
final_proba_test = meta_model.predict_proba(meta_X_test_std)[:, 1]
print("测试集概率（前10个）：", np.round(final_proba_test[:10], 4))


测试集概率（前10个）： [0.3746 0.3732 0.0982 0.1238 0.0982 0.1677 0.9899 0.3732 0.2694 0.3732]


In [14]:
# ===================================================
# 综合评估
# ===================================================
roc = roc_auc_score(y_test, final_proba_test)
pr  = average_precision_score(y_test, final_proba_test)
brier = brier_score_loss(y_test, final_proba_test)
ece = expected_calibration_error(y_test, final_proba_test, n_bins=15)

# F1 / 召回等（用 0.5 只是参考，真正业务阈值见下一单元格）
pred_05 = (final_proba_test >= 0.5).astype(int)
f1 = f1_score(y_test, pred_05)
rec = recall_score(y_test, pred_05)
prec = precision_score(y_test, pred_05)

# 价值指标（以 v_test）
v = v_test.values
vp10, vr10, cov10 = value_precision_recall_at_k(y_test, final_proba_test, v, k=0.10)
vp20, vr20, cov20 = value_precision_recall_at_k(y_test, final_proba_test, v, k=0.20)
vpr_auc = value_pr_auc(y_test, final_proba_test, v)

print(f"ROC-AUC      : {roc:.4f}")
print(f"PR-AUC       : {pr:.4f}")
print(f"Brier        : {brier:.4f}")
print(f"ECE(15bins)  : {ece:.4f}")
print(f"F1@0.5       : {f1:.4f}  Precision@0.5: {prec:.4f}  Recall@0.5: {rec:.4f}")
print(f"Value-Precision@10%: {vp10:.4f}  Value-Recall@10%: {vr10:.4f}  价值覆盖率@10%: {cov10:.4f}")
print(f"Value-Precision@20%: {vp20:.4f}  Value-Recall@20%: {vr20:.4f}  价值覆盖率@20%: {cov20:.4f}")
print(f"价值加权 PR-AUC    : {vpr_auc:.4f}")


ROC-AUC      : 0.8616
PR-AUC       : 0.6968
Brier        : 0.1428
ECE(15bins)  : 0.1750
F1@0.5       : 0.6260  Precision@0.5: 0.5401  Recall@0.5: 0.7445
Value-Precision@10%: 0.8389  Value-Recall@10%: 0.4126  价值覆盖率@10%: 0.1069
Value-Precision@20%: 0.6648  Value-Recall@20%: 0.6220  价值覆盖率@20%: 0.2034
价值加权 PR-AUC    : -0.7158


In [15]:
# ===================================================
# 利润驱动的阈值搜索（带预算）
# 可调参数：
#   C_contact：联系成本（单位化即可）
#   gain     ：成功挽留时的收益系数（乘以客户价值 v_i）
#   budget   ：最多联系的比例（例如 10%）
# ===================================================
C_contact = 1.0
gain = 2.0
budget = 0.10  # 只联系前 10%

best_tau, best_profit, profit_curve = search_tau_by_profit(
    scores=final_proba_test,
    y_true=y_test,
    v=v_test.values,
    C_contact=C_contact,
    gain=gain,
    budget=budget
)

print(f"预算上限={int(budget*100)}%，最优阈值 tau={best_tau:.4f}，对应利润={best_profit:.2f}")
profit_curve.head()


预算上限=10%，最优阈值 tau=0.9899，对应利润=0.21


Unnamed: 0,k,tau,profit
0,1,0.997313,-0.030327
1,2,0.995341,-0.356109
2,3,0.989922,0.213056
3,4,0.989922,0.042845
4,5,0.989922,0.035628


In [16]:
# ===================================================
# 按照最优阈值或 Top-K 进行选客，并输出业务汇总
# ===================================================
scores = final_proba_test.copy()
order = np.argsort(-scores)
k = max(1, int(math.floor(budget * len(scores))))
sel_idx = order[:k]

y_sel = y_test[sel_idx]
v_sel = v_test.values[sel_idx]

# 价值指标（Top-K）
vp, vr, cov = value_precision_recall_at_k(y_test, scores, v_test.values, k=budget)
# 利润（Top-K）
profit_topk = gain * (v_sel[y_sel == 1].sum()) - C_contact * len(sel_idx)

# 利润（最优阈值）
sel_tau = np.where(scores >= best_tau)[0]
profit_tau = gain * (v_test.values[sel_tau][y_test[sel_tau] == 1].sum()) - C_contact * len(sel_tau)

print(f"Top-{int(budget*100)}%：Value-Precision={vp:.4f}  Value-Recall={vr:.4f}  覆盖率={cov:.4f}  利润={profit_topk:.2f}")
print(f"阈值 tau={best_tau:.4f}：  选中人数={len(sel_tau)}  利润={profit_tau:.2f}")


Top-10%：Value-Precision=0.8389  Value-Recall=0.4126  覆盖率=0.1069  利润=-43.90
阈值 tau=0.9899：  选中人数=35  利润=-4.36


In [18]:
# ===================================================
# 可复用封装（训练 -> 预测 -> 选客）
# 为兼容 sklearn 新旧版本，这里提供 _build_calibrator 封装
# ===================================================
from dataclasses import dataclass
import joblib
import inspect
import numpy as np

from sklearn.model_selection import StratifiedKFold, cross_val_predict
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.calibration import CalibratedClassifierCV

# ---- 兼容封装：sklearn>=1.2 使用 estimator，旧版使用 base_estimator ----
def _build_calibrator(base_est, method="isotonic", cv=3):
    params = inspect.signature(CalibratedClassifierCV).parameters
    if "estimator" in params:  # 新版
        return CalibratedClassifierCV(estimator=base_est, method=method, cv=cv)
    else:                      # 旧版
        return CalibratedClassifierCV(base_estimator=base_est, method=method, cv=cv)

@dataclass
class VStackConfig:
    seed: int = SEED
    gamma: float = 1.0
    n_splits: int = 5
    calibrate_method: str = "auto"  # 'auto' / 'isotonic' / 'sigmoid'
    class_weight: dict = None

class VStackChurnModel:
    def __init__(self, preprocessor, config: VStackConfig):
        self.preprocessor = preprocessor
        self.cfg = config
        self.base_models = None
        self.fitted_bases = None
        self.meta_scaler = None
        self.meta_model = None

    def _make_bases(self):
        # 复用前面单元格中定义的 make_base_models（已内置校准）
        return make_base_models(self.preprocessor, self.cfg.class_weight, seed=self.cfg.seed)

    def fit(self, X, y, v, sample_weight):
        """
        X, y：训练集
        v：客户价值（目前未直接用到，保留以便扩展）
        sample_weight：样本权重（已包含类别不平衡与价值权重）
        """
        self.base_models = self._make_bases()
        skf = StratifiedKFold(n_splits=self.cfg.n_splits, shuffle=True, random_state=self.cfg.seed)

        meta_train = []
        self.fitted_bases = {}
        for name, est in self.base_models.items():
            # 生成 OOF 概率作为元特征
            oof = cross_val_predict(
                est, X, y, cv=skf, method="predict_proba",
                fit_params={"clf__sample_weight": sample_weight}
            )
            oof = oof[:, 1] if oof.ndim == 2 else oof.ravel()
            meta_train.append(oof)

            # 在全量训练集上拟合一个最终基模型供推理使用
            est.fit(X, y, clf__sample_weight=sample_weight)
            self.fitted_bases[name] = est

        # 组装并标准化元特征
        meta_train = np.vstack(meta_train).T
        self.meta_scaler = StandardScaler()
        meta_train_std = self.meta_scaler.fit_transform(meta_train)

        # 元学习器：逻辑回归 + 轻量校准（小数据用 sigmoid，大数据用 isotonic）
        base = LogisticRegression(max_iter=500, class_weight=self.cfg.class_weight)
        cal_method = "isotonic" if len(X) >= 2000 else "sigmoid"
        self.meta_model = _build_calibrator(base_est=base, method=cal_method, cv=3)
        self.meta_model.fit(meta_train_std, y, sample_weight=sample_weight)
        return self

    def predict_proba(self, X):
        """使用拟合好的基模型得到元特征 -> 元学习器概率"""
        meta_list = []
        for name in self.fitted_bases:
            p = self.fitted_bases[name].predict_proba(X)
            p = p[:, 1] if p.ndim == 2 else p.ravel()
            meta_list.append(p)
        meta = np.vstack(meta_list).T
        meta_std = self.meta_scaler.transform(meta)
        return self.meta_model.predict_proba(meta_std)[:, 1]

    def select_by_budget(self, X, y, v, C_contact=1.0, gain=2.0, budget=0.1):
        """
        按预算上限在集合 X 中选择客户：
        返回 (最优阈值 tau, 最优利润, 选中索引, 概率分数, 利润曲线DataFrame)
        """
        scores = self.predict_proba(X)
        tau, best_profit, curve = search_tau_by_profit(scores, y, v, C_contact, gain, budget)
        idx_tau = np.where(scores >= tau)[0]
        return tau, best_profit, idx_tau, scores, curve

    def save(self, path):
        obj = {
            "preprocessor": self.preprocessor,
            "fitted_bases": self.fitted_bases,
            "meta_scaler": self.meta_scaler,
            "meta_model": self.meta_model,
            "config": self.cfg,
        }
        joblib.dump(obj, path)
        return path

    @staticmethod
    def load(path):
        obj = joblib.load(path)
        model = VStackChurnModel(obj["preprocessor"], obj["config"])
        model.fitted_bases = obj["fitted_bases"]
        model.meta_scaler = obj["meta_scaler"]
        model.meta_model = obj["meta_model"]
        return model

# ====== 使用示例（可按需重训/保存）======
cfg = VStackConfig(seed=SEED, gamma=gamma, class_weight=class_weight_dict)
vstack_model = VStackChurnModel(preprocessor, cfg).fit(X_train, y_train, v_train.values, sample_weight_train)
proba_check = vstack_model.predict_proba(X_test)[:5]
print("封装模型测试概率（前5个）：", np.round(proba_check, 4))

# 保存（可选）
# path = "vstack_churn_model.joblib"
# vstack_model.save(path)
# print("已保存到：", path)


封装模型测试概率（前5个）： [0.3746 0.3732 0.0982 0.1238 0.0982]
