In [1]:
import nibabel as nib
import matplotlib.pyplot as plt

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision.transforms import ToTensor
import torchvision.transforms as transforms
import numpy as np
import torchvision.models as models

In [3]:
# 自定义数据集类
class SpineWeb15(Dataset):
    def __init__(self, data_path, transform=None):
        self.data_path = data_path
        self.transform = transform
        
    def __len__(self):
        return len(self.data_path)
    
    def __getitem__(self, index):
        metadata = np.load(self.data_path+str(index)+".npy")
        image = metadata[0]
        label = metadata[1]

        image_data = torch.from_numpy(image).float()
        label_data = torch.from_numpy(label).float()
        
        if self.transform:
            image_data = self.transform(image_data)
        return image_data, label_data



In [4]:
# U-Net structure

class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )
    
    def forward(self, x):
        return self.double_conv(x)


class UNet(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(UNet, self).__init__()
        
        # 编码器部分
        self.conv1 = DoubleConv(in_channels, 64)
        self.maxpool1 = nn.MaxPool2d(kernel_size=2, stride=2)   #64
        self.conv2 = DoubleConv(64, 128)
        self.maxpool2 = nn.MaxPool2d(kernel_size=2, stride=2)   #32
        self.conv3 = DoubleConv(128, 256)
        self.maxpool3 = nn.MaxPool2d(kernel_size=2, stride=2)   #16
        self.conv4 = DoubleConv(256, 512)
        self.maxpool4 = nn.MaxPool2d(kernel_size=2, stride=2)   #8
        
        # 解码器部分
        self.conv5 = DoubleConv(512, 1024)
        self.upconv6 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)   #16
        self.conv6 = DoubleConv(1024, 512)
        self.upconv7 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)    #32
        self.conv7 = DoubleConv(512, 256)
        self.upconv8 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)    #64
        self.conv8 = DoubleConv(256, 128)
        self.upconv9 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)     #128 feature map size
        self.conv9 = DoubleConv(64, 64)
        
        # 输出层
        self.output = nn.Conv2d(64, out_channels, kernel_size=1)
    
    def forward(self, x):
        # 编码器
        c1 = self.conv1(x)
        p1 = self.maxpool1(c1)
        c2 = self.conv2(p1)
        p2 = self.maxpool2(c2)
        c3 = self.conv3(p2)
        p3 = self.maxpool3(c3)
        c4 = self.conv4(p3)
        p4 = self.maxpool4(c4)
        
        # 解码器
        u5 = self.conv5(p4)
        u5 = self.upconv6(u5)
        u5 = torch.cat((u5, c4), dim=1)
        c6 = self.conv6(u5)
        u6 = self.upconv7(c6)
        u6 = torch.cat((u6, c3), dim=1)
        c7 = self.conv7(u6)
        u7 = self.upconv8(c7)
        u7 = torch.cat((u7, c2), dim=1)
        c8 = self.conv8(u7)
        u8 = self.upconv9(c8)
        c9 = self.conv9(u8)
        
        # 输出层
        output = self.output(c9)
        return output

# 创建UNet模型实例
model = UNet(in_channels=1, out_channels=1)

In [7]:
# -----------------------------
# UNet All Set
# -----------------------------

# empty the cache for model and training process
torch.cuda.empty_cache()

# set the hyper parameters and the paths 设置超参数和路径
data_path = "./dataset/Spineweb_dataset15/processed/"

#hyper parameters set
batch_size = 32
num_epochs = 200
learning_rate = 0.0001
confidence = 0.5
in_channels = 1  # according to demand to adjust the number of channels 根据实际情况修改通道数
out_channels = 1  # according to demand to adjust the number of channels 根据实际情况修改通道数

# 创建数据集和数据加载器
transform = transforms.Compose([
    transforms.ToPILImage(),  # 转换为 PIL 图像对象
    transforms.Resize((128, 128)),  # 调整大小为 128x128
    transforms.ToTensor()  # 转换为张量
])
dataset = SpineWeb15(data_path=data_path,transform=transform)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# 创建模型和优化器
model = UNet(in_channels, out_channels)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.BCEWithLogitsLoss()  # 二分类任务可以使用BCEWithLogitsLoss

# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# check the parameters of the model could be trained 检查模型的参数是否设置为可训练
# for name, param in model.named_parameters():
#     print(f"Parameter: {name}, Requires Grad: {param.requires_grad}")

best_loss = 100.0
best_IoU = 0.0
best_Dice = 0.0

# 训练模型
for epoch in range(num_epochs):
    # torch.cuda.empty_cache()
    model.train()
    train_loss = 0
    total_iou = 0
    total_dice = 0

    for images, labels in dataloader:
        images = images.to(device)
        labels = labels.to(device)
    
        optimizer.zero_grad()

        outputs = model(images)
        # squeeze the dimension that pytorch add for channels
        outputs = outputs.squeeze(1)     # tensor size ([16,128,128])
        # process the result, bigger than 0.5 is reliable
        predicted = (outputs > confidence).float()     # tensor size ([16,128,128])

        intersection = torch.logical_and(predicted, labels).sum((1, 2))
        union = torch.logical_or(predicted, labels).sum((1, 2))
        iou = (intersection / (union + 1e-7)).mean()  # 平均每个样本的IoU
        dice = (2 * intersection / (predicted.sum((1, 2)) + labels.sum((1, 2)) + 1e-7)).mean()  # 平均每个样本的Dice Coefficient

        loss = criterion(outputs, labels)

        train_loss += loss.item()
        total_iou += iou.item()
        total_dice += dice.item()

        loss.backward()
        optimizer.step()

    avg_loss = train_loss / len(dataloader)
    avg_iou = total_iou / len(dataloader)
    avg_dice = total_dice / len(dataloader)
    
    # get the best IoU and Dice
    best_loss = best_loss if best_loss < avg_loss else avg_loss
    best_IoU = best_IoU if best_IoU > avg_iou else avg_iou
    best_Dice = best_Dice if best_Dice > avg_dice else avg_dice

    print(f"Epoch [{epoch+1}/{num_epochs}], Avg Loss: {avg_loss:.4f}, Avg IoU: {avg_iou:.4f}, Avg Dice: {avg_dice:.4f}")
print("-----------------------------------------")
print(f"Best Loss: {best_loss:.4f}, Best IoU: {best_IoU:.4f}, Best Dice: {best_Dice:.4f}")
# empty the cache for model and training process
torch.cuda.empty_cache()

Epoch [1/200], Avg Loss: 0.6508, Avg IoU: 0.0154, Avg Dice: 0.0296
Epoch [2/200], Avg Loss: 0.6330, Avg IoU: 0.0323, Avg Dice: 0.0592
Epoch [3/200], Avg Loss: 0.6157, Avg IoU: 0.0612, Avg Dice: 0.1043
Epoch [4/200], Avg Loss: 0.5860, Avg IoU: 0.1917, Avg Dice: 0.2688
Epoch [5/200], Avg Loss: 0.5525, Avg IoU: 0.2349, Avg Dice: 0.3161
Epoch [6/200], Avg Loss: 0.5164, Avg IoU: 0.2903, Avg Dice: 0.3766
Epoch [7/200], Avg Loss: 0.4816, Avg IoU: 0.3183, Avg Dice: 0.4107
Epoch [8/200], Avg Loss: 0.4594, Avg IoU: 0.3096, Avg Dice: 0.3928
Epoch [9/200], Avg Loss: 0.4259, Avg IoU: 0.2971, Avg Dice: 0.3559
Epoch [10/200], Avg Loss: 0.4053, Avg IoU: 0.2733, Avg Dice: 0.3252
Epoch [11/200], Avg Loss: 0.3861, Avg IoU: 0.3533, Avg Dice: 0.4245
Epoch [12/200], Avg Loss: 0.3717, Avg IoU: 0.4178, Avg Dice: 0.4885
Epoch [13/200], Avg Loss: 0.3581, Avg IoU: 0.4352, Avg Dice: 0.5008
Epoch [14/200], Avg Loss: 0.3517, Avg IoU: 0.2999, Avg Dice: 0.3499
Epoch [15/200], Avg Loss: 0.3415, Avg IoU: 0.4234, Avg Di

In [8]:
# -----------------------------
# UNet Train Set
# -----------------------------

def calculate_iou(prediction, target):
    intersection = torch.logical_and(prediction, target).sum()
    union = torch.logical_or(prediction, target).sum()
    iou = intersection / union
    return iou.item()

def calculate_dice(prediction, target):
    intersection = torch.logical_and(prediction, target).sum()
    dice = (2 * intersection) / (prediction.sum() + target.sum())
    return dice.item()

# empty the cache for model and training process
torch.cuda.empty_cache()

# set the hyper parameters and the paths 设置超参数和路径
data_path = "./dataset/Spineweb_dataset15/train/"
test_path = "./dataset/Spineweb_dataset15/test/"

#hyper parameters set
batch_size = 32
num_epochs = 10
learning_rate = 0.0001
confidence = 0.5
in_channels = 1  # according to demand to adjust the number of channels 根据实际情况修改通道数
out_channels = 1  # according to demand to adjust the number of channels 根据实际情况修改通道数

# 创建数据集和数据加载器
transform = transforms.Compose([
    transforms.ToPILImage(),  # 转换为 PIL 图像对象
    transforms.Resize((128, 128)),  # 调整大小为 128x128
    transforms.ToTensor()  # 转换为张量
])
dataset = SpineWeb15(data_path=data_path, transform=transform)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
testset = SpineWeb15(data_path=test_path, transform=transform)
test_loader = DataLoader(testset, batch_size=batch_size, shuffle=True)

# 创建模型和优化器
model = UNet(in_channels, out_channels)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.BCEWithLogitsLoss()  # 二分类任务可以使用BCEWithLogitsLoss

# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# check the parameters of the model could be trained 检查模型的参数是否设置为可训练
# for name, param in model.named_parameters():
#     print(f"Parameter: {name}, Requires Grad: {param.requires_grad}")

best_loss_train = 100.0
best_IoU_train = 0.0
best_Dice_train = 0.0

best_IoU_test = 0.0
best_Dice_test = 0.0

# 训练模型
for epoch in range(num_epochs):
    # torch.cuda.empty_cache()
    model.train()
    train_loss = 0
    total_iou = 0
    total_dice = 0
    total_iou_test = 0
    total_dice_test = 0

    for images, labels in dataloader:
        images = images.to(device)
        labels = labels.to(device)
    
        optimizer.zero_grad()

        outputs = model(images)
        # squeeze the dimension that pytorch add for channels
        outputs = outputs.squeeze(1)     # tensor size ([16,128,128])
        # process the result, bigger than 0.5 is reliable
        predicted = (outputs > confidence).float()     # tensor size ([16,128,128])

        intersection = torch.logical_and(predicted, labels).sum((1, 2))
        union = torch.logical_or(predicted, labels).sum((1, 2))
        iou = (intersection / (union + 1e-7)).mean()  # 平均每个样本的IoU
        dice = (2 * intersection / (predicted.sum((1, 2)) + labels.sum((1, 2)) + 1e-7)).mean()  # 平均每个样本的Dice Coefficient

        loss = criterion(outputs, labels)

        train_loss += loss.item()
        total_iou += iou.item()
        total_dice += dice.item()

        loss.backward()
        optimizer.step()

    # train set results
    avg_loss = train_loss / len(dataloader)
    avg_iou = total_iou / len(dataloader)
    avg_dice = total_dice / len(dataloader)

    # test set results
    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs = inputs.to(device)
            targets = targets.to(device)
            outputs = model(inputs)
            # squeeze the dimension that pytorch add for channels
            outputs = outputs.squeeze(1)     # tensor size ([16,128,128])
            # process the result, bigger than 0.5 is reliable
            predicted = (outputs > confidence).float()     # tensor size ([16,128,128])
            
            intersection = torch.logical_and(predicted, targets).sum((1, 2))
            union = torch.logical_or(predicted, targets).sum((1, 2))
            iou = (intersection / (union + 1e-7)).mean()  # 平均每个样本的IoU
            dice = (2 * intersection / (predicted.sum((1, 2)) + targets.sum((1, 2)) + 1e-7)).mean()  # 平均每个样本的Dice Coefficient

            total_iou_test += iou.item()
            total_dice_test += dice.item()

    test_iou = total_iou_test / len(test_loader)
    test_dice = total_dice_test / len(test_loader)

    # get the best IoU and Dice
    best_loss_train = best_loss_train if best_loss_train < avg_loss else avg_loss
    best_IoU_train = best_IoU_train if best_IoU_train > avg_iou else avg_iou
    best_Dice_train = best_Dice_train if best_Dice_train > avg_dice else avg_dice
    best_IoU_test = best_IoU_test if best_IoU_test > test_iou else test_iou
    # best_Dice_test = best_Dice_test if best_Dice_test > test_dice else test_dice

    if(best_Dice_test <= test_dice):
        best_Dice_test = test_dice
        torch.save(model.state_dict(), f"./model/UNet_checkpoint.pth")

    print(f"Epoch [{epoch+1}/{num_epochs}], Avg Loss: {avg_loss:.4f}, Avg IoU: {avg_iou:.4f}, Avg Dice: {avg_dice:.4f},test IoU: {test_iou:.4f}, test Dice: {test_dice:.4f}")
    
print("-----------------------------------------")
print(f"Best Loss: {best_loss_train:.4f}, Best Train IoU: {best_IoU_train:.4f}, Best Train Dice: {best_Dice_train:.4f}, Best Test IoU: {best_IoU_test:.4f}, Best Test Dice: {best_Dice_test:.4f}")
# empty the cache for model and training process
torch.cuda.empty_cache()

Epoch [1/10], Avg Loss: 0.9541, Avg IoU: 0.0422, Avg Dice: 0.0767,test IoU: 0.0860, test Dice: 0.1501
Epoch [2/10], Avg Loss: 0.9186, Avg IoU: 0.0680, Avg Dice: 0.1190,test IoU: 0.0721, test Dice: 0.1222
Epoch [3/10], Avg Loss: 0.8902, Avg IoU: 0.0366, Avg Dice: 0.0626,test IoU: 0.2500, test Dice: 0.3759
Epoch [4/10], Avg Loss: 0.8125, Avg IoU: 0.1527, Avg Dice: 0.2295,test IoU: 0.2162, test Dice: 0.3053
Epoch [5/10], Avg Loss: 0.7737, Avg IoU: 0.0803, Avg Dice: 0.1264,test IoU: 0.2096, test Dice: 0.2987
Epoch [6/10], Avg Loss: 0.7393, Avg IoU: 0.0919, Avg Dice: 0.1444,test IoU: 0.2605, test Dice: 0.3929
Epoch [7/10], Avg Loss: 0.6677, Avg IoU: 0.1425, Avg Dice: 0.2072,test IoU: 0.1596, test Dice: 0.2435
Epoch [8/10], Avg Loss: 0.6466, Avg IoU: 0.1463, Avg Dice: 0.2104,test IoU: 0.1751, test Dice: 0.2625
Epoch [9/10], Avg Loss: 0.6101, Avg IoU: 0.2690, Avg Dice: 0.3614,test IoU: 0.2128, test Dice: 0.3276
Epoch [10/10], Avg Loss: 0.5707, Avg IoU: 0.2046, Avg Dice: 0.2755,test IoU: 0.249