In [1]:
import torch
from models.UNet import UNet
from data_prep.preprocess import *
import torch.nn as nn
import torch.nn.functional as F
from pathlib import Path
from torch.utils.data import DataLoader, random_split
from torch import Tensor
from torch import optim


# Load data

In [2]:
train_loader, test_loader = Preprocess().get_data_loaders(subset=True)

input_height = 600
input_width = 400
input_features = 3

device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Define the model

In [3]:
""" Parts of the U-Net model """


class DoubleConv(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)


class Down(nn.Module):
    """Downscaling with maxpool then double conv"""

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)


class Up(nn.Module):
    """Upscaling then double conv"""

    def __init__(self, in_channels, out_channels, bilinear=True):
        super().__init__()

        # if bilinear, use the normal convolutions to reduce the number of channels
        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
            self.conv = DoubleConv(in_channels, out_channels, in_channels // 2)
        else:
            self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
            self.conv = DoubleConv(in_channels, out_channels)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        # input is CHW
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])
        # if you have padding issues, see
        # https://github.com/HaiyongJiang/U-Net-Pytorch-Unstructured-Buggy/commit/0e854509c2cea854e247a9c615f175f76fbb2e3a
        # https://github.com/xiaopeng-liao/Pytorch-UNet/commit/8ebac70e633bac59fc22bb5195e513d5832fb3bd
        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)


class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)
    

In [4]:
""" Full assembly of the parts to form the complete network """


class UNet(nn.Module):
    def __init__(self, n_channels, n_classes, bilinear=False):
        super().__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.bilinear = bilinear

        self.inc = (DoubleConv(n_channels, 64))
        self.down1 = (Down(64, 128))
        self.down2 = (Down(128, 256))
        self.down3 = (Down(256, 512))
        factor = 2 if bilinear else 1
        self.down4 = (Down(512, 1024 // factor))
        self.up1 = (Up(1024, 512 // factor, bilinear))
        self.up2 = (Up(512, 256 // factor, bilinear))
        self.up3 = (Up(256, 128 // factor, bilinear))
        self.up4 = (Up(128, 64, bilinear))
        self.outc = (OutConv(64, n_classes))

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        return logits

    def use_checkpointing(self):
        self.inc = torch.utils.checkpoint(self.inc)
        self.down1 = torch.utils.checkpoint(self.down1)
        self.down2 = torch.utils.checkpoint(self.down2)
        self.down3 = torch.utils.checkpoint(self.down3)
        self.down4 = torch.utils.checkpoint(self.down4)
        self.up1 = torch.utils.checkpoint(self.up1)
        self.up2 = torch.utils.checkpoint(self.up2)
        self.up3 = torch.utils.checkpoint(self.up3)
        self.up4 = torch.utils.checkpoint(self.up4)
        self.outc = torch.utils.checkpoint(self.outc)
        

In [10]:

def dice_coeff(input: Tensor, target: Tensor, reduce_batch_first: bool = False, epsilon: float = 1e-6):
    # Average of Dice coefficient for all batches, or for a single mask
    assert input.size() == target.size()
    assert input.dim() == 3 or not reduce_batch_first

    sum_dim = (-1, -2) if input.dim() == 2 or not reduce_batch_first else (-1, -2, -3)

    inter = 2 * (input * target).sum(dim=sum_dim)
    sets_sum = input.sum(dim=sum_dim) + target.sum(dim=sum_dim)
    sets_sum = torch.where(sets_sum == 0, inter, sets_sum)

    dice = (inter + epsilon) / (sets_sum + epsilon)
    return dice.mean()


def multiclass_dice_coeff(input: Tensor, target: Tensor, reduce_batch_first: bool = False, epsilon: float = 1e-6):
    # Average of Dice coefficient for all classes
    return dice_coeff(input.flatten(0, 1), target.flatten(0, 1), reduce_batch_first, epsilon)


def dice_loss(input: Tensor, target: Tensor, multiclass: bool = False):
    # Dice loss (objective to minimize) between 0 and 1
    fn = multiclass_dice_coeff if multiclass else dice_coeff
    return 1 - fn(input, target, reduce_batch_first=True)

def compute_iou(mask_pred, target_mask):
    mask_pred = mask_pred.squeeze(1)
    mask_pred = (mask_pred > 0.5).float().bool()
    true_mask = target_mask.bool()

    intersection = (mask_pred & true_mask).float().sum((1, 2))
    union = (mask_pred | true_mask).float().sum((1, 2))
    iou = (intersection + 1e-6) / (union + 1e-6)
    
    return iou.mean()


In [14]:

dir_img = Path('./data/imgs/')
dir_mask = Path('./data/masks/')
dir_checkpoint = Path('./checkpoints/')


def train_model(model, epochs=5, learning_rate=1e-5, amp=False, weight_decay=1e-8, gradient_clipping=1.0,):

    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    grad_scaler = torch.cuda.amp.GradScaler(enabled=amp)
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(epochs):
        model.train()
        losses, accuracy = [], 0
        
        for images, true_mask in train_loader:

            optimizer.zero_grad()
            images = images.to(device=device, memory_format=torch.channels_last)
            true_mask = true_mask.to(device=device, dtype=torch.long)
            pred_mask = model(images).to(device=device)
            
            print(f'pred_mask shape: {pred_mask.shape}')            
            print(f'true_mask shape: {true_mask.shape}')
            
            loss = criterion(pred_mask, true_mask.squeeze(1))
            loss += dice_loss(F.softmax(pred_mask, dim=1).float(), 
                              F.one_hot(true_mask.squeeze(1), model.n_classes).permute(0, 3, 1, 2).float(), 
                              multiclass=True)
            
            iou = compute_iou(pred_mask, true_mask)
            accuracy += iou
            
            grad_scaler.scale(loss).backward()
            grad_scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), gradient_clipping)
            grad_scaler.step(optimizer)
            grad_scaler.update()

            losses.append(loss.item())

        print(f'Epoch: {epoch}, Loss: {losses[-1] / len(train_loader)}, Accuracy: {accuracy / len(train_loader)}')


unet = UNet(n_channels=3, n_classes=1)
unet = unet.to(device=device, memory_format=torch.channels_last)
train_model(model=unet, epochs=1)


pred_mask shape: torch.Size([4, 1, 600, 400])
true_mask shape: torch.Size([4, 1, 600, 400])


KeyboardInterrupt: 

In [7]:
for i, (image, mask) in enumerate(train_loader):
    print(image.shape)
    print(mask.shape)
    break

torch.Size([4, 3, 600, 400])
torch.Size([4, 1, 600, 400])


# Train

In [8]:
epochs = 10

for epoch in range(epochs):
    print(f"Epoch {epoch+1}")
    unet.train()  # Set the model to training mode
    running_loss = 0.0
    
    for inputs, masks in train_loader:  # Assuming your DataLoader is named train_loader
        #inputs, labels = inputs.to(device), labels.to(device)
        print(inputs.shape)
        print(masks.shape)
        masks = masks.squeeze(1)
        
        # Zero the parameter gradients
        optimizer.zero_grad()
        # TODO: Softmax + One-hot encoding of the outputs maybe?
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, masks)
        
        # Backward pass and optimize
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    print(f"Loss: {running_loss}")

Epoch 1
torch.Size([4, 3, 600, 400])
torch.Size([4, 1, 600, 400])


NameError: name 'optimizer' is not defined

In [None]:
tensor = torch.tensor([[[1, 2, 3], [4, 5, 6], [7, 8, 9]], [[1, 2, 3], [4, 5, 6], [7, 8, 9]]])
tensor.shape

In [None]:
tensor.squeeze(1)

In [None]:
tensor