In [None]:
#模块导入区域

import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer
from sklearn.model_selection import (
    train_test_split, RandomizedSearchCV, cross_val_score, StratifiedKFold, cross_val_predict
)
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import (
    accuracy_score, f1_score, classification_report, confusion_matrix,
    recall_score, roc_auc_score, average_precision_score, precision_score, roc_curve
)
from sklearn.svm import SVC
from sklearn.linear_model import (
    LogisticRegression, PassiveAggressiveClassifier, RidgeClassifier, SGDClassifier
)
from sklearn.ensemble import (
    RandomForestClassifier, AdaBoostClassifier, BaggingClassifier,
    ExtraTreesClassifier, GradientBoostingClassifier, VotingClassifier
)
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.tree import DecisionTreeClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis, QuadraticDiscriminantAnalysis
from sklearn.inspection import permutation_importance
from sklearn.decomposition import PCA
from matplotlib.colors import ListedColormap
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
from joblib import Parallel, delayed
from imblearn.over_sampling import SMOTE
import optuna
from collections import defaultdict
import shap
import xgboost as xgb
import lightgbm as lgb
import catboost as cb
from sklearn.neural_network import MLPClassifier

In [None]:
matplotlib.use('TkAgg')  # 更改后端为TkAgg

# 设置Seaborn风格
sns.set_style("white")
sns.set_context("paper")

#------------------------------------------------------------------------------------------------------------------------------
# 读取CSV文件 这个是数据集Y
file_path = r'AZ31_ML.csv'
data = pd.read_csv(file_path)

X = data.drop(columns=['Twinned'])
y = data['Twinned'].astype(int)  # 将标签转为整数类型
#--------------------------------------------------------------------------------------------------------------

# # 读取Excel文件 这个是数据集T
# data = pd.read_excel(r'grain.xlsx')
#
# # 提取特征数据和目标数据
# X = data.iloc[:, 1:19]
# y = data.iloc[:, 19]

# 使用SimpleImputer填补缺失值------------------------------------------------------------------------------------------------------------------
imputer = SimpleImputer(strategy='mean')
X_imputed = imputer.fit_transform(X)

# 2. Min-Max归一化（缩放到0-1区间）
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X_imputed)

# 全部数据都作为训练集
X_train = X_scaled
y_train = y
X_test = None
y_test = None

X_train = pd.DataFrame(X_scaled, columns=X.columns)
y_train = pd.Series(y, name='Lable')


In [None]:
# 机器学习算法定义区域

# 1. Logistic Regression
param_space_lr = {
    'C': lambda trial: trial.suggest_float('C', 1e-4, 1e4, log=True),
    'penalty': lambda trial: trial.suggest_categorical('penalty', ['l2', None, 'l1', 'elasticnet']),  # 'none' 改为 None
    'solver': lambda trial: trial.suggest_categorical('solver', ['lbfgs', 'saga', 'newton-cg', 'liblinear', 'sag']),
    'max_iter': lambda trial: trial.suggest_int('max_iter', 200, 2000),
    'class_weight': lambda trial: trial.suggest_categorical('class_weight', [None, 'balanced']),
    'random_state': lambda trial: trial.suggest_categorical('random_state', [42]),             #123，2024非常离谱
    'dual': lambda trial: trial.suggest_categorical('dual', [False, True]),
    'tol': lambda trial: trial.suggest_float('tol', 1e-8, 1e-1, log=True),
    'fit_intercept': lambda trial: trial.suggest_categorical('fit_intercept', [True, False]),
    'intercept_scaling': lambda trial: trial.suggest_float('intercept_scaling', 0.01, 100.0, log=True),
    'multi_class': lambda trial: trial.suggest_categorical('multi_class', ['auto', 'ovr', 'multinomial']),
    'verbose': lambda trial: trial.suggest_categorical('verbose', [0, 1, 2, 3, 4, 5]),
    'warm_start': lambda trial: trial.suggest_categorical('warm_start', [False, True]),
    'n_jobs': lambda trial: trial.suggest_categorical('n_jobs', [-1]),
    # l1_ratio 只在 penalty=elasticnet 时添加
}

def objective_lr(trial):
    params = {k: v(trial) for k, v in param_space_lr.items()}
    if params.get('penalty') == 'elasticnet':
        params['l1_ratio'] = trial.suggest_float('l1_ratio', 0.0, 1.0)
    try:
        model = LogisticRegression(**params)
        score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
        return 0.0 if np.isnan(score) else score
    except Exception:
        return 0.0

# 2. SVM 半新半旧
param_space_svm = {
    'C': lambda trial: trial.suggest_float('C', 1e-4, 1e4, log=True),  # 正则化参数，宽范围覆盖
    'kernel': lambda trial: trial.suggest_categorical('kernel', ['linear', 'rbf', 'sigmoid', 'poly']),  # 添加 poly 核，增加模型灵活性
    'gamma': lambda trial: trial.suggest_categorical('gamma', ['scale', 'auto', 1e-4, 1e-3, 1e-2, 1e-1, 1.0, 10.0]),  # 扩展 gamma，包含数值，适合 150 特征
    'class_weight': lambda trial: trial.suggest_categorical('class_weight', [None, 'balanced']),  # 适配 recall_macro
    'random_state': lambda trial: 42,  # 固定为 42，减少搜索复杂度
    'tol': lambda trial: trial.suggest_float('tol', 1e-5, 1e-2, log=True),  # 添加收敛容差，优化性能
    'coef0': lambda trial: trial.suggest_float('coef0', -1.0, 1.0) if trial.suggest_categorical('kernel', ['linear', 'rbf', 'sigmoid', 'poly']) in ['sigmoid', 'poly'] else 0.0,  # 添加，支持 sigmoid/poly 核
    'degree': lambda trial: trial.suggest_int('degree', 2, 4) if trial.suggest_categorical('kernel', ['linear', 'rbf', 'sigmoid', 'poly']) == 'poly' else 3,  # 添加，支持 poly 核
    'probability': lambda trial: True,  # 添加，确保与 train_and_evaluate 一致
    'max_iter': lambda trial: trial.suggest_int('max_iter', 500, 5000),  # 控制最大迭代次数
}

def objective_svm(trial):   #添加了异常保护
    try:  # 异常保护开始
        params = {k: v(trial) for k, v in param_space_svm.items()}
        # 处理无效参数组合
        if params['kernel'] == 'linear':
            params.pop('gamma', None)  # 线性核无需 gamma
            params['coef0'] = 0.0
            params['degree'] = 3
        if params['kernel'] == 'rbf':
            params['coef0'] = 0.0
            params['degree'] = 3
        params['cache_size'] = 500
        model = SVC(**params)
        score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
        return 0.0 if np.isnan(score) else score
    except Exception as e:  # 异常保护：出错直接返回0.0
        return 0.0

# 3. Random Forest 旧的
param_space_rf = {
    'n_estimators': lambda trial: trial.suggest_int('n_estimators', 50, 500),
    'max_depth': lambda trial: trial.suggest_int('max_depth', 3, 20),
    'min_samples_split': lambda trial: trial.suggest_int('min_samples_split', 2, 20),
    'min_samples_leaf': lambda trial: trial.suggest_int('min_samples_leaf', 1, 10),
    'max_features': lambda trial: trial.suggest_categorical('max_features', ['sqrt', 'log2']),
    'bootstrap': lambda trial: trial.suggest_categorical('bootstrap', [True, False]),
    'class_weight': lambda trial: trial.suggest_categorical('class_weight', [None, 'balanced']),
    'random_state': lambda trial: trial.suggest_categorical('random_state', [42, 123, 2024]),
}
def objective_rf(trial):
    params = {k: v(trial) for k, v in param_space_rf.items()}
    model = RandomForestClassifier(**params)
    score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
    return 0.0 if np.isnan(score) else score

# 4. KNN  新的
param_space_knn = {
    'n_neighbors': lambda trial: trial.suggest_int('n_neighbors', 1, 100),  # 扩展范围到100，适应1000样本的不同邻居规模
    'weights': lambda trial: trial.suggest_categorical('weights', ['uniform', 'distance']),  # 保留，核心参数，控制邻居权重
    'metric': lambda trial: trial.suggest_categorical('metric', ['euclidean', 'manhattan', 'minkowski']),  # 保留，常用距离度量
    'p': lambda trial: trial.suggest_int('p', 1, 5),  # 保留，minkowski核的p值
    'algorithm': lambda trial: trial.suggest_categorical('algorithm', ['auto', 'ball_tree', 'kd_tree', 'brute']),  # 添加，选择最佳搜索算法，适应150特征
    'leaf_size': lambda trial: trial.suggest_int('leaf_size', 10, 50),  # 添加，优化树结构效率，范围适中
}

def objective_knn(trial):
    params = {k: v(trial) for k, v in param_space_knn.items()}
    # 当metric不是minkowski时，忽略p
    if params['metric'] != 'minkowski':
        params.pop('p', None)
    model = KNeighborsClassifier(**params)
    score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
    return 0.0 if np.isnan(score) else score

# 5. Naive Bayes  新的
param_space_nb = {
    'var_smoothing': lambda trial: trial.suggest_float('var_smoothing', 1e-12, 1e-3, log=True),  # 扩展范围到1e-3，适应1000样本和150特征的方差平滑
    'priors': lambda trial: trial.suggest_categorical('priors', [None, [0.5, 0.5], [0.3, 0.7], [0.7, 0.3]]),  # 添加，先验概率，适配二分类和recall_macro
}

def objective_nb(trial):
    params = {k: v(trial) for k, v in param_space_nb.items()}
    model = GaussianNB(**params)
    score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
    return 0.0 if np.isnan(score) else score

# 6. Decision Tree 新的
param_space_dt = {
    'max_depth': lambda trial: trial.suggest_categorical('max_depth', [None, 3, 5, 10, 15, 20, 30]),  # Extend to include None for deeper trees
    'min_samples_split': lambda trial: trial.suggest_int('min_samples_split', 2, 50),  # Widen range for more flexibility
    'min_samples_leaf': lambda trial: trial.suggest_int('min_samples_leaf', 1, 20),  # Widen range to prevent overfitting
    'max_features': lambda trial: trial.suggest_float('max_features', 0.1, 1.0),  # Replace categorical with float for finer control over 150 features
    'class_weight': lambda trial: trial.suggest_categorical('class_weight', [None, 'balanced']),  # Keep for recall_macro
    'random_state': lambda trial: 42,  # Fix to single value to reduce search complexity
    'criterion': lambda trial: trial.suggest_categorical('criterion', ['gini', 'entropy']),  # Add to explore different split criteria
    'ccp_alpha': lambda trial: trial.suggest_float('ccp_alpha', 1e-6, 0.1, log=True)
}

def objective_dt(trial):
    params = {k: v(trial) for k, v in param_space_dt.items()}
    model = DecisionTreeClassifier(**params)
    score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
    return 0.0 if np.isnan(score) else score

# 7. Gradient Boosting
param_space_gb = {
    'n_estimators': lambda trial: trial.suggest_int('n_estimators', 50, 1000),  # Extend range for more trees, leveraging Optuna
    'learning_rate': lambda trial: trial.suggest_float('learning_rate', 1e-4, 1.0, log=True),  # Keep wide range for flexibility
    'max_depth': lambda trial: trial.suggest_int('max_depth', 3, 15),  # Extend to 15 for deeper trees
    'min_samples_split': lambda trial: trial.suggest_int('min_samples_split', 2, 50),  # Widen range for flexibility
    'min_samples_leaf': lambda trial: trial.suggest_int('min_samples_leaf', 1, 20),  # Widen range to prevent overfitting
    'subsample': lambda trial: trial.suggest_float('subsample', 0.1, 1.0),  # Extend to 0.1 for more stochasticity
    'random_state': lambda trial: 42,  # Fix to single value to reduce search complexity
    'criterion': lambda trial: trial.suggest_categorical('criterion', ['friedman_mse', 'squared_error']),  # Add to explore split criteria
    'max_features': lambda trial: trial.suggest_float('max_features', 0.1, 1.0),  # Add for feature subsampling, suits 150 features
      'ccp_alpha': lambda trial: trial.suggest_float('ccp_alpha', 1e-6, 0.1, log=True)
}

def objective_gb(trial):
    params = {k: v(trial) for k, v in param_space_gb.items()}
    model = GradientBoostingClassifier(**params)
    score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
    return 0.0 if np.isnan(score) else score

# 8. XGBoost
param_space_xgb = {
    'n_estimators': lambda trial: trial.suggest_int('n_estimators', 50, 1000),  # Extend range for more trees
    'learning_rate': lambda trial: trial.suggest_float('learning_rate', 1e-4, 1.0, log=True),  # Keep wide range
    'max_depth': lambda trial: trial.suggest_int('max_depth', 3, 15),  # Extend to 15 for deeper trees
    'min_child_weight': lambda trial: trial.suggest_int('min_child_weight', 1, 20),  # Widen range for robustness
    'subsample': lambda trial: trial.suggest_float('subsample', 0.1, 1.0),  # Extend to 0.1 for more stochasticity
    'colsample_bytree': lambda trial: trial.suggest_float('colsample_bytree', 0.1, 1.0),  # Extend to 0.1 for more feature subsampling
    'gamma': lambda trial: trial.suggest_float('gamma', 0.0, 10.0),  # Widen range for more flexibility
    'random_state': lambda trial: 42,  # Fix to single value
    'reg_alpha': lambda trial: trial.suggest_float('reg_alpha', 1e-5, 10.0, log=True),  # Add L1 regularization for sparsity
    'reg_lambda': lambda trial: trial.suggest_float('reg_lambda', 1e-5, 10.0, log=True),  # Add L2 regularization for robustness
    'scale_pos_weight': lambda trial: trial.suggest_float('scale_pos_weight', 0.1, 10.0, log=True),  # Add for recall_macro and class imbalance
}

def objective_xgb(trial):
    params = {k: v(trial) for k, v in param_space_xgb.items()}
    model = xgb.XGBClassifier(**params, use_label_encoder=False, eval_metric='logloss')
    score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
    return 0.0 if np.isnan(score) else score

# 9. MLP
param_space_mlp = {
    'hidden_layer_sizes': lambda trial: trial.suggest_categorical('hidden_layer_sizes', [(50,), (100,), (150,), (50, 50), (100, 50), (100, 100)]),  # Add more architectures for 150 features
    'activation': lambda trial: trial.suggest_categorical('activation', ['relu', 'tanh', 'logistic']),  # Keep for flexibility
    'solver': lambda trial: trial.suggest_categorical('solver', ['adam', 'sgd']),  # Keep for optimization methods
    'alpha': lambda trial: trial.suggest_float('alpha', 1e-5, 1e-1, log=True),  # Keep for regularization
    'learning_rate': lambda trial: trial.suggest_categorical('learning_rate', ['constant', 'adaptive']),  # Keep for learning rate schedule
    'max_iter': lambda trial: trial.suggest_int('max_iter', 200, 5000),  # Extend for convergence on 1000 samples
    'random_state': lambda trial: 42,  # Fix to single value
    'batch_size': lambda trial: trial.suggest_int('batch_size', 16, 256),  # Add for mini-batch size, suits 1000 samples
    'learning_rate_init': lambda trial: trial.suggest_float('learning_rate_init', 1e-4, 1e-2, log=True),  # Add for initial learning rate
}

def objective_mlp(trial):
    params = {k: v(trial) for k, v in param_space_mlp.items()}
    model = MLPClassifier(**params)
    score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
    return 0.0 if np.isnan(score) else score

# 10. LightGBM
param_space_lgbm = {
    'n_estimators': lambda trial: trial.suggest_int('n_estimators', 50, 250),  # Extend range for more trees
    'learning_rate': lambda trial: trial.suggest_float('learning_rate', 1e-4, 1.0, log=True),  # Keep wide range
    'max_depth': lambda trial: trial.suggest_int('max_depth', 3, 10),  # Extend to 15 for deeper trees
    'num_leaves': lambda trial: trial.suggest_int('num_leaves', 20, 32),  # Widen range for more complex trees  这个太大会卡
    'min_child_samples': lambda trial: trial.suggest_int('min_child_samples', 10, 50),  # Widen range for robustness
    'subsample': lambda trial: trial.suggest_float('subsample', 0.1, 1.0),  # Extend to 0.1 for stochasticity
    'colsample_bytree': lambda trial: trial.suggest_float('colsample_bytree', 0.1, 1.0),  # Extend to 0.1 for feature subsampling
    'random_state': lambda trial: 42,  # Fix to single value
    'reg_alpha': lambda trial: trial.suggest_float('reg_alpha', 1e-5, 10.0, log=True),  # Add L1 regularization
    'reg_lambda': lambda trial: trial.suggest_float('reg_lambda', 1e-5, 10.0, log=True),  # Add L2 regularization
    'class_weight': lambda trial: trial.suggest_categorical('class_weight', [None, 'balanced']),  # Add for recall_macro and class imbalance
}

import concurrent.futures
import traceback

def objective_lgbm(trial):
    def run_trial():
        params = {k: v(trial) for k, v in param_space_lgbm.items()}
        model = lgb.LGBMClassifier(**params)
        score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
        return 0.0 if np.isnan(score) else score

    try:
        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(run_trial)
            return future.result(timeout=30)
    except concurrent.futures.TimeoutError:
        print(f"[WARN] Trial timeout! Trial number: {trial.number}")
        return 0.0
    except Exception as e:
        print(f"[ERROR] Trial exception at trial {trial.number}: {e}")
        # 可选：traceback.print_exc()
        return 0.0

# 11. CatBoost  简化了
param_space_cb = {
    'iterations': lambda trial: trial.suggest_categorical('iterations', [100, 200, 300]),
    'learning_rate': lambda trial: trial.suggest_categorical('learning_rate', [0.01, 0.1, 0.2]),
    'depth': lambda trial: trial.suggest_categorical('depth', [3, 5, 10]),
    'l2_leaf_reg': lambda trial: trial.suggest_categorical('l2_leaf_reg', [1, 3, 5]),
    'random_state': lambda trial: 42,  # 固定随机种子
}
def objective_cb(trial):
    params = {k: v(trial) for k, v in param_space_cb.items()}
    model = cb.CatBoostClassifier(**params, verbose=0)
    score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
    return 0.0 if np.isnan(score) else score

# 12. AdaBoost
param_space_ab = {
    'n_estimators': lambda trial: trial.suggest_int('n_estimators', 50, 1000),  # Extend range for more boosting
    'learning_rate': lambda trial: trial.suggest_float('learning_rate', 1e-4, 1.0, log=True),  # Keep wide range
    'random_state': lambda trial: 42,  # Fix to single value
    'algorithm': lambda trial: trial.suggest_categorical('algorithm', ['SAMME'])
}

def objective_ab(trial):
    params = {k: v(trial) for k, v in param_space_ab.items()}
    model = AdaBoostClassifier(**params)
    score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
    return 0.0 if np.isnan(score) else score

# 13. Bagging Classifier
param_space_bag = {
    'n_estimators': lambda trial: trial.suggest_int('n_estimators', 10, 200),  # Extend range for more base models
    'max_samples': lambda trial: trial.suggest_float('max_samples', 0.1, 1.0),  # Extend to 0.1 for more stochasticity
    'max_features': lambda trial: trial.suggest_float('max_features', 0.1, 1.0),  # Extend to 0.1 for finer control
    'bootstrap': lambda trial: trial.suggest_categorical('bootstrap', [True, False]),  # Keep for diversity
    'random_state': lambda trial: 42,  # Fix to single value
    'bootstrap_features': lambda trial: trial.suggest_categorical('bootstrap_features', [True, False]),  # Add for feature sampling diversity
}

def objective_bag(trial):
    params = {k: v(trial) for k, v in param_space_bag.items()}
    model = BaggingClassifier(**params)
    score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
    return 0.0 if np.isnan(score) else score

# 14. Extra Trees Classifier
param_space_et = {
    'n_estimators': lambda trial: trial.suggest_int('n_estimators', 50, 1000),  # Extend range for more trees
    'max_depth': lambda trial: trial.suggest_categorical('max_depth', [None, 3, 5, 10, 15, 20, 30]),  # Include None for deeper trees
    'min_samples_split': lambda trial: trial.suggest_int('min_samples_split', 2, 50),  # Widen range
    'min_samples_leaf': lambda trial: trial.suggest_int('min_samples_leaf', 1, 20),  # Widen range
    'max_features': lambda trial: trial.suggest_float('max_features', 0.1, 1.0),  # Replace categorical with float for finer control
    'bootstrap': lambda trial: trial.suggest_categorical('bootstrap', [True, False]),  # Keep for diversity
    'random_state': lambda trial: 42,  # Fix to single value
    'criterion': lambda trial: trial.suggest_categorical('criterion', ['gini', 'entropy']),  # Add for split criteria
    'ccp_alpha': lambda trial: trial.suggest_float('ccp_alpha', 1e-6, 0.1, log=True),  # Add for pruning
    'class_weight': lambda trial: trial.suggest_categorical('class_weight', [None, 'balanced']),  # Add for recall_macro
}

def objective_et(trial):
    params = {k: v(trial) for k, v in param_space_et.items()}
    model = ExtraTreesClassifier(**params)
    score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
    return 0.0 if np.isnan(score) else score

# 15. Passive Aggressive Classifier
param_space_pa = {
    'C': lambda trial: trial.suggest_float('C', 1e-4, 1e4, log=True),  # Keep wide range for regularization
    'max_iter': lambda trial: trial.suggest_int('max_iter', 200, 5000),  # Extend for convergence
    'tol': lambda trial: trial.suggest_float('tol', 1e-8, 1e-1, log=True),  # Keep for convergence control
    'loss': lambda trial: trial.suggest_categorical('loss', ['hinge', 'squared_hinge']),  # Keep for loss functions
    'random_state': lambda trial: 42,  # Fix to single value
    'class_weight': lambda trial: trial.suggest_categorical('class_weight', [None, 'balanced']),  # Add for recall_macro
    'fit_intercept': lambda trial: trial.suggest_categorical('fit_intercept', [True, False]),  # Add to explore intercept
}

def objective_pa(trial):
    params = {k: v(trial) for k, v in param_space_pa.items()}
    model = PassiveAggressiveClassifier(**params)
    score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
    return 0.0 if np.isnan(score) else score

# 16. Ridge Classifier
param_space_ridge = {
    'alpha': lambda trial: trial.suggest_float('alpha', 1e-4, 1e4, log=True),  # Keep wide range for regularization
    'max_iter': lambda trial: trial.suggest_int('max_iter', 200, 5000),  # Extend for convergence
    'tol': lambda trial: trial.suggest_float('tol', 1e-8, 1e-1, log=True),  # Keep for convergence control
    'class_weight': lambda trial: trial.suggest_categorical('class_weight', [None, 'balanced']),  # Keep for recall_macro
    'random_state': lambda trial: 42,  # Fix to single value
    'fit_intercept': lambda trial: trial.suggest_categorical('fit_intercept', [True, False]),  # Add to explore intercept
    'solver': lambda trial: trial.suggest_categorical('solver', ['auto', 'svd', 'cholesky', 'lsqr', 'sparse_cg']),  # Add to explore solvers
}

def objective_ridge(trial):
    params = {k: v(trial) for k, v in param_space_ridge.items()}
    model = RidgeClassifier(**params)
    score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
    return 0.0 if np.isnan(score) else score

# 17. Linear Discriminant Analysis  这个需要先把数据里面的高度相关的东西清理掉
param_space_lda = {
    'solver': lambda trial: trial.suggest_categorical('solver', ['svd', 'lsqr', 'eigen']),
    'tol': lambda trial: trial.suggest_float('tol', 1e-8, 1e-1, log=True),
    'priors': lambda trial: trial.suggest_categorical('priors', [None, [0.5, 0.5], [0.3, 0.7], [0.7, 0.3]]),
    # shrinkage 不直接在这里设置
}

def objective_lda(trial):
    params = {k: v(trial) for k, v in param_space_lda.items()}
    # 只有 lsqr/eigen 支持 shrinkage
    if params['solver'] in ['lsqr', 'eigen']:
        params['shrinkage'] = trial.suggest_categorical('shrinkage', ['auto', 0.0, 0.5, 1.0])
    model = LinearDiscriminantAnalysis(**params)
    score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
    return 0.0 if np.isnan(score) else score

# 18. Quadratic Discriminant Analysis
param_space_qda = {
    'reg_param': lambda trial: trial.suggest_float('reg_param', 0.0, 1.0),  # Keep for regularization
    'tol': lambda trial: trial.suggest_float('tol', 1e-8, 1e-1, log=True),  # Keep for convergence
    'priors': lambda trial: trial.suggest_categorical('priors', [None, [0.5, 0.5], [0.3, 0.7], [0.7, 0.3]]),  # Add for class priors, suits recall_macro
}

def objective_qda(trial):
    params = {k: v(trial) for k, v in param_space_qda.items()}
    model = QuadraticDiscriminantAnalysis(**params)
    score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
    return 0.0 if np.isnan(score) else score

# 19. Stochastic Gradient Descent
param_space_sgd = {
    'loss': lambda trial: trial.suggest_categorical('loss', ['hinge', 'log_loss', 'modified_huber']),  # Update to include 'log_loss' instead of 'log'
    'penalty': lambda trial: trial.suggest_categorical('penalty', ['l2', 'l1', 'elasticnet']),  # Keep for regularization
    'alpha': lambda trial: trial.suggest_float('alpha', 1e-6, 1e-1, log=True),  # Keep for regularization strength
    'max_iter': lambda trial: trial.suggest_int('max_iter', 200, 5000),  # Extend for convergence
    'tol': lambda trial: trial.suggest_float('tol', 1e-8, 1e-1, log=True),  # Keep for convergence
    'random_state': lambda trial: 42,  # Fix to single value
    'l1_ratio': lambda trial: trial.suggest_float('l1_ratio', 0.0, 1.0),  # Add for elasticnet mixing
    'learning_rate': lambda trial: trial.suggest_categorical('learning_rate', ['constant', 'optimal', 'invscaling', 'adaptive']),  # Add more schedules
    'eta0': lambda trial: trial.suggest_float('eta0', 1e-4, 1.0, log=True),  # Add for initial learning rate
    'class_weight': lambda trial: trial.suggest_categorical('class_weight', [None, 'balanced']),  # Add for recall_macro
}

def objective_sgd(trial):
    params = {k: v(trial) for k, v in param_space_sgd.items()}
    model = SGDClassifier(**params)
    score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
    return 0.0 if np.isnan(score) else score

# 20. Voting Classifier
param_space_voting = {
    'voting': lambda trial: trial.suggest_categorical('voting', ['hard', 'soft']),
}
def objective_voting(trial):
    params = {k: v(trial) for k, v in param_space_voting.items()}
    # Define base estimators (using default parameters for simplicity)
    estimators = [
        ('lr', LogisticRegression(random_state=42)),
        ('rf', RandomForestClassifier(random_state=42)),
        ('svm', SVC(probability=True, random_state=42)),
        ('knn', KNeighborsClassifier()),
        ('dt', DecisionTreeClassifier())
    ]
    model = VotingClassifier(estimators=estimators, **params)
    score = cross_val_score(model, X_train, y_train, cv=10, scoring='recall_macro', n_jobs=-1).mean()
    return 0.0 if np.isnan(score) else score

In [None]:
# 机器学习算法与对应目标函数的映射
ALGORITHM_OBJECTIVES = {
    'LogisticRegression': objective_lr,
    'SVM': objective_svm,
    'RandomForest': objective_rf,
    'KNN': objective_knn,
    'NaiveBayes': objective_nb,
    'DecisionTree': objective_dt,
    'GradientBoosting': objective_gb,
    'XGBoost': objective_xgb,
    'MLP': objective_mlp,
    'LightGBM': objective_lgbm,
    'CatBoost': objective_cb,
    'AdaBoost': objective_ab,
    'Bagging': objective_bag,
    'ExtraTrees': objective_et,
    'PassiveAggressive': objective_pa,
    'Ridge': objective_ridge,
    'LDA': objective_lda,
    'QDA': objective_qda,
    'SGD': objective_sgd,
    'Voting': objective_voting
}

# 模型类映射，用于实例化模型
MODEL_CLASSES = {
    'LogisticRegression': LogisticRegression,
    'SVM': SVC,
    'RandomForest': RandomForestClassifier,
    'KNN': KNeighborsClassifier,
    'NaiveBayes': GaussianNB,
    'DecisionTree': DecisionTreeClassifier,
    'GradientBoosting': GradientBoostingClassifier,
    'XGBoost': xgb.XGBClassifier,
    'MLP': MLPClassifier,
    'LightGBM': lgb.LGBMClassifier,
    'CatBoost': cb.CatBoostClassifier,
    'AdaBoost': AdaBoostClassifier,
    'Bagging': BaggingClassifier,
    'ExtraTrees': ExtraTreesClassifier,
    'PassiveAggressive': PassiveAggressiveClassifier,
    'Ridge': RidgeClassifier,
    'LDA': LinearDiscriminantAnalysis,
    'QDA': QuadraticDiscriminantAnalysis,
    'SGD': SGDClassifier,
    'Voting': VotingClassifier
}

def train_and_evaluate(
    algorithms='all',  # 'all' 或指定算法名称列表，如 ['LogisticRegression', 'RandomForest']
    scoring='recall_macro',  # 优化目标
    random_trials=200,  # 随机搜索步数
    tpe_trials=200,  # TPE 搜索步数
    cv=10,  # 交叉验证折数
    output_metrics=['recall', 'auc', 'auprc', 'recall0', 'recall1', 'precision0', 'precision1', 'accuracy', 'f1']  # 输出指标
):
    """
    训练并评估指定的机器学习算法，使用 Optuna 进行超参数优化，所有指标基于交叉验证的验证集。

    参数:
        algorithms: str 或 list，'all' 表示训练所有算法，或指定算法名称列表
        scoring: str，优化目标，如 'recall_macro', 'accuracy' 等
        random_trials: int，随机搜索的试验次数
        tpe_trials: int，TPE 搜索的试验次数
        cv: int，交叉验证折数
        output_metrics: list，输出的评估指标

    返回:
        results: dict，包含每个算法的交叉验证评估指标、最优参数和训练好的模型（供后续 SHAP 分析）
    """
    results = defaultdict(dict)

    # 如果指定 'all'，获取所有算法，否则使用用户指定的算法列表
    algo_list = list(ALGORITHM_OBJECTIVES.keys()) if algorithms == 'all' else algorithms

    for algo_name in algo_list:
        print(f"Training {algo_name}...")

        # 获取目标函数
        objective = ALGORITHM_OBJECTIVES.get(algo_name)
        if objective is None:
            print(f"Objective function for {algo_name} not found. Skipping...")
            continue

        # 创建 Optuna 学习
        study = optuna.create_study(direction='maximize', sampler=optuna.samplers.RandomSampler())

        # 随机搜索
        study.optimize(objective, n_trials=random_trials, n_jobs=-1)

        # 切换到 TPE 采样器继续优化
        study.sampler = optuna.samplers.TPESampler()
        study.optimize(objective, n_trials=tpe_trials, n_jobs=-1)

        # 获取最优参数
        best_params = study.best_params
        results[algo_name]['best_params'] = best_params
        results[algo_name]['best_score'] = study.best_value

        # 训练最终模型
        model_class = MODEL_CLASSES.get(algo_name)
        if model_class is None:
            print(f"Model class for {algo_name} not found. Skipping...")
            continue

        # 特殊处理 VotingClassifier 和 SVM
        if algo_name == 'Voting':
            estimators = [
                ('lr', LogisticRegression(random_state=42)),
                ('rf', RandomForestClassifier(random_state=42)),
                ('svm', SVC(probability=True, random_state=42))
            ]
            model = model_class(estimators=estimators, **best_params)
        else:
            # XGBoost 特殊参数
            if algo_name == 'XGBoost':
                best_params['use_label_encoder'] = False
                best_params['eval_metric'] = 'logloss'
            # CatBoost 关闭 verbose
            if algo_name == 'CatBoost':
                best_params['verbose'] = 0
            # SVM 启用 probability=True
            if algo_name == 'SVM':
                best_params['probability'] = True
            model = model_class(**best_params)

        # 交叉验证评估
        cv_scores = cross_val_score(model, X_train, y_train, cv=cv, scoring=scoring, n_jobs=-1)
        results[algo_name][f'cv_{scoring}'] = cv_scores.mean()

        # 自定义交叉验证循环以计算所有指标
        skf = StratifiedKFold(n_splits=cv, shuffle=True, random_state=42)
        metric_scores = defaultdict(list)

        for train_idx, val_idx in skf.split(X_train, y_train):
            X_train_fold, X_val_fold = X_train.iloc[train_idx], X_train.iloc[val_idx]
            y_train_fold, y_val_fold = y_train.iloc[train_idx], y_train.iloc[val_idx]

            if algo_name == 'Voting':
                estimators = [
                    ('lr', LogisticRegression(random_state=42)),
                    ('rf', RandomForestClassifier(random_state=42)),
                    ('svm', SVC(probability=True, random_state=42))
                ]
                fold_model = model_class(estimators=estimators, **best_params)
            else:
                fold_model = model_class(**best_params)

            fold_model.fit(X_train_fold, y_train_fold)

            # 预测验证集
            y_pred = fold_model.predict(X_val_fold)
            y_proba = fold_model.predict_proba(X_val_fold)[:, 1] if hasattr(fold_model, 'predict_proba') else None

            # 计算指标
            for metric in output_metrics:
                if metric == 'recall':
                    metric_scores['cv_recall_macro'].append(recall_score(y_val_fold, y_pred, average='macro'))
                elif metric == 'auc' and y_proba is not None:
                    metric_scores['cv_auc'].append(roc_auc_score(y_val_fold, y_proba))
                elif metric == 'auprc' and y_proba is not None:
                    metric_scores['cv_auprc'].append(average_precision_score(y_val_fold, y_proba))
                elif metric == 'recall0':
                    metric_scores['cv_recall0'].append(recall_score(y_val_fold, y_pred, pos_label=0))
                elif metric == 'recall1':
                    metric_scores['cv_recall1'].append(recall_score(y_val_fold, y_pred, pos_label=1))
                elif metric == 'precision0':
                    metric_scores['cv_precision0'].append(precision_score(y_val_fold, y_pred, pos_label=0))
                elif metric == 'precision1':
                    metric_scores['cv_precision1'].append(precision_score(y_val_fold, y_pred, pos_label=1))
                elif metric == 'accuracy':
                    metric_scores['cv_accuracy'].append(accuracy_score(y_val_fold, y_pred))
                elif metric == 'f1':
                    metric_scores['cv_f1_macro'].append(f1_score(y_val_fold, y_pred, average='macro'))

        # 平均每折的指标
        for metric_name, scores in metric_scores.items():
            results[algo_name][metric_name] = np.mean(scores)

        # 训练完整模型以存储（供 SHAP 分析）
        model.fit(X_train, y_train)
        results[algo_name]['model'] = model

        # 验证 cv_recall_macro 是否等于 (cv_recall0 + cv_recall1) / 2
        if 'cv_recall0' in results[algo_name] and 'cv_recall1' in results[algo_name]:
            expected_recall_macro = (results[algo_name]['cv_recall0'] + results[algo_name]['cv_recall1']) / 2
            if not np.isclose(results[algo_name]['cv_recall_macro'], expected_recall_macro, atol=1e-6):
                print(f"Warning: cv_recall_macro ({results[algo_name]['cv_recall_macro']}) does not match "
                      f"(cv_recall0 + cv_recall1) / 2 ({expected_recall_macro}) for {algo_name}")

        print(f"{algo_name} completed. Best {scoring}: {study.best_value:.4f}")

    return results

#指定指标 输出所有算法的得分
def print_metric_for_all_algorithms(results, metric_name):
    print(f"\n所有算法的 {metric_name} 得分：")
    for algo, metrics in results.items():
        value = metrics.get(metric_name, None)
        if value is not None:
            print(f"{algo}: {value:.4f}")
        else:
            print(f"{algo}: 无此指标")

In [None]:
if __name__ == "__main__":
    # 示例用法
    results = train_and_evaluate(
        algorithms=[
            'LogisticRegression',
                    'SVM',
                    'RandomForest',
                    'KNN',
                    'NaiveBayes',
                    'DecisionTree',
                    'GradientBoosting',
                    'XGBoost',
                    'MLP',
                    'LightGBM',   # 这个很卡要优化一下
                    'AdaBoost',
                    'Bagging',
                    'ExtraTrees',
                    'PassiveAggressive',
                    'Ridge',
                    'QDA',
                    'SGD',
                    'Voting'
                    ],
        scoring='roc_auc',            # precision_macro  recall_macro  roc_auc
        random_trials=500,
        tpe_trials=500,
        cv=10,
       # output_metrics=['recall', 'auc', 'auprc', 'recall0', 'recall1', 'precision0', 'precision1', 'accuracy']
    )

    # 输出所有每个算法的结果 依次输出一个算法的各种得分
    for algo, metrics in results.items():
        print(f"\nResults for {algo}:")
        for key, value in metrics.items():
            if key != 'model':
                print(f"{key}: {value}")


In [None]:
print_metric_for_all_algorithms(results, 'cv_recall_macro')      # 输出所有算法的 recall
print_metric_for_all_algorithms(results, 'cv_auc')         # 输出所有算法的 auc
print_metric_for_all_algorithms(results, 'cv_recall0')  # 输出所有算法的 precision1
print_metric_for_all_algorithms(results, 'cv_recall1')
print_metric_for_all_algorithms(results, 'cv_precision0')
print_metric_for_all_algorithms(results, 'cv_precision1')
print_metric_for_all_algorithms(results, 'cv_accuracy')
print_metric_for_all_algorithms(results, 'cv_f1_macro')

In [None]:
# 指定算法名称
algo_name = 'LogisticRegression'  # 可改为'SVM'、'RandomForest'等   LogisticRegression XGBoost

# 获取模型
model = results[algo_name]['model']

# 特征名
feature_names = X_train.columns.tolist()

# 选择SHAP解释器
if algo_name in ['RandomForest', 'GradientBoosting', 'XGBoost', 'LightGBM', 'CatBoost', 'ExtraTrees']:
    explainer = shap.TreeExplainer(model)
elif algo_name in ['LogisticRegression', 'Ridge', 'SGD']:
    explainer = shap.LinearExplainer(model, X_train)
else:
    explainer = shap.KernelExplainer(model.predict_proba, X_train)

# 计算SHAP值
shap_values = explainer.shap_values(X_train)

In [None]:

# SHAP总结图
if isinstance(shap_values, list):  # 二分类
    shap.summary_plot(shap_values[1], X_train, feature_names=feature_names, max_display=200)
else:
    shap.summary_plot(shap_values, X_train, feature_names=feature_names, max_display=200)