# 0. Change the Runtime to GPU

Runtime -> Change runtime type -> "T4 GPU" -> Save

# 1. Importing the Segmentation Dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# View the content of the dataset

### Start Code Here
!ls >>>path_to_dataset<<<
### End

In [None]:
import os
import glob
import random

import numpy as np
import pandas as pd

import cv2
import matplotlib.pyplot as plt

plt.style.use("dark_background")

In [None]:
### Start Code Here
ROOT_DIR = >>>path_to_dataset<<<
### End

In [None]:
# Make objects with the images and masks
import glob

brain_scans = []
mask_files = glob.glob(f'{ROOT_DIR}/*/*_mask*')

for i in mask_files:
    brain_scans.append(i.replace('_mask',''))

# Print 10 input and mask files
print(brain_scans[:10])
print(mask_files[:10])

In [None]:
# Make a dataframe with the images and their corresponding masks
import pandas as pd

### Start Code Here
img_df = pd.DataFrame({
    "image_path":>>>brain_scans<<<,
    "mask_path":>>>mask_files<<<
})
### End
print(img_df.head(5))

In [None]:
from concurrent.futures import ThreadPoolExecutor
from tqdm.notebook import tqdm

# Adding A/B column for diagnosis
def positiv_negativ_diagnosis(mask_path):
    value = np.max(cv2.imread(mask_path))
    if value > 0 : return 1
    else: return 0# How many non-tumors (0) and tumors (1) are in the data

def process_images(mask_paths):
    results = []
    with ThreadPoolExecutor() as executor:
        results = list(tqdm(executor.map(positiv_negativ_diagnosis, mask_paths), total=len(mask_paths)))
    return results

# Apply the function to the masks and return back a column with 1 and zeros, where 0 indicates no tumor and 1 a tumor
img_df["diagnosis"] = process_images(img_df["mask_path"].tolist())
img_df.head()

# 2. Data Visualization

In [None]:
# How many non-tumors (0) and tumors (1) are in the data
img_df['diagnosis'].value_counts()

### Plot the MRI, Mask and overlay image

In [None]:
from mpl_toolkits.axes_grid1.axes_grid import ImageGrid
# Data
sample_df = img_df[img_df["diagnosis"] == 1].sample(5).values
sample_imgs = []

for i, data in enumerate(sample_df):
    img = cv2.resize(cv2.imread(data[0]), (256, 256))
    mask = cv2.resize(cv2.imread(data[1]), (256, 256))
     # Plot the Brain MRI scan with their mask
    main = img.copy()
    sample = np.array(np.squeeze(mask), dtype = np.uint8)
    contours, hier = cv2.findContours(sample[:,:,0],cv2.RETR_LIST,cv2.CHAIN_APPROX_SIMPLE)
    sample_over_gt = cv2.drawContours(main, contours, -1,[255,0,0], thickness=-1)

    sample_imgs.extend([img, mask, sample_over_gt])

sample_imgs_arr = np.hstack(np.array(sample_imgs[::3]))
sample_masks_arr = np.hstack(np.array(sample_imgs[1::3]))
sample_over_gt_arr = np.hstack(np.array(sample_imgs[2::3]))

# Plot
fig = plt.figure(figsize=(25., 25.))
grid = ImageGrid(fig, 111,  # similar to subplot(111)
                 nrows_ncols=(3, 1),  # creates 3x1 grid of axes
                 axes_pad=0.1,  # pad between axes in inch.
                 )

grid[0].imshow(sample_imgs_arr)
grid[0].set_title("Images", fontsize=15)
grid[0].axis("off")

grid[1].imshow(sample_masks_arr)
grid[1].set_title("Masks", fontsize=15, y=0.9)
grid[1].axis("off")

grid[2].imshow(sample_over_gt_arr)
grid[2].set_title("MRI Brain with highlighted Tumor", fontsize=15)
grid[2].axis("off")

plt.show()


# 3. Dataset split and DataLoaders

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

from sklearn.model_selection import train_test_split

class BrainMRIDataset(Dataset):
    def __init__(self, df, transforms):

        self.df = df
        self.transforms = transforms

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        image = cv2.imread(self.df.iloc[idx, 0])
        mask = cv2.imread(self.df.iloc[idx, 1], 0)

        # Normalize mask from 0-255 to 0-1
        ### Start Code Here
        mask = >>>image_nomrmalization<<<

        augmented = self.transforms(
            >>>image<<<,
            >>>mask<<<
            )
        ### End

        image = augmented['image']
        mask = augmented['mask']

        return image, mask

## Image transformation

In [None]:
### Start Code Here
transforms = A.Compose([
    >>>horizontal_flip<<<,
    >>>vertical_flip<<<,
    >>>random_rotate_90<<<,
    >>>transpose<<<,
    A.ShiftScaleRotate(shift_limit=0.01, scale_limit=0.04, rotate_limit=0, p=0.25),
    A.Normalize(p=1.0),
    ToTensorV2(),
])
### End

## Train and test data split

In [None]:
### Start Code Here
# Split df into train_df and val_df
train_df, val_df = >>>train_val_split<<<
train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)

# Split train_df into train_df and test_df
train_df, test_df = >>>train_test_split<<<
train_df = train_df.reset_index(drop=True)

test_df = test_df.reset_index(drop=True)
### End
#train_df = train_df[:1000]
print(f"Train: {train_df.shape} \nVal: {val_df.shape} \nTest: {test_df.shape}")

## Dataloader

In [None]:
# train
import multiprocessing
num_workers = multiprocessing.cpu_count()
batch_size = 4

### Start Code Here
train_dataset = BrainMRIDataset(df=train_df, transforms=transforms)
train_dataloader = >>>train_dataloader<<<

# val
val_dataset = >>>val_dataset<<<
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, num_workers=num_workers, shuffle=True)

#test
test_dataset = >>>test_dataset<<<
test_dataloader = >>>test_dataloader<<<
### End

# 4. UNet Model

In [None]:
import torch
from torch import nn

DEVICE = >>>assign_to_cuda_if_available<<<


In [None]:
class VanillaUNet(nn.Module):

    def __init__(self, n_classes):
        super().__init__()

        ### Start Code Here
        self.conv_down1 = self.double_conv(3, >>>output_channel<<<)
        self.conv_down2 = self.double_conv(>>>in_channel<<<, 128)
        self.conv_down3 = self.double_conv(128, 256)
        self.conv_down4 = self.double_conv(256,>>>out_channel<<<)
        ### End

        self.maxpool = nn.MaxPool2d(2)
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

        self.conv_up3 = self.double_conv(256 + 512, 256)
        self.conv_up2 = self.double_conv(128 + 256, 128)
        self.conv_up1 = self.double_conv(128 + 64, 64)

        self.last_conv = nn.Conv2d(64, n_classes, kernel_size=1)

    def double_conv(self, in_channels, out_channels):
        ### Start Code Here
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, >>>kernel_size<<<, padding=>>>padding_size<<<),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, >>>kernel_size<<<, padding=>>>padding_size<<<),
            >>>relu operation<<<<,
            )
        ### End

    def forward(self, x):
        # Batch - 1d tensor.  N_channels - 1d tensor, IMG_SIZE - 2d tensor.
        # Example: x.shape >>> (10, 3, 256, 256).
        ### Start Code Here
        conv1 = >>>output_of_the_first_double_convolution<<<    # <- BATCH, 3,   IMG_SIZE -> BATCH,  64, IMG_SIZE..
        x     = self.maxpool(conv1)                             # <- BATCH, 64,  IMG_SIZE -> BATCH,  64, IMG_SIZE 2x down.
        conv2 = self.conv_down2(x)                              # <- BATCH, 64,  IMG_SIZE -> BATCH, 128, IMG_SIZE.
        x     = self.maxpool(conv2)                             # <- BATCH, 128, IMG_SIZE -> BATCH, 128, IMG_SIZE 2x down.
        conv3 = self.conv_down3(x)                              # <- BATCH, 128, IMG_SIZE -> BATCH, 256, IMG_SIZE.
        x     = >>>max_pooling_operation<<<<                    # <- BATCH, 256, IMG_SIZE -> BATCH, 256, IMG_SIZE 2x down.
        x     = self.conv_down4(x)                              # <- BATCH, 256, IMG_SIZE -> BATCH, 512, IMG_SIZE.
        x     = self.upsample(x)                                # <- BATCH, 512, IMG_SIZE -> BATCH, 512, IMG_SIZE 2x up.

        #(Below the same)                                 N this       ==        N this.  Because the first N is upsampled.
        x    = >>>concatenation_of_the_two_feature<<<<         # <- BATCH, 512, IMG_SIZE & BATCH, 256, IMG_SIZE--> BATCH, 768, IMG_SIZE.

        x    = self.conv_up3(x)                                 #  <- BATCH, 768, IMG_SIZE --> BATCH, 256, IMG_SIZE.
        x    = self.upsample(x)                                 #  <- BATCH, 256, IMG_SIZE -> BATCH,  256, IMG_SIZE 2x up.
        x    = torch.cat([x, conv2], dim=1)                     # <- BATCH, 256,IMG_SIZE & BATCH, 128, IMG_SIZE --> BATCH, 384, IMG_SIZE.

        x    = self.conv_up2(x)                                 # <- BATCH, 384, IMG_SIZE --> BATCH, 128 IMG_SIZE.
        x    = self.upsample(x)                                 # <- BATCH, 128, IMG_SIZE --> BATCH, 128, IMG_SIZE 2x up.
        x    = torch.cat([x, conv1], dim=1)                     # <- BATCH, 128, IMG_SIZE & BATCH, 64, IMG_SIZE --> BATCH, 192, IMG_SIZE.

        x    = self.conv_up1(x)                                 # <- BATCH, 128, IMG_SIZE --> BATCH, 64, IMG_SIZE.

        out  = self.last_conv(x)                                # <- BATCH, 64, IMG_SIZE --> BATCH, n_classes, IMG_SIZE.


        >>>sigmoid_of_the_output_logits<<<
        ### End
        return out

## Model visualization

In [None]:
model = VanillaUNet(n_classes=1).to(DEVICE)
### Start Code Here
output = model(torch.randn([>>>input_shape<<<]).to(DEVICE))
print(>>>output_shape<<<)
### End

In [None]:
from torchsummary import summary
### Start Code Here
>>>summary_of_the_model<<<
### End

In [None]:
def dice_coef_loss(inputs, target):
    smooth = 1.0
    intersection = 2.0 * ((target * inputs).sum()) + smooth
    union = target.sum() + inputs.sum() + smooth

    return 1 - (intersection / union)

### Start Code Here
def bce_dice_loss(inputs, target):
    dicescore = dice_coef_loss(inputs, target)
    bcescore = >>>PyTorch_BCE_loss<<<
    bceloss = bcescore(inputs, target)

    return bceloss + dicescore
### End

loss_fn = bce_dice_loss

## Model training

In [None]:
from tqdm import tqdm

def train_model(model, loss_fn, optimizer, train_dataloader):
    model.train()
    losses = 0

    print("Training Phase ...")
    pbar = tqdm(range(len(train_dataloader)))
    for data, labels in train_dataloader:
        ### Start Code Here
        data = data.to(>>>device<<<)
        labels = labels.to(>>>device<<<)  # Convert labels to Float
        optimizer.zero_grad()

        >>>pass_the_data_through_the model<<<
        logits = >>>model<<<

        loss = loss_fn(logits.squeeze(1), labels.float())
        >>>add_loss<<<<

        >>>back_propagation<<<
        ### End

        optimizer.step()

        pbar.update()

    return losses / len(train_dataloader)

def evaluate_model(model, loss_fn, val_dataloader):
    ### Start Code Here
    >>>change_the_model_to_eval_mode<<<<
    losses = 0

    print("Validation Phase ...")
    qbar = tqdm(range(len(val_dataloader)))
    for data, labels in val_dataloader:
        data = data.to(DEVICE)
        labels = labels.to(DEVICE)  # Convert labels to Float

        logits = model(data)

        >>>loss_calculation_and_add_to_losses<<<<
        ### End
        qbar.update()

    return losses / len(val_dataloader)

## Define optimizer

In [None]:
### Start Code Here
>>>define_the_optimizer <<<
### End

## Train the model

In [None]:
epochs = 10
checkpoint_dir = "checkpoints"
os.makedirs(checkpoint_dir, exist_ok=True)

for epoch in range(1, epochs + 1):
    print("-" * 10)
    print(f"Epoch {epoch}/{epochs}")
    print("-" * 10)
    ### Start Code Here
    >>>train_the_model<<<
    >>>validate_the_model<<<

    print(f"Epoch: {epoch}, Train loss: {>>>train_loss <<< :.3f}, Val loss: {>>>validation_loss<<<:.3f}")

    # Save model checkpoint
    torch.save(model.state_dict(), f'{checkpoint_dir}/unet_model_epoch_{epoch}.pth')

>>>save_the_final_model<<<
### End

### visualize the prediction

In [None]:
import matplotlib.pyplot as plt

### Start Code Here
model = >>>load_the_model <<<<
### End

def inference_pipeline(model, test_dataloader, device, threshold=0.3):
    ### Start Code Here
    >>> convert_the_model_to_eval_mode <<<
    with torch.no_grad():
        for data, labels in test_dataloader:
            data = data.to(device)
            labels = labels.to(device)

            # Get predictions
            >>> run the inference <<<
            >>> convert the prediction to numpy <<<<

            # Apply threshold
            preds_t = >>> process the prediction <<<)
            ### End

            # Plot results for the first batch
            fig, ax = plt.subplots(nrows=3, ncols=4, figsize=(15, 10))
            for i in range(len(data)):
                # Original image
                img = data[i].permute(1, 2, 0).cpu().numpy()
                img = (img - img.min()) / (img.max() - img.min())  # Normalize to [0, 1]
                ax[0, i].imshow(img)
                ax[0, i].set_title("Image")
                ax[0, i].axis("off")

                # Ground truth mask
                ax[1, i].imshow(labels[i].cpu().numpy(), cmap='gray')
                ax[1, i].set_title("Mask (GT)")
                ax[1, i].axis("off")

                # Prediction
                ax[2, i].imshow(preds_t[i, 0, :, :], cmap='gray')
                ax[2, i].set_title("Prediction")
                ax[2, i].axis("off")
                if i == 3:
                    break

            plt.show()
            break

## Visualize the inference

In [None]:
### Start Code Here
>>> Load the saved  model <<<
>>> run the inference <<<
### End