# EDA

In [None]:
import os
import numpy as np
import pandas as pd

datapath = '/kaggle/input/lgg-mri-segmentation/kaggle_3m/'

DATA = []

# Reading all file directories into DataFrame of columns [scanID, scanpath, maskpath]
for scanID in os.listdir(datapath):
    if scanID not in ['README.md', 'data.csv']:
        for sliceID in os.listdir(datapath + scanID):
            if '_mask' not in sliceID:
                DATA.append([scanID, datapath + scanID + '/' + sliceID, datapath + scanID + '/' + sliceID[:-4] + '_mask' + sliceID[-4:] ])
                
DATA = pd.DataFrame(DATA, columns = ['scanID', 'scanpath', 'maskpath'])
SCANID_LIST = DATA['scanID'].unique()
DATA.head(10)

In [None]:
def show(im):
    '''
        Takes in np array of shape H*W*3 or Tensor of
        shape 3*H*W, displays it.
    '''
    if im.shape[0] == 3:
        im = np.moveaxis(im.cpu().detach().numpy(), 0, -1)
    plt.figure()
    plt.imshow(im)

import re
LAST_NUMBER_RE = re.compile('.*_(\d+).*')
def last_number(imname):
    '''
        Takes in string of image name, returns identifying
        number at the end of the image name.
    '''
    search = LAST_NUMBER_RE.search(imname)
    return int(search.groups()[0])

def order_filenames(filenames):
    '''
        Sorts image names by identifying number at the end.
    '''
    return sorted(
        filenames,
        key = lambda slice : last_number(slice[1]['scanpath'])
    )

def compare_segmentation(im, mask):
    '''
        Takes in Image and Mask, displays them side by side.
    '''
    fig, (ax1, ax2) = plt.subplots(1, 2)
    ax1.imshow(im)
    ax2.imshow(mask)
    ax1.set_title('Image')
    ax2.set_title('Mask')

In [None]:
import matplotlib.pyplot as plt
import cv2

images = []
masks  = []

# Index of patients' images to look at
SCANID_IDX = 6

# Select all images with this patient's index
for index, slice in order_filenames(
    DATA[DATA['scanID'] == SCANID_LIST[SCANID_IDX] ].iterrows() ):
    im   = cv2.imread(slice['scanpath'])
    mask = cv2.imread(slice['maskpath'])
    images.append(im)
    masks.append(mask)

# Loading in Data

In [None]:
from sklearn.decomposition import PCA
np.seterr(divide='ignore', invalid='ignore')

pca_eigenvector_project = PCA() # So we don't keep remaking our PCA object
def eigenvector_project(image, channel_first = False):
    '''
        Args:
            image:
                (H, W, 3) numpy array
        Desc:
            Orthogonal projection from image RGB point cloud to 
            their eigenvectors after centering
        Returns:
            (256, 256, 3) numpy array
    '''
    global pca_eigenvector_project
    if image.shape == (3, 256, 256):
        image = np.moveaxis(image, 0, -1)
    if channel_first:
        return np.moveaxis(pca_eigenvector_project.fit_transform(image.reshape(-1, 3) ).reshape(256, 256, 3), -1, 0)
    else:
        return pca_eigenvector_project.fit_transform(image.reshape(-1, 3) ).reshape(256, 256, 3)

In [None]:
images = []
masks  = []
for index, row in DATA.iterrows():
    images.append( eigenvector_project(cv2.imread(row['scanpath']), channel_first = True ) )
    masks.append( eigenvector_project(cv2.imread(row['maskpath']), channel_first = True ) )

In [None]:
import torch
def CUDA_load(dataset, transpose_data = False, dtype_X = torch.float, dtype_y = torch.float):
    '''
        Pass in iterable in form [ (X0, y0), ..., (Xn, yn) ] (transpose_data = False) 
        or [ (X0, ..., Xn), (y0, ..., yn) ] (transpose_data = True), as well as 
        data types to convert X and y to if defaults are not desired.
        
        Returns object to pass into DataLoader as 'dataset' argument. Done this way to
        allow for custom DataLoader to be used.
    '''
    if not transpose_data:
        return list([torch.tensor(X, device = 'cuda', dtype = dtype_X), torch.tensor(y, device = 'cuda', dtype = dtype_y)] for X, y in dataset)
    else:
        return list([torch.tensor(X, device = 'cuda', dtype = dtype_X), torch.tensor(y, device = 'cuda', dtype = dtype_y)] for X, y in zip(*dataset) )

In [None]:
from torch.utils.data import Dataset as DSMixin, DataLoader

class ShuffleLoader(DSMixin):
    '''
        Standard torch.utils.data.DataLoader, with reassigned batches every
        run-through to prevent overfitting to one batch distribution.
    '''
    def __init__(self, dataset, batch_size, shuffle = True):
        super().__init__()
        self.loader = torch.utils.data.DataLoader(
            dataset = dataset,
            batch_size = batch_size,
            shuffle = shuffle
        )
        self.dataset = dataset
        self.batch_size = batch_size
        self.shuffle = shuffle
    
    def __iter__(self):
        def next_generator(loader_instance):
            for X in loader_instance.loader:
                yield X
            loader_instance.loader = torch.utils.data.DataLoader(
                dataset = loader_instance.dataset,
                batch_size = loader_instance.batch_size,
                shuffle = loader_instance.shuffle
            )
        return next_generator(self)

In [None]:
from sklearn.model_selection import train_test_split
# We will use 70-15-15 train/val/test

image_train, image_test, mask_train, mask_test = train_test_split(
    images, masks,
    train_size = .6
)

image_val, image_test, mask_val, mask_test = train_test_split(
    image_test, mask_test,
    train_size = .5
)

In [None]:
train_dataset_CUDA = CUDA_load([image_train, mask_train], transpose_data= True)
val_dataset_CUDA = CUDA_load([image_val, mask_val], transpose_data= True)
test_dataset_CUDA = CUDA_load([image_test, mask_test], transpose_data= True)

In [None]:
BATCH_SIZE = 64
train_loader = ShuffleLoader(dataset = train_dataset_CUDA, batch_size = BATCH_SIZE)
val_loader = ShuffleLoader(dataset = val_dataset_CUDA, batch_size = BATCH_SIZE)
test_loader = ShuffleLoader(dataset = test_dataset_CUDA, batch_size = BATCH_SIZE)

# LOSS FUNCTIONS

In [None]:
def tversky_loss(input, target, eps= 1, alpha= .5, beta= .5):
    '''
        Args:
            input:
                Predicted Tensor to gauge accuracy of. Same size as target.
            target:
                Target Tensor to use as ground truth. Same size as input.
            eps:
                Smoothing value to ensure no division by zero.
            alpha:
                Weight to put on False Positives. Higher value penalizes more.
                Value of .5 for alpha & beta results in standard DICE loss.
            beta:
                Weight to put on False Negatives. Higher value penalizes more.
                Value of .5 for alpha & beta results in standard DICE loss.
        Desc:
            Tversky Loss is DICE Loss (IoU) with separate weights put on
            False Positives and False Negatives. The Union calculation for
            the denominator is framed as:
                Union = True Positive + False Positive + False Negative
            This allows us to put separate weights on False Positives and
            False Negatives, leading to the calculation:
                Union = True Positive + alpha * False Positive + beta * False Negative
            Values of .5 for both parameters create the standard DICE loss.
            Values lie in domain (0, inf).
        Returns:
            Tensor containing 1 - Tversky coefficient, optimal when minimized @ 0.
    '''
    # Flattens mask to single binary image since all 3 channels are the same
    # for all masks in the batch
    target = target[:,0,:,:].reshape(-1)
    input  = input.reshape(-1)

    true_pos = (input * target).sum()    
    false_pos = ( (1-target) * input).sum()
    false_neg = (target * (1-input) ).sum()
    tversky_coef = (true_pos + eps) / (true_pos + alpha*false_pos + beta*false_neg + eps)  

    return 1 - tversky_coef

In [None]:
def DICE_loss(input, target, eps= 1e-5):
    '''
        Args:
            input:
                Predicted Tensor to gauge accuracy of. Same size as target.
            target:
                Target Tensor to use as ground truth. Same size as input.
            eps:
                Smoothing value to ensure no division by zero.
        Desc:
            DICE Loss function computes 1 - DICE coefficient. DICE coefficient
            is representation of Intersection over Union. Formula is:
                2 * |Input && Target| / ( |Input| + |Target| )
            For |...| sybolizing cardinality of a set.
            Since input can include soft probabilities as well as hard 1/0,
            the cardinality of an input is the sum.
        Returns:
            Tensor containing 1 - DICE coefficient, optimal when minimized @ 0
    '''
    intersection = (input * target).view(input.shape[0], -1).sum(axis= -1)
    union = input.view(input.shape[0], -1).sum(axis= -1) + target.view(target.shape[0], -1).sum(axis= -1)
    return (1 - 2*intersection/(union + eps) ).sum()

# Network

In [None]:
import torch
import torch.nn as nn

In [None]:
class UpConv2d_2x2(nn.Module):
    '''
        Up-Convolution operation that consists of Bilinear upsampling +
        2x2 Convolution
    '''
    def __init__(self, in_channels, out_channels, padding= 1):
        super().__init__()
        self.pipeline= nn.Sequential(
            nn.Upsample(scale_factor= 2, mode= 'bilinear'),   
            # Top left of 2x2 Conv filter is output location, so we may pad bottom+right by 1
            nn.ReflectionPad2d(padding= (0, padding, 0, padding) ),
            nn.Conv2d(in_channels= in_channels, out_channels= out_channels, kernel_size= 2)
        )
    
    def forward(self, X):
        return self.pipeline(X)

class SingleImageSegmenter5(nn.Module):
    '''
        Convolutional Autoencoder with Residual Connections (U-Net with modifications). 
        Generates binary segmentation mask for single image, given single image as input.
        In contrast to a multi-scan model, which this model can be used in a pipeline of.
    '''
    def __init__(self):
        super().__init__()

        self.encoder_block_1 = nn.Sequential(
            nn.BatchNorm2d(3),
            nn.Conv2d(in_channels= 3, out_channels= 16, kernel_size= 3, padding= 1), # 256 * 256 * 3 -> 256 * 256 * 16
            nn.LeakyReLU(),
            nn.Conv2d(in_channels= 16, out_channels= 16, kernel_size= 3, padding= 1), # 256 * 256 * 16 -> 256 * 256 * 16
            nn.LeakyReLU(),
            nn.BatchNorm2d(16)
        )

        self.encoder_block_2 = nn.Sequential(
            nn.MaxPool2d(kernel_size= 2, stride= 2), # 256 * 256 * 16 -> 128 * 128 * 16
            nn.Conv2d(in_channels= 16, out_channels= 32, kernel_size= 3, padding= 1), # 128 * 128 * 16 -> 128 * 128 * 32
            nn.LeakyReLU(),
            nn.Conv2d(in_channels= 32, out_channels= 32, kernel_size= 3, padding= 1), # 128 * 128 * 32 -> 128 * 128 * 32
            nn.LeakyReLU(),
            nn.BatchNorm2d(32)
        )

        self.encoder_block_3 = nn.Sequential(
            nn.MaxPool2d(kernel_size= 2, stride= 2), # 128 * 128 * 32 -> 64 * 64 * 32
            nn.Conv2d(in_channels= 32, out_channels= 64, kernel_size= 3, padding= 1), # 64 * 64 * 32 -> 64 * 64 * 64
            nn.LeakyReLU(),
            nn.Conv2d(in_channels= 64, out_channels= 64, kernel_size= 3, padding= 1), # 64 * 64 * 64 -> 64 * 64 * 64
            nn.LeakyReLU(),
            nn.BatchNorm2d(64)
        )

        self.latent_block = nn.Sequential(
            nn.MaxPool2d(kernel_size= 2, stride= 2), # 64 * 64 * 64 -> 32 * 32 * 64
            nn.Conv2d(in_channels= 64, out_channels= 128, kernel_size= 3, padding= 1), # 32 * 32 * 64 -> 32 * 32 * 128
            nn.LeakyReLU(),
            nn.Conv2d(in_channels= 128, out_channels= 64, kernel_size= 3, padding= 1), # 32 * 32 * 128 -> 32 * 32 * 64
            nn.LeakyReLU(),
            UpConv2d_2x2(in_channels= 64, out_channels= 64), # 32 * 32 * 64 -> 64 * 64 * 64
            nn.BatchNorm2d(64)
        )

        self.decoder_block_1 = nn.Sequential(
            nn.Conv2d(in_channels= 128, out_channels= 64, kernel_size= 3, padding= 1), # 64 * 64 * 128 -> 64 * 64 * 64
            nn.LeakyReLU(),
            nn.Conv2d(in_channels= 64, out_channels= 32, kernel_size= 3, padding= 1), # 64 * 64 * 64 -> 64 * 64 * 32
            nn.LeakyReLU(),
            UpConv2d_2x2(in_channels= 32, out_channels= 32), # 64 * 64 * 32 -> 128 * 128 * 32
            nn.BatchNorm2d(32)
        )

        self.decoder_block_2 = nn.Sequential(
            nn.Conv2d(in_channels= 64, out_channels= 32, kernel_size= 3, padding= 1), # 128 * 128 * 64 -> 128 * 128 * 32
            nn.LeakyReLU(),
            nn.Conv2d(in_channels= 32, out_channels= 16, kernel_size= 3, padding= 1), # 128 * 128 * 32 -> 128 * 128 * 16
            nn.LeakyReLU(),
            UpConv2d_2x2(in_channels= 16, out_channels= 16), # 128 * 128 * 16 -> 256 * 256 * 16
            nn.BatchNorm2d(16)
        )

        self.decoder_block_3 = nn.Sequential(
            nn.Conv2d(in_channels= 32, out_channels= 16, kernel_size= 3, padding= 1), # 256 * 256 * 32 -> 256 * 256 * 16
            nn.LeakyReLU(),
            nn.Conv2d(in_channels= 16, out_channels= 8, kernel_size= 3, padding= 1), # 256 * 256 * 16 -> 256 * 256 * 8
            nn.LeakyReLU(),
            nn.Conv2d(in_channels= 8, out_channels= 1, kernel_size= 1), # 256 * 256 * 8 -> 256 * 256 * 1
            nn.Sigmoid()
        )
    
    def forward(self, X):
        X_resout_1 = self.encoder_block_1(X)
        X_resout_2 = self.encoder_block_2(X_resout_1)
        X_resout_3 = self.encoder_block_3(X_resout_2)
        X_resin_1 = self.latent_block(X_resout_3)
        X_resin_1 = torch.cat([
            X_resin_1,
            X_resout_3
        ],
        dim= 1)

        X_resin_2 = self.decoder_block_1(X_resin_1)
        X_resin_2 = torch.cat([
            X_resin_2,
            X_resout_2
        ],
        dim= 1)

        X_resin_3 = self.decoder_block_2(X_resin_2)
        X_resin_3 = torch.cat([
            X_resin_3,
            X_resout_1
        ],
        dim= 1)

        return self.decoder_block_3(X_resin_3)


# Training Loop

In [None]:
from torch.optim import Adam

def train(net, train_loader, val_loader, learning_rate, loss_function, epochs):
    optimizer = Adam(params = net.parameters(), lr = learning_rate)
    
    for epoch in range(epochs):
        training_loss = 0
        
        net.train()
        for batchnum, (image, mask) in enumerate(train_loader):
            optimizer.zero_grad()
            
            pred = net(image/255)
            loss = loss_function(pred, mask/255)
            
            loss.backward()
            optimizer.step()
            
            training_loss += loss.item()
        
        print("Training loss for epoch {} is {}".format(epoch, training_loss/(batchnum+1) ) )
        
        net.eval()
        val_loss = 0
        with torch.no_grad():
            for batchnum, (image, mask) in enumerate(val_loader):
                pred = net(image/255)
                val_loss += loss_function(pred, mask/255).item()
        
        print("Val loss for epoch {} is {}".format(epoch, val_loss/(batchnum+1) ) )

In [None]:
net = SingleImageSegmenter5().cuda()

In [None]:
train(net, train_loader, val_loader, learning_rate = .0001,
      loss_function = DICE_loss, epochs = 2)
# Both DICE_loss and tversky_loss are provided here, can use native PyTorch losses as well. 
# Toggle loss & tversky coefficients periodically depending on model output

In [None]:
# Optional
torch.save(net.state_dict(), "model.pt")