## UNETR model Configration

In [2]:
""" UNETR Configration """

from keras.optimizers import Adam

cf = {}
cf["image_size"] = 256
cf["num_layers"] = 12
cf["hidden_dim"] = 128
cf["mlp_dim"] = 32
cf["num_heads"] = 6
cf["dropout_rate"] = 0.1
cf["batch_size"] = 16
cf["lr"] = 1e-4
cf["optimizer"] = Adam(cf["lr"])
cf["patch_size"] = 16
cf["num_patches"] = (cf["image_size"] ** 2) // (cf["patch_size"] ** 2)
cf["num_channels"] = 3
cf["flat_patches_shape"] = (
    cf["num_patches"],
    cf["patch_size"] * cf["patch_size"] * cf["num_channels"]
)


2024-03-26 04:54:38.422043: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-03-26 04:54:38.422132: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-03-26 04:54:38.579703: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


## Create functions for loading dataset

In [3]:
from glob import glob
import os
from sklearn.model_selection import train_test_split
import cv2
import numpy as np
import tensorflow as tf
from patchify import patchify
from keras.preprocessing.image import ImageDataGenerator


def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)


def load_dataset(path, split=0.2):
    # Get paths of images and masks
    images_paths = sorted(glob(os.path.join(path, "images", "*.png")))
    masks_paths = sorted(glob(os.path.join(path, "masks", "*.png")))

    split_size = int(len(images_paths) * split)

    train_x, valid_x = train_test_split(
        images_paths, test_size=split_size, random_state=42)
    train_y, valid_y = train_test_split(
        masks_paths, test_size=split_size, random_state=42)

    train_x, test_x = train_test_split(
        train_x, test_size=split_size, random_state=42)
    train_y, test_y = train_test_split(
        train_y, test_size=split_size, random_state=42)

    return (train_x, train_y), (valid_x, valid_y), (test_x, test_y)
    return (train_x, train_y), (valid_x, valid_y), (test_x, test_y)


def augment_data(images_paths, masks_paths, num_aug_per_image=2, aug_save_path="/kaggle/working/Augmented_Dataset"):
    """
    Perform data augmentation on images and masks.

    Args:
        images_paths (list): List of file paths to original images.
        masks_paths (list): List of file paths to corresponding masks.
        num_aug_per_image (int, optional): Number of augmented versions to create per image. Defaults to 3.
        aug_save_path (str, optional): Directory to save augmented images and masks. Defaults to "Augmented_Dataset".

    Returns:
        tuple: A tuple containing lists of augmented image paths and mask paths.
    """
    # Create directories to save augmented images and masks
    images_aug_dir = os.path.join(aug_save_path, "images")
    masks_aug_dir = os.path.join(aug_save_path, "masks")

    create_dir(images_aug_dir)
    create_dir(masks_aug_dir)

    for (img_path, mask_path) in zip(images_paths, masks_paths):
        # Read the image and mask
        image = cv2.imread(img_path, cv2.IMREAD_COLOR)
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

        # Initialize an ImageDataGenerator for augmentation
        aug = ImageDataGenerator(
            rotation_range=10,
            zoom_range=0.2,
            brightness_range=[0.7, 1.3],
            horizontal_flip=True,
            fill_mode="nearest"
        )

        # Extract the base filename (without extension) for saving augmented images
        id = os.path.splitext(os.path.basename(img_path))[0]

        # Augment the image
        num_aug = 0
        for _ in aug.flow(np.expand_dims(image, axis=0), batch_size=1, save_to_dir=images_aug_dir, save_prefix=f"{id}_aug", save_format="png", seed=42):
            num_aug += 1
            if num_aug >= num_aug_per_image:
                break

        # Augment the mask
        num_aug = 0
        for _ in aug.flow(np.expand_dims(mask, axis=(0, -1)), batch_size=1, save_to_dir=masks_aug_dir, save_prefix=f"{id}_aug", save_format="png", seed=42):
            num_aug += 1
            if num_aug >= num_aug_per_image:
                break

    # Get paths of augmented images and masks
    aug_images_paths = sorted(
        glob(os.path.join(aug_save_path, "images", "*.png")))
    aug_masks_paths = sorted(
        glob(os.path.join(aug_save_path, "masks", "*.png")))

    # Combine original and augmented paths
    images_paths.extend(aug_images_paths)
    masks_paths.extend(aug_masks_paths)

    return images_paths, masks_paths


def read_image(path):
    path = path.decode()
    image = cv2.imread(path, cv2.IMREAD_COLOR)
    image = cv2.resize(image, (cf["image_size"], cf["image_size"]))
    image = image / 255.0

    """ Processing to patches """
    patch_shape = (cf["patch_size"], cf["patch_size"], cf["num_channels"])
    patches = patchify(image, patch_shape, cf["patch_size"])
    patches = np.reshape(patches, cf["flat_patches_shape"])
    patches = patches.astype(np.float32)

    return patches


def read_mask(path):
    path = path.decode()
    mask = cv2.imread(path, cv2.IMREAD_GRAYSCALE)  # (256, 256)
    mask = cv2.resize(mask, (cf["image_size"], cf["image_size"]))  # (256, 256)
    mask = mask / 255.0  # (256, 256)
    mask = mask.astype(np.float32)  # (256, 256)
    mask = np.expand_dims(mask, axis=-1)  # (256, 256, 1)
    return mask


def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y

    x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
    x.set_shape(cf["flat_patches_shape"])
    y.set_shape([cf["image_size"], cf["image_size"], 1])
    return x, y


def tf_dataset(X, Y, batch=2):
    dataset = tf.data.Dataset.from_tensor_slices((X, Y))
    dataset = dataset.map(tf_parse)
    dataset = dataset.batch(batch)
    dataset = dataset.prefetch(10)
    return dataset


## Build the UNETR model

UNETR - Unet Transformer

In [None]:
import tensorflow as tf
import keras.layers as L
from keras.models import Model


def mlp(x, cf):
    x = L.Dense(cf["mlp_dim"], activation="gelu")(x)
    x = L.Dropout(cf["dropout_rate"])(x)
    x = L.Dense(cf["hidden_dim"])(x)
    x = L.Dropout(cf["dropout_rate"])(x)
    return x


def transformer_encoder(x, cf):
    skip_1 = x
    x = L.LayerNormalization()(x)
    x = L.MultiHeadAttention(
        num_heads=cf["num_heads"], key_dim=cf["hidden_dim"]
    )(x, x)
    x = L.Add()([x, skip_1])

    skip_2 = x
    x = L.LayerNormalization()(x)
    x = mlp(x, cf)
    x = L.Add()([x, skip_2])

    return x


def conv_block(x, num_filters, kernel_size=3):
    x = L.Conv2D(num_filters, kernel_size=kernel_size, padding="same")(x)
    x = L.BatchNormalization()(x)
    x = L.ReLU()(x)
    return x


def deconv_block(x, num_filters):
    x = L.Conv2DTranspose(num_filters, kernel_size=2,
                          padding="same", strides=2)(x)
    return x


def build_unetr(cf):
    """ Inputs """
    input_shape = (cf["num_patches"], cf["patch_size"]
                   * cf["patch_size"]*cf["num_channels"])
    inputs = L.Input(input_shape)  # (None, 256, 768)

    """ Patch + Position Embeddings """
    patch_embed = L.Dense(cf["hidden_dim"])(inputs)  # (None, 256, 768)

    positions = tf.range(start=0, limit=cf["num_patches"], delta=1)  # (256,)
    pos_embed = L.Embedding(input_dim=cf["num_patches"], output_dim=cf["hidden_dim"])(
        positions)  # (256, 768)
    x = patch_embed + pos_embed  # (None, 256, 768)

    """ Transformer Encoder """
    skip_connection_index = [3, 6, 9, 12]
    skip_connections = []

    for i in range(1, cf["num_layers"] + 1, 1):
        x = transformer_encoder(x, cf)

        if i in skip_connection_index:
            skip_connections.append(x)

    """ CNN Decoder """
    z3, z6, z9, z12 = skip_connections

    # Reshaping
    z0 = L.Reshape((cf["image_size"], cf["image_size"],cf["num_channels"]))(inputs)
    z3 = L.Reshape((cf["patch_size"], cf["patch_size"], cf["hidden_dim"]))(z3)
    z6 = L.Reshape((cf["patch_size"], cf["patch_size"], cf["hidden_dim"]))(z6)
    z9 = L.Reshape((cf["patch_size"], cf["patch_size"], cf["hidden_dim"]))(z9)
    z12 = L.Reshape((cf["patch_size"], cf["patch_size"], cf["hidden_dim"]))(z12)

    # Decoder 1
    x = deconv_block(z12, 512)

    s = deconv_block(z9, 512)
    s = conv_block(s, 512)
    x = L.Concatenate()([x, s])

    x = conv_block(x, 512)
    x = conv_block(x, 512)

    # Decoder 2
    x = deconv_block(x, 256)

    s = deconv_block(z6, 256)
    s = conv_block(s, 256)
    s = deconv_block(s, 256)
    s = conv_block(s, 256)

    x = L.Concatenate()([x, s])
    x = conv_block(x, 256)
    x = conv_block(x, 256)

    # Decoder 3
    x = deconv_block(x, 128)

    s = deconv_block(z3, 128)
    s = conv_block(s, 128)
    s = deconv_block(s, 128)
    s = conv_block(s, 128)
    s = deconv_block(s, 128)
    s = conv_block(s, 128)

    x = L.Concatenate()([x, s])
    x = conv_block(x, 128)
    x = conv_block(x, 128)

    # Decoder 4
    x = deconv_block(x, 64)

    s = conv_block(z0, 64)
    s = conv_block(s, 64)

    x = L.Concatenate()([x, s])
    x = conv_block(x, 64)
    x = conv_block(x, 64)

    """ Output """
    outputs = L.Conv2D(1, kernel_size=1, padding="same",
                       activation="sigmoid")(x)

    return Model(inputs, outputs, name="UNETR")


model = build_unetr(cf)
model.summary()


Model: "UNETR"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 256, 768)]           0         []                            
                                                                                                  
 dense (Dense)               (None, 256, 128)             98432     ['input_1[0][0]']             
                                                                                                  
 tf.__operators__.add (TFOp  (None, 256, 128)             0         ['dense[0][0]']               
 Lambda)                                                                                          
                                                                                                  
 layer_normalization (Layer  (None, 256, 128)             256       ['tf.__operators__.add[0][

## Training the UNETR model

In [5]:
import tensorflow as tf
import numpy as np
import os
from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, CSVLogger, EarlyStopping


smooth = 1e-15


def dice_coef(y_true, y_pred):
    y_true = tf.keras.layers.Flatten()(y_true)
    y_pred = tf.keras.layers.Flatten()(y_pred)
    intersection = tf.reduce_sum(y_true * y_pred)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)


def dice_loss(y_true, y_pred):
    return 1.0 - dice_coef(y_true, y_pred)


""" Seeding """
np.random.seed(42)
tf.random.set_seed(42)

""" Directory for storing files """
create_dir("/kaggle/working/files")

""" Hyperparameters """
batch_size = cf["batch_size"]
optimizer = cf["optimizer"]
num_epochs = 30
model_path = os.path.join("/kaggle/working/files", "model.h5")
csv_path = os.path.join("/kaggle/working/files", "log.csv")

""" Dataset """
dataset_path = "/kaggle/input/brain-tumor-segmentation"
(train_x, train_y), (valid_x, valid_y), (test_x,
                                         test_y) = load_dataset(dataset_path)

print(f"Train: {len(train_x)} - {len(train_y)}")
print(f"Valid: {len(valid_x)} - {len(valid_y)}")
print(f"Test : {len(test_x)} - {len(test_y)}")

train_x, train_y = augment_data(train_x, train_y, num_aug_per_image=2, aug_save_path="/kaggle/working/aug_brain-tumor-segmentation")

print(f"Train after augmented: {len(train_x)} - {len(train_y)}")

train_dataset = tf_dataset(train_x, train_y, batch=batch_size)
valid_dataset = tf_dataset(valid_x, valid_y, batch=batch_size)



Train: 1840 - 1840
Valid: 612 - 612
Test : 612 - 612
Train after augmented: 5520 - 5520


In [6]:
""" Model """
model = build_unetr(cf)
model.compile(loss=dice_loss, optimizer=optimizer, metrics=[dice_coef])

callbacks = [
    ModelCheckpoint(model_path, verbose=1, save_best_only=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.1,
                      patience=5, min_lr=1e-7, verbose=1),
    CSVLogger(csv_path),
    EarlyStopping(monitor='val_loss', patience=20,
                  restore_best_weights=False),
]

model.fit(
    train_dataset,
    epochs=num_epochs,
    validation_data=valid_dataset,
    callbacks=callbacks
)


Epoch 1/30


I0000 00:00:1711429808.288169     106 device_compiler.h:186] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


Epoch 1: val_loss improved from inf to 0.93355, saving model to /kaggle/working/files/model.h5


  saving_api.save_model(


Epoch 2/30
Epoch 2: val_loss improved from 0.93355 to 0.75367, saving model to /kaggle/working/files/model.h5
Epoch 3/30
Epoch 3: val_loss improved from 0.75367 to 0.70675, saving model to /kaggle/working/files/model.h5
Epoch 4/30
Epoch 4: val_loss improved from 0.70675 to 0.60211, saving model to /kaggle/working/files/model.h5
Epoch 5/30
Epoch 5: val_loss improved from 0.60211 to 0.55414, saving model to /kaggle/working/files/model.h5
Epoch 6/30
Epoch 6: val_loss improved from 0.55414 to 0.53135, saving model to /kaggle/working/files/model.h5
Epoch 7/30
Epoch 7: val_loss did not improve from 0.53135
Epoch 8/30
Epoch 8: val_loss improved from 0.53135 to 0.47199, saving model to /kaggle/working/files/model.h5
Epoch 9/30
Epoch 9: val_loss improved from 0.47199 to 0.45198, saving model to /kaggle/working/files/model.h5
Epoch 10/30
Epoch 10: val_loss did not improve from 0.45198
Epoch 11/30
Epoch 11: val_loss improved from 0.45198 to 0.44983, saving model to /kaggle/working/files/model.h5


<keras.src.callbacks.History at 0x7fc296ccaf20>

## Testing & Evaluation

In [7]:
from sklearn.metrics import f1_score, jaccard_score, precision_score, recall_score
import tensorflow as tf
from tqdm import tqdm
import pandas as pd
import cv2
import numpy as np
import os
from patchify import patchify

In [8]:
def save_results(image, mask, y_pred, save_image_path):
    mask = np.expand_dims(mask, axis=-1)
    mask = np.concatenate([mask, mask, mask], axis=-1)

    y_pred = np.expand_dims(y_pred, axis=-1)
    y_pred = np.concatenate([y_pred, y_pred, y_pred], axis=-1)
    y_pred = y_pred * 255

    line = np.ones((cf["image_size"], 10, 3)) * 255
    
    cat_images = np.concatenate([image, line, mask, line, y_pred], axis=1)
    cv2.imwrite(save_image_path, cat_images)

In [9]:
""" Seeding """
np.random.seed(42)
tf.random.set_seed(42)

""" Directory for storing files """
create_dir("/kaggle/working/results")

""" Load the model """
model_path = os.path.join("/kaggle/working/files", "model.h5")
model = tf.keras.models.load_model(model_path, custom_objects={"dice_loss": dice_loss, "dice_coef": dice_coef})

""" Dataset """
dataset_path = "/kaggle/input/brain-tumor-segmentation"
(train_x, train_y), (valid_x, valid_y), (test_x,
                                            test_y) = load_dataset(dataset_path)

""" Prediction and Evaluation """
SCORE = []
for x, y in tqdm(zip(test_x, test_y), total=len(test_y)):
    """ Extracting the name """
    name = x.replace("\\", "/").split("/")[-1]

    """ Reading the image """
    image = cv2.imread(x, cv2.IMREAD_COLOR)  # [512, 512, 3]
    image = cv2.resize(
        image, (cf["image_size"], cf["image_size"]))  # [256, 256, 3]
    x = image / 255.0  # [256, 256, 3]

    patch_shape = (cf["patch_size"], cf["patch_size"], cf["num_channels"]) # (16, 16, 3)
    patches = patchify(x, patch_shape, cf["patch_size"])
    patches = np.reshape(patches, cf["flat_patches_shape"]) # [256, 768]
    patches = patches.astype(np.float32) # [256, 768]
    patches = np.expand_dims(patches, axis=0) # [1, 256, 768]

    """ Reading the mask """
    mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE) # [512, 512]
    mask = cv2.resize(mask, (cf["image_size"], cf["image_size"])) # [256, 256]

    """ Prediction """
    y_pred = model.predict(patches, verbose=0)[0] # [256, 256, 1]
    y_pred = np.squeeze(y_pred, axis=-1) # [256, 256]
    y_pred = y_pred >= 0.5 # [256, 256]
    y_pred = y_pred.astype(np.int32) # [256, 256]

    """ Saving the prediction """
    save_image_path = os.path.join("/kaggle/working/results", name)
    save_results(image, mask, y_pred, save_image_path)

    """ Flatten the array """
    mask = mask / 255.0
    mask = (mask > 0.5).astype(np.int32).flatten()
    y_pred = y_pred.flatten()

    """ Calculating the metrics values """
    f1_value = f1_score(mask, y_pred, labels=[0, 1], average="binary")
    jac_value = jaccard_score(mask, y_pred, labels=[
                                0, 1], average="binary")
    recall_value = recall_score(
        mask, y_pred, labels=[0, 1], average="binary", zero_division=0)
    precision_value = precision_score(
        mask, y_pred, labels=[0, 1], average="binary", zero_division=0)
    SCORE.append([name, f1_value, jac_value,
                    recall_value, precision_value])
    
""" Metrics values """
score = [s[1:]for s in SCORE]
score = np.mean(score, axis=0)
print(f"F1: {score[0]:0.5f}")
print(f"Jaccard: {score[1]:0.5f}")
print(f"Recall: {score[2]:0.5f}")
print(f"Precision: {score[3]:0.5f}")

df = pd.DataFrame(
    SCORE, columns=["Image", "F1", "Jaccard", "Recall", "Precision"])
df.to_csv("/kaggle/working/files/score.csv")

100%|██████████| 612/612 [02:57<00:00,  3.46it/s]

F1: 0.60161
Jaccard: 0.50582
Recall: 0.60382
Precision: 0.68353





In [None]:
import matplotlib.pyplot as plt

# Function to display an image with its mask and predicted mask
def display_image_with_masks(image, mask, predicted_mask):
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    # Display the original image
    axes[0].imshow(image)
    axes[0].set_title('Original Image')
    
    # Display the ground truth mask
    axes[1].imshow(mask, cmap='gray')
    axes[1].set_title('Ground Truth Mask')
    
    # Display the predicted mask
    axes[2].imshow(predicted_mask, cmap='gray')
    axes[2].set_title('Predicted Mask')
    
    plt.show()

# Iterate over test images and display each image with its masks
for x, y in zip(test_x, test_y):
    # Read the image
    image = cv2.imread(x, cv2.IMREAD_COLOR)
    image = cv2.resize(image, (cf["image_size"], cf["image_size"]))
    
    # Read the ground truth mask
    mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE)
    mask = cv2.resize(mask, (cf["image_size"], cf["image_size"]))
    
    # Predict the mask
    patches = patchify(image / 255.0, patch_shape, cf["patch_size"])
    patches = np.reshape(patches, cf["flat_patches_shape"])
    patches = patches.astype(np.float32)
    patches = np.expand_dims(patches, axis=0)
    y_pred = model.predict(patches, verbose=0)[0]
    y_pred = np.squeeze(y_pred, axis=-1) >= 0.5
    
    # Display the image with its masks
    display_image_with_masks(image, mask, y_pred)
