In [None]:
# -*- coding: utf-8 -*-
"""
SMOTE增强与XGBoost分类的严谨实验框架 (消融研究对比版本)
=========================================================
本脚本是为响应审稿人“消融研究”要求而创建的对比实验版本。
它使用SMOTE作为数据增强方法，与WGAN-GP版本形成直接对比。

核心方法学与WGAN-GP版本保持一致，以确保对比的公平性：
1.  **严格的数据隔离**: 独立的“开发集”和“最终测试集”。
2.  **动态嵌套式数据增强**: 在K-折交叉验证的每一折内部，动态地使用SMOTE增强当前折的训练数据。
3.  **混合数据类型处理**: SMOTE应用于预处理后的数据空间，生成样本后再逆转换为原始格式。
4.  **正确的模型与评估**: 全程使用XGBoost分类器及分类指标。
"""

# --- 基础库导入 ---
import pandas as pd
import numpy as np
import xgboost as xgb
from sklearn.model_selection import KFold, RandomizedSearchCV, train_test_split
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.metrics import (accuracy_score, precision_score, recall_score,
                             f1_score, roc_auc_score, ConfusionMatrixDisplay,
                             make_scorer)
# SMOTE需要从imblearn库导入
# 如果您尚未安装，请运行: pip install -U imbalanced-learn
from imblearn.over_sampling import SMOTE
import matplotlib.pyplot as plt
import os
import joblib
from pathlib import Path

# --- 0. 全局配置 (与WGAN-GP版本完全一致) ---

# --- 路径配置 ---
CURRENT_DIR = Path.cwd()
PROJECT_ROOT = CURRENT_DIR.parent
DATA_DIR = PROJECT_ROOT / "data"
OUTPUT_DIR = PROJECT_ROOT / "output"

DEV_SET_FILE = DATA_DIR / "development_set.xlsx"
TEST_SET_FILE = DATA_DIR / "final_test_set.xlsx"
# 为SMOTE版本创建独立的输出文件夹，以避免与WGAN-GP版本的结果混淆
SMOTE_OUTPUT_DIR = OUTPUT_DIR / "smote_experiment"
AUGMENTED_DATA_OUTPUT_FOLDER = SMOTE_OUTPUT_DIR / "augmented_outputs"
MODEL_OUTPUT_PATH = SMOTE_OUTPUT_DIR / "trained_models"
OUTPUT_PLOT_PATH = SMOTE_OUTPUT_DIR

os.makedirs(AUGMENTED_DATA_OUTPUT_FOLDER, exist_ok=True)
os.makedirs(MODEL_OUTPUT_PATH, exist_ok=True)

# --- 实验参数 ---
TARGET_COLUMN = 'target'
RANDOM_STATE = 42
N_SPLITS_KFOLD = 5

# --- XGBoost RandomizedSearchCV 超参数搜索空间 ---
XGB_PARAM_GRID = {
    'n_estimators': [100, 200, 300, 400],
    'max_depth': [3, 5, 7],
    'learning_rate': [0.01, 0.05, 0.1],
    'subsample': [0.7, 0.8, 0.9, 1],
    'colsample_bytree': [0.7, 0.8, 0.9, 1],
    'gamma': [0, 0.1, 0.2],
    'reg_alpha': [0, 0.01, 0.1],
    'reg_lambda': [0.5, 1, 1.5],
    'scale_pos_weight': [1, 5, 10, 20]
}
N_ITER_RANDOMIZED_SEARCH = 30

# --- 设备配置 ---
# SMOTE在CPU上运行，但XGBoost仍可利用GPU
# 检查torch是否可用，以决定设备类型。如果torch不可用，则默认为'cpu'。
try:
    import torch
    device_type = 'cuda' if torch.cuda.is_available() else 'cpu'
except ImportError:
    device_type = 'cpu'
print(f"XGBoost将使用设备: {device_type}")


# --- 数据特征定义 ---
CONTINUOUS_COLS = ['age', 'restingBP', 'serumcholestrol', 'maxheartrate', 'oldpeak']
CATEGORICAL_COLS = ['gender', 'fastingbloodsugar', 'chestpain',
                    'restingrelectro', 'exerciseangia', 'slope',
                    'noofmajorvessels', 'target']

CATEGORY_MAPPINGS = {
    'gender':           [0, 1],
    'fastingbloodsugar':[0, 1],
    'chestpain':        [0, 1, 2, 3],
    'restingrelectro':  [0, 1, 2],
    'exerciseangia':    [0, 1],
    'slope':            [1, 2, 3],
    'noofmajorvessels': [0, 1, 2, 3],
    'target':           [0, 1]
}

CONTINUOUS_BOUNDS = {
    'age':              (20, 80),
    'restingBP':        (94, 200),
    'serumcholestrol':  (126, 564),
    'maxheartrate':     (71, 202),
    'oldpeak':          (0, 6.2)
}

# --- 1. SMOTE 数据增强核心功能函数 ---
def augment_with_smote(input_original_df,
                       fold_num_for_logging=None,
                       output_augmented_data_path=None):
    """
    一个完整的函数，用于使用SMOTE生成指定数量的增强样本。
    该函数封装了所有预处理、SMOTE应用和后处理步骤。
    """
    log_prefix = f"[SMOTE"
    if fold_num_for_logging:
        log_prefix += f" {fold_num_for_logging}"
    log_prefix += "]"
    print(f"\n{log_prefix}] 开始数据增强，输入形状: {input_original_df.shape}")

    # 1. 预处理：拆分、标准化、独热编码
    cont_df = input_original_df[CONTINUOUS_COLS]
    cat_df  = input_original_df[CATEGORICAL_COLS]

    scaler = StandardScaler()
    cont_std = scaler.fit_transform(cont_df)

    encoder = OneHotEncoder(sparse_output=False, dtype=np.float32, handle_unknown='ignore')
    cat_oh  = encoder.fit_transform(cat_df)
    
    X_processed = np.hstack([cont_std, cat_oh])
    y_original = input_original_df[TARGET_COLUMN]

    # 2. 应用SMOTE (修正后的鲁棒版本)
    # 动态确定少数类和多数类
    class_counts = y_original.value_counts()
    
    # 如果只有一个类别，或者类别已经平衡，则不进行增强
    if len(class_counts) < 2 or class_counts.iloc[0] == class_counts.iloc[1]:
        print(f"{log_prefix}] 数据已平衡或只有一个类别，跳过SMOTE。")
        return pd.DataFrame()

    majority_class_label = class_counts.idxmax()
    minority_class_label = class_counts.idxmin()
    n_majority = class_counts[majority_class_label]
    n_minority = class_counts[minority_class_label]
    
    # 设置采样策略：将少数类的样本数增加到与多数类相同
    sampling_strategy = {minority_class_label: n_majority}
    
    print(f"{log_prefix}] 应用SMOTE... 原始少数类({minority_class_label})样本数: {n_minority}, 多数类({majority_class_label})样本数: {n_majority}")
    smote = SMOTE(sampling_strategy=sampling_strategy, random_state=RANDOM_STATE)
    X_resampled, y_resampled = smote.fit_resample(X_processed, y_original)
    
    # 提取新生成的样本
    num_original_samples = len(input_original_df)
    generated_processed = X_resampled[num_original_samples:]
    
    if len(generated_processed) == 0:
        print(f"{log_prefix}] 警告: SMOTE未生成新样本。返回空DataFrame。")
        return pd.DataFrame()
        
    print(f"{log_prefix}] SMOTE完成，生成了 {len(generated_processed)} 个新样本。")

    # 3. 后处理：反变换与数据还原
    num_cont_cols = len(CONTINUOUS_COLS)
    gen_cont_std = generated_processed[:, :num_cont_cols]
    gen_cat_oh   = generated_processed[:, num_cont_cols:]

    gen_cont = scaler.inverse_transform(gen_cont_std)
    for i, col in enumerate(CONTINUOUS_COLS):
        lo, hi = CONTINUOUS_BOUNDS[col]
        gen_cont[:, i] = np.clip(gen_cont[:, i], lo, hi)

    # 使用与WGAN-GP版本相同的鲁棒反变换逻辑
    gen_cat_df = pd.DataFrame(columns=CATEGORICAL_COLS)
    current_col_idx = 0
    for i, col in enumerate(CATEGORICAL_COLS):
        num_categories = len(encoder.categories_[i])
        col_slice = gen_cat_oh[:, current_col_idx : current_col_idx + num_categories]
        cat_indices = np.argmax(col_slice, axis=1)
        original_labels = encoder.categories_[i][cat_indices]
        gen_cat_df[col] = original_labels
        current_col_idx += num_categories

    for col in CATEGORICAL_COLS:
        gen_cat_df[col] = pd.to_numeric(gen_cat_df[col], errors='coerce')
        gen_cat_df[col] = gen_cat_df[col].apply(
            lambda x: min(CATEGORY_MAPPINGS[col], key=lambda v: abs(v - x)) if pd.notna(x) else x
        )

    # 4. 合并为最终的DataFrame
    augmented_df = pd.DataFrame(gen_cont, columns=CONTINUOUS_COLS)
    augmented_df[CATEGORICAL_COLS] = gen_cat_df

    # 5. (可选) 保存到文件
    if output_augmented_data_path:
        augmented_df.to_excel(output_augmented_data_path, index=False)
        print(f"{log_prefix}] 增强数据已保存到: {output_augmented_data_path}")

    return augmented_df

# --- 2. 主流程开始 ---
try:
    development_df_original = pd.read_excel(DEV_SET_FILE)
    final_test_df_original = pd.read_excel(TEST_SET_FILE)
    print(f"成功加载数据: 开发集形状 {development_df_original.shape}, 最终测试集形状 {final_test_df_original.shape}")
except FileNotFoundError as e:
    print(f"错误: 数据文件未找到，请检查路径。 {e}")
    exit()

X_dev_original = development_df_original.drop(columns=[TARGET_COLUMN])
y_dev_original = development_df_original[TARGET_COLUMN]
X_final_test = final_test_df_original.drop(columns=[TARGET_COLUMN])
y_final_test = final_test_df_original[TARGET_COLUMN]

# --- 步骤 3.1: 超参数调优 (HPO) ---
print("\n--- [步骤 3.1] XGBoost 超参数调优 (使用SMOTE增强) ---")
print("为HPO生成开发集的SMOTE增强版本...")
augmented_dev_for_hpo_df = augment_with_smote(
    input_original_df=development_df_original.copy(),
    fold_num_for_logging="HPO_Dev_Set"
)

# 检查增强是否成功
if not augmented_dev_for_hpo_df.empty:
    X_augmented_dev_for_hpo = augmented_dev_for_hpo_df.drop(columns=[TARGET_COLUMN])
    y_augmented_dev_for_hpo = augmented_dev_for_hpo_df[TARGET_COLUMN]
    X_combined_dev_for_hpo = pd.concat([X_dev_original, X_augmented_dev_for_hpo], ignore_index=True)
    y_combined_dev_for_hpo = pd.concat([y_dev_original, y_augmented_dev_for_hpo], ignore_index=True)
else:
    print("HPO阶段数据增强未生成样本，仅使用原始数据进行调优。")
    X_combined_dev_for_hpo = X_dev_original
    y_combined_dev_for_hpo = y_dev_original


xgb_classifier_for_hpo = xgb.XGBClassifier(objective='binary:logistic', eval_metric='logloss',
                                           random_state=RANDOM_STATE, use_label_encoder=False,
                                           tree_method='gpu_hist' if device_type == 'cuda' else 'auto')

random_search_hpo = RandomizedSearchCV(
    estimator=xgb_classifier_for_hpo, param_distributions=XGB_PARAM_GRID,
    n_iter=N_ITER_RANDOMIZED_SEARCH, cv=N_SPLITS_KFOLD,
    scoring='roc_auc',
    verbose=1, random_state=RANDOM_STATE, n_jobs=-1
)
print(f"开始在 {X_combined_dev_for_hpo.shape[0]} 个样本上进行XGBoost超参数搜索...")
random_search_hpo.fit(X_combined_dev_for_hpo, y_combined_dev_for_hpo)
best_overall_xgboost_params = random_search_hpo.best_params_
print(f"\n找到的最佳XGBoost超参数 (SMOTE): {best_overall_xgboost_params}")
print(f"最佳HPO ROC AUC Score (SMOTE): {random_search_hpo.best_score_:.4f}")

# --- 步骤 3.2: K-折交叉验证与动态SMOTE增强 ---
print(f"\n--- [步骤 3.2] 在开发集上进行 {N_SPLITS_KFOLD}-折交叉验证 (动态SMOTE增强) ---")
kf = KFold(n_splits=N_SPLITS_KFOLD, shuffle=True, random_state=RANDOM_STATE)

kfold_cv_val_metrics_list = []
kfold_cv_train_metrics_list = []
cv_train_loss_curves = []
cv_val_loss_curves = []

for fold_idx, (train_indices, val_indices) in enumerate(kf.split(development_df_original)):
    print(f"\n--- K-Fold: 第 {fold_idx + 1}/{N_SPLITS_KFOLD} 折 ---")
    cv_train_original_fold_df = development_df_original.iloc[train_indices]
    cv_val_original_fold_df = development_df_original.iloc[val_indices]

    cv_augmented_fold_df = augment_with_smote(
        input_original_df=cv_train_original_fold_df.copy(),
        fold_num_for_logging=f"Fold_{fold_idx + 1}"
    )

    X_cv_train_original_fold = cv_train_original_fold_df.drop(columns=[TARGET_COLUMN])
    y_cv_train_original_fold = cv_train_original_fold_df[TARGET_COLUMN]
    
    if not cv_augmented_fold_df.empty:
        X_cv_augmented_fold = cv_augmented_fold_df.drop(columns=[TARGET_COLUMN])
        y_cv_augmented_fold = cv_augmented_fold_df[TARGET_COLUMN]
        X_cv_train_combined_fold = pd.concat([X_cv_train_original_fold, X_cv_augmented_fold], ignore_index=True)
        y_cv_train_combined_fold = pd.concat([y_cv_train_original_fold, y_cv_augmented_fold], ignore_index=True)
    else:
        X_cv_train_combined_fold = X_cv_train_original_fold
        y_cv_train_combined_fold = y_cv_train_original_fold

    
    X_cv_val_fold = cv_val_original_fold_df.drop(columns=[TARGET_COLUMN])
    y_cv_val_fold = cv_val_original_fold_df[TARGET_COLUMN]

    model_fold = xgb.XGBClassifier(
        **best_overall_xgboost_params, objective='binary:logistic', eval_metric='logloss',
        random_state=RANDOM_STATE, use_label_encoder=False,
        tree_method='gpu_hist' if device_type == 'cuda' else 'auto'
    )
    
    eval_set_fold = [(X_cv_train_combined_fold, y_cv_train_combined_fold), (X_cv_val_fold, y_cv_val_fold)]
    model_fold.fit(X_cv_train_combined_fold, y_cv_train_combined_fold,
                   eval_set=eval_set_fold, early_stopping_rounds=10, verbose=False)

    fold_eval_results = model_fold.evals_result()
    cv_train_loss_curves.append(fold_eval_results['validation_0']['logloss'])
    cv_val_loss_curves.append(fold_eval_results['validation_1']['logloss'])

    y_pred_val_proba = model_fold.predict_proba(X_cv_val_fold)[:, 1]
    y_pred_val = (y_pred_val_proba > 0.5).astype(int)
    kfold_cv_val_metrics_list.append({
        'Accuracy': accuracy_score(y_cv_val_fold, y_pred_val),
        'F1 Score': f1_score(y_cv_val_fold, y_pred_val),
        'AUC': roc_auc_score(y_cv_val_fold, y_pred_val_proba)
    })

    y_pred_train_proba = model_fold.predict_proba(X_cv_train_combined_fold)[:, 1]
    y_pred_train = (y_pred_train_proba > 0.5).astype(int)
    kfold_cv_train_metrics_list.append({
        'Accuracy': accuracy_score(y_cv_train_combined_fold, y_pred_train),
        'F1 Score': f1_score(y_cv_train_combined_fold, y_pred_train),
        'AUC': roc_auc_score(y_cv_train_combined_fold, y_pred_train_proba)
    })
    print(f"Fold {fold_idx + 1} - Val AUC: {kfold_cv_val_metrics_list[-1]['AUC']:.4f} | Train AUC: {kfold_cv_train_metrics_list[-1]['AUC']:.4f}")

# --- 步骤 3.3: 交叉验证结果分析与可视化 ---
avg_kfold_cv_val_metrics_df = pd.DataFrame(kfold_cv_val_metrics_list)
avg_kfold_cv_train_metrics_df = pd.DataFrame(kfold_cv_train_metrics_list)

print("\n--- K-折交叉验证平均性能 (SMOTE动态增强) ---")
print("--- 平均验证集性能 ---")
print(avg_kfold_cv_val_metrics_df.mean())
print("\n--- 平均训练集性能 ---")
print(avg_kfold_cv_train_metrics_df.mean())

avg_val_metrics = avg_kfold_cv_val_metrics_df.mean()
avg_train_metrics = avg_kfold_cv_train_metrics_df.mean()
metrics_to_plot = ['Accuracy', 'F1 Score', 'AUC']
x_axis = np.arange(len(metrics_to_plot))

plt.figure(figsize=(10, 6))
plt.bar(x_axis - 0.2, avg_train_metrics[metrics_to_plot], width=0.4, label='CV Train Avg.', align='center')
plt.bar(x_axis + 0.2, avg_val_metrics[metrics_to_plot], width=0.4, label='CV Validation Avg.', align='center')
plt.xticks(x_axis, metrics_to_plot)
plt.ylabel('Score')
plt.title('Average K-Fold CV Train vs. Validation Metrics (SMOTE Augmented)')
plt.legend()
plt.grid(True, linestyle='--', alpha=0.6)
plt.ylim(0.8, 1.0)
plt.savefig(OUTPUT_PLOT_PATH / "kfold_avg_eval_metrics_smote.png", dpi=300)
plt.show()

plt.figure(figsize=(12, 7))
for i in range(N_SPLITS_KFOLD):
    plt.plot(cv_train_loss_curves[i], label=f'Train Fold {i+1}', linestyle='--')
    plt.plot(cv_val_loss_curves[i], label=f'Validation Fold {i+1}', linestyle='-')
plt.xlabel('Boosting Round')
plt.ylabel('Log Loss')
plt.title('Training and Validation Log Loss per Fold (SMOTE Augmented)')
plt.legend()
plt.grid(True, linestyle='--', alpha=0.6)
plt.savefig(OUTPUT_PLOT_PATH / "kfold_logloss_per_fold_smote.png", dpi=300)
plt.show()

# --- 步骤 4: 训练最终模型 ---
print("\n--- [步骤 4] 训练最终模型 (使用SMOTE增强) ---")
print("为最终模型生成开发集的完整SMOTE增强版本...")
final_augmented_dev_df = augment_with_smote(
    input_original_df=development_df_original.copy(),
    fold_num_for_logging="Final_Model_Augmentation",
    output_augmented_data_path=AUGMENTED_DATA_OUTPUT_FOLDER / "augmented_for_final_model_smote.xlsx"
)

if not final_augmented_dev_df.empty:
    X_final_augmented_dev = final_augmented_dev_df.drop(columns=[TARGET_COLUMN])
    y_final_augmented_dev = final_augmented_dev_df[TARGET_COLUMN]
    X_train_final_model = pd.concat([X_dev_original, X_final_augmented_dev], ignore_index=True)
    y_train_final_model = pd.concat([y_dev_original, y_final_augmented_dev], ignore_index=True)
else:
    print("最终模型训练阶段数据增强未生成样本，仅使用原始数据进行训练。")
    X_train_final_model = X_dev_original
    y_train_final_model = y_dev_original

print(f"用于训练最终模型的总数据形状: {X_train_final_model.shape}")

final_model = xgb.XGBClassifier(
    **best_overall_xgboost_params, objective='binary:logistic', eval_metric='logloss',
    random_state=RANDOM_STATE, use_label_encoder=False,
    tree_method='gpu_hist' if device_type == 'cuda' else 'auto'
)
print("开始训练最终模型...")
final_model.fit(X_train_final_model, y_train_final_model)
print("最终模型训练完成。")

final_model_path = MODEL_OUTPUT_PATH / "final_xgboost_smote_model.joblib"
joblib.dump(final_model, final_model_path)
print(f"最终模型已保存到: {final_model_path}")

plt.figure(figsize=(10, 8))
xgb.plot_importance(final_model, max_num_features=20, height=0.8, title="Feature Importance (Final Model with SMOTE)")
plt.tight_layout()
plt.savefig(OUTPUT_PLOT_PATH / "final_model_feature_importances_smote.png", dpi=300)
plt.show()

# --- 步骤 5: 在最终测试集上进行无偏评估 ---
print("\n--- [步骤 5] 在最终测试集上进行无偏评估 (SMOTE模型) ---")
y_pred_proba_final = final_model.predict_proba(X_final_test)[:, 1]
y_pred_final = (y_pred_proba_final >= 0.5).astype(int)

print("--- 最终模型在最终测试集上的性能 (SMOTE) ---")
print(f"Accuracy : {accuracy_score(y_final_test, y_pred_final):.4f}")
print(f"Precision: {precision_score(y_final_test, y_pred_final):.4f}")
print(f"Recall   : {recall_score(y_final_test, y_pred_final):.4f}")
print(f"F1 Score : {f1_score(y_final_test, y_pred_final):.4f}")
print(f"AUC      : {roc_auc_score(y_final_test, y_pred_proba_final):.4f}")

fig, ax = plt.subplots(figsize=(8, 8))
ConfusionMatrixDisplay.from_predictions(y_final_test, y_pred_final,
                                        ax=ax,
                                        display_labels=['Absence', 'Presence'],
                                        cmap='Blues')
ax.set_title('Confusion Matrix on Final Test Set (SMOTE Model)')
plt.savefig(OUTPUT_PLOT_PATH / "final_model_confusion_matrix_smote.png", dpi=300)
plt.show()

print("\n--- SMOTE版本整体流程执行完毕 ---")
