In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, roc_curve, auc, roc_auc_score
from sklearn.preprocessing import label_binarize
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from imblearn.pipeline import Pipeline
import matplotlib.pyplot as plt
from itertools import cycle
import xgboost as xgb

# 读取数据
data_features = pd.read_csv(r'')
data_labels = pd.read_csv(r'')

# 检查两个表格的ID名称是否一致
if not all(data_features['ID'] == data_labels['ID']):
    mismatched_ids = data_features['ID'][data_features['ID'] != data_labels['ID']]
    print("IDs in the two dataframes do not match. Mismatched IDs:")
    print(mismatched_ids)
    raise ValueError("IDs do not match.")
else:
    print("ID配对一致")

# 检查标签数据分布
print("Original label data distribution:")
print(data_labels['label'].value_counts())

In [None]:
# 合并数据
merged_data = pd.merge(data_features, data_labels, on='ID')

# 提取特征和标签
X = merged_data.drop(columns=['ID', 'label', 'group'])
y = merged_data['label']

# 确保标签的编码与原始标签一致
label_mapping = {0: 0, 1: 1, 2: 2}
y = y.map(label_mapping)

# 根据group列划分训练集、验证集和测试集
train_indices = merged_data[merged_data['group'] == 'train'].index
val_indices = merged_data[merged_data['group'] == 'val'].index
test_indices = merged_data[merged_data['group'] == 'test'].index

X_train, X_val, X_test = X.loc[train_indices], X.loc[val_indices], X.loc[test_indices]
y_train, y_val, y_test = y.loc[train_indices], y.loc[val_indices], y.loc[test_indices]

# 打印训练集、验证集和测试集的标签分布
print("Training data distribution:")
print(y_train.value_counts())
print("Validation data distribution:")
print(y_val.value_counts())
print("Test data distribution:")
print(y_test.value_counts())

In [None]:
# 使用SMOTE和RandomUnderSampler来处理类别不平衡
smote = SMOTE(random_state=42)
under_sampler = RandomUnderSampler(random_state=42)

# 重新采样
X_resampled, y_resampled = smote.fit_resample(X_train, y_train)
X_resampled, y_resampled = under_sampler.fit_resample(X_resampled, y_resampled)

# 打印重新采样后的数据分布
print("\nResampled training data:")
print(pd.Series(y_resampled).value_counts())
print("Number of samples:", len(y_resampled))


In [None]:
# 定义XGBoost模型和参数网格
xgb_params = {
    'xgb__n_estimators': [100, 200, 300],
    'xgb__max_depth': [3, 4, 5, 6],
    'xgb__learning_rate': [0.01, 0.05, 0.1],
    'xgb__subsample': [0.6, 0.8, 1.0],
    'xgb__colsample_bytree': [0.6, 0.8, 1.0]
}



# 创建XGBoost模型实例
xgb_clf = xgb.XGBClassifier(use_label_encoder=False, eval_metric='mlogloss')

# 使用SMOTE和RandomUnderSampler来处理类别不平衡
resampling_pipeline = Pipeline(steps=[('over', smote), ('under', under_sampler), ('xgb', xgb_clf)])

# 创建超参数搜索实例
xgb_grid = GridSearchCV(resampling_pipeline, xgb_params, cv=2, scoring='roc_auc', verbose=1, n_jobs=-1)

# 训练模型
xgb_grid.fit(X_train, y_train)

# 输出最佳参数
print("Best parameters for XGBoost with SMOTE and RandomUnderSampler:", xgb_grid.best_params_)

# 使用最佳参数重新训练模型
xgb_best = xgb_grid.best_estimator_

# 预测训练集、验证集和测试集
y_train_pred_xgb = xgb_best.predict(X_train)
y_train_score_xgb = xgb_best.predict_proba(X_train)
y_val_pred_xgb = xgb_best.predict(X_val)
y_val_score_xgb = xgb_best.predict_proba(X_val)
y_test_pred_xgb = xgb_best.predict(X_test)
y_test_score_xgb = xgb_best.predict_proba(X_test)

#Best parameters for XGBoost with SMOTE and RandomUnderSampler: 
#{'xgb__colsample_bytree': 0.6, 'xgb__learning_rate': 0.01, 'xgb__max_depth': 3, 'xgb__n_estimators': 100, 'xgb__subsample': 0.6}

In [None]:
# 保存每个样本的预测概率
train_ids = data_features.loc[train_indices, 'ID'].reset_index(drop=True)
val_ids = data_features.loc[val_indices, 'ID'].reset_index(drop=True)
test_ids = data_features.loc[test_indices, 'ID'].reset_index(drop=True)

pd.concat([train_ids, pd.DataFrame(y_train_score_xgb, columns=['Class_0_Prob', 'Class_1_Prob', 'Class_2_Prob'])], axis=1).to_csv('table\XGBoost_Training_Predicted_Probabilities.csv', index=False)
pd.concat([val_ids, pd.DataFrame(y_val_score_xgb, columns=['Class_0_Prob', 'Class_1_Prob', 'Class_2_Prob'])], axis=1).to_csv('table\XGBoost_Validation_Predicted_Probabilities.csv', index=False)
pd.concat([test_ids, pd.DataFrame(y_test_score_xgb, columns=['Class_0_Prob', 'Class_1_Prob', 'Class_2_Prob'])], axis=1).to_csv('table\XGBoost_Test_Predicted_Probabilities.csv', index=False)

# SHAP图

全局SHAP图

In [None]:
import shap
import matplotlib.pyplot as plt
import numpy as np

# 假设 SHAP 值计算如下
explainer = shap.Explainer(xgb_best.named_steps['xgb'])
shap_values = explainer(X_train)

# 选择每个类别的 SHAP 值
shap_values_class_0 = shap_values[:, :, 0]
shap_values_class_1 = shap_values[:, :, 1]
shap_values_class_2 = shap_values[:, :, 2]

def save_shap_summary_plot(shap_values, class_name, file_path):
    # 生成一个新的图形
    plt.figure(figsize=(14, 10))
    
    # 绘制SHAP汇总图
    shap.summary_plot(shap_values, X_train, show=False, plot_type='dot', cmap='viridis')
    #cmap="?"配色viridis Spectral coolwar mRdYlGn RdYlBu RdBu RdGy PuOr BrBG PRGn PiYG
#     "viridis"：从黄色到蓝绿色。"Spectral"：从红色到蓝色，适用于有正负影响的特征。"coolwarm"：从冷到暖的颜色图。
#     "RdYlGn"：从红到绿的颜色图。"RdYlBu"：从红到蓝的颜色图。"RdBu"：红蓝双色图。"RdGy"：红灰双色图。
#     "PuOr"：从紫色到橙色的颜色图。"BrBG"：从棕色到蓝绿色的颜色图。"PRGn"：从紫色到绿色的颜色图。
#     "PiYG"：从粉红色到绿色的颜色图
    ax = plt.gca()
    
    # 获取SHAP值的范围
    shap_values_np = shap_values.values  # 将SHAP值转为numpy数组
    min_shap, max_shap = np.min(shap_values_np), np.max(shap_values_np)
    
    # 扩展SHAP值的范围以减少点的密集
    extended_min_shap = min_shap - (max_shap - min_shap) * 0.01  # 扩展范围的10%
    extended_max_shap = max_shap + (max_shap - min_shap) * 0.01
    
    # 设置SHAP值的范围
    ax.set_xlim(extended_min_shap, extended_max_shap)
    
    # 调整特征名称和SHAP值的字体大小
    for label in (ax.get_xticklabels() + ax.get_yticklabels()):
        label.set_fontsize(8)  # 调整特征名称的字体大小
    
    # 设置标题的字体大小
    plt.title(class_name, fontsize=12)
    
    # 获取当前图例
    legend = plt.gca().get_legend()
    
    if legend:
        # 设置图例字体大小
        for text in legend.get_texts():
            text.set_fontsize(10)  # 调整图例字体大小
    
    # 自动调整子图参数
    plt.tight_layout()

    # 保存SHAP图
    plt.savefig(file_path, format='png', dpi=300, bbox_inches='tight')  # 自动调整边界
    

# 保存SHAP图
save_shap_summary_plot(shap_values_class_0, 'HER2-zero', r'')
save_shap_summary_plot(shap_values_class_1, 'HER2-low', r'')
save_shap_summary_plot(shap_values_class_2, 'HER2-positive', r'')


分habitat的SHAP图

In [None]:
import shap
import matplotlib.pyplot as plt
import numpy as np

# 假设 SHAP 值计算如下
explainer = shap.Explainer(xgb_best.named_steps['xgb'])
shap_values = explainer(X_train)

# 选择每个类别的 SHAP 值
shap_values_class_0 = shap_values[:, :, 0]
shap_values_class_1 = shap_values[:, :, 1]
shap_values_class_2 = shap_values[:, :, 2]

# 获取特征列名
feature_names = X_train.columns

# 定义保存 SHAP summary 图的函数
def save_shap_summary_plot(features, shap_values, X_train, class_name, suffix):
    # 筛选出特定后缀的特征及其对应的SHAP值
    feature_indices = [i for i, feat in enumerate(features) if feat.endswith(suffix)]
    if not feature_indices:
        print(f"No features with suffix {suffix} for class {class_name}")
        return
    
    X_train_selected = X_train.iloc[:, feature_indices]
    shap_values_selected = shap_values.values[:, feature_indices]
    
    # 生成一个新的图形
    plt.figure(figsize=(14, 10))
    
    # 绘制SHAP汇总图
    shap.summary_plot(shap_values_selected, X_train_selected, show=False, plot_type='dot', cmap='viridis')
    ax = plt.gca()
    #     #cmap="?"配色viridis Spectral coolwar mRdYlGn RdYlBu RdBu RdGy PuOr BrBG PRGn PiYG
# #     "viridis"：从黄色到蓝绿色。"Spectral"：从红色到蓝色，适用于有正负影响的特征。"coolwarm"：从冷到暖的颜色图。
# #     "RdYlGn"：从红到绿的颜色图。"RdYlBu"：从红到蓝的颜色图。"RdBu"：红蓝双色图。"RdGy"：红灰双色图。
# #     "PuOr"：从紫色到橙色的颜色图。"BrBG"：从棕色到蓝绿色的颜色图。"PRGn"：从紫色到绿色的颜色图。
# #     "PiYG"：从粉红色到绿色的颜色图
    # 获取SHAP值的范围
    min_shap, max_shap = np.min(shap_values_selected), np.max(shap_values_selected)
    
    # 扩展SHAP值的范围以减少点的密集
    extended_min_shap = min_shap - (max_shap - min_shap) * 0.01  # 扩展范围的1%
    extended_max_shap = max_shap + (max_shap - min_shap) * 0.01
    
    # 设置SHAP值的范围
    ax.set_xlim(extended_min_shap, extended_max_shap)
    
    # 调整特征名称和SHAP值的字体大小
    for label in (ax.get_xticklabels() + ax.get_yticklabels()):
        label.set_fontsize(8)  # 调整特征名称的字体大小
    
    # 设置标题的字体大小
    plt.title(f'{class_name} - {suffix}', fontsize=12)
    
    # 获取当前图例
    legend = plt.gca().get_legend()
    
    if legend:
        # 设置图例字体大小
        for text in legend.get_texts():
            text.set_fontsize(10)  # 调整图例字体大小
    
    # 自动调整子图参数
    plt.tight_layout()

    # 保存SHAP图
    file_path = rf'shap_summary_plot_{class_name}_{suffix}.png'
    plt.savefig(file_path, format='png', dpi=300, bbox_inches='tight')  # 自动调整边界
    

# 保存SHAP图
suffixes = ['_h1', '_h2', '_h3', '_h4']
for class_name, shap_values_class in zip(['HER2-zero', 'HER2-low', 'HER2-positive'], [shap_values_class_0, shap_values_class_1, shap_values_class_2]):
    for suffix in suffixes:
        save_shap_summary_plot(feature_names, shap_values_class, X_train, class_name, suffix)


In [None]:
import shap
import matplotlib.pyplot as plt

# 选择一个观测值（这里选择训练集中的第一个实例）
single_observation = X_train.iloc[[312]] # 需要是DataFrame格式

# 创建SHAP解释器对象
explainer = shap.Explainer(xgb_best.named_steps['xgb'], X_train)

# 计算所选观测值的SHAP值
shap_values = explainer(single_observation)

# 选择每个类别的SHAP值
shap_values_class_0 = shap_values[:, :, 0]
shap_values_class_1 = shap_values[:, :, 1]
shap_values_class_2 = shap_values[:, :, 2]

# 绘制所选观测值的SHAP值瀑布图并保存为PNG格式
def save_and_show_shap_plot(shap_values, class_label, base_path):
    # 创建SHAP瀑布图，获取Axes对象
    shap_plot = shap.plots.waterfall(shap_values[0], show=False)
    
    # 获取Figure对象
    fig = shap_plot.figure
    
    # 保存图形为PNG格式
    fig.savefig(f'{base_path}/shap_waterfall_class_{class_label}.png', bbox_inches='tight', format='png', dpi=300)
    
    # 显示图形
    plt.show(fig)
   # 关闭图形以释放内存
    plt.close(fig)

# 设置保存路径
base_path = r''

# 保存并显示SHAP瀑布图
save_and_show_shap_plot(shap_values_class_0, 0, base_path)
save_and_show_shap_plot(shap_values_class_1, 1, base_path)
save_and_show_shap_plot(shap_values_class_2, 2, base_path)


In [None]:
# 评估模型
def evaluate_model(y_true, y_pred, y_score, label_mapping):
    accuracy = accuracy_score(y_true, y_pred)
    report = classification_report(y_true, y_pred, target_names=[str(i) for i in label_mapping.keys()], output_dict=True)
    conf_matrix = confusion_matrix(y_true, y_pred)
    
    def calculate_metrics(conf_matrix):
        sensitivity = np.diag(conf_matrix) / np.sum(conf_matrix, axis=1)
        precision = np.diag(conf_matrix) / np.sum(conf_matrix, axis=0)
        specificity = []
        for i in range(len(conf_matrix)):
            tn = np.sum(np.delete(np.delete(conf_matrix, i, axis=0), i, axis=1))
            fp = np.sum(np.delete(conf_matrix, i, axis=0)[:, i])
            specificity.append(tn / (tn + fp))
        f1_scores = 2 * (precision * sensitivity) / (precision + sensitivity)
        return sensitivity, specificity, precision, f1_scores

    sensitivity, specificity, precision, f1_scores = calculate_metrics(conf_matrix)

    y_true_binarized = label_binarize(y_true, classes=[0, 1, 2])
    n_classes = y_true_binarized.shape[1]

    fpr = dict()
    tpr = dict()
    roc_auc = dict()
    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve(y_true_binarized[:, i], y_score[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    return accuracy, report, conf_matrix, sensitivity, specificity, precision, f1_scores, roc_auc, fpr, tpr

# 评估XGBoost模型
train_results_xgb = evaluate_model(y_train, y_train_pred_xgb, y_train_score_xgb, label_mapping)
val_results_xgb = evaluate_model(y_val, y_val_pred_xgb, y_val_score_xgb, label_mapping)
test_results_xgb = evaluate_model(y_test, y_test_pred_xgb, y_test_score_xgb, label_mapping)


In [None]:
# 打印评估结果并生成表格
def print_and_save_results(results, title):
    accuracy, report, conf_matrix, sensitivity, specificity, precision, f1_scores, roc_auc, fpr, tpr = results
    num_classes = conf_matrix.shape[1]
    print(f"{title} Results")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Number of classes: {num_classes}")
    # 分类报告
    report_df = pd.DataFrame(report).transpose()
    print("Classification Report:")
    print(report_df)
    
    # 混淆矩阵
    conf_matrix_df = pd.DataFrame(conf_matrix, index=label_mapping.keys(), columns=label_mapping.keys())
    print("Confusion Matrix:")
    print(conf_matrix_df)
    
    # 其他评估指标
    metrics_df = pd.DataFrame({
        "Sensitivity": sensitivity,
        "Specificity": specificity,
        "Precision": precision,
        "F1 Score": f1_scores
    }, index=label_mapping.keys())
    print("Metrics:")
    print(metrics_df)
    
    # 保存表格
    report_df.to_csv(f"table\{title}_classification_report.csv")
    conf_matrix_df.to_csv(f"table\{title}_confusion_matrix.csv")
    metrics_df.to_csv(f"table\{title}_metrics.csv")

print_and_save_results(train_results_xgb, "XGBoost_Training")
print_and_save_results(val_results_xgb, "XGBoost_Validation")
print_and_save_results(test_results_xgb, "XGBoost_Test")

In [None]:
def calculate_roc_auc_with_ci(y_true, y_score, n_bootstraps=1000, random_state=42):
    fpr, tpr, roc_auc = {}, {}, {}
    bootstrapped_aucs = {i: [] for i in range(y_score.shape[1])}
    
    for i in range(y_score.shape[1]):
        # 计算 ROC 曲线和 AUC
        fpr[i], tpr[i], _ = roc_curve(y_true, y_score[:, i], pos_label=i)
        roc_auc[i] = auc(fpr[i], tpr[i])
        
        rng = np.random.RandomState(random_state)
        for _ in range(n_bootstraps):
            # 使用引导法计算 AUC
            indices = rng.randint(0, len(y_true), len(y_true))
            if len(np.unique(y_true[indices])) < 2:
                continue
            fpr_boot, tpr_boot, _ = roc_curve(y_true[indices], y_score[indices, i], pos_label=i)
            roc_auc_boot = auc(fpr_boot, tpr_boot)
            bootstrapped_aucs[i].append(roc_auc_boot)
    
    auc_ci = {}
    for i in range(y_score.shape[1]):
        sorted_aucs = np.array(bootstrapped_aucs[i])
        sorted_aucs.sort()
        confidence_lower = np.percentile(sorted_aucs, 2.5)
        confidence_upper = np.percentile(sorted_aucs, 97.5)
        auc_ci[i] = (confidence_lower, confidence_upper)
    
    return fpr, tpr, roc_auc, auc_ci

In [None]:
def plot_roc_curve(fpr, tpr, roc_auc, auc_ci, title, class_labels):
    plt.figure()
    colors = ['aqua', 'darkorange', 'cornflowerblue']
    for i, color in enumerate(colors):
        plt.plot(fpr[i], tpr[i], color=color, lw=2,
                 label=f'{class_labels[i]} (AUC = {roc_auc[i]:0.3f}[{auc_ci[i][0]:0.3f}, {auc_ci[i][1]:0.3f}])')

    plt.plot([0, 1], [0, 1], 'k--', lw=2)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(title)
    plt.legend(loc="lower right")
    plt.savefig(f'figures\{title.replace(" ", "_")}.tif', dpi=300)
    plt.show()

In [None]:
def compute_and_plot_roc_curve(y_true, y_score, n_classes, title):
    # 确保 y_true 是一维数组
    y_true = np.asarray(y_true).ravel()
    # 确保 y_score 是二维数组
    y_score = np.asarray(y_score)
    
    fpr, tpr, roc_auc, auc_ci = calculate_roc_auc_with_ci(y_true, y_score, n_bootstraps=1000)
    # 类别标签
    class_labels = ['Her2-zero', 'Her2-low', 'Her2-positive']
    plot_roc_curve(fpr, tpr, roc_auc, auc_ci, title, class_labels)

# 示例：使用训练集、验证集和测试集的预测结果绘制 ROC 曲线
compute_and_plot_roc_curve(y_train, y_train_score_xgb, n_classes=3, title='Training ROC Curve')
compute_and_plot_roc_curve(y_val, y_val_score_xgb, n_classes=3, title='Val ROC Curve')
compute_and_plot_roc_curve(y_test, y_test_score_xgb, n_classes=3, title='Test ROC Curve')