<a href="https://colab.research.google.com/github/VladMorarK19032334/Project24/blob/main/CTR_Segmentation_ExperimentalEnvi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!nvidia-smi

Libraries

In [None]:
if MODEL_AUGMENTATION == 'complex': # due to piecewise affine => different import is required
  !python -m pip install --upgrade opencv-contrib-python
  !pip uninstall opencv-python
  !pip install git+https://github.com/albumentations-team/albumentations
  !pip install opencv-python
else:
  !pip install albumentations==0.4.6

  import albumentations as A
  from albumentations.pytorch import ToTensorV2

In [None]:
import torch
import torch.nn as nn
import torchvision.transforms.functional as TF
import numpy as np
import os
from torch.utils.data import Dataset
from PIL import Image
import torch
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim
import torch
import torchvision
from torch.utils.data import DataLoader

import cv2

import statistics as stat
import math

from IPython.display import clear_output
import matplotlib.pyplot as plt

import warnings

Experimental Parameters - to decide what experiment to run for

In [None]:

# True - the loss has weights on classes, False - runs the segmentation for UNet with no weights
# available for multi-learning as well
WEIGHTED_UNET = False 


'''
none			No augmentation
full			The full augmentation used in the latest models (more complex with no piecewise)
complex		Full augmentation with Piecewise Affine transformation
'''
MODEL_AUGMENTATION = 'full'


# Hyperparameters
LEARNING_RATE = 1e-4 #
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE = 5
NUM_EPOCHS = 100
NUM_WORKERS = 2
IMAGE_HEIGHT = 480  # original size: 480px
IMAGE_WIDTH = 640  # original size: 640px
PIN_MEMORY = True
LOAD_MODEL = False



# weight for class segmentation in cross entropy loss
CLASS_WEIGHTS = torch.tensor([0.5, 2.0, 1.0, 1.0]) # [bg, tip, middle, base]


# form keypoint normalization
KEYPOINT_NORM = False 


# training dataset for segmentation only (faster 8times less data)
TRAIN_IMG_DIR = "/content/drive/MyDrive/Biomed 3rd Year/BEng Project/Training Datasets/Experimental_Env/imgs_doubleheaded_extraset_v24Mar_train.npy"
TRAIN_MASK_DIR = "/content/drive/MyDrive/Biomed 3rd Year/BEng Project/Training Datasets/Experimental_Env/masks_doubleheaded_extraset_v24Mar_train.npy"
TRAIN_LOCAL_DIR = "/content/drive/MyDrive/Biomed 3rd Year/BEng Project/Training Datasets/Experimental_Env/keypoints_doubleheaded_extraset_v24Mar_train.npy"


VAL_IMAGE_DIR = "/content/drive/MyDrive/Biomed 3rd Year/BEng Project/Training Datasets/Multi-Learning Experiments Localization/imgs_doubleheaded_x5dataset_val.npy"
VAL_MASK_DIR = "/content/drive/MyDrive/Biomed 3rd Year/BEng Project/Training Datasets/Multi-Learning Experiments Localization/masks_doubleheaded_x5dataset_val.npy"
VAL_LOCAL_DIR = "/content/drive/MyDrive/Biomed 3rd Year/BEng Project/Training Datasets/Multi-Learning Experiments Localization/keypoints_doubleheaded_x5dataset_val.npy"

# datasets for saving all necessary visual results
GLOBAL_FOLDER = '/content/drive/MyDrive/Biomed 3rd Year/BEng Project/Training Datasets/Experimental_Env/UNetSeg' # location of envi savings
EXPERIMENT_NAME = 'Non-WeightedSegmentation_12Apr' # name of experiment
SAVE_FOLDER = f'{GLOBAL_FOLDER}/{EXPERIMENT_NAME}'


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Main Runner

In [None]:
# main algorithm run function

def main():
  _saved_image_index = 0
  # select segmentation properties
  if MODEL_AUGMENTATION == 'full': # full augmentation
      train_transform = A.Compose(
          [
              A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
              A.Rotate(limit=90, p=1.0),
              A.HorizontalFlip(p=0.5),
              A.VerticalFlip(p=0.25),
              A.Normalize(
                  mean=[0.0, 0.0, 0.0],
                  std=[1.0, 1.0, 1.0],
                  max_pixel_value=255.0,
              ),
              ToTensorV2(),
          ]
      )

      val_transforms = A.Compose(
          [
              A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
              A.Normalize(
                  mean=[0.0, 0.0, 0.0],
                  std=[1.0, 1.0, 1.0],
                  max_pixel_value=255.0, # divide by 255
              ),
              ToTensorV2(),
          ]
      )
  elif MODEL_AUGMENTATION == 'complex': # complex augmentation
    train_transform = A.Compose(
          [
              A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
              A.Rotate(limit=90, p=1.0),
              A.HorizontalFlip(p=0.5),
              A.VerticalFlip(p=0.25),
              A.augmentations.geometric.transforms.PiecewiseAffine(scale=(0.03, 0.05), nb_rows=8, nb_cols=8, 
                                interpolation=1, mask_interpolation=0, cval=0, 
                                cval_mask=0, mode='constant', absolute_scale=False, 
                                always_apply=False, keypoints_threshold=0.01, p=0.2), # bilinear interpolation image, nearest neighbour for mask
              A.Normalize(
                  mean=[0.0, 0.0, 0.0],
                  std=[1.0, 1.0, 1.0],
                  max_pixel_value=255.0,
              ),
              ToTensorV2(),
          ]
      )

    val_transforms = A.Compose(
          [
              A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
              A.Normalize(
                  mean=[0.0, 0.0, 0.0],
                  std=[1.0, 1.0, 1.0],
                  max_pixel_value=255.0, # divide by 255
              ),
              ToTensorV2(),
          ]
      )
  else: # none augmentation
    train_transform = A.Compose(
          [
              A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
              A.Normalize(
                  mean=[0.0, 0.0, 0.0],
                  std=[1.0, 1.0, 1.0],
                  max_pixel_value=255.0,
              ),
              ToTensorV2(),
          ]
      )

    val_transforms = A.Compose(
          [
              A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
              A.Normalize(
                  mean=[0.0, 0.0, 0.0],
                  std=[1.0, 1.0, 1.0],
                  max_pixel_value=255.0, # divide by 255
              ),
              ToTensorV2(),
          ]
      )
    

  model = UNET(in_channels=3, out_channels=4).to(DEVICE)
  
  # select segmentation properties
  if WEIGHTED_UNET: # weighted classes
    loss_fn = nn.CrossEntropyLoss(weight=CLASS_WEIGHTS.to(DEVICE))
  else:
    loss_fn = nn.CrossEntropyLoss()

  optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

  train_loader, val_loader = get_loaders(
      TRAIN_IMG_DIR,
      TRAIN_MASK_DIR,
      VAL_IMAGE_DIR,
      VAL_MASK_DIR,
      BATCH_SIZE,
      train_transform,
      val_transforms,
      NUM_WORKERS,
      PIN_MEMORY,
  )

  if LOAD_MODEL:
      load_checkpoint(torch.load("my_checkpoint.pth.tar"), model)
  
  accuracy(val_loader, model, device=DEVICE)
  scaler = torch.cuda.amp.GradScaler()


  for epoch in range(NUM_EPOCHS):

      print(f'EPOCH: {epoch}')
      torch.cuda.empty_cache() # empty cuda cache
      train_fn(train_loader, model, optimizer, loss_fn, scaler)

      # save model
      checkpoint = {
          "state_dict": model.state_dict(),
          "optimizer": optimizer.state_dict(),
      }
      save_checkpoint(checkpoint)

      # check accuracy
      accuracy(val_loader, model, device=DEVICE)

      # print examples every 5 epochs
      if _saved_image_index%10 == 0 or _saved_image_index == 0 or _saved_image_index == 99:
        # print some examples to a folder
        save_predictions_as_imgs(
            val_loader, model, _saved_image_index, SAVE_FOLDER, device=DEVICE
        )

      _saved_image_index += 1
        

#torch.backends.cudnn.enabled = False
# train model
main()


Training

In [None]:
# does 1 Epoch of training
def train_fn(loader, model, optimizer, loss_fn, scaler):
    loop = tqdm(loader) # progress bar

    for batch_idx, (data, targets) in enumerate(loop):
        data = data.to(device=DEVICE)
        targets = targets.long().to(device=DEVICE)

        # forward
        with torch.cuda.amp.autocast():
            predictions = model(data)
            #targets = targets.long().to(device=DEVICE)
            loss = loss_fn(predictions, targets)

        # backward
        optimizer.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # update tqdm loop
        loop.set_postfix(loss=loss.item())

Save loss from experiment run - MAY BE USED COMPARING LOSS PROPAGATION BETWEEN Networks

In [None]:
def save_loss_propagation(loss_arr, title='Test Loss',save_folder=SAVE_FOLDER):
  # make loss array as numpy
  loss = np.asarray(loss_arr)
  # y is the number of epochs
  y = np.arange(100)

  # plot settings
  plt.title(f"{title}") 
  plt.xlabel("loss") 
  plt.ylabel("Epoch") 
  plt.plot(loss,y) 

  # show plot of loss
  plt.show()

  # save plot of loss to folder
  plt.savefig(f'{SAVE_FOLDER}/loss_propagation/{title}.png')

Segmentation Algorithm and Multi-learning Algorithm

In [None]:

class DoubleConv(nn.Module):
  def __init__(self, in_channels, out_channels):
      super(DoubleConv, self).__init__()
      self.conv = nn.Sequential(
          nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias=False),
          nn.BatchNorm2d(out_channels),
          nn.ReLU(inplace=True),
          nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False),
          nn.BatchNorm2d(out_channels),
          nn.ReLU(inplace=True)
      )

  def forward(self, x):
      return self.conv(x)


class UNET(nn.Module):
    # output 3 channels (3 tubes)
    def __init__(
            self, in_channels=3, out_channels=4, features=[64, 128, 256, 512]):
        super(UNET, self).__init__()
        self.ups = nn.ModuleList()
        self.downs = nn.ModuleList()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        # Down part of UNET (downsampling)
        for feature in features:
            self.downs.append(DoubleConv(in_channels, feature))
            in_channels = feature

        # Up part of UNET (upsampling)
        for feature in reversed(features):
            self.ups.append(
                nn.ConvTranspose2d(
                    feature*2, feature, kernel_size=2, stride=2
                )
            )
            self.ups.append(DoubleConv(feature*2, feature))

        self.bottleneck = DoubleConv(features[-1], features[-1]*2)
        # final 2d layer
        self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)

    def forward(self, x):
        skip_connections = []

        for down in self.downs:
            x = down(x)
            skip_connections.append(x)
            x = self.pool(x)

        x = self.bottleneck(x)
        skip_connections = skip_connections[::-1] # reverse list

        for idx in range(0, len(self.ups), 2):
            x = self.ups[idx](x)
            skip_connection = skip_connections[idx//2] # integer division

            # check if their shapes does not match => makes it generalize
            if x.shape != skip_connection.shape:
                # do resizing
                x = TF.resize(x, size=skip_connection.shape[2:])

            concat_skip = torch.cat((skip_connection, x), dim=1)
            x = self.ups[idx+1](concat_skip)

        return self.final_conv(x)


Dataset

In [None]:
class CTRDatasetNpy(Dataset):
  def __init__(self, images_file, masks_file, transform=None):
      self.images_file = images_file
      self.masks_file = masks_file
      self.transform = transform
      self.images = np.load(images_file) # load numpy data of all images
      self.masks = np.load(masks_file) # load numpy data of all masks

  def __len__(self):
      return len(self.images)

  def __getitem__(self, index):
      image = self.images[index]
      mask = self.masks[index] # mask data is already normalized

      if self.transform is not None: # do data augmentation
          augmentations = self.transform(image=image, mask=mask)
          image = augmentations["image"]
          mask = augmentations["mask"]

      return image, mask


Utils

In [None]:
KEYPOINT_COLOR = (255, 0, 0) # Green

def vis_keypoints(image, keypoints, color=KEYPOINT_COLOR, diameter=5): #(BATCH, C)
    image = image.copy()

    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # remove normalization from image
    image = image*255

    '''
    for (x, y) in keypoints:
        cv2.circle(image, (int(x), int(y)), diameter, (0, 255, 0), -1)
    '''

    for i in range(4):
      cv2.circle(image, (int(keypoints[i*2]), int(keypoints[i*2+1])), diameter, KEYPOINT_COLOR, -1)
      
    return image


def save_checkpoint(state, filename="my_checkpoint.pth.tar"):
    print("=> Saving checkpoint")
    torch.save(state, filename)


def load_checkpoint(checkpoint, model):
    print("=> Loading checkpoint")
    model.load_state_dict(checkpoint["state_dict"])


def get_loaders(
    train_dir,
    train_maskdir,
    val_dir,
    val_maskdir,
    batch_size,
    train_transform,
    val_transform,
    num_workers=4,
    pin_memory=True,
):
    train_ds = CTRDatasetNpy(
        train_dir,
        train_maskdir,
        train_transform,
    )

    train_loader = DataLoader(
        train_ds,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        shuffle=True,
    )

    val_ds = CTRDatasetNpy(
        val_dir,
        val_maskdir,
        val_transform,
    )

    val_loader = DataLoader(
        val_ds,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        shuffle=False,
    )

    return train_loader, val_loader



def save_predictions_as_imgs(
    loader, model, saved_image_index, folder="/content/drive/MyDrive/Biomed 3rd Year/BEng Project/Training Datasets/CTRvBG_Multichannel_v01/UNet_Aug_OutputSizeColor_v02/", device="cuda"):
    model.eval()
    for idx, (x, y) in enumerate(loader):
        x = x.to(device=device)
        with torch.no_grad():
            preds = model(x)

            # *** NEW *** set colors based on probabilities

            print(preds.shape)
            preds1 = preds.detach().cpu().numpy()

            for out_mask_idx in range(len(preds)): # save mask
                preds2 = preds1[out_mask_idx] # get last prediction from batch

                #print(preds2.shape)

                preds3 = preds2

                seg = np.moveaxis(preds3, 0, -1)

                #print(seg.shape)
                
                #print(f"Unique values of seg: {np.unique(seg)}")

                ## coloring
                my_seg = seg2img(seg)

                #print(f"Unique values of seg: {np.unique(my_seg)}")

                im = Image.fromarray(my_seg)
                im.save(f"{folder}/mask_pred_epoch{saved_image_index}_batch{idx}_img{out_mask_idx}.png")
            
            print(f"Saved prediction: {saved_image_index}")
        # ERROR LINE
        if saved_image_index == 0: # save ground truth for one epoch only
          #torchvision.utils.save_image(x.unsqueeze(1), f"{folder}/image_{saved_image_index}_{idx}.png", normalize=True) # save image
          torchvision.utils.save_image(y.float().unsqueeze(1), f"{folder}/mask_{saved_image_index}_{idx}.png", normalize=True) # save mask
          print(f"Saved image: {saved_image_index}")

    model.train()
  



def seg2img(seg: np.array) -> np.array:
    colours = np.array(  # Colour triplets in cv2 convention (BGR instead of RGB)
        [[0, 0, 0],      # Black
         [0, 0, 255],    # Red
         [0, 255, 0],    # Green
         [255, 0, 0]],   # Blue
        dtype='uint8'
    )

    '''
    print("The segmentation array")
    print(seg.ndim)
    '''
    
    if seg.ndim == 2:  # assuming [H, W] containing class indices
        if np.min(seg) < 0 or np.max(seg) > 3:
            raise ValueError("Incorrect number of classes in seg array")
        return colours[seg]

    elif seg.ndim == 3:  # assuming [H, W, C] with C containing class probabilities (logits)
        img = seg2img(np.argmax(seg, axis=2))

        # Convert image to HSV colour space to get direct access to the saturation
        hsv_img = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)

        logits = np.copy(seg)  # Copy to avoid accidentally changing a mutable array outside the function

        max_vals = np.max(logits, axis=-1)  # Probability of the dominant class at each location

        ind = tuple(np.indices(logits.shape[:-1])) + (logits.argmax(axis=-1),)

        logits[ind] = 0  # Set all probabilities of dominant classes to 0

        sec_vals = np.max(logits, axis=-1)

        # Maximum probability is now that of the second-most-dominant class at each location
        sat_vals = (max_vals - sec_vals) / max_vals

        # Saturation is the lower, the closer dominant prob is to second-most-dominant prob
        hsv_img[..., 1] = np.uint8(sat_vals * 255)  # Update image with the saturation values
        hsv_img[..., 2] = 255
        img = cv2.cvtColor(hsv_img, cv2.COLOR_HSV2BGR)

        # Now fix the black values - make gray value equal to 255-sat_val
        indices = np.argmax(seg, axis=2) == 0
        img[indices] = 255 * (1 - sat_vals[indices])[..., np.newaxis]

        return img

Accuracy Utils

In [None]:
# segmentation accuracy (dice + good pixels)
def accuracy(loader, model, device="cuda"):
    num_correct = 1
    num_pixels = 1
    dice_score = 0
    conf_dice_score = 0
    model.eval()

    softmax = nn.Softmax(dim=1)

    with torch.no_grad():
        absolute_error_epoch = 0 # absolute error recorded for all epoch
        for x, y in loader: # image, mask, keypoints
            x = x.to(device)
            y = y.to(device)

            #print("Pre prediction accuracy")
            # apply softmax on output
            output_mask = softmax(model(x))

            images = x.detach().cpu()
            mask = y.detach().cpu()
            
            #print(output_mask.shape)

            output_mask = torch.transpose(output_mask, 1, 2)
            output_mask = torch.transpose(output_mask, 2, 3)

            x, y = dice_coef2(mask.numpy(), output_mask.detach().cpu().numpy())
            dice_score += x
            conf_dice_score += y


    print(
        f"Got {num_correct}/{num_pixels} with acc {num_correct/num_pixels*100:.2f}"
    )
    print(f"Dice score: {dice_score*100/len(loader):.2f} %")
    print(f"CONFIDENCE Dice score: {conf_dice_score*100/len(loader):.2f} %")
    model.train()


def dice_coef(y_true, y_pred, index=0):
    
    '''
    # y_true = torch.Tensor(y_true)
    y_pred = torch.Tensor(y_pred)


    # make 0 - 1 probabilities for specific index in output
    y_pred = (y_pred > 0.5).float().numpy()
    '''

    # flatten arrays
    y_true_f = y_true
    y_pred_f = y_pred


    # calculate score
    intersection = np.sum(y_true_f * y_pred_f)
    smooth = 0.0001
    return (2. * intersection + smooth) / (np.sum(y_true_f) + np.sum(y_pred_f) + smooth)


def dice_coef_multilabel(y_true, y_pred, numLabels):
    dice=0

    print(y_true.size)
    print(y_pred.size)

    for index in range(numLabels):
        dice += dice_coef(y_true, y_pred, index)
    return dice/numLabels # taking average


def dice_coef2(y_true, y_pred, no_classes=4):
    dice=0
    conf_dice = 0

    #print(np.unique(y_true))
    #print(np.unique(y_pred))

    # arg max from output => returns max indecise from class comp
    max_indecies = np.argmax(y_pred, axis=-1)
    probs = np.copy(y_pred)
    max_values = np.max(probs, axis=-1)  # Probability of the dominant class at each location

    # comparing for each class
    for class_index in range(no_classes):
      # copy arrays for accidental mutations
      logits = np.copy(max_indecies)
      gt = np.copy(y_true)

      # logical operation to filter for given class
      pred_arr = (logits == class_index)
      # set all values from all other classes to 0
      gt_arr = (gt == class_index)

      # two binary arrays for a specific class filtered
      dice += dice_coef(gt_arr, pred_arr)

      # confidence dice score
      conf_dice += confidence_dice(gt_arr, pred_arr, max_values)

      #print(dice)

    # average dice score
    dice = dice/no_classes
    conf_dice = conf_dice/no_classes

    return dice, conf_dice


# implements the idea of how sure the model is about the class in the score
def confidence_dice(y_true, indecies, logits):
    # flatten arrays
    y_true_f = y_true
    y_pred_f = indecies

    # calculate score
    intersection = np.sum(y_true_f * y_pred_f * logits)
    smooth = 0.0001
    return (2. * intersection + smooth) / (np.sum(y_true_f) + np.sum(y_pred_f) + smooth)
