In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.ensemble import RandomForestClassifier, StackingClassifier, RandomForestRegressor
from xgboost import XGBClassifier, XGBRegressor
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import (
    roc_curve, auc, accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, mean_squared_error, mean_absolute_error, confusion_matrix
)
from sklearn.calibration import calibration_curve
from sklearn.linear_model import LogisticRegression
import shap

file_path = ''
data = pd.read_excel(file_path)

# 数据预处理
data["浓缩红细胞"] = np.log1p(data["浓缩红细胞"])
data["新鲜血浆"] = np.log1p(data["新鲜血浆"])

# 分割特征和目标
X = data.drop(columns=["是否输血", "浓缩红细胞", "新鲜血浆", "单采血小板", "冷沉淀"]).values
y_class = data["是否输血"].values
y_reg = data[["浓缩红细胞", "新鲜血浆", "单采血小板", "冷沉淀"]].values

# SMOTE过采样
smote = SMOTE(random_state=42)
X_resampled, y_class_resampled = smote.fit_resample(X, y_class)

# 回归目标处理 - 不需要输血的样本回归值为0
y_reg_resampled = np.zeros((len(y_class_resampled), y_reg.shape[1]))
y_reg_resampled[:len(y_reg)] = y_reg
y_reg_resampled[y_class_resampled == 0] = 0

# 标准化
scaler = StandardScaler()
X_resampled = scaler.fit_transform(X_resampled)

# 划分训练集和验证集
X_train, X_val, y_class_train, y_class_val, y_reg_train, y_reg_val = train_test_split(
    X_resampled, y_class_resampled, y_reg_resampled, test_size=0.2, random_state=42
)

# ======================== 模型定义 ========================
# 定义基础模型
rf_clf = RandomForestClassifier(n_estimators=100, random_state=42)
xgb_clf = XGBClassifier(n_estimators=500, max_depth=8, learning_rate=0.02, 
                       objective="binary:logistic", eval_metric="logloss", 
                       random_state=42)
mlp_clf = MLPClassifier(hidden_layer_sizes=(128, 64, 32), max_iter=1000, 
                        activation='relu', random_state=42)
bp_clf = MLPClassifier(random_state=42, hidden_layer_sizes=(128, 64, 32), max_iter=1000, activation='relu')
stacked_model = StackingClassifier(
    estimators=[
        ("rf", rf_clf),
        ("xgb", xgb_clf)
    ],
    final_estimator=LogisticRegression(),
    stack_method='predict_proba'
)

# 模型集合
models = {
    "Random Forest": rf_clf,
    "XGBoost": xgb_clf,
    "MLP": mlp_clf,
    "BP": bp_clf,
    "HGBVE": stacked_model
}

# ======================== 分层交叉验证 ========================
def stratified_cross_validation(model, X, y, n_splits=5):
    """执行分层交叉验证并返回结果"""
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    results = {
        'accuracy': [],
        'precision': [],
        'recall': [],
        'f1': [],
        'roc_auc': [],
        'fprs': [],
        'tprs': [],
        'aucs': []
    }
    
    for train_idx, test_idx in skf.split(X, y):
        X_train, X_test = X[train_idx], X[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]
        
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        y_prob = model.predict_proba(X_test)[:, 1]
        
        # 计算指标
        results['accuracy'].append(accuracy_score(y_test, y_pred))
        results['precision'].append(precision_score(y_test, y_pred))
        results['recall'].append(recall_score(y_test, y_pred))
        results['f1'].append(f1_score(y_test, y_pred))
        results['roc_auc'].append(roc_auc_score(y_test, y_prob))
        
        # 计算ROC曲线
        fpr, tpr, _ = roc_curve(y_test, y_prob)
        results['fprs'].append(fpr)
        results['tprs'].append(tpr)
        results['aucs'].append(auc(fpr, tpr))
    
    return results

# 存储所有模型的CV结果
cv_results = {}
for name, model in models.items():
    print(f"正在进行 {name} 的分层交叉验证...")
    cv_results[name] = stratified_cross_validation(model, X_resampled, y_class_resampled)

# ======================== ROC曲线 ========================
plt.figure(figsize=(10, 8))
plt.plot([0, 1], [0, 1], 'k--', label='Random Classifier (AUC = 0.5)')

for name, results in cv_results.items():
    mean_fpr = np.linspace(0, 1, 100)
    tprs = []
    
    for i in range(len(results['fprs'])):
        fpr = results['fprs'][i]
        tpr = results['tprs'][i]
        tprs.append(np.interp(mean_fpr, fpr, tpr))
        tprs[-1][0] = 0.0
    
    mean_tpr = np.mean(tprs, axis=0)
    mean_tpr[-1] = 1.0
    mean_auc = auc(mean_fpr, mean_tpr)
    std_auc = np.std(results['aucs'])
    
    plt.plot(mean_fpr, mean_tpr, label=f'{name} (AUC = {mean_auc:.3f} ± {std_auc:.3f})')
    plt.fill_between(mean_fpr, 
                     np.maximum(mean_tpr - np.std(tprs, axis=0), 0), 
                     np.minimum(mean_tpr + np.std(tprs, axis=0), 1), 
                     alpha=0.2)

plt.title('ROC Curves with Cross-Validation (5-fold)')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.legend(loc='lower right')
plt.grid(True, linestyle='--', alpha=0.7)
plt.tight_layout()
#plt.savefig('ROC_Curves_with_CV.png', dpi=300)
plt.show()

In [None]:
best_model = stacked_model
best_model.fit(X_train, y_class_train)

explainer = shap.Explainer(best_model.named_estimators_['xgb'], X_train)
shap_values = explainer(X_val)

plt.figure(figsize=(10, 8))
shap.summary_plot(shap_values.values, X_val, feature_names=data.columns.drop(
    ["是否输血", "浓缩红细胞", "新鲜血浆", "单采血小板", "冷沉淀"]), show=False)
plt.title('SHAP Feature Importance Summary')
plt.tight_layout()
#plt.savefig('SHAP_Summary.png', dpi=300)
plt.show()

In [None]:
feature_names = data.columns.drop(["是否输血", "浓缩红细胞", "新鲜血浆", "单采血小板", "冷沉淀"])
most_important_feature = feature_names[np.argmax(np.abs(shap_values.values).mean(0))]
most_important_idx = np.where(feature_names == most_important_feature)[0][0]

plt.figure(figsize=(10, 6))
shap.dependence_plot(
    most_important_idx, 
    shap_values.values, 
    X_val, 
    feature_names=feature_names,
    interaction_index='auto',
    show=False
)
plt.title(f'SHAP Dependence Plot for {most_important_feature}')
plt.tight_layout()
#plt.savefig(f'SHAP_Dependence_{most_important_feature}.png', dpi=300)
plt.show()

In [None]:
# 个体预测解释
sample_idx = 10  # 选择一个样本
plt.figure(figsize=(10, 6))
shap.plots.waterfall(shap_values[sample_idx], max_display=15, show=False)
plt.title(f'Individual Prediction Explanation (Sample {sample_idx})')
plt.tight_layout()
#plt.savefig('SHAP_Individual_Explanation.png', dpi=300)
plt.show()

In [None]:
plt.figure(figsize=(10, 8))
plt.plot([0, 1], [0, 1], 'k:', label='Perfectly calibrated')

for name, model in models.items():
    y_prob = model.predict_proba(X_val)[:, 1]
    fraction_of_positives, mean_predicted_value = calibration_curve(y_class_val, y_prob, n_bins=10)
    
    plt.plot(mean_predicted_value, fraction_of_positives, 's-', label=f'{name}')

plt.title('Calibration Curves (Reliability Curves)')
plt.xlabel('Mean Predicted Value')
plt.ylabel('Fraction of Positives')
plt.legend(loc='upper left')
plt.grid(True, linestyle='--', alpha=0.7)
plt.tight_layout()
#plt.savefig('Calibration_Curves.png', dpi=300)
plt.show()

In [None]:
# ======================== DCA分析 ========================
def decision_curve_analysis(model, X, y, model_name):
    """决策曲线分析"""
    y_prob = model.predict_proba(X)[:, 1]
    thresholds = np.linspace(0.01, 0.99, 100)
    
    # 计算净收益
    net_benefit = []
    for threshold in thresholds:
        y_pred = (y_prob >= threshold).astype(int)
        tn, fp, fn, tp = confusion_matrix(y, y_pred).ravel()
        n = len(y)
        net_benefit.append(tp/n - (fp/n) * (threshold/(1-threshold)))
    
    # 计算"全部治疗"和"全不治疗"的净收益
    treat_all = np.array([y.mean() - (1-y.mean()) * (t/(1-t)) for t in thresholds])
    treat_none = np.zeros_like(thresholds)
    
    return thresholds, net_benefit, treat_all, treat_none

for name, model in models.items():
    model.fit(X_train, y_class_train)

# 绘制DCA曲线
plt.figure(figsize=(10, 8))

# 添加参考线
thresholds = np.linspace(0.01, 0.99, 100)
plt.plot(thresholds, np.zeros_like(thresholds), 'k-', label='Treat None')
plt.plot(thresholds, [y_class_val.mean()]*len(thresholds), 'r-', label='Treat All')

# 添加模型曲线
for name, model in models.items():
    thresholds, net_benefit, _, _ = decision_curve_analysis(model, X_val, y_class_val, name)
    plt.plot(thresholds, net_benefit, label=f'{name}')

plt.title('Decision Curve Analysis')
plt.xlabel('Threshold Probability')
plt.ylabel('Net Benefit')
plt.ylim([-0.05, 0.6])
plt.legend(loc='upper right')
plt.grid(True, linestyle='--', alpha=0.7)
plt.tight_layout()
#plt.savefig('Decision_Curve_Analysis.png', dpi=300)
plt.show()

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

def feature_ablation_study(model,
                           X_val: np.ndarray,
                           y_val: np.ndarray,
                           important_features: list,
                           n_top: int = 3):

    feature_names = data.drop(columns=["是否输血", "浓缩红细胞", "新鲜血浆", "单采血小板", "冷沉淀"]).columns
    X_df = pd.DataFrame(X_val, columns=feature_names)
    base_performance = evaluate_model(model, X_df.values, y_val)

    ablation_results = []
    for feature in important_features[:n_top]:
        X_ablated = X_df.copy()
        X_ablated[feature] = X_ablated[feature].mean()
        ablated_perf = evaluate_model(model, X_ablated.values, y_val)

        perf_drop = (base_performance['roc_auc'] - ablated_perf['roc_auc']) / base_performance['roc_auc'] * 100
        ablation_results.append((feature, perf_drop))
        
    plt.figure(figsize=(10, 6))
    features, drops = zip(*ablation_results)
    colors = plt.cm.viridis(np.linspace(0.2, 0.8, len(features)))
    ax = sns.barplot(x=list(drops), y=list(features), palette=colors, orient='h')
    plt.title('Feature Ablation Study: Impact on Model Performance',
              fontsize=16, fontweight='bold', pad=20)
    plt.xlabel('AUC Reduction (%)', fontsize=12, labelpad=10)
    plt.ylabel('')
    for i, v in enumerate(drops):
        ax.text(v + 0.5, i, f"{v:.2f}%", color='dimgrey', fontsize=10, va='center')
    plt.grid(axis='x', linestyle='--', alpha=0.3, color='lightgray')
    sns.despine(left=False, bottom=True)
    ax.spines['bottom'].set_visible(False)
    plt.tight_layout()
    #plt.savefig('Feature_Ablation_Study.png', dpi=300, bbox_inches='tight')
    plt.show()

    return ablation_results

important_features = ['Hb', 'HCT', 'TP','ALB','INR']
feature_ablation_study(classification_model, X_val, y_class_val, important_features)