In [2]:
import keras
import json
import numpy as np
import pandas as pd
import nibabel as nib
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tensorflow.keras import backend as K
from keras import layers

import util
from public_tests import *
from test_utils import *

import tensorflow as tf
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

In [3]:
Data_Dir = "/content/drive/MyDrive/MRI_CVision_Data/imagesTr/"
Labels_Dir = "/content/drive/MyDrive/MRI_CVision_Data/labelsTr/"

# Patches

In [4]:
# due to limited processing power, "patches" or "sub-volumes" if shape (160, 160, 16) are required
# patches have to be at leadt 5% tumor

def get_sub_volume(image, label,
                   orig_x = 240, orig_y = 240, orig_z = 155,
                   output_x = 160, output_y = 160, output_z = 16,
                   num_classes = 4, max_tries = 1000,
                   background_threshold=0.95):

    X = None
    y = None
    tries = 0

    while tries < max_tries:
        start_x = np.random.randint(orig_x-output_x+1)
        start_y = np.random.randint(orig_y-output_y+1)
        start_z = np.random.randint(orig_z-output_z+1)

        y = label[start_x: start_x + output_x,
                  start_y: start_y + output_y,
                  start_z: start_z + output_z]
        y = keras.utils.to_categorical(y, num_classes=num_classes)

        bgrd_ratio = np.sum(y[:, :, :, 0])/(output_x * output_y * output_z)
        tries += 1
        if bgrd_ratio < background_threshold:
            X = np.copy(image[start_x: start_x + output_x,
                              start_y: start_y + output_y,
                              start_z: start_z + output_z, :])
            X =  np.transpose(X, (3, 0, 1, 2))
            y =  np.transpose(y, (3, 0, 1, 2))
            y = y[1:, :, :, :]

            return X, y

    print(f"Tried {tries} times to find a sub-volume. Giving up...")

# Standardization

In [5]:
def standardize(image):
  standardized_image = np.zeros(image.shape)
  for c in range(image.shape[0]):
    for z in range(image.shape[3]):
      image_slice = image[c,:,:,z]
      centered = image_slice - np.mean(image_slice)
      if np.std(centered) != 0:
        centered_scaled = image_slice/np.std(image_slice)
      standardized_image[c, :, :, z] = centered_scaled

    return standardized_image

# 3D U-Net Model Creation

In [6]:

def dice_coefficient(y_true, y_pred, axis=(1, 2, 3), epsilon=0.00001):
    intersection = K.sum(y_true * y_pred, axis=axis)
    sum_true = K.sum(y_true, axis=axis)
    sum_pred = K.sum(y_pred, axis=axis)
    dice_numerator = (2 * intersection) + epsilon
    dice_denominator = sum_true + sum_pred + epsilon
    dice_coefficient = (K.mean((dice_numerator) / (dice_denominator)))
    return dice_coefficient


def soft_dice_loss(y_true, y_pred, axis=(1, 2, 3), epsilon=0.00001):
    intersection = K.sum(y_true * y_pred, axis=axis)
    sum_true = K.sum(y_true * y_true, axis=axis)
    sum_pred = K.sum(y_pred * y_pred, axis=axis)
    dice_numerator = (2 * intersection) + epsilon
    dice_denominator = sum_true + sum_pred + epsilon
    dice_loss = 1 - (K.mean((dice_numerator) / (dice_denominator)))
    return dice_loss



def create_convolution_block(input_layer, n_filters, batch_normalization=False, kernel=(3, 3, 3), activation=None, padding='same',
                             strides=(1, 1, 1), instance_normalization=False):
    layer = tf.keras.layers.Conv3D(n_filters, kernel, padding=padding, strides=strides)(
        input_layer)
    if activation is None:
        return keras.layers.Activation('relu')(layer)
    else:
        return activation()(layer)


def get_up_convolution(n_filters, pool_size, kernel_size=(2, 2, 2), strides=(2, 2, 2), deconvolution=False):
    if deconvolution:
        return tf.keras.layers.Deconvolution3D(filters=n_filters, kernel_size=kernel_size, strides=strides)
    else:
        return tf.keras.layers.UpSampling3D(size=pool_size)



def unet_model_3d(loss_function, input_shape=(4, 160, 160, 16), pool_size=(2, 2, 2), n_labels=3, initial_learning_rate=0.00001, deconvolution=False,
                  depth=4, n_base_filters=32, include_label_wise_dice_coefficients=False, metrics=[], batch_normalization=False, activation_name="sigmoid"):

    inputs = tf.keras.layers.Input(input_shape)
    current_layer = inputs
    levels = list()

    # add levels with max pooling
    for layer_depth in range(depth):
        layer1 = create_convolution_block(input_layer=current_layer, n_filters=n_base_filters * (2 ** layer_depth), batch_normalization=batch_normalization)
        layer2 = create_convolution_block(input_layer=layer1, n_filters=n_base_filters * (2 ** layer_depth) * 2, batch_normalization=batch_normalization)
        if layer_depth < depth - 1:
            current_layer = keras.layers.MaxPooling3D(pool_size=pool_size)(layer2)
            levels.append([layer1, layer2, current_layer])
        else:
            current_layer = layer2
            levels.append([layer1, layer2])

    # add levels with up-convolution or up-sampling
    for layer_depth in range(depth - 2, -1, -1):
        up_convolution = get_up_convolution(pool_size=pool_size, deconvolution=deconvolution, n_filters=current_layer.shape[1])(current_layer)
        concat = tf.concat([up_convolution, levels[layer_depth][1]], axis=1)
        current_layer = create_convolution_block(n_filters=levels[layer_depth][1].shape[1], input_layer=concat, batch_normalization=batch_normalization)
        current_layer = create_convolution_block(n_filters=levels[layer_depth][1].shape[1], input_layer=current_layer, batch_normalization=batch_normalization)

    final_convolution = tf.keras.layers.Conv3D(n_labels, (1, 1, 1))(current_layer)
    act = keras.layers.Activation(activation_name)(final_convolution)
    model = tf.keras.Model(inputs=inputs, outputs=act)

    if not isinstance(metrics, list):
        metrics = [metrics]

    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=initial_learning_rate), loss=loss_function, metrics=metrics)
    return model


model = unet_model_3d(loss_function=soft_dice_loss, metrics=[dice_coefficient])

In [None]:
base_dir = "/content/drive/MyDrive/MRI_CVision_Data/processed/"

with open(base_dir + "config.json") as json_file:
    config = json.load(json_file)

train_generator = util.VolumeDataGenerator(config["train"], base_dir + "train/", batch_size=3, dim=(160, 160, 16), verbose=0)
valid_generator = util.VolumeDataGenerator(config["valid"], base_dir + "valid/", batch_size=3, dim=(160, 160, 16), verbose=0)


model.fit_generator(generator=train_generator, steps_per_epoch=20, epochs=10, use_multiprocessing=True,
                    validation_data=valid_generator, validation_steps=20)

# 3D U-Net Model Training Alternative

In [None]:
'''
with open("/content/drive/MyDrive/MRI_CVision_Data/dataset.json") as json_file:
    config = json.load(json_file)


# Function to load and preprocess image and label data
def load_and_preprocess_data(image_path, label_path):
    image_data = nib.load(image_path).get_fdata()
    label_data = nib.load(label_path).get_fdata()

    # Perform preprocessing as needed

    return image_data, label_data



# Prepare training and testing data
train_data = []
test_data = []
for sample in config['training']:
    image_path = "/content/drive/MyDrive/MRI_CVision_Data" + sample['image'][1:]
    label_path = "/content/drive/MyDrive/MRI_CVision_Data" + sample['label'][1:]
    image_data, label_data = load_and_preprocess_data(image_path, label_path)

    # Split data into training and testing sets
    train_image, test_image, train_label, test_label = train_test_split(image_data, label_data, test_size=0.2, random_state=42)

    train_data.append((train_image, train_label))
    test_data.append((test_image, test_label))

# Convert training and testing data to NumPy arrays
train_images = np.array([sample[0] for sample in train_data])
train_labels = np.array([sample[1] for sample in train_data])
test_images = np.array([sample[0] for sample in test_data])
test_labels = np.array([sample[1] for sample in test_data])

# Train the model
model.fit(x=train_images, y=train_labels, epochs=10, batch_size=10)

# Evaluate the model on the testing data
test_loss, test_accuracy = model.evaluate(x=test_images, y=test_labels)
print("Test Loss:", test_loss)
print("Test Accuracy:", test_accuracy)
'''