Import libraries

In [11]:
import os 
import torch 
import cv2 
import shutil
import zipfile 
import copy
import torch.nn as nn 
import torch.optim as optim 
import torch.nn.functional as F 
import numpy as np 
from numpy import linalg as LA 
from math import atan2, cos, sin, sqrt, pi, log
from PIL import Image 
from tqdm import tqdm  
from torchvision import transforms 
from typing import List, Tuple
from os import PathLike
from torch.utils.data import DataLoader 
from torch.utils.data import Dataset
from torch.utils.tensorboard import SummaryWriter 

Define preprocessing functions for dataset images and masks. Original size is 4054x3040 pixels 

I will resize it to the following sizes: 128x128, 256x256, 512x512, 1024x1024 and 2048x2048 

In [8]:
def unzip_data(zip_archive: PathLike) -> None:
    output_dir: str = "unzipped-dataset"
    
    with zipfile.ZipFile(zip_archive, "r") as zp:
        zp.extractall(output_dir)
    
    return None

def move_files(source: PathLike, target_images: PathLike, target_masks: PathLike) -> None:
    for file in os.listdir(source):
        file_path = os.path.join(source, file)
        if file.endswith(".jpg"):
            shutil.move(file_path, os.path.join(target_images, file))
        elif file.endswith(".png"):
            shutil.move(file_path, os.path.join(target_masks, file))
        else:
            print("Unexpected file format found: Neither JPG nor PNG")
    
    return None 


def organize_data(unzipped_data: PathLike) -> None:
    train_dir: str = os.path.join(unzipped_data, "train")
    val_dir: str = os.path.join(unzipped_data, "val")

    target_train_images_dir: str = os.path.join(train_dir, "images")
    target_train_masks_dir: str = os.path.join(train_dir, "masks")

    target_val_images_dir: str = os.path.join(val_dir, "images")
    target_val_masks_dir: str = os.path.join(val_dir, "masks")

    os.makedirs(target_train_images_dir, exist_ok=True)
    os.makedirs(target_train_masks_dir, exist_ok=True)
    os.makedirs(target_val_images_dir, exist_ok=True)
    os.makedirs(target_val_masks_dir, exist_ok=True)

    move_files(train_dir, target_train_images_dir, target_train_masks_dir)
    move_files(val_dir, target_val_images_dir, target_val_masks_dir)

    print("Files have been successfully moved")

    return None 



class SegmentationDataset(Dataset):
    def __init__(self, images_dir: PathLike, masks_dir: PathLike, size: int):
        super(SegmentationDataset, self).__init__()
        self.images_paths = [os.path.join(images_dir, file) for file in os.listdir(images_dir)]
        self.masks_paths = [os.path.join(masks_dir, file) for file in os.listdir(masks_dir)]
        self.size = size  
    
    def __len__(self):
        return len(self.images_paths)
    
    def __getitem__(self, idx):
        # Get image and correspoding mask paths 
        image_path = self.images_paths[idx]
        mask_path = self.masks_paths[idx]

        # Read image and mask file from the given path 
        image = cv2.imread(image_path)
        mask = cv2.imread(mask_path)

        # Convert BGR to RGB 
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)

        # Resize image and mask with the given size as tuple 
        image = cv2.resize(image, (self.size, self.size), cv2.INTER_LINEAR)
        mask = cv2.resize(mask, (self.size, self.size), cv2.INTER_NEAREST)

        # Convert to PyTorch Tensor object to make it eligible for passing to the model 
        image = torch.from_numpy(image).unsqueeze(0).float() / 255.0
        mask = torch.from_numpy(mask).long()

        return image, mask

Define U-Net Network

In [6]:
class DoubleConvolution(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv_op = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        ) 
    def forward(self, x):
        return self.conv_op(x)

class DownSample(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv = DoubleConvolution(in_channels, out_channels)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
    
    def forward(self, x):
        down = self.conv(x)
        p = self.pool(down)

        return down, p


class UpSample(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
        self.conv = DoubleConvolution(in_channels, out_channels)
    
    def forward(self, x1, x2):
        x1 = self.up(x1)
        x = torch.cat([x1, x2], dim=1)
        return self.conv(x)

class UNet(nn.Module):
    def __init__(self, in_channels, num_classes):
        super().__init__()
        self.down_convolution_1 = DownSample(in_channels, 64)
        self.down_convolution_2 = DownSample(64, 128)
        self.down_convolution_3 = DownSample(128, 256)
        self.down_convolution_4 = DownSample(256, 512)

        self.bottle_neck = DoubleConvolution(512, 1024)

        self.up_convolution_1 = UpSample(1024, 512)
        self.up_convolution_2 = UpSample(512, 256)
        self.up_convolution_3 = UpSample(256, 128)
        self.up_convolution_4 = UpSample(128, 64)

        self.out = nn.Conv2d(64, num_classes, kernel_size=1)
    
    def forward(self, x):
        down_1, p1 = self.down_convolution_1(x)
        down_2, p2 = self.down_convolution_2(p1)
        down_3, p3 = self.down_convolution_3(p2)
        down_4, p4 = self.down_convolution_4(p3)

        b = self.bottle_neck(p4)

        up_1 = self.up_convolution_1(b, down_4)
        up_2 = self.up_convolution_2(up_1, down_3)
        up_3 = self.up_convolution_3(up_2, down_2)
        up_4 = self.up_convolution_4(up_3, down_1)

        out = self.out(up_4)

        return out 

Metrics: DICE and Intersection over Union

In [12]:
def dice_coefficient(prediction, target, epsilon=1e-07) -> float:
    prediction_copy = prediction.clone()
    prediction_copy[prediction_copy < 0] = 0 
    prediction_copy[prediction_copy > 0] = 1

    intersection = abs(torch.sum(prediction_copy * target))
    union = abs(torch.sum(prediction_copy) + torch.sum(target))

    dice = (2. * intersection + epsilon) / (union + epsilon)

    return dice.item()  

def iou_coefficient(prediction, target, epsilon=1e-07) -> float:
    prediction = (prediction > epsilon).float()

    intersection = (prediction * target).sum(dim=(1, 2))
    union = (prediction + target).clamp(0, 1).sum(dim=(1, 2))

    iou_score = intersection / union 

    return iou_score.mean().item()

Hyperparameters and training loop

In [14]:
def train_model(model: nn.Module, optimizer: optim.Optimizer, trainingdataloader: DataLoader, validationdataloader: DataLoader, epochs: int, experiment_name: str, device: torch.device, tag: str) -> None:
    # Paths to checkpoints, logging directory to track model perfomance, criterion(loss function) and other metrics/components/paths including Tensorboard writer object 
    logging_directory: str = f"unet-runs/{experiment_name}"
    checkpoints: str = os.path.join(logging_directory, experiment_name, "ckpt")
    save_path_best: str = os.path.join(checkpoints, f"best_{tag}.pt")
    save_path_last: str = os.path.join(checkpoints, f"last_{tag}.pt")
    tb_writer: SummaryWriter = SummaryWriter(log_dir=logging_directory)
    criterion: nn.Module = nn.BCEWithLogitsLoss()
    best_loss: float = float("inf")

    for epoch in tqdm(range(epochs)):
        # Model training 
        model.train()
        train_running_loss: float = 0.0 
        train_running_dice: float = 0.0 
        train_running_iou: float = 0.0

        for idx, img_mask in enumerate(tqdm(trainingdataloader, position=0, leave=True)):
            img = img_mask[0].float().to(device)
            mask = img_mask[1].float().to(device)

            segmentations = model(img)
            optimizer.zero_grad()

            dc = dice_coefficient(segmentations, mask)
            iou = iou_coefficient(segmentations, mask)
            loss = criterion(segmentations, mask)

            train_running_loss += loss.item()
            train_running_dice += dc.item()
            train_running_iou += iou.item()

            loss.backward()
            optimizer.step()
        
        train_loss = train_running_loss / (idx + 1)
        train_dice = train_running_dice / (idx + 1)
        train_iou = train_running_iou / (idx + 1)

        # Add metrics to Tensorboard like IOU score, DICE and training loss values 
        tb_writer.add_scalar("Loss/Train", train_loss, epoch + 1)
        tb_writer.add_scalar("DICE/Train", train_dice, epoch + 1)
        tb_writer.add_scalar("IOU/Train", train_iou, epoch + 1)

        # Add training segmentation results as images  
        tb_writer.add_images("Original mask", img_mask, epoch + 1)
        tb_writer.add_images("Segmentation mask", segmentations, epoch + 1)


        # Model validation 
        model.eval()
        val_running_loss: float = 0.0 
        val_running_dice: float = 0.0 
        val_running_iou: float = 0.0 

        for idx, val_img_mask in enumerate(tqdm(validationdataloader, position=0, leave=True)):
            val_img = val_img_mask[0].float().to(device)
            val_mask = val_img_mask[1].float().to(device)
            val_segmentations = model(img)

            dc_val = dice_coefficient(val_segmentations, val_mask)
            iou_val = iou_coefficient(val_segmentations, val_mask)
            val_loss = criterion(val_segmentations, val_mask)

            val_running_loss += val_loss.item()
            val_running_dice += dc_val.item()
            val_running_iou += iou_val.item()
        
        validation_loss = val_running_loss / (idx + 1)
        validation_dice = val_running_dice / (idx + 1)
        validation_iou = val_running_iou / (idx + 1)

        # Add validation metrics like IOU, Loss and DICE 
        tb_writer.add_scalar("Loss/Validation", validation_loss, epoch + 1)
        tb_writer.add_scalar("DICE/Validation", validation_dice, epoch + 1)
        tb_writer.add_scalar("IOU/Validation", validation_iou, epoch + 1)

        # Add validation segmentation results as images
        tb_writer.add_images("Validation original mask", val_img_mask, epoch + 1)
        tb_writer.add_images("Validation segmentation mask", val_segmentations, epoch + 1)

        # Save best.pt and last.pt checkpoints 
        if validation_loss < best_loss:
            best_loss = validation_loss
            torch.save(model.state_dict(), save_path_best)
        
        torch.save(model.state_dict(), save_path_last)
        

    tb_writer.close()
        

Actual Training Process

In [None]:
data_path: str = "" # path to GlobalLogic's Google Drive 

unzip_data(data_path) # Unzip zip-archive to get train/val folders called dataset-splitted/train for train_images and masks and dataset-splitted/val for validation images and masks 

unzipped_path: str = ""  # path to unzipped folder called dataset-splitted 

organize_data(unzipped_path) # Sort images and masks into subfolder images and masks inside dataset-splitted folder 

size_128 = 128*128 
size_256 = 256*256 
size_512 = 512*512 
size_1024 = 1024*1024

path_to_train_images = "" # path to training images 
path_to_train_masks = "" # path to training masks 
path_to_val_images = "" # path to validation images 
path_to_val_masks = "" # path to validation masks 

num_workers = 4
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
epochs = 300

# Datasets prepared for work 

train_dataset_128 = SegmentationDataset(path_to_train_images, path_to_train_masks, size_128)
val_dataset_128 =  SegmentationDataset(path_to_val_images, path_to_train_masks, size_128)

train_dataset_256 = SegmentationDataset(path_to_train_images, path_to_train_masks, size_256)
val_dataset_256 = SegmentationDataset(path_to_val_images, path_to_val_masks, size_256)

train_dataset_512 = SegmentationDataset(path_to_train_images, path_to_train_masks, size_512)
val_dataset_512 = SegmentationDataset(path_to_val_images, path_to_val_masks, size_512)

train_dataset_1024 = SegmentationDataset(path_to_train_images, path_to_train_masks, size_1024)
val_dataset_1024 = SegmentationDataset(path_to_val_images, path_to_val_masks, size_1024)


# DataLoaders 

train_dataloader_128 = DataLoader(train_dataset_128, num_workers=num_workers, pin_memory=False, batch_size=500, shuffle=True)
val_dataloader_128 = DataLoader(val_dataset_128, num_workers=num_workers, pin_memory=False,batch_size=500, shuffle=True)

train_dataloader_256 = DataLoader(train_dataset_256, num_workers=num_workers, pin_memory=False, batch_size=500, shuffle=True)
val_dataloader_256 = DataLoader(val_dataset_256, num_workers=num_workers, pin_memory=False, batch_size=500, shuffle=True)

train_dataloader_512 = DataLoader(train_dataset_512, num_workers=num_workers, pin_memory=False, batch_size=500, shuffle=True)
val_dataloader_512 = DataLoader(val_dataset_512, num_workers=num_workers, pin_memory=False, batch_size=500, shuffle=True)

train_dataloader_1024 = DataLoader(train_dataset_1024, num_workers=num_workers, pin_memory=False, batch_size=500, shuffle=True)
val_dataloader_1024 = DataLoader(val_dataset_1024, num_workers=num_workers, pin_memory=False, batch_size=500, shuffle=True)

model = UNet(in_channels=3, num_classes=2).to(device)
optimizer = optim.AdamW(model.parameters(), lr=3e-4)

# Start from 128x128 
experiment_name_128: str = f"experiment_{size_128}"
experiment_name_256: str = f"experiment_{size_256}"
experiment_name_512: str = f"experiment_{size_512}"
experiment_name_1024: str = f"experiment_{size_1024}"

# Training
train_model(model, optimizer, train_dataloader_128, val_dataloader_128, epochs, experiment_name_128, device)
train_model(model, optimizer, train_dataloader_256, val_dataloader_256, epochs, experiment_name_256, device)
train_model(model, optimizer, train_dataloader_512, val_dataloader_512, epochs, experiment_name_512, device)
train_model(model, optimizer, train_dataloader_1024, val_dataloader_1024, epochs, experiment_name_1024, device)