# UNet

Training, testing, and evaluating the UNet architecture on image segmentation

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import os

import matplotlib.pyplot as plt
from torchvision import transforms, utils
import numpy as np
import random
import time

from UNet import UNetNoSkip, UNet, UNetPadded, TGSCustomDataset   

if torch.cuda.is_available():
    device = "cuda"
else:
    device = "cpu"
    
device = "cpu"    
device = torch.device(device)
print("Using {} device".format(device))


# Hyperparameters
EPOCH = 10

In [None]:
# Load Dataset
root_dir = os.path.expanduser("~/Developer/Datasets/tgs_comp/competition_data/train")
image_folder = "images/"
mask_folder = "masks/"

transform = transforms.Compose(
    [
        transforms.ToTensor(),
        # Change size to 100x100
        transforms.Resize((224, 224), antialias=True),
    ]
)
custom_dataset = TGSCustomDataset(root_dir, image_folder, mask_folder, transform)


print("Train Dataset Size:", len(custom_dataset))

# Image has 4 channels: 3 for RGB and 1 for alpha
print("Train Image Type", type(custom_dataset[0][0]), custom_dataset[0][0].shape) 
print("Train Mask Type", type(custom_dataset[0][1]), custom_dataset[0][1].shape)

# Print alpha channel
# print("Alpha Channel", custom_dataset[0][1][0][0])

# Print Mask
print("Mask", custom_dataset[3999][1])

In [None]:
fig, axes = plt.subplots(5, 2, figsize=(10, 20))
visualize_number = 5

for i in range(visualize_number):
    random_int = np.random.randint(0, len(custom_dataset))
    sample = custom_dataset[random_int]
    ax1 = axes[i, 0]
    ax2 = axes[i, 1]
    ax1.set_title(f'Img {custom_dataset.images[i]}')
    ax2.set_title(f'Mask {custom_dataset.masks[i]}')
    ax1.axis('off')
    ax2.axis('off')
        
    ax1.imshow(sample[0].permute(1, 2, 0).numpy())  # Convert tensor to numpy array and permute dimensions
    ax2.imshow(sample[1].permute(1, 2, 0).numpy(), cmap="gray")  # Convert tensor to numpy array and permute dimensions

plt.tight_layout()
plt.show()


In [None]:
# Data Loader
batch_size = 32

# Split the dataset into training and validation sets
train_size = int(0.8 * len(custom_dataset))
val_size = len(custom_dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(custom_dataset, [train_size, val_size])

train_loader = torch.utils.data.DataLoader(train_dataset, 
                                           batch_size=batch_size, 
                                           shuffle=True, 
                                           num_workers=4)
test_loader = torch.utils.data.DataLoader(val_dataset,
                                            batch_size=batch_size, 
                                            shuffle=False, 
                                            num_workers=4)

print("Train Loader Size:", len(train_loader))  # 100 = 3200 / 32
print("Test Loader Size:", len(test_loader))    # 25 = 800 / 32




In [None]:
# 加载模型
# model = UNetNoSkip(in_channels=3, out_channels=1)
model = UNetPadded(in_channels=3, out_channels=1, device=device)
model = model.to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

## 训练
我们使用`dice_coeff`函数来评估图像分割。Dice coefficient测量预测分割和真实分割之间的重叠。Dice为1表示预测分与真实分割完全一致。0表示完全没有重叠。

In [None]:
# dice_coeff函数
def dice_coeff(pred, target):
    smooth = 1.0
    pred = pred.contiguous()
    target = target.contiguous()
    intersection = (pred * target).sum(dim=2).sum(dim=2)
    loss = (1 - ((2. * intersection + smooth) / (pred.sum(dim=2).sum(dim=2) + target.sum(dim=2).sum(dim=2) + smooth)))
    return loss.mean()

In [None]:
# 训练模型
def train_step(model: nn.Module, data_loader: DataLoader, criterion: nn.Module, optimizer: torch.optim.Optimizer, device: torch.device):
    model.train()
    train_loss, train_acc = 0.0, 0.0
    for i, (images, masks) in enumerate(data_loader):
        images = images.to(device)
        masks = masks.to(device)
        
        # Forward Pass
        optimizer.zero_grad()
        outputs = model(images)
        
        outputs = torch.sigmoid(outputs)
                
        # Backward Pass
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()
        
        # # Threshold: Binary Mask
        # threshold = 0.1
        # outputs = (outputs >= threshold).float()
        
        # Loss
        train_loss += loss.item()
        train_acc += dice_coeff(outputs, masks)
        
    train_loss /= len(data_loader)
    train_acc /= len(data_loader)
    return train_loss, train_acc

def test_step(model: nn.Module, data_loader: DataLoader, criterion: nn.Module, device: torch.device):
    model.eval()
    train_loss, train_acc = 0.0, 0.0
    with torch.inference_mode():
        for i, (images, masks) in enumerate(data_loader):
            images = images.to(device)
            masks = masks.to(device)
            optimizer.zero_grad()
            
            outputs = model(images)
            outputs = torch.sigmoid(outputs)   
            loss = criterion(outputs, masks)
            train_loss += loss.item()
            train_acc += dice_coeff(outputs, masks)

        train_loss /= len(data_loader)
        train_acc /= len(data_loader)
    
    torch.cuda.empty_cache()
    return train_loss, train_acc
        
def training(model: nn.Module, 
                data_loader: DataLoader,
                criterion: nn.Module,
                optimizer: torch.optim.Optimizer,
                device: torch.device,
                epochs: int
            ):
    
    history = {
        "train_loss": [],
        "val_loss": [],
        "train_acc": [],
        "val_acc": []
    }
    
    for epoch in range(epochs):
        train_start_time = time.time()
        train_loss, train_acc = train_step(model, data_loader, criterion, optimizer, device)
        train_end_time = time.time()
        
        test_start_time = time.time()
        val_loss, val_acc = test_step(model, data_loader, criterion, device)
        test_end_time = time.time()
        
        history["train_loss"].append(train_loss)
        history["train_acc"].append(train_acc)
        history["val_loss"].append(val_loss)
        history["val_acc"].append(val_acc)
        
        print(f"Epoch: {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
        print(f"Train Time: {train_end_time - train_start_time:.4f}, Test Time: {test_end_time - test_start_time:.4f}")
    
    return history

In [None]:
results = training(model, train_loader, criterion, optimizer, device, EPOCH)

In [None]:
# 显示训练结果


In [None]:
# Evaluate



## IoU Intersection over Union Metric at different Thresholds
threshould = 0.5


## Precision and Recall at different Thresholds
