## 1. Internal validation data: save the best model in the 5th fold crossover

In [None]:
import numpy as np  
import pandas as pd
from sklearn import metrics
from sklearn.model_selection import KFold, train_test_split
from catboost import CatBoostClassifier, Pool
import os

def bootstrap_metric_ci(y_true, y_pred, metric_fn, n_bootstrap=2000, seed=42):
    rng = np.random.RandomState(seed)
    scores = []
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    for _ in range(n_bootstrap):
        idx = rng.randint(0, len(y_true), len(y_true))
        if len(np.unique(y_true[idx])) < 2:
            continue
        scores.append(metric_fn(y_true[idx], y_pred[idx]))
    return np.mean(scores), np.percentile(scores, 2.5), np.percentile(scores, 97.5)

def train_catboost_5fold_cv_fixed(
    dataX,
    dataY,
    cat_features=None,
    save_path='/home/mailiyi/Poisoning_Prediction/ML/catboost_predict_death/internal_valid/',
    seed=9762,
    early_stopping_rounds=30,
    params={'depth': 5, 'iterations': 200, 'learning_rate': 0.05}
):
    os.makedirs(save_path, exist_ok=True)
    dataX = dataX.copy()
    if cat_features is not None:
        for c in cat_features:
            dataX[c] = dataX[c].astype(str).fillna("missing")

    X = dataX
    y = np.array(dataY)

    param_name = "_".join([f"{k}_{v}" for k, v in params.items()])
    param_path = os.path.join(save_path, param_name)
    os.makedirs(param_path, exist_ok=True)
    print(f"\n===== 使用固定参数: {params} =====")

    kf = KFold(n_splits=5, shuffle=True, random_state=seed)
    all_results = []
    val_scores = []   
    model_paths = []  

    fold_idx = 1
    for train_val_index, test_index in kf.split(X):
        X_train_val, X_test = X.iloc[train_val_index], X.iloc[test_index]
        y_train_val, y_test = y[train_val_index], y[test_index]

        X_train, X_val, y_train, y_val = train_test_split(
            X_train_val, y_train_val,
            test_size=1/8, random_state=seed, stratify=y_train_val
        )

        print(f"Seed {seed}, Fold={fold_idx}: Train={len(y_train)}, Val={len(y_val)}, Test={len(y_test)}")
        print(f"  Train - Pos: {np.sum(y_train == 1)}, Neg: {np.sum(y_train == 0)}")
        print(f"  Val   - Pos: {np.sum(y_val == 1)},   Neg: {np.sum(y_val == 0)}")
        print(f"  Test  - Pos: {np.sum(y_test == 1)},  Neg: {np.sum(y_test == 0)}")

        num_pos = np.sum(y_train == 1)
        num_neg = np.sum(y_train == 0)
        scale_pos_weight = num_neg / max(num_pos, 1)

        # Pool
        train_pool = Pool(X_train, label=y_train, cat_features=cat_features)
        val_pool = Pool(X_val, label=y_val, cat_features=cat_features)
        test_pool = Pool(X_test, label=y_test, cat_features=cat_features)

        model = CatBoostClassifier(
            **params,
            loss_function="Logloss",
            eval_metric="AUC",
            scale_pos_weight=scale_pos_weight,
            random_seed=seed,
            verbose=False
        )

        # model.fit(train_pool, eval_set=val_pool, early_stopping_rounds=early_stopping_rounds, verbose=100)
        model.fit(train_pool, eval_set=val_pool, early_stopping_rounds=early_stopping_rounds)

        y_val_pred = model.predict_proba(val_pool)[:, 1]
        val_auc = metrics.roc_auc_score(y_val, y_val_pred)
        val_scores.append(val_auc)

        y_pred_prob = model.predict_proba(test_pool)[:, 1]

        fold_csv = os.path.join(param_path, f"fold_{fold_idx}_results.csv")
        pd.DataFrame({
            "fold": fold_idx,
            "y_test": y_test,
            "y_pred": y_pred_prob
        }).to_csv(fold_csv, index=False)

        model_path = os.path.join(param_path, f"fold_{fold_idx}_model.cbm")
        model.save_model(model_path)
        model_paths.append(model_path)

        all_results.append(pd.DataFrame({
            "fold": fold_idx,
            "y_test": y_test,
            "y_pred": y_pred_prob
        }))
        print(f"Fold {fold_idx} validation set AUC={val_auc:.4f}，Model saved to {model_path}")
        fold_idx += 1

    best_fold = int(np.argmax(val_scores)) + 1
    best_model_path = model_paths[best_fold - 1]
    print(f"\n===== The optimal model is Fold {best_fold}，validation set AUC={val_scores[best_fold - 1]:.4f} =====")

    all_results_df = pd.concat(all_results, ignore_index=True)
    all_csv = os.path.join(param_path, "all_folds_results.csv")
    all_results_df.to_csv(all_csv, index=False)

    # ===================== bootstrap calculate AUROC/AUPRC =====================
    mean_auroc, auroc_lower, auroc_upper = bootstrap_metric_ci(
        all_results_df["y_test"], all_results_df["y_pred"], metrics.roc_auc_score
    )
    mean_auprc, auprc_lower, auprc_upper = bootstrap_metric_ci(
        all_results_df["y_test"], all_results_df["y_pred"], metrics.average_precision_score
    )

    print(f"AUROC: Mean={mean_auroc:.4f}, 95% CI=({auroc_lower:.4f},{auroc_upper:.4f})")
    print(f"AUPRC: Mean={mean_auprc:.4f}, 95% CI=({auprc_lower:.4f},{auprc_upper:.4f})")

    return {
        "params": params,
        "AUROC_mean": mean_auroc,
        "AUROC_CI": (auroc_lower, auroc_upper),
        "AUPRC_mean": mean_auprc,
        "AUPRC_CI": (auprc_lower, auprc_upper),
        "AllResults": all_results_df,
        "BestModelPath": best_model_path
    }


# ===================== Load optimal model and predict example =====================
def load_best_catboost_model(model_path, X_new, cat_features=None):
    model = CatBoostClassifier()
    model.load_model(model_path)
    if cat_features is not None:
        for c in cat_features:
            X_new[c] = X_new[c].astype(str).fillna("missing")
    y_pred_prob = model.predict_proba(X_new)[:, 1]
    return y_pred_prob


#### 2. 外部验证数据：直接加载最优模型进行预测