### 1. 导入依赖

In [None]:
# 忽略警告
import warnings
warnings.filterwarnings("ignore")

import copy

# 数据处理
import numpy as np
import pandas as pd

pd.set_option('display.float_format', '{:.2f}'.format)
from sklearn.preprocessing import StandardScaler

# 采样
from imblearn.over_sampling import SMOTE

# 绘图
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib as mpl

# 设置中文字体
mpl.rcParams['font.sans-serif'] = ['Heiti TC']

# 单模型
from sklearn.linear_model import LogisticRegression  # 逻辑回归 & 弹性网络
from sklearn.neural_network import MLPClassifier  # ANN
from sklearn.svm import SVC  # SVM
from sklearn.ensemble import GradientBoostingClassifier  # 梯度提升决策树
from sklearn.ensemble import RandomForestClassifier
# 集成模型
from sklearn.ensemble import VotingClassifier  # Voting 方法
from sklearn.ensemble import StackingClassifier  # Stacking 方法

# 数据集划分 & 网格搜索 & 交叉验证
from sklearn.model_selection import GridSearchCV

# 评估指标
from sklearn.metrics import confusion_matrix, accuracy_score, f1_score, roc_curve, auc
from sklearn.metrics import make_scorer
from sklearn.inspection import permutation_importance

### 2. 设置可调节超参数

In [None]:
# 定义不同的模型和超参数
RANDOM_NUMBER = 62964
model_params = {
    'LogisticRegression': {
        'model': LogisticRegression(random_state=RANDOM_NUMBER, C=0.1),
        'params': {},
    },
    'RandomForest': {
        'model': RandomForestClassifier(random_state=RANDOM_NUMBER),
        'params': {'n_estimators': [15, 20, 25], 'max_depth': [1, 3, 5], 'min_samples_split': [1, 3, 5]},
    },
    'SVM': {
        'model': SVC(kernel='linear', probability=True, random_state=RANDOM_NUMBER),
        'params': {'C': [0.1, 1, 5]},
    },
    'GradientBoosting': {
        'model': GradientBoostingClassifier(random_state=RANDOM_NUMBER),
        'params': {'n_estimators': [7, 11, 17], 'max_depth': [1, 3, 5], 'min_samples_split': [1, 3, 5]},
    },
    'ElasticNet': {
        'model': LogisticRegression(penalty='elasticnet', solver='saga', random_state=RANDOM_NUMBER),
        'params': {'C': [0.1, 1, 5], 'l1_ratio': [0.3, 0.5, 0.7]},
    },
    'ANN': {
        'model': MLPClassifier(random_state=RANDOM_NUMBER),
        'params': {'hidden_layer_sizes': [(20,), (20, 20)], 'activation': ['tanh', 'relu'],
                   'max_iter': [200]},
    }
}

### 3. 数据预处理

In [None]:
# 加载数据
def get_data(path_inner, path_outer, test=False, balance_outer=False, sample=False):
    df_inner = pd.read_excel(path_inner)
    df_outer = pd.read_excel(path_outer)
    df_inner['Results'] = df_inner['Results'] - 1
    df_outer['Results'] = df_outer['Results'] - 1
    # 对连续变量进行标准化处理
    scaler = StandardScaler()
    continuous_variable = ['Age', 'Lymphocyte']
    df_inner[continuous_variable] = scaler.fit_transform(df_inner[continuous_variable])
    df_outer[continuous_variable] = scaler.fit_transform(df_outer[continuous_variable])

    if balance_outer:
        # 将外部验证集数据配平
        df_outer_positive = df_outer[df_outer['Results'] == 1].sample(30)
        df_outer_negative = df_outer[df_outer['Results'] == 2].sample(30)
        df_outer = pd.concat([df_outer_positive, df_outer_negative], axis=0)
        print(
            f"随机抽取后外部验证集数据比例: {len(df_outer_sample[df_outer_sample['Results'] == 1])}(1, 阴性) : {len(df_outer_sample[df_outer_sample['Results'] == 0])}(0, 阳性)")
    else:
        print(
            f"外部验证集数据比例: {len(df_outer[df_outer['Results'] == 1])}(1, 阴性) : {len(df_outer[df_outer['Results'] == 0])}(0, 阳性)")

    if test:
        # 划分训练集和测试集 8:2
        df_test, df_train = train_test_split(df_inner, test_size=0.8, random_state=RANDOM_NUMBER)
        print(
            f"训练集数据比例: {len(df_train[df_train['Results'] == 1])}(1, 阴性) : {len(df_train[df_train['Results'] == 0])}(0, 阳性)")
        print(
            f"测试集数据比例: {len(df_test[df_test['Results'] == 1])}(1, 阴性) : {len(df_test[df_test['Results'] == 0])}(0, 阳性)")
        if sample:
            # SMOTE过采样: 1. 随机选择一个少数类样本 2. 从最近邻中找到样本 3. 生成合成样本
            smote = SMOTE(sampling_strategy='auto', random_state=RANDOM_NUMBER)
            df_train_resampling = pd.concat(smote.fit_resample(df_train.drop('Results', axis=1), df_train['Results']),
                                            axis=1)
            print(
                f"增强后训练集数据比例: {len(df_train_resampling[df_train_resampling['Results'] == 1])}(1, 阴性) : {len(df_train_resampling[df_train_resampling['Results'] == 0])}(0, 阳性)")
            return df_test, df_train, df_train_resampling, df_outer
        else:
            return df_test, df_train, df_outer
    else:
        # 不划分训练集和测试集
        print(
            f"训练集数据比例: {len(df_inner[df_inner['Results'] == 1])}(1, 阴性) : {len(df_inner[df_inner['Results'] == 0])}(0, 阳性)")
        if sample:
            smote = SMOTE(sampling_strategy='auto', random_state=RANDOM_NUMBER)
            df_inner_resampling = pd.concat(smote.fit_resample(df_inner.drop('Results', axis=1), df_inner['Results']),
                                            axis=1)
            print(
                f"增强后训练集数据比例: {len(df_inner_resampling[df_inner_resampling['Results'] == 1])}(1, 阴性) : {len(df_inner_resampling[df_inner_resampling['Results'] == 0])}(0, 阳性)")
            return df_inner, df_inner_resampling, df_outer
        else:
            return df_inner, df_outer


file_path_inner = "Dataset/1-训练集+内部验证集.xls"
file_path_outer = "Dataset/1-外部测试集-1.xls"
df_train, df_train_resampling, df_outer = get_data(file_path_inner, file_path_outer, test=False, balance_outer=False, sample=True)

In [None]:
# 划分自变量和因变量
X_train, y_train = df_train.drop('Results', axis=1), df_train['Results']
X_train_resampling, y_train_resampling = df_train_resampling.drop('Results', axis=1), df_train_resampling['Results']
X_valid, y_valid = df_outer.drop('Results', axis=1), df_outer['Results']

### 4. 模型训练

In [None]:
def get_estimator(X, y):
    # 1. 使用GridSearchCV进行超参数调整和模型选择
    best_estimators = {}
    for model_name, mp in model_params.items():
        clf = GridSearchCV(mp['model'], mp['params'], scoring='roc_auc', cv=3)
        clf.fit(X, y)
        best_estimators[model_name] = clf.best_estimator_

    ensemble_models = copy.deepcopy(best_estimators)
    # 2. 采用 Stacking 方法进行集成学习
    # 2.1 定义第二层学习器为 Logistics 模型
    final_estimator = LogisticRegression(random_state=RANDOM_NUMBER)
    # 2.2 创建 StackingClassifier
    stacking_model = StackingClassifier(estimators=[(k, v) for k, v in ensemble_models.items()],
                                        final_estimator=final_estimator)

    # 2.3 训练模型
    stacking_model.fit(X, y)

    # 2.4 存储模型
    best_estimators["Stacking"] = stacking_model

    # 3. 采用 Voting 方法进行集成学习
    # 3.1 创建 VotingClassifier
    # rf, svm, gb, ann
    print(ensemble_models.keys())
    voting_clf = VotingClassifier(estimators=[(k, v) for k, v in ensemble_models.items()], voting='soft')#, weights=[3, 2, 1, 2])

    # 3.2 训练模型
    voting_clf.fit(X, y)

    # 3.3 存储模型
    best_estimators["Voting"] = voting_clf

    return best_estimators

In [None]:
best_estimators = get_estimator(X_train_resampling, y_train_resampling)

In [None]:
best_estimators

 ### 5. 模型评估

In [None]:
# 1. 定义计算指标的函数
# 1.1 定义计算特异度（Specificity）的函数
def get_specificity(y_true, y_pred):
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    return tn / (tn + fp + 0.0001)


# 1.2 定义计算灵敏度（Sensitivity）的函数
def get_sensitivity(y_true, y_pred):
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    return tp / (tp + fn + 0.0001)


# 1.3 定义计算阳性预测值（PPV）的函数
def get_ppv(y_true, y_pred):
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    return tp / (tp + fp + 0.0001)


# 1.4 定义计算阴性预测值（NPV）的函数
def get_npv(y_true, y_pred):
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    return tn / (tn + fn + 0.0001)


# 1.5 定义计算 AUC 值的函数
def get_auc(y_true, y_pred_prob):
    # 计算 ROC 曲线
    fpr, tpr, thresholds = roc_curve(y_true, y_pred_prob)
    # 计算 AUC 值
    auc_value = auc(fpr, tpr)
    return auc_value

In [None]:
# 2. 定义计算95%置信区间的方法（使用Bootstrap法）
def bootstrap_metric(y_true, y_pred, metric_func, n_bootstrap=500, random_state=RANDOM_NUMBER):
    np.random.seed(random_state)
    bootstrapped_scores = []
    indices = np.arange(len(y_pred))
    y_true = y_true.to_numpy()

    for _ in range(n_bootstrap):
        # 随机抽样，允许重复
        bootstrap_indices = np.random.choice(indices, size=len(indices), replace=True)
        bootstrap_true = y_true[bootstrap_indices]
        bootstrap_pred = y_pred[bootstrap_indices]
        score = metric_func(bootstrap_true, bootstrap_pred)
        bootstrapped_scores.append(score)

    sorted_scores = np.sort(bootstrapped_scores)
    lower = sorted_scores[int(0.025 * len(sorted_scores))]
    upper = sorted_scores[int(0.975 * len(sorted_scores))]

    return lower, upper


# 3. 定义一个函数，用于将结果转换为字符串
def get_str_res(score, ci):
    # 保留两位小数
    score = np.round(score, 2)
    lower = np.round(ci[0], 2)
    upper = np.round(ci[1], 2)
    # 转换为字符串并拼接
    return f"{score:.2f} ({lower:.2f} - {upper:.2f})"

In [None]:
# 4. 评估模型性能
def evaluate_model_withCI(model, X, y, threshold):
    # y_pred = model.predict(X_test)
    y_pred_prob = model.predict_proba(X)[:, 1]  # 获取分类概率值
    y_pred = (y_pred_prob >= threshold).astype(int)

    # 计算混淆矩阵
    # tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()

    # 计算准确率
    accuracy = accuracy_score(y, y_pred)
    accuracy_ci = bootstrap_metric(y, y_pred, accuracy_score)
    accuracy_res = get_str_res(accuracy, accuracy_ci)

    # 计算灵敏度（Sensitivity）
    sensitivity = get_sensitivity(y, y_pred)
    sensitivity_ci = bootstrap_metric(y, y_pred, get_sensitivity)
    sensitivity_res = get_str_res(sensitivity, sensitivity_ci)

    # 计算特异度（Specificity）
    specificity = get_specificity(y, y_pred)
    specificity_ci = bootstrap_metric(y, y_pred, get_specificity)
    specificity_res = get_str_res(specificity, specificity_ci)

    # 计算阳性预测值（PPV）
    ppv = get_ppv(y, y_pred)
    ppv_ci = bootstrap_metric(y, y_pred, get_ppv)
    ppv_res = get_str_res(ppv, ppv_ci)

    # 计算阴性预测值（NPV）
    npv = get_npv(y, y_pred)
    npv_ci = bootstrap_metric(y, y_pred, get_npv)
    npv_res = get_str_res(npv, npv_ci)

    # 计算F1值
    f1 = f1_score(y, y_pred)
    f1_ci = bootstrap_metric(y, y_pred, f1_score)
    f1_res = get_str_res(f1, f1_ci)

    # 计算ROC曲线及AUC值
    auc_value = get_auc(y, y_pred_prob)
    auc_ci = bootstrap_metric(y, y_pred_prob, get_auc)
    auc_res = get_str_res(auc_value, auc_ci)

    return auc_res, accuracy_res, sensitivity_res, specificity_res, ppv_res, npv_res, f1_res


def evaluate_model(model, X, y, threshold):
    # y_pred = model.predict(X_test)
    y_pred_prob = model.predict_proba(X)[:, 1]  # 获取分类概率值
    y_pred = (y_pred_prob >= threshold).astype(int)

    # 计算混淆矩阵
    tn, fp, fn, tp = confusion_matrix(y, y_pred).ravel()

    # 计算准确率
    accuracy = accuracy_score(y, y_pred)

    # 计算灵敏度（Sensitivity）
    sensitivity = tp / (tp + fn + 0.0001)

    # 计算特异度（Specificity）
    specificity = tn / (tn + fp + 0.0001)

    # 计算阳性预测值（PPV）
    ppv = tp / (tp + fp + 0.0001)

    # 计算阴性预测值（NPV）
    npv = tn / (tn + fn + 0.0001)

    # 计算F1值
    f1 = f1_score(y, y_pred)

    # 计算ROC曲线及AUC值
    # 计算 ROC 曲线
    fpr, tpr, thresholds = roc_curve(y, y_pred_prob)
    # 计算 AUC 值
    auc_value = auc(fpr, tpr)

    return auc_value, accuracy, sensitivity, specificity, ppv, npv, f1
    # return auc_res, accuracy_res, sensitivity_res, specificity_res, ppv_res, npv_res, f1_res

In [None]:
# 5. 评估模型性能
# 5.1 创建一个DataFrame用于存储结果｜
results_train = pd.DataFrame(columns=['AUC', 'Accuracy', 'Sensitivity', 'Specificity', 'PPV', 'NPV', 'F1'], index=best_estimators.keys())
results_valid = pd.DataFrame(columns=['AUC', 'Accuracy', 'Sensitivity', 'Specificity', 'PPV', 'NPV', 'F1'], index=best_estimators.keys())
# 5.2 遍历每个模型，评估其性能并保存结果
for model_name, estimator in best_estimators.items():
    results_train.loc[model_name] = evaluate_model_withCI(estimator, X_train, y_train, threshold=0.5)
    results_valid.loc[model_name] = evaluate_model_withCI(estimator, X_valid, y_valid, threshold=0.5)

In [None]:
# 6. 查看评估结果
results_train

In [None]:
results_valid

### 6. 变量重要性

In [None]:
lr_coef = pd.DataFrame(abs(best_estimators['LogisticRegression'].coef_.flatten()), index=X_train.columns, columns=['LogisticRegression'])
rf_coef = pd.DataFrame(abs(best_estimators['RandomForest'].feature_importances_.flatten()), index=X_train.columns, columns=['RandomForest'])
svm_coef = pd.DataFrame(abs(best_estimators['SVM'].coef_.flatten()), index=X_train.columns, columns=['SVM'])
gb_coef = pd.DataFrame(abs(best_estimators['GradientBoosting'].feature_importances_.flatten()), index=X_train.columns, columns=['GradientBoosting'])
en_coef = pd.DataFrame(abs(best_estimators['ElasticNet'].coef_.flatten()), index=X_train.columns, columns=['ElasticNet'])
ann_coef = abs(pd.DataFrame(permutation_importance(best_estimators['ANN'], X_valid, y_valid, n_repeats=10, random_state=RANDOM_NUMBER)['importances_mean'], index=X_train.columns, columns=['ANN']))
reverent_important = pd.concat([lr_coef, rf_coef, svm_coef, gb_coef, en_coef, ann_coef], axis=1)
reverent_important = reverent_important.div(reverent_important.sum(axis=0)) * 100
reverent_important

### 7. 绘图

In [None]:
plt.figure(figsize=(10, 8))

for name, model in best_estimators.items():
    y_pred_prob = model.predict_proba(X_valid)[:, 1]  # 获取分类概率值
    y_pred = (y_pred_prob >= 0.5).astype(int)
    
    # 计算ROC曲线的特异度（FPR）和灵敏度（TPR）
    fpr, tpr, thresholds = roc_curve(y_valid, y_pred_prob)
    # 计算AUC值
    roc_auc = auc(fpr, tpr)
    auc_ci = bootstrap_metric(y_valid, y_pred_prob, get_auc)
    auc_res = get_str_res(roc_auc, auc_ci)
    plt.plot(fpr, tpr, label=f'{name} (AUC = {auc_res})')

plt.plot([0, 1], [0, 1], 'k--')  # 添加对角虚线
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('1-特异度 (Specificity)')
plt.ylabel('灵敏度 (Sensitivity)')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")

In [None]:
plt.figure(figsize=(10, 8))

for name, model in best_estimators.items():
    y_pred_prob = model.predict_proba(X_train)[:, 1]  # 获取分类概率值
    y_pred = (y_pred_prob >= 0.5).astype(int)
    
    # 计算ROC曲线的特异度（FPR）和灵敏度（TPR）
    fpr, tpr, thresholds = roc_curve(y_train, y_pred_prob)
    # 计算AUC值
    roc_auc = auc(fpr, tpr)
    auc_ci = bootstrap_metric(y_train, y_pred_prob, get_auc)
    auc_res = get_str_res(roc_auc, auc_ci)
    plt.plot(fpr, tpr, label=f'{name} (AUC = {auc_res})')

plt.plot([0, 1], [0, 1], 'k--')  # 添加对角虚线
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('1-特异度 (Specificity)')
plt.ylabel('灵敏度 (Sensitivity)')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")