In [2]:
import os
import json
import time
import csv
import numpy as np
from multiprocessing import freeze_support
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import LinearLR, CosineAnnealingLR, SequentialLR
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
from sklearn.metrics import (
    roc_auc_score, accuracy_score, f1_score,
    precision_score, recall_score, confusion_matrix
)
from sklearn.model_selection import train_test_split, StratifiedKFold
from monai.data import Dataset
from models import resnet
from datasets.ADNI import ADNI, ADNI_transform
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt

# 加载配置文件
def load_config(path="config/config.json"):
    with open(path) as f:
        return json.load(f)

# 配置类
class Config:
    def __init__(self, d):
        for k, v in d.items(): setattr(self, k, v)
        self.weight_decay = getattr(self, 'weight_decay', 1e-4)
        self.dropout_rate = getattr(self, 'dropout_rate', 0.5)
        self.n_splits = getattr(self, 'n_splits', 5)
        self.print_config()

    def print_config(self):
        print("Configuration Parameters:\n" + "=" * 40)
        for k, v in vars(self).items():
            print(f"{k}: {v}")
        print("=" * 40)

In [3]:
def generate_model(model_type='resnet', model_depth=50,
                   input_W=224, input_H=224, input_D=224,
                   resnet_shortcut='B',
                   pretrain_path='config/pretrain/resnet_50_23dataset.pth',
                   nb_class=2,  # 修改为2分类输出
                   dropout_rate=0.5,
                   device=torch.device('cpu')):
    assert model_type == 'resnet'
    assert model_depth in [10, 18, 34, 50, 101, 152, 200]

    fn = {
        10: resnet.resnet10, 18: resnet.resnet18, 34: resnet.resnet34,
        50: resnet.resnet50, 101: resnet.resnet101,
        152: resnet.resnet152, 200: resnet.resnet200
    }[model_depth]

    net = fn(
        sample_input_W=input_W, sample_input_H=input_H, sample_input_D=input_D,
        shortcut_type=resnet_shortcut, no_cuda=True, num_seg_classes=1
    )

    fc_in = {10: 256, 18: 512, 34: 512, 50: 2048, 101: 2048, 152: 2048, 200: 2048}[model_depth]
    net.conv_seg = nn.Sequential(
        nn.AdaptiveAvgPool3d((1, 1, 1)),
        nn.Flatten(),
        nn.Dropout(p=dropout_rate),
        nn.Linear(fc_in, nb_class)  # 输出维度与nb_class=2匹配
    )

    net.to(device)
    sd = net.state_dict()
    if os.path.isfile(pretrain_path):
        ckpt = torch.load(pretrain_path, map_location=device)
        state = ckpt.get('state_dict', ckpt)
        pd = {k: v for k, v in state.items() if k in sd}
        sd.update(pd)
        net.load_state_dict(sd)
        print("Loaded pretrained weights.")
    else:
        print(f"[Warning] no pretrained file at {pretrain_path}")
    return net

In [4]:
def calculate_metrics(y_true, y_pred, y_score):
    return {
        'acc': accuracy_score(y_true, y_pred),
        'auc': roc_auc_score(y_true, y_score),
        'f1': f1_score(y_true, y_pred, zero_division=0),
        'precision': precision_score(y_true, y_pred, zero_division=0),
        'recall': recall_score(y_true, y_pred, zero_division=0),
        'cm': confusion_matrix(y_true, y_pred)
    }

# 加载数据（首次运行时执行，后续调试可注释）
cfg = Config(load_config())
dataset = ADNI(cfg.label_file, cfg.mri_dir, cfg.task, cfg.augment).data_dict
tr_val, test_data = train_test_split(dataset, test_size=0.2, random_state=42, stratify=[d['label'] for d in dataset])
labels = [d['label'] for d in tr_val]  # 用于分层交叉验证

Configuration Parameters:
dataroot: C:\Users\dongz\Desktop\adni_dataset
label_file: C:\Users\dongz\Desktop\adni_dataset\ADNI_902.csv
mri_dir: C:\Users\dongz\Desktop\adni_dataset\MRI_GM_113_137_113
task: ADCN
augment: False
split_ratio: 0.2
seed: 42
num_epochs: 50
batch_size: 16
lr: 1e-05
checkpoint_dir: ./checkpoints
log_file: training_log1.csv
model_type: resnet
model_depth: 50
input_W: 80
input_H: 98
input_D: 80
resnet_shortcut: B
pretrain_path: config/pretrain/resnet_50_23dataset.pth
nb_class: 2
n_splits: 2
dropout_rate: 0.5
weight_decay: 0.0001


In [5]:
def train():
    torch.manual_seed(42)
    np.random.seed(42)
    cfg = Config(load_config())
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    print("Using device:", device)
    
    # 初始化日志（仅首次运行时创建CSV，后续追加）
    writer = SummaryWriter(cfg.checkpoint_dir)
    csv_path = os.path.join(cfg.checkpoint_dir, 'cv_results.csv')
    if not os.path.exists(csv_path):
        with open(csv_path, 'w', newline='') as f:
            csv.writer(f).writerow(
                ['fold', 'epoch',
                 'tr_acc', 'tr_auc', 'tr_loss',
                 'vl_acc', 'vl_auc', 'vl_loss', 'lr']
            )
    
    # 分层交叉验证（可单独调试某一折）
    kf = StratifiedKFold(n_splits=cfg.n_splits, shuffle=True, random_state=42)
    for fold, (train_idx, val_idx) in enumerate(kf.split(tr_val, labels), 1):
        print(f"\n=== Fold {fold}/{cfg.n_splits} ===")
        train_data = [tr_val[i] for i in train_idx]
        val_data = [tr_val[i] for i in val_idx]
        
        # 数据预处理（可单独调试数据转换）
        tf_tr, tf_vt = ADNI_transform(augment=cfg.augment)
        ds_tr = Dataset(data=train_data, transform=tf_tr)
        ds_vl = Dataset(data=val_data, transform=tf_vt)
        loader_tr = DataLoader(ds_tr, batch_size=cfg.batch_size, shuffle=True, num_workers=4, pin_memory=True)
        loader_vl = DataLoader(ds_vl, batch_size=cfg.batch_size, shuffle=False, num_workers=2, pin_memory=True)
        
        # 模型与优化器初始化（可调试模型结构）
        model = generate_model(
            model_type=cfg.model_type, model_depth=cfg.model_depth,
            input_W=cfg.input_W, input_H=cfg.input_H, input_D=cfg.input_D,
            resnet_shortcut=cfg.resnet_shortcut,
            pretrain_path=cfg.pretrain_path,
            nb_class=2,
            dropout_rate=cfg.dropout_rate,
            device=device
        )
        
        # 类别权重与损失函数（可调试权重计算）
        class_counts = np.bincount([d['label'] for d in train_data])
        class_weights = torch.tensor(1.0 / class_counts, dtype=torch.float32).to(device)
        criterion = nn.CrossEntropyLoss(weight=class_weights)
        optimizer = torch.optim.Adam(model.parameters(), lr=cfg.lr, weight_decay=cfg.weight_decay)
        
        # 学习率调度器（可调试超参数）
        warmup_epochs = max(1, min(10, int(cfg.num_epochs * 0.1)))
        total_epochs = cfg.num_epochs
        cosine_epochs = total_epochs - warmup_epochs
        min_lr = cfg.lr * 1e-4
        warmup_sched = LinearLR(optimizer, start_factor=0.1, end_factor=1.0, total_iters=warmup_epochs)
        cosine_sched = CosineAnnealingLR(optimizer, T_max=cosine_epochs, eta_min=min_lr)
        scheduler = SequentialLR(optimizer, schedulers=[warmup_sched, cosine_sched], milestones=[warmup_epochs])
        
        # 训练循环（可逐epoch调试）
        best_metric = -np.inf
        for epoch in range(1, cfg.num_epochs + 1):
            t0 = time.time()
            model.train()
            loss_sum = 0
            y_true, y_pred, y_score = [], [], []
            
            # 训练批次循环（可调试单批次数据）
            for batch in loader_tr:
                x = batch['MRI'].to(device)
                y = batch['label'].to(device).squeeze().long()
                out = model(x)
                loss = criterion(out, y)
                loss_sum += loss.item()
                optimizer.zero_grad()
                loss.backward()
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
                optimizer.step()
                
                # 记录预测结果（可调试预测逻辑）
                probs = torch.softmax(out, 1)[:, 1].detach().cpu().numpy()
                preds = out.argmax(1).detach().cpu().numpy()
                y_true.extend(y.cpu().numpy())
                y_score.extend(probs)
                y_pred.extend(preds)
            
            # 训练集指标（可单独计算指标）
            tr_metrics = calculate_metrics(y_true, y_pred, y_score)
            tr_loss = loss_sum / len(loader_tr)
            
            # 验证集评估（可单独调试验证逻辑）
            model.eval()
            v_true, v_pred, v_score = [], [], []
            vl_loss = 0.0
            with torch.no_grad():
                for batch in loader_vl:
                    x = batch['MRI'].to(device)
                    y = batch['label'].to(device).squeeze().long()
                    out = model(x)
                    loss = nn.CrossEntropyLoss()(out, y)
                    vl_loss += loss.item()
                    
                    probs = torch.softmax(out, 1)[:, 1].cpu().numpy()
                    v_pred.extend(out.argmax(1).cpu().numpy())
                    v_true.extend(y.cpu().numpy())
                    v_score.extend(probs)
            
            vl_metrics = calculate_metrics(v_true, v_pred, v_score)
            vl_loss = vl_loss / len(loader_vl)
            lr_now = scheduler.get_last_lr()[0]
            scheduler.step()
            
            # 日志记录（可注释以加速调试）
            writer.add_scalar(f'fold{fold}/train/acc', tr_metrics['acc'], epoch)
            with open(csv_path, 'a', newline='') as f:
                csv.writer(f).writerow([fold, epoch, *tr_metrics.values(), tr_loss, *vl_metrics.values(), vl_loss, lr_now])
            
            print(f"Fold{fold} Ep{epoch:03d} | tr_acc={tr_metrics['acc']:.4f} vl_acc={vl_metrics['acc']:.4f} lr={lr_now:.7f}")
            
            # 模型保存（调试时可注释）
            current_metric = 0.3 * vl_metrics['auc'] + 0.7 * vl_metrics['acc']
            if current_metric > best_metric:
                best_metric = current_metric
                torch.save({...}, os.path.join(cfg.checkpoint_dir, f"best_fold{fold}.pth"))
    
    writer.close()

In [None]:
def test_models(checkpoint_dir, test_data):
    """测试函数（可单独调试测试逻辑）"""
    cfg = Config(load_config())
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    test_transforms = ADNI_transform(augment=False)[1]
    test_ds = Dataset(data=test_data, transform=test_transforms)
    test_loader = DataLoader(test_ds, batch_size=cfg.batch_size, shuffle=False)
    
    # 测试循环（可调试单模型测试）
    all_fold_probs = []
    all_fold_labels = []
    for fold in range(1, cfg.n_splits + 1):
        model = generate_model(..., device=device)
        checkpoint_path = os.path.join(checkpoint_dir, f"best_fold{fold}.pth")
        model.load_state_dict(torch.load(checkpoint_path)['model_state_dict'])
        model.eval()
        
        with torch.no_grad():
            for batch in test_loader:
                x = batch['MRI'].to(device)
                y = batch['label'].squeeze().cpu().numpy()
                out = model(x)
                probs = torch.softmax(out, dim=1)[:, 1].cpu().numpy()
                all_fold_probs.extend(probs)
                all_fold_labels.extend(y)
    
    # 绘制ROC曲线（可单独调试可视化）
    plt.figure(figsize=(10, 8))
    fpr, tpr, _ = roc_curve(all_fold_labels, all_fold_probs)
    roc_auc = auc(fpr, tpr)
    plt.plot(fpr, tpr, lw=2, label=f'Mean ROC (AUC={roc_auc:.2f})')
    plt.savefig(os.path.join(checkpoint_dir, 'test_roc_curves.png'))
    plt.close()

# 主程序（调试时可分阶段运行）
if __name__ == '__main__':
    freeze_support()  # Windows系统需要，Linux/macOS可注释
    train()  # 训练代码（首次运行时执行）
    # test_models(cfg.checkpoint_dir, test_data)  # 测试代码（训练完成后执行）
    pass  # 调试时先注释主程序，手动调用函数

Configuration Parameters:
dataroot: C:\Users\dongz\Desktop\adni_dataset
label_file: C:\Users\dongz\Desktop\adni_dataset\ADNI_902.csv
mri_dir: C:\Users\dongz\Desktop\adni_dataset\MRI_GM_113_137_113
task: ADCN
augment: False
split_ratio: 0.2
seed: 42
num_epochs: 50
batch_size: 16
lr: 1e-05
checkpoint_dir: ./checkpoints
log_file: training_log1.csv
model_type: resnet
model_depth: 50
input_W: 80
input_H: 98
input_D: 80
resnet_shortcut: B
pretrain_path: config/pretrain/resnet_50_23dataset.pth
nb_class: 2
n_splits: 2
dropout_rate: 0.5
weight_decay: 0.0001
Using device: cpu

=== Fold 1/2 ===


  m.weight = nn.init.kaiming_normal(m.weight, mode='fan_out')
  ckpt = torch.load(pretrain_path, map_location=device)


Loaded pretrained weights.
