**3. Tumour Segmentation**
=

***Advanced Methods of Artificial Vision - Final Project***

**Authors:** *Alejandro Araque Robles, Ander Bodegas Díez, Lucía Gonzalez Ratón y Gonzalo Sabando Alonso*

# 3.1. Data Loading
First we need to import all the packages that we are going to need for the project.

In [28]:
from google.colab import drive
from PIL import Image

import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
import torch
import torchvision
import time
import random
import pandas as pd

Then, we mount the drive to get access to the dataset.

In [29]:
drive.mount('/content/drive')
pathRoot = '/content/drive/MyDrive/Colab Notebooks/Admeav/FinalProjectAdmeav'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# 3.2. PyTorch Dataset

In the last section, we preprocessed and structured the data as follows:
- DATASET
  - TRAIN
    - IMAGES
    - MASKS
  - VALIDATION
    - IMAGES
    - MASKS
  - TEST
    - IMAGES
    - MASKS

In order to train, validate and test a segmentation model we first need to create a PyTorch dataset to read each image and its mask.

In [30]:
class BreastCancerDataset(torch.utils.data.Dataset):
  def __init__(self, fol: str, tra = None):
    super(BreastCancerDataset, self).__init__()
    self.imgFol = os.path.join(fol, 'IMAGES')
    self.masFol = os.path.join(fol, 'MASKS')
    self.tra = tra
    self.imgLis = os.listdir(self.imgFol)

  def __getitem__(self, idx):

    # Read image
    img = cv2.imread(
      filename = os.path.join(self.imgFol, self.imgLis[idx]),
      flags = cv2.IMREAD_UNCHANGED
    )
    img = Image.fromarray(img)

    # Read mask
    mas = cv2.imread(
      filename = os.path.join(self.masFol, self.imgLis[idx]),
      flags = cv2.IMREAD_UNCHANGED
    )
    mas = torch.from_numpy(mas / 255).to(dtype = torch.long)

    # Apply transformation to image if given
    if self.tra:
      img = self.tra(img)

    return img, mas

  def __len__(self):
      return len(self.imgLis)

# 3.3. Model Architecture

Now we need to define the segmentation models.

In [31]:
def convBlock(inChannels: int, outChannels: int, k: int):
  return torch.nn.Sequential(
    torch.nn.Conv2d(
      in_channels = inChannels,
      out_channels = outChannels,
      kernel_size = (k, k),
      stride = 1,
      padding = 'same'
    ),
    torch.nn.BatchNorm2d(outChannels),
    torch.nn.ReLU(),
    torch.nn.Conv2d(
      in_channels = outChannels,
      out_channels = outChannels,
      kernel_size = (3, 3),
      stride = 1,
      padding = 'same'
    ),
    torch.nn.BatchNorm2d(outChannels),
    torch.nn.ReLU()
  )

#------------------------------------------------------------------------------#

def up(inChannels: int, outChannels: int, k: int):
  return torch.nn.Sequential(
    torch.nn.ConvTranspose2d(
      in_channels = inChannels,
      out_channels = outChannels,
      kernel_size = (k, k),
      stride = k
    )
  )

#------------------------------------------------------------------------------#

class Encoder(torch.nn.Module):
  def __init__(
    self,
    inChannels: int = 1,
    initChannels: int = 16,
    depthLevels: int = 4
  ):
    super(Encoder, self).__init__()
    self.depthLevels = depthLevels

    # Initialize list of modules
    self.layers = torch.nn.ModuleList()

    # Initialize out channels
    outChannels = initChannels

    # Loop to generate all layers
    for i in range(depthLevels):

      # Convolutional block
      self.layers.append(
        convBlock(
          inChannels = inChannels,
          outChannels = outChannels,
          k = 3
        )
      )

      # Max pooling
      self.layers.append(
        torch.nn.MaxPool2d(kernel_size = (2, 2), stride = 2)
      )

      # Update in and out channels
      inChannels = outChannels
      outChannels = outChannels * 2

    # Bottle neck (last layer of the encoder)
    self.layers.append(
      convBlock(
        inChannels = inChannels,
        outChannels = outChannels,
        k = 3
      )
    )

    # Save feature dimensions
    self.featureDim = outChannels

  def forward(self, x: torch.tensor):
    features = list()

    # Forward loop
    for i in range(self.depthLevels):
      x = self.layers[2 * i](x)
      features.append(x)
      x = self.layers[2 * i + 1](x)
    x = torch.nn.Dropout(0.2)(self.layers[-1](x))
    features.append(x)
    return features

#------------------------------------------------------------------------------#

class Decoder(torch.nn.Module):
  def __init__(
    self,
    inChannels: int,
    nClasses: int = 1,
    depthLevels: int = 4,
    skipConnections: bool = True
  ):
    super(Decoder, self).__init__()
    self.depthLevels = depthLevels
    self.skipConnections = skipConnections

    # Initialize list of modules
    self.layers = torch.nn.ModuleList()

    # Initialize out channels
    outChannels = inChannels // 2

    # Loop to generate all layers
    for _ in range(depthLevels):

      # Up block
      self.layers.append(
        up(
          inChannels = inChannels,
          outChannels = outChannels,
          k = 2
        )
      )

      # Convolutional block
      self.layers.append(
        convBlock(
          inChannels = outChannels * 2 if self.skipConnections else outChannels,
          outChannels = outChannels,
          k = 3
        )
      )

      # Update in and out channels
      inChannels = outChannels
      outChannels = inChannels // 2

    # Final (last layer of the decoder)
    self.layers.append(
      torch.nn.Conv2d(
        in_channels = inChannels,
        out_channels = nClasses,
        kernel_size = (3, 3),
        stride = 1,
        padding = 'same'
      )
    )

  def forward(self, features: list):

    # Get bottleneck features
    x = features[-1]

    # Forward loop with or without skip-connections
    for i in range(self.depthLevels):
      x = self.layers[2 * i](x)
      if self.skipConnections:
        x = torch.cat([x, features[-(i + 2)]], dim = 1)
      x = self.layers[2 * i + 1](x)
    return self.layers[-1](x)

#-----------------------------------------------------------------------------#

class UNet(torch.nn.Module):
  def __init__(
    self,
    inChannels: int = 3,
    nClasses: int = 1,
    initChannels: int = 16,
    depthLevels: int = 4,
    skipConnections: bool = True
  ):
    super(UNet, self).__init__()

    # Encoder
    self.encoder = Encoder(
      inChannels = inChannels,
      initChannels = initChannels,
      depthLevels = depthLevels
    )

    # Decoder
    self.decoder = Decoder(
      inChannels = self.encoder.featureDim,
      nClasses = nClasses,
      depthLevels = depthLevels,
      skipConnections = skipConnections
    )

  def forward(self, x: torch.tensor):
    features = self.encoder(x)
    return self.decoder(features)


# 3.4. Training model


In [32]:
def diceCELoss(preds: torch.Tensor, targs: torch.Tensor):

  # Compute probabilities with a sigmoid function
  preds = torch.sigmoid(preds[:, 0])

  # Compute binary cross entropy
  bce = torch.nn.functional.binary_cross_entropy_with_logits(preds, targs.float())

  # Compute intersection and union
  inter = (preds * targs).sum()
  union = preds.sum() + targs.sum()

  # Compute dice and return 1 - dice
  dice = (2 * inter + 1e-6) / (union + 1e-6)

  return 0.8 * (1 - dice) + 0.4 * bce

In [33]:
def train(
  model: torch.nn.Module,
  optimizer,
  criterion,
  epochs: int,
  dlT: torch.utils.data.DataLoader,
  dlV: torch.utils.data.DataLoader,
  dev: torch.DeviceObjType
):

  # Number of steps for the training and validation dataset
  stepsT = len(dlT)
  stepsV = len(dlV)

  # Logs dataframe
  logs = {
    'epoch': list(),
    'time': list(),
    'lossT': list(),
    'lossV': list()
  }

  # Move model to device
  model.to(device = dev)

  # Traing loop
  for epoch in range(epochs):

    # Initialize time
    t0 = time.time()

    # Update logs
    logs['epoch'].append(epoch)
    logs['lossT'].append(0)
    logs['lossV'].append(0)

    # Set model in training mode
    model.train()

    # Training batch loop
    for batch in dlT:

      # Read batch and move to device
      img, mas = batch
      img = img.to(device = dev)
      mas = mas.to(device = dev)

      # Reset gradient
      optimizer.zero_grad()

      # Forward pass
      preds = model(img)

      # Compute loss
      loss = criterion(preds, mas)

      # Backward pass
      loss.backward()
      optimizer.step()

      # Update logs
      logs['lossT'][-1] += loss.item()

    # Set model in evaluation mode
    model.eval()

    # Disable gradient calculations
    with torch.no_grad():

      # Validation batch loop
      for batch in dlV:

        # Read batch and move to device
        img, mas = batch
        img = img.to(device = dev)
        mas = mas.to(device = dev)

        # Forward pass
        preds = model(img)

        # Compute loss
        loss = criterion(preds, mas)

        # Update logs
        logs['lossV'][-1] += loss.item()

      # Update logs based on the number of steps
      logs['lossT'][-1] /= stepsT
      logs['lossV'][-1] /= stepsV

    # Calculate time
    t1 = time.time()
    t = (t1 - t0) / 60.0
    logs['time'].append(t)

    # Print progress
    print((
      f'Epoch [{epoch} / {epochs}] | '
      f'Time: {logs['time'][-1]: .2f} | '
      f'T. Loss: {logs['lossT'][-1]: .2f} | '
      f'V. Loss: {logs['lossV'][-1]: .2f} | '
    ))

  return logs

In [34]:
# Check device
dev = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Device: {dev}')

# Model initialization
model = UNet(inChannels = 3, nClasses = 1, initChannels = 16, depthLevels = 4)

# Base dataset
dsT = BreastCancerDataset(
  fol = os.path.join(pathRoot, 'DATASET/TRAIN'),
  tra = torchvision.transforms.ToTensor()
)
dsV = BreastCancerDataset(
  fol = os.path.join(pathRoot, 'DATASET/VALIDATION'),
  tra = torchvision.transforms.ToTensor()
)

# Set seed for reproducibility
torch.manual_seed(42)

# Dataloaders
dlT = torch.utils.data.DataLoader(
  dataset = dsT,
  batch_size = 16,
  shuffle = True
)
dlV = torch.utils.data.DataLoader(
  dataset = dsV,
  batch_size = 16,
  shuffle = False
)

# Define criterion and optimizer
criterion = diceCELoss
optimizer = torch.optim.AdamW(
  params = model.parameters(),
  lr = 1e-3,
  weight_decay = 1e-4
)

# Train
logs = train(
  model = model,
  optimizer = optimizer,
  criterion = criterion,
  epochs = 100,
  dlT = dlT,
  dlV = dlV,
  dev = dev
)

Device: cuda
Epoch [0 / 100] | Time:  0.58 | T. Loss:  0.99 | V. Loss:  0.98 | 
Epoch [1 / 100] | Time:  0.57 | T. Loss:  0.87 | V. Loss:  1.00 | 
Epoch [2 / 100] | Time:  0.56 | T. Loss:  0.82 | V. Loss:  1.04 | 
Epoch [3 / 100] | Time:  0.56 | T. Loss:  0.75 | V. Loss:  0.91 | 
Epoch [4 / 100] | Time:  0.56 | T. Loss:  0.72 | V. Loss:  0.73 | 
Epoch [5 / 100] | Time:  0.56 | T. Loss:  0.69 | V. Loss:  0.66 | 
Epoch [6 / 100] | Time:  0.56 | T. Loss:  0.67 | V. Loss:  0.86 | 
Epoch [7 / 100] | Time:  0.56 | T. Loss:  0.69 | V. Loss:  0.81 | 
Epoch [8 / 100] | Time:  0.56 | T. Loss:  0.67 | V. Loss:  0.62 | 
Epoch [9 / 100] | Time:  0.57 | T. Loss:  0.66 | V. Loss:  0.64 | 
Epoch [10 / 100] | Time:  0.56 | T. Loss:  0.66 | V. Loss:  0.61 | 
Epoch [11 / 100] | Time:  0.57 | T. Loss:  0.64 | V. Loss:  0.72 | 
Epoch [12 / 100] | Time:  0.55 | T. Loss:  0.64 | V. Loss:  0.62 | 
Epoch [13 / 100] | Time:  0.56 | T. Loss:  0.65 | V. Loss:  0.71 | 
Epoch [14 / 100] | Time:  0.56 | T. Loss:  0.