In [12]:
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

import numpy as np
import cv2
import os
from glob import glob
from sklearn.utils import shuffle
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger, ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.optimizers import Adam, SGD
from sklearn.model_selection import train_test_split
from patchify import patchify
from tqdm import tqdm
from math import log2
import tensorflow.keras.layers as L
from tensorflow.keras.models import Model
from tensorflow.keras.utils import get_custom_objects

In [2]:
# !pip install patchify


In [9]:
m = 1e-12
def d_coef(y_actual, y_pred):
    y_actual = tf.keras.layers.Flatten()(y_actual)
    y_pred = tf.keras.layers.Flatten()(y_pred)
    inter = tf.reduce_sum(y_actual * y_pred)
    return (2. * inter + m) / (tf.reduce_sum(y_actual) + tf.reduce_sum(y_pred) + m)

def d_loss(y_actual, y_pred):
    return 1.0 - d_coef(y_actual, y_pred)

In [4]:
def multilp(x, o):
    x = L.Dense(o["mp_dim"], activation="gelu")(x)
    x = L.Dropout(o["dropout_rate"])(x)
    x = L.Dense(o["hidden_dim"])(x)
    x = L.Dropout(o["dropout_rate"])(x)
    return x

def transformer_er(x, o):
    sk_1 = x
    x = L.LayerNormalization()(x)
    x = L.MultiHeadAttention(
        num_heads=o["n_heads"], key_dim=o["hidden_dim"]
    )(x, x)
    x = L.Add()([x, sk_1])

    sk_2 = x
    x = L.LayerNormalization()(x)
    x = multilp(x, o)
    x = L.Add()([x, sk_2])

    return x

def conv_bk(x, num_filters, kernel_size=3):
    x = L.Conv2D(num_filters, kernel_size=kernel_size, padding="same")(x)
    x = L.BatchNormalization()(x)
    x = L.ReLU()(x)
    return x

def deconv_bk(x, n_filters, std=2):
    x = L.Conv2DTranspose(n_filters, kernel_size=2, padding="same", strides=std)(x)
    return x

def build_unenr_2d(o):
    """ Inputs """
    input_s = (o["n_patches"], o["patch_s"]*o["patch_s"]*o["n_channels"])
    inp = L.Input(input_s) ## (None, 256, 3072)

    """ Patch + Position Embeddings """
    patch_e = L.Dense(o["hidden_dim"])(inp) ## (None, 256, 768)

    ps = tf.range(start=0, limit=o["n_patches"], delta=1) ## (256,)
    pos_e = L.Embedding(input_dim=o["n_patches"], output_dim=o["hidden_dim"])(ps) ## (256, 768)
    x = patch_e + pos_e ## (None, 256, 768)

    """ Transformer Encoder """
    sk_connection_i = [3, 6, 9, 12]
    sk_connection = []

    for i in range(1, o["n_layers"]+1, 1):
        x = transformer_er(x, o)

        if i in sk_connection_i:
            sk_connection.append(x)

    """ CNN Decoder """
    s3, s6, s9, s12 = sk_connection

    ## Reshaping
    s0 = L.Reshape((o["i_size"], o["i_size"], o["n_channels"]))(inp)

    shape = (
        o["i_size"]//o["patch_s"],
        o["i_size"]//o["patch_s"],
        o["hidden_dim"]
    )
    s3 = L.Reshape(shape)(s3)
    s6 = L.Reshape(shape)(s6)
    s9 = L.Reshape(shape)(s9)
    s12 = L.Reshape(shape)(s12)

    ## Additional layers for managing different patch sizes
    total_addi_factor = int(log2(o["patch_s"]))
    addi = total_addi_factor - 4

    if addi >= 2: ## Patch size 16 or greater
        s3 = deconv_bk(s3, s3.shape[-1], strides=2**addi)
        s6 = deconv_bk(s6, s6.shape[-1], strides=2**addi)
        s9 = deconv_bk(s9, s9.shape[-1], strides=2**addi)
        s12 = deconv_bk(s12, s12.shape[-1], strides=2**addi)
        # print(z3.shape, z6.shape, z9.shape, z12.shape)

    if addi < 0: ## Patch size less than 16
        p = 2**abs(addi)
        s3 = L.MaxPool2D((p, p))(s3)
        s6 = L.MaxPool2D((p, p))(s6)
        s9 = L.MaxPool2D((p, p))(s9)
        s12 = L.MaxPool2D((p, p))(s12)

    ## Decoder 1
    x = deconv_bk(s12, 128)

    s = deconv_bk(s9, 128)
    s = conv_bk(s, 128)

    x = L.Add()([0.7 * x, 0.3 * s])#L.Concatenate()([x, s])

    x = conv_bk(x, 128)
    x = conv_bk(x, 128)

    ## Decoder 2
    x = deconv_bk(x, 64)

    s = deconv_bk(s6, 64)
    s = conv_bk(s, 64)
    s = deconv_bk(s, 64)
    s = conv_bk(s, 64)

    x = L.Add()([0.7 * x, 0.3 * s])#L.Concatenate()([x, s])
    x = conv_bk(x, 64)
    x = conv_bk(x, 64)

    ## Decoder 3
    x = deconv_bk(x, 32)

    s = deconv_bk(s3, 32)
    s = conv_bk(s, 32)
    s = deconv_bk(s, 32)
    s = conv_bk(s, 32)
    s = deconv_bk(s, 32)
    s = conv_bk(s, 32)

    x = L.Add()([0.7 * x, 0.3 * s])#L.Concatenate()([x, s])
    x = conv_bk(x, 32)
    x = conv_bk(x, 32)

    ## Decoder 4
    x = deconv_bk(x, 16)

    s = conv_bk(s0, 16)
    s = conv_bk(s, 16)

    x = L.Add()([0.7 * x, 0.3 * s])#L.Concatenate()([x, s])
    x = conv_bk(x, 16)
    x = conv_bk(x, 16)

    """ Output """
    outputs = L.Conv2D(1, kernel_size=1, padding="same", activation="sigmoid")(x)

    return Model(inp, outputs, name="UNETR_2D")

if __name__ == "__main__":
    config = {}
    config["i_size"] = 512
    config["n_layers"] = 12
    config["hidden_dim"] = 64
    config["mp_dim"] = 128
    config["n_heads"] = 6
    config["dropout_rate"] = 0.1
    config["patch_s"] = 1
    config["n_patches"] = (config["i_size"]**2)//(config["patch_s"]**2)
    config["n_channels"] = 3

    model = build_unenr_2d(config)
    model.summary()

Model: "UNETR_2D"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 262144, 3)]          0         []                            
                                                                                                  
 dense (Dense)               (None, 262144, 64)           256       ['input_1[0][0]']             
                                                                                                  
 tf.__operators__.add (TFOp  (None, 262144, 64)           0         ['dense[0][0]']               
 Lambda)                                                                                          
                                                                                                  
 layer_normalization (Layer  (None, 262144, 64)           128       ['tf.__operators__.add[

In [5]:
""" UNETR  Configration """
o = {}
o["i_size"] = 256
o["n_channels"] = 3
o["n_layers"] = 12
o["hidden_dim"] = 128
o["mp_dim"] = 64
o["n_heads"] = 3
o["dropout_rate"] = 0.1
o["patch_s"] = 16
o["n_patches"] = (o["i_size"]**2)//(o["patch_s"]**2)
o["flat_patches_shape"] = (
    o["n_patches"],
    o["patch_s"]*o["patch_s"]*o["n_channels"]
)

def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

def load_dataset(root_path, split=0.1):
    """
    Load images and masks from a directory structure where
    each subdirectory contains 'images' and 'masks' folders.

    Args:
        root_path (str): The root path containing the dataset (e.g., "Brain_HE").
        split (float): Proportion of data to use for validation and test splits.

    Returns:
        tuple: Train, validation, and test datasets (each as a tuple of image and mask paths).
    """
    images = []
    masks = []

    # Traverse the folder structure
    for subdir in os.listdir(root_path):
        subdir_path = os.path.join(root_path, subdir)
        if os.path.isdir(subdir_path):
            images_path = os.path.join(subdir_path, "image")
            masks_path = os.path.join(subdir_path, "mask")
            
            if os.path.exists(images_path) and os.path.exists(masks_path):
                images += sorted(glob(os.path.join(images_path, "*.jpg")))
                masks += sorted(glob(os.path.join(masks_path, "*.jpg")))

    # Ensure images and masks are matched
    if len(images) != len(masks):
        raise ValueError("Number of images and masks do not match!")

    # Split the data
    split_size = int(len(images) * split)
    train_x, valid_x = train_test_split(images, test_size=split_size, random_state=42)
    train_y, valid_y = train_test_split(masks, test_size=split_size, random_state=42)

    train_x, test_x = train_test_split(train_x, test_size=split_size, random_state=42)
    train_y, test_y = train_test_split(train_y, test_size=split_size, random_state=42)

    return (train_x, train_y), (valid_x, valid_y), (test_x, test_y)

def read_image(path):
    path = path.decode()
    image = cv2.imread(path, cv2.IMREAD_COLOR)
    image = cv2.resize(image, (o["i_size"], o["i_size"]))
    image = image / 255.0

    """ Processing to patches """
    patch_shape = (o["patch_s"], o["patch_s"], o["n_channels"])
    patches = patchify(image, patch_shape, o["patch_s"])
    patches = np.reshape(patches, o["flat_patches_shape"])
    patches = patches.astype(np.float32)

    return patches

def read_mask(path):
    path = path.decode()
    mask = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    mask = cv2.resize(mask, (o["i_size"], o["i_size"]))
    mask = mask / 255.0
    mask = mask.astype(np.float32)
    mask = np.expand_dims(mask, axis=-1)
    return mask

def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y

    x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
    x.set_shape(o["flat_patches_shape"])
    y.set_shape([o["i_size"], o["i_size"], 1])
    return x, y

def tf_dataset(X, Y, batch=2):
    ds = tf.data.Dataset.from_tensor_slices((X, Y))
    ds = ds.map(tf_parse).batch(batch).prefetch(10)
    return ds


if __name__ == "__main__":
    """ Seeding """
    np.random.seed(42)
    tf.random.set_seed(42)

    """ Directory for storing files """
    create_dir("files")

    """ Hyperparameters """
    batch_size = 8
    lr = 0.1
    n_epochs = 100
    model_path = os.path.join("files", "model.h5")
    csv_path = os.path.join("files", "log.csv")

    """ Dataset """
    dataset_path = "Brain HE"
    (train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_dataset(dataset_path)

    print(f"Train: \t{len(train_x)} - {len(train_y)}")
    print(f"Valid: \t{len(valid_x)} - {len(valid_y)}")
    print(f"Test: \t{len(test_x)} - {len(test_y)}")

    train_dataset = tf_dataset(train_x, train_y, batch=batch_size)
    valid_dataset = tf_dataset(valid_x, valid_y, batch=batch_size)

    """ Model """
    model = build_unenr_2d(o)
    model.compile(loss=d_loss, optimizer=SGD(lr), metrics=[d_coef, "accuracy"])
    # model.summary()

    callbacks = [
        ModelCheckpoint(model_path, verbose=1, save_best_only=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-7, verbose=1),
        CSVLogger(csv_path),
        EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=False)
    ]

    model.fit(
        train_dataset,
        epochs=n_epochs,
        validation_data=valid_dataset,
        callbacks=callbacks
    )

Train: 	256 - 256
Valid: 	31 - 31
Test: 	31 - 31
Epoch 1/100
Epoch 1: val_loss improved from inf to 0.98492, saving model to files\model.h5


  saving_api.save_model(


Epoch 2/100
Epoch 2: val_loss did not improve from 0.98492
Epoch 3/100
Epoch 3: val_loss did not improve from 0.98492
Epoch 4/100
Epoch 4: val_loss improved from 0.98492 to 0.98485, saving model to files\model.h5
Epoch 5/100
Epoch 5: val_loss improved from 0.98485 to 0.98446, saving model to files\model.h5
Epoch 6/100
Epoch 6: val_loss improved from 0.98446 to 0.98325, saving model to files\model.h5
Epoch 7/100
Epoch 7: val_loss improved from 0.98325 to 0.97991, saving model to files\model.h5
Epoch 8/100
Epoch 8: val_loss improved from 0.97991 to 0.97274, saving model to files\model.h5
Epoch 9/100
Epoch 9: val_loss improved from 0.97274 to 0.96497, saving model to files\model.h5
Epoch 10/100
Epoch 10: val_loss improved from 0.96497 to 0.95913, saving model to files\model.h5
Epoch 11/100
Epoch 11: val_loss improved from 0.95913 to 0.95545, saving model to files\model.h5
Epoch 12/100
Epoch 12: val_loss improved from 0.95545 to 0.95354, saving model to files\model.h5
Epoch 13/100
Epoch 13

### Test results and corresponding image saving in 3 grid form

In [15]:
""" UNETR  Configration """
# o = {}
# o["image_size"] = 256
# o["num_channels"] = 3
# o["num_layers"] = 12
# o["hidden_dim"] = 128
# o["mlp_dim"] = 32
# cf["num_heads"] = 2
# cf["dropout_rate"] = 0.1
# cf["patch_size"] = 16
# cf["num_patches"] = (cf["image_size"]**2)//(cf["patch_size"]**2)
# cf["flat_patches_shape"] = (
#     cf["num_patches"],
#     cf["patch_size"]*cf["patch_size"]*cf["num_channels"]
# )
o = {}
o["i_size"] = 256
o["n_channels"] = 3
o["n_layers"] = 12
o["hidden_dim"] = 128
o["mp_dim"] = 64
o["n_heads"] = 3
o["dropout_rate"] = 0.1
o["patch_s"] = 16
o["n_patches"] = (o["i_size"]**2)//(o["patch_s"]**2)
o["flat_patches_shape"] = (
    o["n_patches"],
    o["patch_s"]*o["patch_s"]*o["n_channels"]
)

get_custom_objects().update({'d_coef': d_coef})

if __name__ == "__main__":
    """ Seeding """
    np.random.seed(42)
    tf.random.set_seed(42)

    """ Directory for storing files """
    create_dir(f"results1")

    """ Load the model """
    model_path = os.path.join("files", "model.h5")
    model = tf.keras.models.load_model(model_path, custom_objects={"d_loss": d_loss, "dice_coef": d_coef})

    """ Dataset """
    dataset_path = "Brain HE"
    (train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_dataset(dataset_path)

    print(f"Train: \t{len(train_x)} - {len(train_y)}")
    print(f"Valid: \t{len(valid_x)} - {len(valid_y)}")
    print(f"Test: \t{len(test_x)} - {len(test_y)}")

    """ Prediction """
    for x, y in tqdm(zip(test_x, test_y), total=len(test_x)):
        """ Extracting the name """
        name = x.split("/")[-1]

        """ Reading the image """
        image = cv2.imread(x, cv2.IMREAD_COLOR)
        image = cv2.resize(image, (o["i_size"], o["i_size"]))
        x = image / 255.0

        patch_shape = (o["patch_s"], o["patch_s"], o["n_channels"])
        patches = patchify(x, patch_shape, o["patch_s"])
        patches = np.reshape(patches, o["flat_patches_shape"])
        patches = patches.astype(np.float32)
        patches = np.expand_dims(patches, axis=0)

        """ Read Mask """
        mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE)
        mask = cv2.resize(mask, (o["i_size"], o["i_size"]))
        mask = mask / 255.0
        mask = np.expand_dims(mask, axis=-1)
        mask = np.concatenate([mask, mask, mask], axis=-1)

        """ Prediction """
        pred = model.predict(patches, verbose=0)[0]
        pred = np.concatenate([pred, pred, pred], axis=-1)

        """ Save final mask """
        line = np.ones((o["i_size"], 10, 3)) * 255
        cat_images = np.concatenate([image, line, mask*255, line, pred*255], axis=1)
        save_image_path = os.path.join("results2",  name)
        cv2.imwrite(save_image_path, cat_images)

Train: 	256 - 256
Valid: 	31 - 31
Test: 	31 - 31


100%|██████████████████████████████████████████████████████████████████████████████████| 31/31 [00:07<00:00,  4.21it/s]


### Test results by showing it localization with color mapping

In [18]:
""" UNETR  Configration """
# cf = {}
# cf["image_size"] = 256
# cf["num_channels"] = 3
# cf["num_layers"] = 12
# cf["hidden_dim"] = 128
# cf["mlp_dim"] = 32
# cf["num_heads"] = 2
# cf["dropout_rate"] = 0.1
# cf["patch_size"] = 16
# cf["num_patches"] = (cf["image_size"]**2)//(cf["patch_size"]**2)
# cf["flat_patches_shape"] = (
#     cf["num_patches"],
#     cf["patch_size"]*cf["patch_size"]*cf["num_channels"]
# )

o = {}
o["i_size"] = 256
o["n_channels"] = 3
o["n_layers"] = 12
o["hidden_dim"] = 128
o["mp_dim"] = 64
o["n_heads"] = 2
o["dropout_rate"] = 0.1
o["patch_s"] = 16
o["n_patches"] = (o["i_size"]**2)//(o["patch_s"]**2)
o["flat_patches_shape"] = (
    o["n_patches"],
    o["patch_s"]*o["patch_s"]*o["n_channels"]
)

get_custom_objects().update({'d_coef': d_coef})

get_custom_objects().update({'d_loss': d_loss})

from matplotlib import cm

if __name__ == "__main__":
    """ Seeding """
    np.random.seed(42)
    tf.random.set_seed(42)

    """ Directory for storing files """
    create_dir(f"results2")

    """ Load the model """
    model_path = os.path.join("files", "model.h5")
    model = tf.keras.models.load_model(model_path, custom_objects={"dice_loss": d_loss, "dice_coef": d_coef})

    """ Dataset """
    dataset_path = "Brain HE"
    (train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_dataset(dataset_path)

    print(f"Train: \t{len(train_x)} - {len(train_y)}")
    print(f"Valid: \t{len(valid_x)} - {len(valid_y)}")
    print(f"Test: \t{len(test_x)} - {len(test_y)}")



        # Helper function to create colored masks
    def create_color_mask(mask, color):
        """
        Convert a binary mask into a specific color overlay.
        Args:
            mask: Binary mask (values between 0 and 1).
            color: Tuple of (R, G, B) values for the color.
        Returns:
            A mask in the specified color.
        """
        colored_mask = np.zeros((mask.shape[0], mask.shape[1], 3), dtype=np.uint8)
        for i in range(3):  # Assign the color to respective channels
            colored_mask[:, :, i] = (mask * color[i]).astype(np.uint8)
        return colored_mask
    
    """ Prediction """
    for x, y in tqdm(zip(test_x, test_y), total=len(test_x)):
        """ Extracting the name """
        name = x.split("/")[-1]
    
        """ Reading the image """
        image = cv2.imread(x, cv2.IMREAD_COLOR)
        image = cv2.resize(image, (o["i_size"], o["i_size"]))
        x = image / 255.0
    
        patch_shape = (o["patch_s"], o["patch_s"], o["n_channels"])
        patches = patchify(x, patch_shape, o["patch_s"])
        patches = np.reshape(patches, o["flat_patches_shape"])
        patches = patches.astype(np.float32)
        patches = np.expand_dims(patches, axis=0)
    
        """ Read Mask """
        mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE)
        mask = cv2.resize(mask, (o["i_size"], o["i_size"]))
        mask = mask / 255.0
    
        """ Prediction """
        pred = model.predict(patches, verbose=0)[0]
        pred = pred.reshape((o["i_size"], o["i_size"]))
        pred = (pred > 0.5).astype(np.float32)  # Threshold prediction for binary mask
    
        """ Create  Masks """
        ground_truth_colored = create_color_mask(mask, (0, 0, 255))  # Red for Ground Truth
        prediction_colored = create_color_mask(pred, (0, 255, 0))   # Green for Prediction
    
    
        ground_truth_colored = cv2.addWeighted(image, 0.3, ground_truth_colored, 0.7, 0)
        prediction_colored = cv2.addWeighted(image, 0.3, prediction_colored, 0.7, 0)
    
        """ Combine  two masks """
        combined_overlay = cv2.addWeighted(ground_truth_colored, 0.5, prediction_colored, 0.5, 0)
    
        """ Save visualization """
        save_image_path = os.path.join("results2", name)
        cv2.imwrite(save_image_path, combined_overlay)




Train: 	256 - 256
Valid: 	31 - 31
Test: 	31 - 31


100%|██████████████████████████████████████████████████████████████████████████████████| 31/31 [00:06<00:00,  5.04it/s]


In [19]:
# print(len(train_x))
# print(len(valid_x))
# print(len(test_x))

# print(len(train_y))
# print(len(valid_y))
# print(len(test_y))

256
31
31
256
31
31
