In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
!pip install nibabel
!pip install nilearn
!pip install scikit-image

import sys
import os

import numpy as np
import tensorflow as tf
import pandas as pd
import nibabel as nib
import nilearn as nil
import scipy.ndimage as ndi
import matplotlib.pyplot as plt
from skimage.util import montage
from skimage.transform import rotate,resize
from sklearn.model_selection import train_test_split
from keras.src import Input
from keras.src.models import Model
from keras.src.callbacks import EarlyStopping,ModelCheckpoint,ReduceLROnPlateau
from keras.src.metrics import Precision,Recall
import cv2
import keras

Collecting nilearn
  Downloading nilearn-0.11.1-py3-none-any.whl.metadata (9.3 kB)
Downloading nilearn-0.11.1-py3-none-any.whl (10.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.5/10.5 MB[0m [31m99.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: nilearn
Successfully installed nilearn-0.11.1


In [4]:
from keras.src.layers import Add,Multiply,ZeroPadding2D,Dropout,ZeroPadding3D, BatchNormalization, LeakyReLU, ReLU, Conv2D, MaxPooling2D, UpSampling2D,  Conv3D, MaxPooling3D, UpSampling3D,concatenate, Input

activations = {
    'relu': ReLU,
    'leaky_relu': LeakyReLU,
}


def Reshape3D(original_block,up):
    if up.shape[1] != original_block.shape[1] or up.shape[2] != original_block.shape[2] or up.shape[3] != original_block.shape[3]:
        diff_depth = original_block.shape[1] - up.shape[1]
        diff_height = original_block.shape[2] - up.shape[2]
        diff_width = original_block.shape[3] - up.shape[3]

        up = ZeroPadding3D(((diff_depth // 2, diff_depth - diff_depth // 2),
            (diff_height // 2, diff_height - diff_height // 2),
            (diff_width // 2, diff_width - diff_width // 2))
        )(up)
    return up

def Reshape2D(original_block, up):
    if up.shape[1] != original_block.shape[1] or up.shape[2] != original_block.shape[2]:
        diff_height = original_block.shape[1] - up.shape[1]
        diff_width = original_block.shape[2] - up.shape[2]

        up = ZeroPadding2D(((diff_height // 2, diff_height - diff_height // 2),
                            (diff_width // 2, diff_width - diff_width // 2)))(up)
    return up

def IdentityBlock(x):

    return x

def Normalize(x, normalize):

    return BatchNormalization()(x) if normalize else x

def DropOut(x, drop_out):

    return Dropout(drop_out)(x) if 0.0 < drop_out < 1.0 else x

def EncoderBlock(input_block, filter_size, kernel_size=3, padding="same",
                 activation="relu", slope=0.0, pool_size=(2,2),
                 batch_normalization=False, drop_out=0.0,kernel_regularizer=None):

    conv = Conv2D(filters=filter_size,kernel_size=kernel_size, padding=padding,kernel_regularizer=kernel_regularizer)(input_block)
    conv = Normalize(conv, batch_normalization)
    conv = activations.get(activation, ReLU)(negative_slope=slope)(conv)

    conv = Conv2D(filters=filter_size, kernel_size=kernel_size, padding=padding,kernel_regularizer=kernel_regularizer)(conv)
    conv = Normalize(conv, batch_normalization)
    conv = activations.get(activation, ReLU)(negative_slope=slope)(conv)

    pool = MaxPooling2D(pool_size=pool_size)(conv)
    pool = DropOut(pool, drop_out)

    return conv, pool

def BottleneckBlock(input_block, filter_size, kernel_size=3, padding="same",
                    activation="relu", slope=0.0, batch_normalization=False,kernel_regularizer=None):

    conv = Conv2D(filters=filter_size, kernel_size=kernel_size, padding=padding,kernel_regularizer=kernel_regularizer)(input_block)
    conv = Normalize(conv, batch_normalization)
    conv = activations.get(activation, ReLU)(negative_slope=slope)(conv)

    conv = Conv2D(filters=filter_size, kernel_size=kernel_size, padding=padding,kernel_regularizer=kernel_regularizer)(conv)
    conv = Normalize(conv, batch_normalization)
    conv = activations.get(activation, ReLU)(negative_slope=slope)(conv)

    return conv

def DecoderBlock(input_block, original_block, filter_size, kernel_size=3, padding="same",
                 activation="relu", slope=0.0, batch_normalization=False, drop_out=0.0,kernel_regularizer=None):

    up = UpSampling2D(size=(2,2))(input_block)
    up = Reshape2D(original_block=original_block,up = up)
    merge = concatenate([original_block, up], axis=3)

    conv = Conv2D(filters=filter_size, kernel_size=kernel_size, padding=padding,kernel_regularizer=kernel_regularizer)(merge)
    conv = Normalize(conv, batch_normalization)
    conv = activations.get(activation, ReLU)(negative_slope=slope)(conv)

    conv = Conv2D(filters=filter_size, kernel_size=kernel_size, padding=padding,kernel_regularizer=kernel_regularizer)(conv)
    conv = Normalize(conv, batch_normalization)
    conv = activations.get(activation, ReLU)(negative_slope=slope)(conv)

    return conv

def OutputBlock(input_block,num_of_classes = 1,kernel_size=1,activation = "sigmoid",padding="same",kernel_regularizer=None):
    return Conv2D(filters=num_of_classes,kernel_size=kernel_size,activation=activation,padding=padding,kernel_regularizer=kernel_regularizer)(input_block)



def EncoderBlock3D(input_block, filter_size, kernel_size=3, padding="same",
                 activation="relu", slope=0.0, pool_size=(2, 2, 2),
                 batch_normalization=False, drop_out=0.0,kernel_regularizer=None):

    conv = Conv3D(filters=filter_size, kernel_size=kernel_size, padding=padding,kernel_regularizer=kernel_regularizer)(input_block)
    conv = Normalize(conv, batch_normalization)
    conv = activations.get(activation, ReLU)(negative_slope=slope)(conv)

    conv = Conv3D(filters=filter_size, kernel_size=kernel_size, padding=padding,kernel_regularizer=kernel_regularizer)(conv)
    conv = Normalize(conv, batch_normalization)
    conv = activations.get(activation, ReLU)(negative_slope=slope)(conv)

    pool = MaxPooling3D(pool_size=pool_size)(conv)
    pool = DropOut(pool, drop_out)

    return conv, pool

def BottleneckBlock3D(input_block, filter_size, kernel_size=3, padding="same",
                    activation="relu", slope=0.0, batch_normalization=False,kernel_regularizer=None):
    conv = Conv3D(filters=filter_size, kernel_size=kernel_size, padding=padding,kernel_regularizer=kernel_regularizer)(input_block)
    conv = Normalize(conv, batch_normalization)
    conv = activations.get(activation, ReLU)(negative_slope=slope)(conv)

    conv = Conv3D(filters=filter_size, kernel_size=kernel_size, padding=padding,kernel_regularizer=kernel_regularizer)(conv)
    conv = Normalize(conv, batch_normalization)
    conv = activations.get(activation, ReLU)(negative_slope=slope)(conv)

    return conv

def AttentionDecoder(input_block, original_block, filter_size, kernel_size=3, padding="same",
                    activation="relu", slope=0.0, batch_normalization=False,kernel_regularizer=None):

    up = UpSampling2D(size=(2, 2))(input_block)
    up = Reshape2D(original_block=original_block, up=up)

    theta_x = Conv2D(filter_size, kernel_size=kernel_size, strides=1, padding=padding)(original_block)
    phi_g = Conv2D(filter_size, kernel_size=kernel_size, strides=1, padding=padding)(up)

    attention = Add()([theta_x, phi_g])
    attention = Normalize(attention, batch_normalization)
    attention = activations.get(activation, ReLU)(negative_slope=slope)(attention)
    attention = Conv2D(1, kernel_size=kernel_size, activation='sigmoid', padding=padding,kernel_regularizer=kernel_regularizer)(attention)

    attention = Multiply()([original_block, attention])

    conv = Conv2D(filters=filter_size, kernel_size=kernel_size, padding=padding,kernel_regularizer=kernel_regularizer)(attention)
    conv = Normalize(conv, batch_normalization)
    conv = activations.get(activation, ReLU)(negative_slope=slope)(conv)

    conv = Conv2D(filters=filter_size, kernel_size=kernel_size, padding=padding,kernel_regularizer=kernel_regularizer)(conv)
    conv = Normalize(conv, batch_normalization)
    conv = activations.get(activation, ReLU)(negative_slope=slope)(conv)

    return conv



def AttentionDecoder3D(input_block, original_block, filter_size, kernel_size=3, padding="same",
                    activation="relu", slope=0.0, batch_normalization=False,kernel_regularizer=None):

    up = UpSampling3D(size=(2, 2, 2))(input_block)
    up = Reshape3D(original_block=original_block, up=up)

    theta_x = Conv3D(filter_size, kernel_size=kernel_size, strides=1, padding=padding)(original_block)
    phi_g = Conv3D(filter_size, kernel_size=kernel_size, strides=1, padding=padding)(up)

    attention = Add()([theta_x, phi_g])
    attention = Normalize(attention, batch_normalization)
    attention = activations.get(activation, ReLU)(negative_slope=slope)(attention)
    attention = Conv3D(1, kernel_size=kernel_size, activation='sigmoid', padding=padding,kernel_regularizer=kernel_regularizer)(attention)

    attention = Multiply()([original_block, attention])

    conv = Conv3D(filters=filter_size, kernel_size=kernel_size, padding=padding,kernel_regularizer=kernel_regularizer)(attention)
    conv = Normalize(conv, batch_normalization)
    conv = activations.get(activation, ReLU)(negative_slope=slope)(conv)

    conv = Conv3D(filters=filter_size, kernel_size=kernel_size, padding=padding,kernel_regularizer=kernel_regularizer)(conv)
    conv = Normalize(conv, batch_normalization)
    conv = activations.get(activation, ReLU)(negative_slope=slope)(conv)

    return conv


def DecoderBlock3D(input_block, original_block, filter_size, kernel_size=3, padding="same",
                 activation="relu", slope=0.0, batch_normalization=False, drop_out=0.0,kernel_regularizer=None):

    up = UpSampling3D(size=(2, 2, 2))(input_block)
    up = Reshape3D(original_block=original_block,up=up)
    merge = concatenate([original_block, up], axis=-1)

    conv = Conv3D(filters=filter_size, kernel_size=kernel_size, padding=padding, kernel_regularizer=kernel_regularizer)(merge)
    conv = Normalize(conv, batch_normalization)
    conv = activations.get(activation, ReLU)(negative_slope=slope)(conv)

    conv = Conv3D(filters=filter_size, kernel_size=kernel_size, padding=padding, kernel_regularizer=kernel_regularizer)(conv)
    conv = Normalize(conv, batch_normalization)
    conv = activations.get(activation, ReLU)(negative_slope=slope)(conv)

    return conv

def OutputBlock3D(input_block, num_of_classes=1, kernel_size=1, activation="sigmoid", padding="same",kernel_regularizer=None):
    return Conv3D(filters=num_of_classes, kernel_size=kernel_size, activation=activation, padding=padding, kernel_regularizer=kernel_regularizer)(input_block)

def U_NET(inputs,outputs):
    return Model(inputs,outputs)

In [5]:

def dice_coefficient(y_true, y_pred, smooth=1e-6):
    y_true_f = tf.reshape(y_true, [-1])
    y_pred_f = tf.reshape(y_pred, [-1])

    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    union = tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f)

    dice = (2. * intersection + smooth) / (union + smooth)
    return dice

def tversky_loss(y_true, y_pred, alpha=0.7, beta=0.3, smooth=1e-6):
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)
    true_pos = tf.reduce_sum(y_true * y_pred)
    false_neg = tf.reduce_sum(y_true * (1 - y_pred))
    false_pos = tf.reduce_sum((1 - y_true) * y_pred)
    return 1 - (true_pos + smooth) / (true_pos + alpha*false_neg + beta*false_pos + smooth)

def dice_coefficient_loss(y_true, y_pred):
    return 1 - dice_coefficient(y_true, y_pred)

def load_nii_file(filepath):
    nii_file = nib.load(filepath)
    data = np.array(nii_file.get_fdata(), dtype=np.float32)
    data = (data - np.min(data)) / (np.max(data) - np.min(data))
    data = data[56:-56, :, :]
    return data

def load_nii_dataset(input_filepaths, output_filepaths):
    def generator():
        for input_file, output_file in zip(input_filepaths, output_filepaths):
            input_nii = load_nii_file(input_file)
            output_nii = load_nii_file(output_file)

            output_nii = np.floor(output_nii * 4)
            output_nii = np.where(output_nii > 1, 1, output_nii)

            input_nii = tf.convert_to_tensor(input_nii, dtype=tf.float32)
            output_nii = tf.convert_to_tensor(output_nii, dtype=tf.float32)

            input_nii = tf.expand_dims(input_nii, axis=-1)
            output_nii = tf.expand_dims(output_nii, axis=-1)

            for i in range(input_nii.shape[0]):
                yield input_nii[i], output_nii[i]

    dataset = tf.data.Dataset.from_generator(
        generator=generator,
        output_signature=(
            tf.TensorSpec(shape=(240,155,1), dtype=tf.float32),
            tf.TensorSpec(shape=(240,155,1), dtype=tf.float32)
        )
    )

    return dataset


In [6]:
train_path = '/content/MICCAI_BraTS2020_TrainingData'
input_files = []
output_files = []

In [12]:
for folder in os.listdir(train_path):
     folder_path = os.path.join(train_path,folder)
     input_file,output_file = os.listdir(folder_path)

     if input_file[-5] == 'g':
         input_file,output_file = output_file,input_file

     input_files.append(os.path.join(folder_path,input_file))
     output_files.append(os.path.join(folder_path,output_file))

X_train,X_validation,y_train,y_validation = train_test_split(input_files,output_files,train_size=0.82,random_state=42)

train_dataset = load_nii_dataset(X_train,y_train)
val_dataset = load_nii_dataset(X_validation,y_validation)

train_dataset = train_dataset.shuffle(6400).repeat().batch(128).prefetch(tf.data.AUTOTUNE)
val_dataset = val_dataset.batch(128).prefetch(tf.data.AUTOTUNE)

def augment(input_img, output_img):

    prob = np.random.uniform()
    if prob <=0.5:
        input_img = tf.image.flip_left_right(input_img)
        output_img = tf.image.flip_left_right(output_img)

    prob = np.random.uniform()
    if prob <=0.5:
        input_img = tf.image.flip_up_down(input_img)
        output_img = tf.image.flip_up_down(output_img)

    prob = np.random.uniform()
    if prob <=0.5:
        angle = tf.random.uniform([], -30, 30)
        input_img = tf.keras.preprocessing.image.apply_affine_transform(input_img, theta=angle)
        output_img = tf.keras.preprocessing.image.apply_affine_transform(output_img, theta=angle)

    return input_img, output_img

train_dataset = train_dataset.map(lambda x, y: (augment(x, y)), num_parallel_calls=tf.data.AUTOTUNE)

In [13]:
inputs = Input(shape=(240,155,1))

conv1,pool1 = EncoderBlock(inputs,filter_size=32,batch_normalization=True,
                           kernel_regularizer=keras.regularizers.l2(0.001))
conv2,pool2 = EncoderBlock(pool1,filter_size=64,batch_normalization=True,
                           kernel_regularizer=keras.regularizers.l2(0.001))
conv3,pool3 = EncoderBlock(pool2,filter_size=128,batch_normalization=True,
                           kernel_regularizer=keras.regularizers.l2(0.001))
conv4,pool4 = EncoderBlock(pool3,filter_size=256,batch_normalization=True,
                           kernel_regularizer=keras.regularizers.l2(0.001))

neck = BottleneckBlock(pool4,filter_size=512,batch_normalization=True,
                           kernel_regularizer=keras.regularizers.l2(0.001))

up1 =  DecoderBlock(neck,conv4,filter_size=256,batch_normalization=True,
                           kernel_regularizer=keras.regularizers.l2(0.001))
up2 =  DecoderBlock(up1,conv3,filter_size=128,batch_normalization=True,
                           kernel_regularizer=keras.regularizers.l2(0.001))
up3 =  DecoderBlock(up2,conv2,filter_size=64,batch_normalization=True,
                           kernel_regularizer=keras.regularizers.l2(0.001))
up4 =  DecoderBlock(up3,conv1,filter_size=32,batch_normalization=True,
                           kernel_regularizer=keras.regularizers.l2(0.001))

outputs = OutputBlock(up4)

model = U_NET(inputs=inputs,outputs=outputs)

optimizer = tf.keras.optimizers.Adam(
    learning_rate=0.0001
)


# model.compile(
#         optimizer=optimizer,
#         loss=tversky_loss,
#         metrics=[dice_coefficient]
# )

model.compile(
        optimizer=optimizer,
        loss='binary_crossentropy',
        metrics=[dice_coefficient,keras.metrics.Precision(),keras.metrics.Recall()],
)

model.summary()

In [14]:
early_stopping = EarlyStopping(monitor='val_loss', patience=13, restore_best_weights=True)
model_checkpoint = ModelCheckpoint('/content/drive/MyDrive/Keras/brain_part_I.keras', save_best_only=True, monitor='val_loss')
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.01, patience=4, min_lr=1e-12)

history = model.fit(
    train_dataset,
    epochs=100,
    steps_per_epoch=len(X_train),
    validation_data=val_dataset,
    callbacks=[early_stopping, model_checkpoint, reduce_lr],
    class_weight={0: 0.1, 1: 0.9}
)

Epoch 1/100
[1m1805/1805[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 454ms/step - dice_coefficient: 0.2577 - loss: 0.7241 - precision: 0.5422



[1m1805/1805[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1044s[0m 537ms/step - dice_coefficient: 0.2577 - loss: 0.7239 - precision: 0.5422 - val_dice_coefficient: 0.1991 - val_loss: 0.1459 - val_precision: 0.7683 - learning_rate: 1.0000e-04
Epoch 2/100
[1m1805/1805[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m965s[0m 535ms/step - dice_coefficient: 0.5805 - loss: 0.0142 - precision: 0.6991 - val_dice_coefficient: 0.4360 - val_loss: 0.0455 - val_precision: 0.9588 - learning_rate: 1.0000e-04
Epoch 3/100
[1m1805/1805[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m963s[0m 534ms/step - dice_coefficient: 0.7049 - loss: 0.0082 - precision: 0.7106 - val_dice_coefficient: 0.4994 - val_loss: 0.0723 - val_precision: 0.4195 - learning_rate: 1.0000e-04
Epoch 4/100
[1m1805/1805[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m964s[0m 534ms/step - dice_coefficient: 0.7515 - loss: 0.006

In [None]:
import os
import time

time.sleep(5)

os._exit(0)
