In [None]:
import os
import pydicom
import numpy as np
import matplotlib.pyplot as plt
import random
import torch
from sklearn.model_selection import train_test_split
from albumentations import Rotate, HorizontalFlip, VerticalFlip, Affine
import albumentations as albu
from torch import nn
from torchsummary import summary
import pandas as pd
import nibabel as nib
import segmentation_models_pytorch as smp
import copy
import time
from tqdm import tqdm
import cv2

# Data Handling

In [None]:
def get_data():
    files_images = os.listdir('images')
    files_mask = os.listdir('GT')

    X = np.zeros((211, 512, 512, 7), dtype='float32')
    Y = np.zeros((211, 512, 512, 7), dtype='float32')

    index = 0
    for i in tqdm(range(len(files_images))):
        img = nib.load(f'images/M{i+1}.nii').get_fdata()
        mask = nib.load(f'GT/M{i+1}SEG.nii').get_fdata()
        
        if img.shape != (7, 512, 512):
            if img.shape == (512, 512, 7):
                X[index] = img
                Y[index] = mask
                index += 1
            continue
        
        X[index] = np.moveaxis(img, 0, -1)
        Y[index] = np.moveaxis(mask, 0, -1)

        index += 1
    return X, Y

In [None]:
X, Y = get_data()
X = X/np.max(X)
X = X * (X > 0)

Y = (Y > 0).astype('float32')

In [None]:
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.3, random_state=42)

# Util Functions

In [None]:
def get_training_augmentation():
    train_transform = [
        albu.Rotate(p=0.5, limit=10, border_mode=0, mask_value=0),
        albu.HorizontalFlip(p=0.5),
        albu.VerticalFlip(p=0.5),
        albu.Affine(p=0.5, translate_px=20)
    ]
    return albu.Compose(train_transform)

In [None]:
class CustomTensorDataset(torch.utils.data.Dataset):
    def __init__(self, volumes, masks, transform=None):
        self.volumes = volumes
        self.masks = masks
        self.transform = transform

    def __getitem__(self, index):
        if self.transform:
            sample = self.transform(image=self.volumes[index, :, :, :], mask=self.masks[index, :, :, :]) 
            volume = np.moveaxis(sample['image'], -1, 0)
            mask = np.moveaxis(sample['mask'], -1, 0)
        else:
            volume = np.moveaxis(self.volumes[index, :, :, :], -1, 0)
            mask = np.moveaxis(self.masks[index, :, :, :], -1, 0)
        
        return torch.FloatTensor(volume).unsqueeze(1), torch.FloatTensor(mask).unsqueeze(1)

    def __len__(self):
        return self.volumes.shape[0]

In [None]:
def get_loaders(pred=False, batch_size=2):
    if pred:
        train_dataset = CustomTensorDataset(X_train, Y_train, transform=get_testing_augmentation())
    else:
        train_dataset = CustomTensorDataset(X_train, Y_train, transform=get_training_augmentation())

    test_dataset = CustomTensorDataset(X_test, Y_test)    
    
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=1)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True, num_workers=1)
    
    return train_loader, test_loader

In [None]:
def dice_coef(y_true, y_pred):
    smooth = 1e-5
    y_true_f = torch.flatten(y_true)
    y_pred_f = torch.flatten(y_pred)
    intersection = torch.sum(y_true_f * y_pred_f)
    return (2.*intersection + smooth)/(torch.sum(y_true_f) + torch.sum(y_pred_f) + smooth)

def dice_coef_loss(y_true, y_pred):
    return 1-dice_coef(y_true, y_pred)

In [None]:
def iou(y_true, y_pred):
    smooth = 1e-5
    y_true_f = torch.flatten(y_true)
    y_pred_f = torch.flatten(y_pred)
    intersection = torch.sum(y_true_f * y_pred_f)
    return (intersection + smooth)/(torch.sum(y_true_f) + torch.sum(y_pred_f) - intersection + smooth)

# Experimenting with augmentation

In [None]:
aug = []
aug.append(Rotate(p=1.0, limit=30, border_mode=0, mask_value=0))
aug.append(HorizontalFlip(p=1.0))
aug.append(VerticalFlip(p=1.0))
aug.append(Affine(p=1.0, translate_px=10))

num_image = 0
num_aug = 3

augmented_1 = aug[num_aug](image=X[num_image], mask=Y[num_image])

In [None]:
fig, ax = plt.subplots(7, 4, figsize=(20, 70))

for i in range(7):
    ax[i, 0].imshow(X[num_image, :, :, i], cmap='gray')
    ax[i, 1].imshow(augmented_1['image'][:, :, i], cmap='gray')
    ax[i, 2].imshow(Y[num_image, :, :, i], cmap='gray')
    ax[i, 3].imshow(augmented_1['mask'][:, :, i], cmap='gray')
    
    
plt.show()

In [None]:
(augmented_1['image'] < 0).sum()

In [None]:
for i in range(7):
    print(np.histogram(augmented_1['image'][:, :, i]))

In [None]:
for i in range(7):
    print(np.histogram(augmented_1['image'][:, :, i]))

# Segmentation models pytorch - library

## Unet/Unet++

In [None]:
train_loader, test_loader = get_loaders(batch_size=1)

In [None]:
ENCODER = 'resnet34'
ENCODER_WEIGHTS = 'imagenet'
CLASSES = ['tissue']
ACTIVATION = 'sigmoid'
DEVICE = 'cuda'

model = smp.UnetPlusPlus(
    encoder_name=ENCODER, 
    encoder_weights=ENCODER_WEIGHTS, 
    classes=len(CLASSES), 
    activation=ACTIVATION,
    in_channels=1,
    encoder_depth=5
)

In [None]:
print(f'The number of parameters : {sum(p.numel() for p in model.parameters())}')

In [None]:
loss = smp.utils.losses.DiceLoss()
metrics = [
    smp.utils.metrics.IoU(threshold=0.5),
]

optimizer = torch.optim.Adam([ 
    dict(params=model.parameters(), lr=1e-4),
])

In [None]:
train_epoch = smp.utils.train.TrainEpoch(
    model, 
    loss=loss, 
    metrics=metrics, 
    optimizer=optimizer,
    device=DEVICE,
    verbose=True,
)

valid_epoch = smp.utils.train.ValidEpoch(
    model, 
    loss=loss, 
    metrics=metrics, 
    device=DEVICE,
    verbose=True,
)

In [None]:
max_score = 0
train_dice_losses = []
train_iou_scores = []
valid_dice_losses = []
valid_iou_scores = []
file = 'unetplusplus_mri_loc_resnet34_imagenet_1.pt'

epochs = 100

for i in range(epochs):
    
    print('\nEpoch: {}'.format(i))
    train_logs = train_epoch.run(train_loader)
    valid_logs = valid_epoch.run(test_loader)
    
    train_dice_losses.append(train_logs['dice_loss'])
    train_iou_scores.append(train_logs['iou_score'])
    
    valid_dice_losses.append(valid_logs['dice_loss'])
    valid_iou_scores.append(valid_logs['iou_score'])
    
    if max_score < valid_logs['iou_score']:
        max_score = valid_logs['iou_score']
        torch.save(model.state_dict(), file)
        print('Model saved!')
        
    if i == 10:
        optimizer.param_groups[0]['lr'] = 5e-5

In [None]:
torch.save(model.state_dict(), file)

In [None]:
plt.plot(train_dice_losses)
plt.show()

In [None]:
plt.plot(valid_dice_losses)
plt.show()

In [None]:
plt.plot(train_iou_scores)
plt.show()

In [None]:
plt.plot(valid_iou_scores)
plt.show()

### Experiments Conducted

#### UnetPlusPlus Resnet34 imagenet

##### Experiment 1

Initial Learning rate: 1e-4

10: 5e-5

100 epochs

#### Unet Resnet 50 imagenet

##### Experiment 1

Initial Learning rate: 1e-4

20: 5e-5

100 epochs

#### Unet Resnet 34 without imagenet

##### Experiment 1

Initial Learning rate: 1e-4

50: 5e-5

120 epochs

#### Unet Resnet 34 imagenet

##### Experiment 1

Initial Learning rate: 1e-4

100 epochs

##### Experiment 2

Initial Learning rate: 1e-4

20: 5e-5

100 epochs


#### VGG16 imagenet

##### Experiment 1

Initial Learning rate: 1e-4

50 epochs

##### Experiment 2

Initial Learning rate: 1e-4

50: 5e-5

100 epochs





