# 性能

In [None]:
import pandas as pd
import numpy as np
from pymoo.core.problem import ElementwiseProblem
from pymoo.algorithms.moo.nsga3 import NSGA3
from pymoo.util.ref_dirs import get_reference_directions
from pymoo.optimize import minimize

# 文件路径
mdd_path = 'C:/Users/74101/Desktop/成人抑郁症/result/12.5/result/mdd_all/cv_allrna.csv'
bd_path = 'C:/Users/74101/Desktop/成人抑郁症/result/10.22/result/all_bp/cv_allrna.csv'
hc_path = 'C:/Users/74101/Desktop/成人抑郁症/result/10.22/result/all_mdd/cv_allrna.csv'
hamd_path = 'C:/Users/74101/Desktop/成人抑郁症/result/10.22/result/all_hmad/cv_allrna.csv'

# 读取每个文件
MDD = pd.read_csv(mdd_path)
BD = pd.read_csv(bd_path)
HC = pd.read_csv(hc_path)
HAMD = pd.read_csv(hamd_path)

# 按 'Model' 分组并计算均值
MDD = MDD.groupby('Model').mean(numeric_only=True).reset_index()
BD = BD.groupby('Model').mean(numeric_only=True).reset_index()
HC = HC.groupby('Model').mean(numeric_only=True).reset_index()
HAMD = HAMD.groupby('Model').mean(numeric_only=True).reset_index()

# 数据整理为字典
data_dict = {
    "MDD": MDD,
    "BD": BD,
    "HC": HC,
    "HAMD": HAMD
}

# 动态提取性能指标列（除去 "Model"）
objectives = ["AUC", "F1 Score", "Accuracy", 'Sensitivity', 'Specificity']
print("目标列:", objectives)

# 获取所有模型类型
all_models = list(MDD["Model"].unique())

# 定义多目标优化问题
class MultiObjectiveModelSelection(ElementwiseProblem):
    def __init__(self, data_dict, all_models, objectives):
        self.data_dict = data_dict
        self.all_models = all_models
        self.objectives = objectives
        n_var = len(all_models)  # 决策变量数量（模型数量）
        n_obj = len(objectives)  # 目标数量（性能指标数量）
        super().__init__(n_var=n_var, n_obj=n_obj, xl=0, xu=1, type_var=float)  # 修改为float

    def _evaluate(self, x, out, *args, **kwargs):
        # 将 x 转换为二进制选择
        x_binary = np.round(x).astype(int)
        selected_models = [self.all_models[i] for i, selected in enumerate(x_binary) if selected == 1]
        
        if not selected_models:
            # 如果未选中任何模型，返回性能值最差的情况
            out["F"] = [1.0] * len(self.objectives)  # 所有目标设为 1（最差性能）
            return
        
        scores = []
        for obj in self.objectives:
            obj_scores = []
            for dataset in self.data_dict.values():
                # 筛选当前数据集中的选中模型
                selected_data = dataset[dataset["Model"].isin(selected_models)]
                if selected_data.empty:
                    obj_scores.append(1.0)  # 若无选中模型，目标值为 1
                else:
                    # 计算选中模型的平均性能并取负值（目标最小化）
                    obj_scores.append(-selected_data[obj].mean())
            scores.append(np.mean(obj_scores))  # 对所有数据集取平均
        out["F"] = scores

# 定义优化问题
problem = MultiObjectiveModelSelection(data_dict, all_models, objectives)

# 设置参考方向和算法
ref_dirs = get_reference_directions("das-dennis", n_dim=len(objectives), n_partitions=3)
algorithm = NSGA3(pop_size=100, ref_dirs=ref_dirs)

# 执行优化
res = minimize(problem, algorithm, ('n_gen', 100), seed=42, verbose=True)

# 解析结果
for i, solution in enumerate(res.X):
    # 找到最大概率的模型索引
    max_index = np.argmax(solution)
    selected_model = all_models[max_index]
    print(f"Solution {i + 1}: Selected model type: {selected_model}")

# 如果你只需要最优解的选择，可以使用：
optimal_solution = res.X[0]
max_index = np.argmax(optimal_solution)
selected_model = all_models[max_index]
print(f"Best solution selected model type: {selected_model}")

# 外部筛选

In [None]:
import pandas as pd

# 导入数据
data_row = pd.read_csv('C:/Users/74101/Desktop/成人抑郁症/data/all.csv', encoding='GBK')

# 数据分组
train_data = data_row[data_row.iloc[:, 0].str.startswith('train')]
test_data = data_row[data_row.iloc[:, 0].str.startswith('test')]

# 定义分组逻辑的通用函数
def process_data(data, group_filter, replace_map, drop_columns):
    """
    按组过滤和处理数据。

    :param data: 原始数据
    :param group_filter: 筛选组的条件列表
    :param replace_map: group 列的替换映射
    :param drop_columns: 需要移除的列
    :return: 处理后的特征和标签
    """
    filtered_data = data[data['group'].isin(group_filter)]
    filtered_data['group'] = filtered_data['group'].replace(replace_map)
    group = filtered_data['group']
    features = filtered_data.drop(columns=drop_columns)

    # 替换所有NaN为0
    features = features.fillna(0)

    return features, group

# 公共需要移除的列
common_drop_columns = ['allRNA', 'Hospital', 'Sample_id', 'Company', 'Batch', 'group', 'Age', 'HAMD', 'Diagnosis', 'Gender']

# MDD 数据
train_mdd_feature, train_mdd_group = process_data(
    train_data, group_filter=[3, 1, 0], replace_map={3: 0}, drop_columns=common_drop_columns)
test_mdd_feature, test_mdd_group = process_data(
    test_data, group_filter=[3, 1, 0], replace_map={3: 0}, drop_columns=common_drop_columns)

# BD 数据
train_bd_feature, train_bd_group = process_data(
    train_data, group_filter=[2, 1], replace_map={2: 0}, drop_columns=common_drop_columns)
test_bd_feature, test_bd_group = process_data(
    test_data, group_filter=[2, 1], replace_map={2: 0}, drop_columns=common_drop_columns)

# HC 数据
train_hc_feature, train_hc_group = process_data(
    train_data, group_filter=[1, 0], replace_map={}, drop_columns=common_drop_columns)
test_hc_feature, test_hc_group = process_data(
    test_data, group_filter=[1, 0], replace_map={}, drop_columns=common_drop_columns)

# Other 数据
train_other_feature, train_other_group = process_data(
    train_data, group_filter=[3, 1], replace_map={3: 0}, drop_columns=common_drop_columns)
test_other_feature, test_other_group = process_data(
    test_data, group_filter=[3, 1], replace_map={3: 0}, drop_columns=common_drop_columns)

# HAMD 数据
train_hamd = train_data[train_data['group'].isin([1, 0])]
test_hamd = test_data[test_data['group'].isin([1, 0])]

# 选择 'HAMD' 非空的行
train_hamd_filtered = train_hamd[train_hamd['HAMD'].notna()].copy()
test_hamd_filtered = test_hamd[test_hamd['HAMD'].notna()].copy()

# 将 'HAMD' 列的值按照范围分组：8-20 的标注为 0，超过 20 的标注为 1
train_hamd_filtered['HAMD_Group'] = train_hamd_filtered['HAMD'].apply(lambda x: 0 if x < 20 else 1)
test_hamd_filtered['HAMD_Group'] = test_hamd_filtered['HAMD'].apply(lambda x: 0 if x < 20 else 1)

# 提取特征和分组信息
train_hamd_feature = train_hamd_filtered.drop(columns=[
    'allRNA', 'Hospital', 'Sample_id', 'Company', 'Batch', 'group', 'Age', 'HAMD', 'Diagnosis', 'Gender', 'HAMD_Group'
])
train_hamd_group = train_hamd_filtered['HAMD_Group']

test_hamd_feature = test_hamd_filtered.drop(columns=[
    'allRNA', 'Hospital', 'Sample_id', 'Company', 'Batch', 'group', 'Age', 'HAMD', 'Diagnosis', 'Gender', 'HAMD_Group'
])
test_hamd_group = test_hamd_filtered['HAMD_Group']


In [None]:
# 导入库
import joblib

# 模型名称列表
model_names = ['AdaBoost', 'CatBoost', 'GBDT', 'LightGBM', 'Logistic Regression', 
               'MLP', 'Random Forest', 'SVM', 'XGBoost']

# MDD 模型
mdd_models = {}
for name in model_names:
    model_path = f'C:/Users/74101/Desktop/成人抑郁症/result/12.5/result/mdd_all/model/{name}_model.pkl'
    try:
        mdd_models[name] = joblib.load(model_path)
        print(f"{name} model (MDD) loaded successfully.")
    except Exception as e:
        print(f"Failed to load {name} model (MDD): {e}")

# BD 模型
bd_models = {}
for name in model_names:
    model_path = f'C:/Users/74101/Desktop/成人抑郁症/result/10.22/result/all_bp/model/{name}_model.pkl'
    try:
        bd_models[name] = joblib.load(model_path)
        print(f"{name} model (BD) loaded successfully.")
    except Exception as e:
        print(f"Failed to load {name} model (BD): {e}")

# HC 模型
hc_models = {}
for name in model_names:
    model_path = f'C:/Users/74101/Desktop/成人抑郁症/result/10.22/result/all_mdd/model/{name}_model.pkl'
    try:
        hc_models[name] = joblib.load(model_path)
        print(f"{name} model (HC) loaded successfully.")
    except Exception as e:
        print(f"Failed to load {name} model (HC): {e}")

# Other 模型
other_models = {}
for name in model_names:
    model_path = f'C:/Users/74101/Desktop/成人抑郁症/result/12.5/result/other_all/model/{name}_model.pkl'
    try:
        other_models[name] = joblib.load(model_path)
        print(f"{name} model (Other) loaded successfully.")
    except Exception as e:
        print(f"Failed to load {name} model (Other): {e}")

# HAMD 模型

hamd_models = {}
for name in model_names:
    model_path = f'C:/Users/74101/Desktop/成人抑郁症/result/10.22/result/all_hmad/model/{name}_model.pkl'
    try:
        hamd_models[name] = joblib.load(model_path)
        print(f"{name} model (HAMD) loaded successfully.")
    except Exception as e:
        print(f"Failed to load {name} model (HAMD): {e}")


In [None]:
models_dict = {
    "MDD": mdd_models,
    "BD": bd_models,
    "HC": hc_models,
    "Other": other_models,
    "HAMD": hamd_models
}

dataset = {
    "MDD": (test_mdd_feature, test_mdd_group),
    "BD": (test_bd_feature, test_bd_group),
    "HC": (test_hc_feature, test_hc_group),
    "Other": (test_other_feature, test_other_group),
    "HAMD": (test_hamd_feature, test_hamd_group)
}

In [None]:
from sklearn.metrics import roc_auc_score, accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, average_precision_score, log_loss, matthews_corrcoef
import pandas as pd

# 定义性能评估函数
def evaluate_model_performance(models_dict, dataset):
    """
    对模型在指定数据集上进行评估。
    
    :param models_dict: 字典，键为数据集名称，值为模型列表
    :param dataset: 字典，键为数据集名称，值为 (特征, 标签)
    :return: 各模型的性能结果
    """
    results = []

    for name, models in models_dict.items():
        # 获取对应数据集
        test_features, test_labels = dataset[name]

        for model_name, model in models.items():
            # 模型预测概率和类别
            y_pred_prob = model.predict_proba(test_features)[:, 1]  # 获取正类的预测概率
            y_pred = model.predict(test_features)  # 获取预测标签

            # 计算各项指标
            auc = roc_auc_score(test_labels, y_pred_prob)
            acc = accuracy_score(test_labels, y_pred)
            sn = recall_score(test_labels, y_pred)  # 敏感性 (召回率)
            sp = specificity_score(test_labels, y_pred)  # 特异性
            f1 = f1_score(test_labels, y_pred)
            auprc = average_precision_score(test_labels, y_pred_prob)  # 精确率-召回率曲线下的面积
            logloss = log_loss(test_labels, y_pred_prob)  # 对数损失
            mcc = matthews_corrcoef(test_labels, y_pred)  # 马修斯相关系数

            # 保存结果
            results.append({
                "Dataset": name,
                "Model": model_name,
                "AUC": auc,
                "Accuracy": acc,
                "Sensitivity (SN)": sn,
                "Specificity (SP)": sp,
                "F1 Score": f1,
                "AUPRC": auprc,
                "Log-Loss": logloss,
                "MCC": mcc
            })

    return pd.DataFrame(results)

# 定义计算特异性的辅助函数
def specificity_score(y_true, y_pred):
    """
    计算特异性。
    
    :param y_true: 真实标签
    :param y_pred: 预测标签
    :return: 特异性
    """
    tn, fp, _, _ = confusion_matrix(y_true, y_pred).ravel()
    return tn / (tn + fp)

# 调用评估函数
performance_results = evaluate_model_performance(models_dict, dataset)

# 打印或保存结果
print(performance_results)
performance_results.to_csv('C:/Users/74101/Desktop/成人抑郁症/result/12.5/result/model_performance1.csv', index=False)


In [None]:
# NSGA3
from pymoo.core.problem import ElementwiseProblem
from pymoo.algorithms.moo.nsga3 import NSGA3
from pymoo.util.ref_dirs import get_reference_directions
from pymoo.optimize import minimize
import numpy as np

# 提取性能指标和模型
objectives = ["AUC", "AUPRC", "Log-Loss", "MCC"]
datasets = performance_results["Dataset"].unique()
models = performance_results["Model"].unique()

# 数据预处理为字典形式
data_dict = {
    dataset: performance_results[performance_results["Dataset"] == dataset]
    for dataset in datasets
}

# 定义多目标优化问题
class ModelSelectionProblem(ElementwiseProblem):
    def __init__(self, data_dict, models, objectives):
        self.data_dict = data_dict
        self.models = models
        self.objectives = objectives
        n_var = len(models)  # 决策变量（模型数量）
        n_obj = len(objectives)  # 目标数量
        super().__init__(n_var=n_var, n_obj=n_obj, xl=0, xu=1, type_var=float)  # 使用浮点型变量

    def _evaluate(self, x, out, *args, **kwargs):
        x_binary = np.round(x).astype(int)  # 将浮点数转换为二进制选择
        selected_models = [self.models[i] for i, selected in enumerate(x_binary) if selected == 1]

        if not selected_models:
            # 如果未选中任何模型，返回最差性能
            out["F"] = [1.0] * len(self.objectives)  # 所有目标最差性能
            return

        scores = []
        for obj in self.objectives:
            obj_scores = []
            for dataset, data in self.data_dict.items():
                # 筛选选中模型
                selected_data = data[data["Model"].isin(selected_models)]
                if selected_data.empty:
                    obj_scores.append(1.0)  # 无选中模型，性能为最差
                else:
                    # 计算选中模型的平均性能并取负值
                    obj_scores.append(-selected_data[obj].mean())
            scores.append(np.mean(obj_scores))  # 对所有数据集取平均
        out["F"] = scores

# 定义优化问题
problem = ModelSelectionProblem(data_dict, models, objectives)

# 设置参考方向和算法
ref_dirs = get_reference_directions("das-dennis", n_dim=len(objectives), n_partitions=6)
algorithm = NSGA3(pop_size=210, ref_dirs=ref_dirs)

# 执行优化
res = minimize(problem, algorithm, ('n_gen', 100), seed=42, verbose=True)

# 输出 Pareto 解
optimal_solutions = res.X
for i, solution in enumerate(optimal_solutions):
    selected_models = [models[j] for j in range(len(models)) if np.round(solution[j]) == 1]
    print(f"Solution {i + 1}: Selected models: {selected_models}")

# 选择最佳解
best_solution = optimal_solutions[0]
selected_models = [models[j] for j in range(len(models)) if np.round(best_solution[j]) == 1]
print(f"Best solution: Selected models: {selected_models}")


In [None]:
# NSGA3
from pymoo.core.problem import ElementwiseProblem
from pymoo.algorithms.moo.nsga3 import NSGA3
from pymoo.util.ref_dirs import get_reference_directions
from pymoo.optimize import minimize
import numpy as np

# 提取性能指标和模型
objectives = ["AUC", "AUPRC", "Log-Loss", "MCC"]
datasets = performance_results["Dataset"].unique()
models = performance_results["Model"].unique()

# 数据预处理为字典形式
data_dict = {
    dataset: performance_results[performance_results["Dataset"] == dataset]
    for dataset in datasets
}

# 定义多目标优化问题
class ModelSelectionProblem(ElementwiseProblem):
    def __init__(self, data_dict, models, objectives):
        self.data_dict = data_dict
        self.models = models
        self.objectives = objectives
        n_var = len(models)  # 决策变量（模型数量）
        n_obj = len(objectives)  # 目标数量
        super().__init__(n_var=n_var, n_obj=n_obj, xl=0, xu=1, type_var=float)  # 使用浮点型变量

    def _evaluate(self, x, out, *args, **kwargs):
        x_binary = np.round(x).astype(int)  # 将浮点数转换为二进制选择
        selected_models = [self.models[i] for i, selected in enumerate(x_binary) if selected == 1]

        if not selected_models:
            # 如果未选中任何模型，返回最差性能
            out["F"] = [1.0] * len(self.objectives)  # 所有目标最差性能
            return

        scores = []
        for obj in self.objectives:
            obj_scores = []
            for dataset, data in self.data_dict.items():
                # 筛选选中模型
                selected_data = data[data["Model"].isin(selected_models)]
                if selected_data.empty:
                    obj_scores.append(1.0)  # 无选中模型，性能为最差
                else:
                    # 计算选中模型的平均性能并取负值
                    obj_scores.append(-selected_data[obj].mean())
            scores.append(np.mean(obj_scores))  # 对所有数据集取平均
        out["F"] = scores

# 定义优化问题
problem = ModelSelectionProblem(data_dict, models, objectives)

# 设置参考方向和算法
ref_dirs = get_reference_directions("das-dennis", n_dim=len(objectives), n_partitions=6)
algorithm = NSGA3(pop_size=100, ref_dirs=ref_dirs)

# 执行优化
res = minimize(problem, algorithm, ('n_gen', 100), seed=42, verbose=True)

# 输出 Pareto 解
optimal_solutions = res.X
optimal_objectives = res.F  # Pareto 前沿解的目标值

# 获取所有解的 AUC 值，并按 AUC 排序
auc_values = optimal_objectives[:, 0]  # 假设 AUC 是第一个目标（根据 objectives 列表中的顺序）
best_solution_index = np.argmin(auc_values)  # 找到最大 AUC 的解的索引

# 获取对应 AUC 最大解的所有模型选择概率
best_solution = optimal_solutions[best_solution_index]

# 输出当前解的模型概率
print(f"Selected models based on maximum AUC: {best_solution}")

# 在这些模型中选择概率最高的模型
max_prob_model_index = np.argmax(best_solution)  # 选择最大概率的模型索引
selected_model = models[max_prob_model_index]  # 获取模型名称

print(f"Best model with highest probability: {selected_model}")



In [None]:
from pymoo.core.problem import ElementwiseProblem
from pymoo.algorithms.moo.nsga3 import NSGA3
from pymoo.util.ref_dirs import get_reference_directions
from pymoo.optimize import minimize
import numpy as np

# 提取性能指标和模型
objectives = ["AUC", "AUPRC", "Log-Loss"]
datasets = performance_results["Dataset"].unique()
models = performance_results["Model"].unique()

# 数据预处理为字典形式
data_dict = {
    dataset: performance_results[performance_results["Dataset"] == dataset]
    for dataset in datasets
}

# 定义多目标优化问题
class ModelSelectionProblem(ElementwiseProblem):
    def __init__(self, data_dict, models, objectives):
        self.data_dict = data_dict
        self.models = models
        self.objectives = objectives
        n_var = len(models)  # 决策变量（模型数量）
        n_obj = len(objectives)  # 目标数量
        super().__init__(n_var=n_var, n_obj=n_obj, xl=0, xu=1, type_var=float)  # 使用浮点型变量

    def _evaluate(self, x, out, *args, **kwargs):
        x_binary = np.round(x).astype(int)  # 将浮点数转换为二进制选择
        selected_models = [self.models[i] for i, selected in enumerate(x_binary) if selected == 1]

        if not selected_models:
            # 如果未选中任何模型，返回最差性能
            out["F"] = [1.0] * len(self.objectives)  # 所有目标最差性能
            return

        scores = []
        for obj in self.objectives:
            obj_scores = []
            for dataset, data in self.data_dict.items():
                # 筛选选中模型
                selected_data = data[data["Model"].isin(selected_models)]
                if selected_data.empty:
                    obj_scores.append(1.0)  # 无选中模型，性能为最差
                else:
                    # 计算选中模型的平均性能并取负值（Log-Loss需要最小化）
                    if obj == "Log-Loss":
                        obj_scores.append(selected_data[obj].mean())  # Log-Loss 越小越好，保留正值
                    else:
                        obj_scores.append(-selected_data[obj].mean())  # AUC, AUPRC 和 MCC 越大越好，取负值
            scores.append(np.mean(obj_scores))  # 对所有数据集取平均
        out["F"] = scores

# 定义优化问题
problem = ModelSelectionProblem(data_dict, models, objectives)

# 设置参考方向和算法
ref_dirs = get_reference_directions("das-dennis", n_dim=len(objectives), n_partitions=6)
algorithm = NSGA3(pop_size=100, ref_dirs=ref_dirs)

# 执行优化
res = minimize(problem, algorithm, ('n_gen', 100), seed=42, verbose=True)

# 输出 Pareto 解
optimal_solutions = res.X
optimal_objectives = res.F  # Pareto 前沿解的目标值

# 获取所有解的 AUC 值，并按 AUC 排序
auc_values = optimal_objectives[:, 0]  # 假设 AUC 是第一个目标（根据 objectives 列表中的顺序）
best_solution_index = np.argmin(auc_values)  # 找到最大 AUC 的解的索引

# 获取对应 AUC 最大解的所有模型选择概率
best_solution = optimal_solutions[best_solution_index]

# 输出当前解的模型概率
print(f"Selected models based on maximum AUC: {best_solution}")

# 在这些模型中选择概率最高的模型
max_prob_model_index = np.argmax(best_solution)  # 选择最大概率的模型索引
selected_model = models[max_prob_model_index]  # 获取模型名称

print(f"Best model with highest probability: {selected_model}")
