In [1]:
from pathlib import Path
from scipy.ndimage import zoom
from scipy.ndimage import find_objects
import torchio as tio
import os
import glob
import re
from configparser import ConfigParser
import nibabel as nib
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms
from typing import Dict, Tuple
import matplotlib.pyplot as plt
from collections import deque
from sklearn.model_selection import KFold
import math
from Fdataset import ACDCDataset, PairwiseAugmentor

# 配置参数
CLASS_MAP = {'NOR':0, 'DCM':1, 'HCM':2, 'MINF':3, 'RV':4}
TARGET_SHAPE = (200, 200, 80)
TARGET_SPACING = 1.25  # mm
AUG_FACTOR = 1  

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class FCN3D(nn.Module):
    """
    3D全卷积网络（FCN），专为医学3D图像分类设计
    特点：结构简单、计算效率高，适合小样本3D医疗图像分类
    """
    def __init__(self, num_classes=5, in_channels=1, dropout_rate=0.2):
        super().__init__()
        
        # 编码器部分（3个下采样模块）
        self.encoder = nn.Sequential(
            # 第1个下采样模块
            nn.Conv3d(in_channels, 32, kernel_size=3, padding=1),
            nn.BatchNorm3d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool3d(kernel_size=2, stride=2),
            
            # 第2个下采样模块
            nn.Conv3d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm3d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool3d(kernel_size=2, stride=2),
            
            # 第3个下采样模块
            nn.Conv3d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm3d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool3d(kernel_size=2, stride=2),
        )
        
        # 全局池化和分类头
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool3d((1, 1, 1)),
            nn.Flatten(),
            nn.Dropout(dropout_rate),
            nn.Linear(128, num_classes)
        )
        
    def forward(self, x):
        # 输入形状: (B, 1, 200, 200, 80)
        x = self.encoder(x)  # 输出形状: (B, 128, 25, 25, 10)
        x = self.classifier(x)  # 输出形状: (B, 5)
        return x

In [None]:
from sklearn.model_selection import StratifiedKFold, StratifiedShuffleSplit
import torch.multiprocessing as mp
import json

if __name__ == '__main__':  # 确保在主模块中设置
    mp.set_start_method('spawn', force=True)

start_fold = 0  # 可修改为需要开始的折数 (0-4)
results_file = '新-3DFCN.json'
CUSTOM_PREFIX = "新-3DFCN"

# 尝试加载已有的结果 - 添加空文件处理
fold_results = []
if os.path.exists(results_file):
    try:
        with open(results_file, 'r') as f:
            file_content = f.read().strip()
            if file_content:  # 检查文件是否非空
                fold_results = json.loads(file_content)
                print(f"Loaded existing results: {fold_results}")
            else:
                print("Results file exists but is empty. Starting fresh.")
    except json.JSONDecodeError:
        print("Warning: Results file contains invalid JSON. Starting fresh.")
        fold_results = []
else:
    print("No existing results file found. Starting fresh.")

# 训练流程修改
def train_epoch(model, loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    all_preds, all_labels = [], []
    
    for inputs, labels in loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1)
        optimizer.step()
        
        running_loss += loss.item()
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())  # 使用extend代替append
        all_labels.extend(labels.cpu().numpy()) # 转换为numpy数组
    
    return running_loss/len(loader), accuracy_score(all_labels, all_preds)

def evaluate(model, loader, criterion, device):
    model.eval()
    val_loss = 0.0
    val_preds, val_labels = [], []
    
    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item() * inputs.size(0)
            
            _, preds = torch.max(outputs, 1)
            val_preds.extend(preds.cpu().numpy())   # 修改为extend
            val_labels.extend(labels.cpu().numpy())  # 修改为extend
    
    return val_loss/len(loader.dataset), accuracy_score(val_labels, val_preds)


# 五折交叉验证修改版
kf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
all_cases = [d for d in Path('心力衰竭/database/training').glob('patient*') if d.is_dir()] + \
            [d for d in Path('心力衰竭/database/testing').glob('patient*') if d.is_dir()]
all_labels = []  # 存储每个病例的标签

# 收集每个病例的标签
for case in all_cases:
    # 创建临时数据集实例（不需要变换）
    _, label = ACDCDataset([case], phase='train')[0]
    all_labels.append(label)
fold_results = []

for fold, (train_val_idx, test_idx) in enumerate(kf.split(all_cases, all_labels)):
    print(f"\n=== Fold {fold+1}/5 ===")
    
    # 划分训练验证集和测试集
    train_val_cases = [all_cases[i] for i in train_val_idx]
    test_cases = [all_cases[i] for i in test_idx]
    
    # 从训练验证集中提取标签用于再分层
    train_val_labels = [all_labels[i] for i in train_val_idx]
    
    # 在训练验证集内部进行分层划分 (75%训练, 25%验证)
    sss = StratifiedShuffleSplit(n_splits=1, test_size=0.25, random_state=42)
    for train_idx, val_idx in sss.split(train_val_cases, train_val_labels):
        train_cases = [train_val_cases[i] for i in train_idx]
        val_cases = [train_val_cases[i] for i in val_idx]
    
    # 创建数据集
    train_dataset = ACDCDataset(train_cases, phase='train')
    val_dataset = ACDCDataset(val_cases, phase='val')    # 从训练集划分的验证集
    test_dataset = ACDCDataset(test_cases, phase='val')  # 独立测试集
    
    # 创建数据加载器
    train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=2)
    val_loader = DataLoader(val_dataset, batch_size=2, shuffle=False, num_workers=1)
    test_loader = DataLoader(test_dataset, batch_size=2, shuffle=False)
    
    # 模型初始化（保持原有实现不变）
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = FCN3D().to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=1e-3)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'max', factor=0.5, patience=5)
    
    # 初始化跟踪变量
    best_acc = 0.0  # 只跟踪最佳准确率
    best_loss = 10
    best_model_path = f"{CUSTOM_PREFIX}_fold{fold}_best.pth"
    final_model_path = f"{CUSTOM_PREFIX}_fold{fold}_last.pth"
    
    for epoch in range(100):
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
        val_loss, val_acc = evaluate(model, val_loader, criterion, device)  # 使用新验证集
        
        scheduler.step(val_acc)
        
        # 动态保存最佳模型（只保留最佳准确率版本）
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), best_model_path)
        
        
        # 早停判断（基于验证损失）
        if val_loss < best_loss:
            best_loss = val_loss
            no_improve = 0
        else:
            no_improve += 1
            if no_improve >= 10:
                print(f"Early stopping at epoch {epoch+1}")
                break
    
    model.load_state_dict(torch.load(best_model_path))
    test_loss, test_acc = evaluate(model, test_loader, criterion, device)
    fold_results.append(test_acc)  # 记录测试集准确率
    print(f"Fold {fold+1} Test Accuracy: {test_acc:.2%}")    
    
    # 确保最终模型保存（即使早停也保存最后达到的epoch）
    torch.save(model.state_dict(), final_model_path)

    with open(results_file, 'w') as f:
        json.dump(fold_results, f)
    print(f"\nCurrent 5-Fold CV Results: {fold_results}")
    print(f"Average Accuracy: {np.mean(fold_results):.2%} (±{np.std(fold_results):.2%})")

# 输出结果（保持原有实现不变）
if os.path.exists(results_file):
    with open(results_file, 'r') as f:
        final_results = json.load(f)
print("\n=== Final Results ===")
print(f"5-Fold CV Results: {final_results}")
print(f"Average Accuracy: {np.mean(final_results):.2%} (±{np.std(final_results):.2%})")

No existing results file found. Starting fresh.

=== Fold 1/5 ===
