In [2]:
# Standard library imports
import os
import time
import copy
import pickle

# Third-party library imports
import numpy as np
import pandas as pd
from sklearn.metrics import (
    accuracy_score, f1_score, precision_score, recall_score,
    roc_auc_score, average_precision_score, cohen_kappa_score
)
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.preprocessing import MinMaxScaler, label_binarize

# PyTorch imports
import torch
import torch.nn.functional as F

# Project-specific imports (MoGoNet)
from models.models_later_FNN import init_model_dict, init_optim
from models.train_test_FNN import (
    prepare_trte_data, gen_trte_adj_mat, train_epoch, test_epoch
)
from models.utils import (
    save_model_dict, one_hot_tensor, cal_sample_weight,
    gen_adj_mat_tensor, gen_test_adj_mat_tensor, cal_adj_mat_parameter
)

In [3]:
import optuna
import numpy as np
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score

from datetime import datetime, timedelta
from tqdm import tqdm

In [4]:
import importlib
from models.models_later_FNN import init_model_dict, init_optim
from models.train_test_FNN import prepare_trte_data, gen_trte_adj_mat, train_epoch, test_epoch

# 修改代码后，重新加载模块
importlib.reload(importlib.import_module("models.models_FNN"))
importlib.reload(importlib.import_module("models.train_test_FNN"))

# 重新导入
from models.models_later_FNN import init_model_dict, init_optim
from models.train_test_FNN import prepare_trte_data, gen_trte_adj_mat, train_epoch, test_epoch

In [27]:
def objective(trial):
    """
    Optuna的目标函数，用于超参数优化
    """
    # 超参数搜索空间定义
    lr_e = trial.suggest_categorical('lr_e', [1e-4, 5e-4])
    lr_c = trial.suggest_categorical('lr_c',  [1e-2, 2e-2, 5e-2, 1e-1 ])
    dropout_rate = trial.suggest_categorical('dropout_rate', [  0.5, 0.7,0.8])
    # dropout_rate = trial.suggest_float(
    # 'dropout_rate', 
    # low=0.75, 
    # high=0.85, 
    # step=0.05  # 可选步长（如0.01表示按0.65,0.66,...,0.85搜索）
    # )
    # dim_hvcdn = trial.suggest_categorical('dim_hvcdn', [27, 100])
    # dropout_vcdn = trial.suggest_categorical('dropout_vcdn', [0.0,  0.1])
    # dim_he_list 的搜索空间 - 三个隐藏层
    # dim_he_1 = trial.suggest_int('dim_he_1', 200, 300, step=100)
    # dim_he_2 = trial.suggest_int('dim_he_2', 100, 200, step=100)
    # dim_he_3 = trial.suggest_categorical('dim_he_3',  [50, 100])
    # dim_he_list = [dim_he_1, dim_he_2, dim_he_3]
    
    # dim_options = [
    #     [100, 50],
    #     [200, 100]
    # ]
    # # 选择索引
    # dim_index = trial.suggest_categorical('dim_he_index', [0, 1])
    # dim_he_list = dim_options[dim_index]

    # 确保隐藏层维度是递减的
    # if not (dim_he_1 >= dim_he_2 >= dim_he_3):
    #     return 0.0  # 返回较差的分数
    
    # 设置固定参数
    # dropout_rate = 0.1
    dim_he_list = [300, 100, 100]
    lr_e_pretrain = 1e-3
    num_view = 3
    scenario = 2
    ch_sig_prop = "高"
    ch_sig_level = "中"
    signal_prop = "high"
    signal_level = "mid"
    num_epoch_pretrain = 20
    num_epoch = 250
    num_class = 3  # 假设是3分类问题
    dim_hvcdn = 100  # VCDN的隐藏层维度
    cuda = torch.cuda.is_available()
    
    # 存储所有fold的准确率
    fold_accuracies = []
    
    # 交叉验证
    kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=2023110400)
    
    try:
        for batch_num in range(1, 3):  # 只用一个batch进行调参
            # 数据路径
            sim_path = f"F:/r-env/中期/方法/模拟试验-模拟数据/情景{scenario}/k-3/信号比例-{ch_sig_prop}/信号水平-{ch_sig_level}/sim{batch_num}"
            
            # 加载数据
            omics1 = np.loadtxt(f"{sim_path}/s{scenario}-k3-{signal_prop}-{signal_level}-batch{batch_num}-mrna.csv", delimiter=',')
            omics2 = np.loadtxt(f"{sim_path}/s{scenario}-k3-{signal_prop}-{signal_level}-batch{batch_num}-meth.csv", delimiter=',')
            omics3 = np.loadtxt(f"{sim_path}/s{scenario}-k3-{signal_prop}-{signal_level}-batch{batch_num}-mutate.csv", delimiter=',')
            
            labels = np.loadtxt(f"F:/r-env/中期/方法/模拟试验-模拟数据/情景{scenario}/k-3/label-k3.csv", delimiter=',')

            # 数据预处理
            # scaler = MinMaxScaler()
            # omics1 = scaler.fit_transform(omics1)
            # omics2 = scaler.fit_transform(omics2)
            
            omics_data = [omics1, omics2, omics3]
            # omics = np.concatenate((omics1_scaled, omics2_scaled, omics3), axis=1)
            
            # 假设labels是从某处加载的，这里需要您提供实际的标签加载代码
            # labels = np.loadtxt(f"{sim_path}/labels.csv", delimiter=',')  # 请根据实际情况修改
            
            for fold_num, (train_idx_raw, test_idx) in enumerate(kfold.split(omics1, labels)):
                if fold_num >= 4:  # 为了加速调参，只使用前3个fold
                    break
                    
                # 将train_idx_raw按3:1划分为train_idx和val_idx
                train_idx, val_idx = train_test_split(
                    train_idx_raw, 
                    test_size=0.25,  # 25%作为验证集，75%作为训练集 (3:1比例)
                    random_state=42,
                    stratify=labels[train_idx_raw]  # 保持标签分布
                )
                
                data_tr_list = []
                data_val_list = []
                data_te_list = []
                data_trval_list = []
                data_trte_list = []

                # 通过循环处理每个omics数据集
                for omic in omics_data:
                    train_X = omic[train_idx]
                    val_X = omic[val_idx]
                    test_X = omic[test_idx]
                    data_tr_list.append(torch.FloatTensor(train_X).cuda() )
                    data_val_list.append(torch.FloatTensor(val_X).cuda() )
                    data_te_list.append(torch.FloatTensor(test_X).cuda() )
                    data_trval_list.append(torch.FloatTensor(np.concatenate((train_X, val_X), axis=0)).cuda() )
                    data_trte_list.append(torch.FloatTensor(np.concatenate((train_X, test_X), axis=0)).cuda() )


                # # 获取对应的数据
                # train_X, val_X, test_X = omics[train_idx], omics[val_idx], omics[test_idx]
                train_y, val_y, test_y = labels[train_idx], labels[val_idx], labels[test_idx]
                    
                # # 准备数据
                # data_tr_list = [torch.FloatTensor(train_X)]
                # data_trval_list = [torch.FloatTensor(np.concatenate((train_X, val_X), axis=0))]
                # data_trte_list = [torch.FloatTensor(np.concatenate((train_X, test_X), axis=0))]

                # if cuda:
                #     data_tr_list[0] = data_tr_list[0].cuda()
                #     data_trval_list[0] = data_trval_list[0].cuda()
                #     data_trte_list[0] = data_trte_list[0].cuda()

                num_tr = data_tr_list[0].shape[0]
                num_trval = data_trval_list[0].shape[0]
                num_trte = data_trte_list[0].shape[0]

                labels_trval = np.concatenate((train_y, val_y))
                labels_trte = np.concatenate((train_y, test_y))

                trval_idx = {
                    "tr": list(range(num_tr)),
                    "te": list(range(num_tr, num_trval))
                }
                
                trte_idx = {
                    "tr": list(range(num_tr)),
                    "te": list(range(num_tr, num_trte))
                }
                
                labels_tr_tensor = torch.LongTensor(labels_trval[trval_idx["tr"]])
                onehot_labels_tr_tensor = one_hot_tensor(labels_tr_tensor, num_class)
                sample_weight_tr = torch.FloatTensor(cal_sample_weight(labels_trval[trval_idx["tr"]], num_class))
                
                if cuda:
                    labels_tr_tensor = labels_tr_tensor.cuda()
                    onehot_labels_tr_tensor = onehot_labels_tr_tensor.cuda()
                    sample_weight_tr = sample_weight_tr.cuda()
                
                # 初始化模型
                dim_list = [x.shape[1] for x in data_tr_list]
                model_dict = init_model_dict(num_view, num_class, dim_list, dim_he_list, dim_hvcdn, dropout_rate, dropout_vcdn = 0)
                optim_dict = init_optim(num_view, model_dict, lr_e_pretrain, lr_c)
                
                for model in model_dict.values():
                    if cuda:
                        model.cuda()
                
                # 预训练
                for epoch in range(num_epoch_pretrain):
                    train_epoch(data_tr_list, labels_tr_tensor, 
                               onehot_labels_tr_tensor, sample_weight_tr, model_dict, optim_dict, train_VCDN=False)
                
                # 主训练
                optim_dict = init_optim(num_view, model_dict, lr_e, lr_c)
                
                # 早停机制
                best_accuracy = 0.0
                patience = 20
                no_improvement_count = 0
                best_model = None

                for epoch in range(num_epoch):
                    train_epoch(data_tr_list, labels_tr_tensor, 
                               onehot_labels_tr_tensor, sample_weight_tr, model_dict, optim_dict)
                    
                    # 每10个epoch验证一次
                    if epoch % 2 == 0:
                        val_prob = test_epoch(data_trval_list, trval_idx["te"], model_dict) ####
                        predictions = np.argmax(val_prob, axis=1)
                        accuracy = accuracy_score(val_y, predictions)
                        
                        if accuracy > best_accuracy:
                            best_accuracy = accuracy
                            no_improvement_count = 0
                            best_model = copy.deepcopy(model_dict)
                        else:
                            no_improvement_count += 2
                            
                        if no_improvement_count >= patience:
                            break
                
                # 最终测试
                te_prob = test_epoch(data_trte_list, trte_idx["te"], best_model)
                predictions = np.argmax(te_prob, axis=1)
                fold_accuracy = accuracy_score(test_y, predictions)
                fold_accuracies.append(fold_accuracy)
                
                # 为了加速调参，如果当前fold表现很差，可以提前结束
                if fold_accuracy < 0.3:  # 阈值可以根据实际情况调整
                    break
    
    except Exception as e:
        print(f"Trial failed with error: {e}")
        return 0.0
    
    # 返回平均准确率
    if len(fold_accuracies) == 0:
        return 0.0
    
    mean_accuracy = np.mean(fold_accuracies)
    return mean_accuracy


In [28]:
def optimize_hyperparameters(n_trials=20,timeout=3000 ,n_jobs=1):
    """
    运行超参数优化
    
    Args:
        n_trials: 优化试验次数
        n_jobs: 并行作业数量
    
    Returns:
        study: Optuna study对象，包含优化结果
    """
    # 创建study
    study = optuna.create_study(
        direction='maximize',  # 最大化准确率
        sampler=optuna.samplers.TPESampler(seed=42),  # 使用TPE采样器
        pruner=optuna.pruners.MedianPruner(  # 使用中位数剪枝器
            n_startup_trials=10,  # 前10个trial不进行剪枝
            n_warmup_steps=30,    # 前30步不进行剪枝
            interval_steps=10     # 每10步检查一次是否需要剪枝
        )
    )
    
    # 开始优化
    study.optimize(objective, n_trials=n_trials,  timeout=timeout, n_jobs=n_jobs)
    
    return study

In [29]:
def print_optimization_results(study):
    """
    打印优化结果
    
    Args:
        study: Optuna study对象
    """
    print("优化完成!")
    print(f"最佳试验编号: {study.best_trial.number}")
    print(f"最佳准确率: {study.best_value:.4f}")
    print("最佳超参数:")
    for key, value in study.best_params.items():
        print(f"  {key}: {value}")
    
    # 保存优化历史
    df = study.trials_dataframe()
    # df.to_csv('optuna_optimization_history.csv', index=False)
    # print("优化历史已保存到 'optuna_optimization_history.csv'")


In [30]:
if __name__ == "__main__":
    # 记录开始时间
    start_time = time.time()
    start_datetime = datetime.now()
    
    print("开始超参数优化...")
    print(f"开始时间: {start_datetime.strftime('%Y-%m-%d %H:%M:%S')}")
    
    # 运行超参数优化
    study = optimize_hyperparameters(n_trials=25, timeout=3000, n_jobs=1)
    
    # 记录结束时间并计算耗时
    end_time = time.time()
    end_datetime = datetime.now()
    elapsed_time = end_time - start_time
    
    print(f"结束时间: {end_datetime.strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"总耗时: {elapsed_time:.2f} 秒")
    print(f"总耗时: {str(timedelta(seconds=int(elapsed_time)))}")
    
    # 打印结果
    print_optimization_results(study)
    
    # 打印详细的时间信息
    print("\n" + "="*50)
    print("时间统计:")
    print(f"开始时间: {start_datetime.strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"结束时间: {end_datetime.strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"总耗时: {elapsed_time:.2f} 秒")
    print(f"总耗时: {str(timedelta(seconds=int(elapsed_time)))}")
    if study.trials:
        print(f"平均每个trial耗时: {elapsed_time/len(study.trials):.2f} 秒")
    print("="*50)

开始超参数优化...
开始时间: 2025-08-20 19:08:07


[I 2025-08-20 19:08:07,926] A new study created in memory with name: no-name-11a539e0-c121-4095-9bfc-f091535f18b9
[I 2025-08-20 19:08:17,354] Trial 0 finished with value: 0.855 and parameters: {'lr_e': 0.0005, 'lr_c': 0.01, 'dropout_rate': 0.7}. Best is trial 0 with value: 0.855.
[I 2025-08-20 19:08:31,625] Trial 1 finished with value: 0.8412499999999999 and parameters: {'lr_e': 0.0001, 'lr_c': 0.01, 'dropout_rate': 0.8}. Best is trial 0 with value: 0.855.
[I 2025-08-20 19:08:42,449] Trial 2 finished with value: 0.84 and parameters: {'lr_e': 0.0001, 'lr_c': 0.01, 'dropout_rate': 0.7}. Best is trial 0 with value: 0.855.
[I 2025-08-20 19:08:51,599] Trial 3 finished with value: 0.84625 and parameters: {'lr_e': 0.0005, 'lr_c': 0.02, 'dropout_rate': 0.7}. Best is trial 0 with value: 0.855.
[I 2025-08-20 19:09:02,529] Trial 4 finished with value: 0.8425 and parameters: {'lr_e': 0.0001, 'lr_c': 0.01, 'dropout_rate': 0.7}. Best is trial 0 with value: 0.855.
[I 2025-08-20 19:09:13,160] Trial 5 

结束时间: 2025-08-20 19:12:32
总耗时: 264.37 秒
总耗时: 0:04:24
优化完成!
最佳试验编号: 0
最佳准确率: 0.8550
最佳超参数:
  lr_e: 0.0005
  lr_c: 0.01
  dropout_rate: 0.7

时间统计:
开始时间: 2025-08-20 19:08:07
结束时间: 2025-08-20 19:12:32
总耗时: 264.37 秒
总耗时: 0:04:24
平均每个trial耗时: 10.57 秒


In [31]:
optuna.visualization.plot_optimization_history(study)


In [32]:
optuna.visualization.plot_param_importances(study)

In [33]:
optuna.visualization.plot_slice(study)

In [34]:
import pickle
import optuna

# 假设你已经创建并运行了 study
# study = optuna.create_study(direction="maximize")
# study.optimize(objective, n_trials=50)  # 运行优化

# 保存 study 到文件
with open("optuna_study_later_FNN-2.pkl", "wb") as f:
    pickle.dump(study, f)

print("Study 保存成功！")

Study 保存成功！


In [31]:
print_optimization_results(study)

优化完成!
最佳试验编号: 22
最佳准确率: 0.8013
最佳超参数:
  lr_e: 0.001
  lr_c: 0.01
  dropout_rate: 0.7
  dim_hvcdn: 300
  dropout_vcdn: 0.0
  dim_he_1: 400
  dim_he_2: 300
  dim_he_3: 100
