In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import FunctionTransformer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, log_loss, confusion_matrix
)
from sklearn.impute import SimpleImputer
from xgboost import XGBClassifier
import lightgbm as lgb

# ==========================================
# 1. 基础配置：定义列索引
# ==========================================
cat_indices = [1,3,4,5,6,7,8,9]        
cont_indices = [2]                
item_indices = list(range(10, 80))      

# 目标变量索引
target_index = 91 

# 常用集合
demographics = cat_indices + cont_indices
all_items = item_indices                 

# ==========================================
# 2. 模型特征配置
# ==========================================
model_feature_config = {
    'LASSO_LR': [2,71,80,72,73,76,78,79,24,29,30,35,37,
                12,43,14,16,18,62,47,65,67,68],           
    'SVM': [16,43,37,68,2,76,75,80,24,7,62,21,72,78,57,
                20,67,18],                                
    'XGBoost': [62,80,76,68,67,16,73,65,60,72,75,78,43,
               77,47,71,79,70,37,23,21,64,22,50,12,74,
               61,18,63,14,19,28],                        
    'LightGBM': [76,80,62,16,43,2,72,68],                 
    'Random Forest': [80,76,68,79,78,77,16,67,72,43,60,
                     62,75,70,71,73,50,47,64,12,2,65]
}

In [None]:
# ==========================================
# 3. 数据读取与智能清洗
# ==========================================
file_path = "患者筛选数据0110.xlsx"
df = pd.read_excel(file_path, header=2)

if target_index < len(df.columns):
    df = df.dropna(subset=[df.columns[target_index]])
else:
    raise ValueError(f"目标列索引 {target_index} 超出了数据列数范围 (总列数: {len(df.columns)})")

cat_names = [df.columns[i] for i in cat_indices if i < len(df.columns)]
cont_names = [df.columns[i] for i in cont_indices if i < len(df.columns)]
item_names = [df.columns[i] for i in item_indices if i < len(df.columns)]
all_cont_names = cont_names + item_names 


for col in all_cont_names:
    df[col] = pd.to_numeric(df[col], errors='coerce')

In [None]:
# ==========================================
# 4. 准备建模数据
# ==========================================
all_needed_indices = set()
for indices in model_feature_config.values():
    all_needed_indices.update(indices)
all_needed_indices.add(target_index)

# 将索引转为列名
all_needed_cols = [df.columns[i] for i in all_needed_indices if i < len(df.columns)]

# 剔除缺失值 
df_clean = df.dropna(subset=[df.columns[target_index]]).copy()
print(f"原始样本量: {len(df)}")
print(f"清洗后有效样本量 (仅剔除无标签样本): {len(df_clean)}")

# 4.1 如果样本量为0，打印错误信息
if len(df_clean) == 0:
    print("\n【严重错误】有效样本量为0！请检查以下缺失值情况：")
    print("(显示缺失值最多的前10个列)")
    na_counts = df[all_needed_cols].isna().sum().sort_values(ascending=False)
    print(na_counts.head(10))
    raise ValueError("数据清洗后没有剩余样本，请检查上述列的数据质量或索引配置。")

# 1. 划分索引 (Train/Test)
train_idx, test_idx = train_test_split(df_clean.index, test_size=0.2, random_state=42)

# 2. 计算分类阈值
y_score_train = df_clean.loc[train_idx, df.columns[target_index]].astype(float)
y_score_test = df_clean.loc[test_idx, df.columns[target_index]].astype(float)

# 3. 在训练集上计算阈值
q1 = y_score_train.quantile(1/3)
q2 = y_score_train.quantile(2/3)
bins = [-np.inf, q1, q2, np.inf]

print(f"Label cut thresholds (calculated on Train only): Q1={q1:.2f}, Q2={q2:.2f}")

# 4. 应用阈值生成标签 (0:Low, 1:Medium, 2:High)
y_train = pd.cut(y_score_train, bins=bins, labels=[0, 1, 2], include_lowest=True).astype(int)
y_test = pd.cut(y_score_test, bins=bins, labels=[0, 1, 2], include_lowest=True).astype(int)

print("Train class counts:", y_train.value_counts().sort_index().values)
print("Test class counts: ", y_test.value_counts().sort_index().values)

In [None]:
import warnings
warnings.filterwarnings("ignore", message="X does not have valid feature names")

In [None]:
# ==========================================
# 5. 网格搜索与模型训练
# ==========================================
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from sklearn.metrics import make_scorer, roc_auc_score, accuracy_score

models = {
    'LASSO_LR': LogisticRegression(solver='saga', penalty='elasticnet', max_iter=10000, random_state=42),
    
    'SVM': SVC(probability=True, random_state=42),
    
    'Random Forest': RandomForestClassifier(random_state=42),
    
    'XGBoost': XGBClassifier(eval_metric='logloss', random_state=42),
    
    'LightGBM': lgb.LGBMClassifier(random_state=42, verbose=-1)
}
# 5.1 定义超参数网格
param_grids = {
    'LASSO_LR': {
        'classifier__l1_ratio': [0.1, 0.5, 0.9], 
        'classifier__C': [0.01, 0.1, 1, 4, 10] 
    },
    
    'Random Forest': {
        'classifier__n_estimators': [100, 500, 1000], 
        'classifier__max_features': ['sqrt', None], 
        'classifier__max_depth': [None, 10, 20],   
        'classifier__min_samples_split': [2, 5]
    },
    
    'XGBoost': {
        'classifier__max_depth': [3, 6, 11], 
        'classifier__learning_rate': [0.01, 0.1, 0.3], 
        'classifier__n_estimators': [200,300,500, 1000],
        'classifier__reg_alpha': [0, 0.1, 1],     
        'classifier__reg_lambda': [1, 5]         
    },
    
    'SVM': {
        'classifier__C': [0.1, 1, 4, 10], 
        'classifier__gamma': ['scale', 0.135],    
        'classifier__kernel': ['rbf']             
    },
    
    'LightGBM': {
        'classifier__learning_rate': [0.01, 0.05],
        'classifier__n_estimators': [500, 1000],
        'classifier__num_leaves': [31, 50],
        'classifier__reg_alpha': [0.1, 1]
    }
}

# 5.2 定义优化目标：Macro-AUC (One-vs-Rest)
scoring_metric = 'roc_auc_ovr'

# 存储最佳模型
best_models = {} 

print(f"\n开始 5折交叉验证 Grid Search (优化目标: {scoring_metric})...")
print("-" * 80)

for name, model_base in models.items():
    print(f"正在优化模型: {name} ...")
    
    # --- A. 准备数据特征 ---
    feature_indices = model_feature_config[name]
    valid_indices = [i for i in feature_indices if i < len(df.columns)]
    feature_names = [df.columns[i] for i in valid_indices]
    
    current_cat = [f for f in feature_names if f in cat_names]
    current_cont = [f for f in feature_names if f not in cat_names]
    
    X_train_curr = df_clean.loc[train_idx, feature_names]
    
    # --- B. 构建 Pipeline ---
    transformers = []
    if current_cont:
        transformers.append(('num', Pipeline([
            ('imputer', SimpleImputer(strategy='mean')), 
            ('scaler', StandardScaler())
        ]), current_cont))
    if current_cat:
        transformers.append(('cat', Pipeline([
            ('imputer', SimpleImputer(strategy='most_frequent')), 
            ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
        ]), current_cat))
        
    pipeline = Pipeline([
        ('preprocessor', ColumnTransformer(transformers=transformers)),
        ('classifier', model_base)
    ])
    
    # --- C. 执行网格搜索 ---
    current_grid = param_grids.get(name, {})
    
    grid = GridSearchCV(
        estimator=pipeline,
        param_grid=current_grid,
        cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=42),
        scoring=scoring_metric,
        n_jobs=-1,
        verbose=0
    )
    
    # 训练
    grid.fit(X_train_curr, y_train)
    
    # --- D. 保存与记录 ---
    best_models[name] = grid.best_estimator_
    
    print(f"  > 最佳 AUC (CV平均): {grid.best_score_:.4f}")
    print(f"  > 最佳参数: {grid.best_params_}")
    
    # 简单测试
    X_test_curr = df_clean.loc[test_idx, feature_names]
    y_test_pred = grid.best_estimator_.predict(X_test_curr)
    test_acc = accuracy_score(y_test, y_test_pred)
    print(f"  > 独立测试集 Accuracy: {test_acc:.4f}")
    print("-" * 80)

print("\n网格搜索完成，最佳模型已保存至 best_models 字典中。")

In [None]:
# ==========================================
# SHAP 可解释性分析
# ==========================================
import shap
import matplotlib.pyplot as plt
import numpy as np
import warnings
import os

warnings.filterwarnings("ignore")

# --- A. 绘图风格设置 ---
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['Times New Roman'] + plt.rcParams['font.serif']
plt.rcParams['axes.titlesize'] = 14
plt.rcParams['axes.labelsize'] = 12

# 定义所有类别
CLASS_MAPPING = {
    0: "Low Trust", 
    1: "Medium Trust", 
    2: "High Trust"
}

print(f"开始执行全量 SHAP 分析...")
print(f"计划生成: {len(best_models)} 个模型 x 3 个类别 x 2 种图表 = {len(best_models)*6} 张图片")
print("-" * 60)

# --- B. 辅助函数 ---
def get_transformed_feature_names(preprocessor):
    output_features = []
    for name, pipe, features in preprocessor.transformers_:
        if name == 'remainder': continue
        if hasattr(pipe, 'get_feature_names_out'):
            output_features.extend(pipe.get_feature_names_out(features))
        else:
            output_features.extend(features)
    return output_features

# --- C. 主循环：遍历每一个模型 ---
for name, model_pipeline in best_models.items():
    print(f"\n>> 正在处理模型: {name} ...")
    
    try:
        # 1. 拆解 Pipeline & 准备数据
        preprocessor = model_pipeline.named_steps['preprocessor']
        classifier = model_pipeline.named_steps['classifier']
        feature_indices = model_feature_config[name]
        feature_names_raw = [df.columns[i] for i in feature_indices]
        
        # 提取数据
        X_test_raw = df_clean.loc[test_idx, feature_names_raw]
        X_train_raw = df_clean.loc[train_idx, feature_names_raw]
        
        # 转换数据
        X_test_trans = preprocessor.transform(X_test_raw)
        X_train_trans = preprocessor.transform(X_train_raw)
        feature_names_final = get_transformed_feature_names(preprocessor)
        
        # 2.SHAP Values
        explainer = None
        shap_values_all = None
        
        # (a) 树模型
        if "XGB" in name or "Random" in name or "LightGBM" in name or "Tree" in name:
            explainer = shap.TreeExplainer(classifier)
            shap_values_all = explainer.shap_values(X_test_trans) # 通常返回 list of arrays
            
        # (b) 线性模型
        elif "LR" in name or "Logistic" in name or "Lasso" in name:
            masker = shap.maskers.Independent(data=X_train_trans)
            explainer = shap.LinearExplainer(classifier, masker=masker)
            shap_values_all = explainer.shap_values(X_test_trans)
            
        # (c) 核模型 (SVM等)
        else:
            background_summary = shap.kmeans(X_train_trans, 30) 
            def predict_proba_wrapper(x):
                return classifier.predict_proba(x) 
            
            explainer = shap.KernelExplainer(predict_proba_wrapper, background_summary)
            X_test_sample = X_test_trans[:50] 
            shap_values_all = explainer.shap_values(X_test_sample)
            X_test_trans = X_test_sample 

        # 3. 内层循环：遍历所有类别 (Low/Medium/High)
        if not isinstance(shap_values_all, list):
            if len(shap_values_all.shape) == 3: 
                shap_values_all = [shap_values_all[:,:,i] for i in range(shap_values_all.shape[2])]
            else:
                shap_values_all = [1-shap_values_all, shap_values_all] 

        for class_idx, class_label in CLASS_MAPPING.items():
            if class_idx >= len(shap_values_all): continue
            
            shap_val_target = shap_values_all[class_idx]
            
            # --- Beeswarm ---
            plt.figure(figsize=(10, 8), dpi=300)
            shap.summary_plot(shap_val_target, X_test_trans, feature_names=feature_names_final, show=False)
            plt.title(f"SHAP Summary: {name} - {class_label}\n(Derivation Cohort)", fontsize=16, pad=20)
            plt.tight_layout()
            
            fname_bee = f"SHAP_Beeswarm_{name.replace(' ', '')}_{class_label.replace(' ', '')}"
            plt.savefig(f"{fname_bee}.png", dpi=300, bbox_inches='tight')
            plt.savefig(f"{fname_bee}.pdf", bbox_inches='tight')
            plt.close()
            
            # --- Importance---
            plt.figure(figsize=(10, 8), dpi=300)
            shap.summary_plot(shap_val_target, X_test_trans, feature_names=feature_names_final, plot_type="bar", show=False)
            plt.title(f"Feature Importance: {name} - {class_label}\n(Derivation Cohort)", fontsize=16, pad=20)
            plt.tight_layout()
            
            fname_bar = f"SHAP_Bar_{name.replace(' ', '')}_{class_label.replace(' ', '')}"
            plt.savefig(f"{fname_bar}.png", dpi=300, bbox_inches='tight')
            plt.savefig(f"{fname_bar}.pdf", bbox_inches='tight')
            plt.close()
            
            print(f"   -> 已生成: {class_label} 的图表")

    except Exception as e:
        print(f"!! 模型 {name} SHAP 生成出错: {e}")
        import traceback
        traceback.print_exc()

print("\n 所有 SHAP 图片生成完毕。")