## Installation


In [None]:
pip install "numpy>=1.23" "pandas>=1.5" python-dateutil six "matplotlib>=3.7" "seaborn>=0.12" "opencv-python>=4.8" "Pillow>=9.5" "scikit-image>=0.21" "nibabel>=5.1.0" "SimpleITK>=2.3.0" "scikit-learn>=1.3" "tqdm>=4.66" "tensorflow>=2.13" "keras>=2.13" keras-unet

## Utils File


In [None]:
import os
import random
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
plt.style.use('ggplot')

import cv2
from tensorflow.keras import backend as K

# Image with Mask Image
def plot_from_img_path(rows, columns, list_img_path, list_mask_path):
    fig = plt.figure(figsize=(12,12))
    rnge = rows * columns + 1

    for i in range(1, rnge):
        fig.add_subplot(rows, columns, i)
        img_path = list_img_path[i]
        mask_path = list_mask_path[i]

        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(mask_path)
        mask = cv2.cvtColor(mask, cv2.COLOR_BGR2RGB)

        plt.imshow(image)
        plt.imshow(mask, alpha=0.4)

    plt.show()

# Image and Mask Image side by sideimport matplotlib.pyplot as plt
def show_img_mask_rows(n, list_img_path, list_mask_path):
    fig = plt.figure(figsize=(6, 3 * n))

    plot_idx = 1

    for i in range(n):
        img_path = list_img_path[i]
        mask_path = list_mask_path[i]

        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        mask = cv2.imread(mask_path, 0)  # grayscale mask

        # --- IMAGE ---
        fig.add_subplot(n, 2, plot_idx)
        plt.imshow(image)
        plt.title("Image")
        plt.axis("off")
        plot_idx += 1

        # --- MASK ---
        fig.add_subplot(n, 2, plot_idx)
        plt.imshow(mask, cmap='gray')
        plt.title("Mask")
        plt.axis("off")
        plot_idx += 1

    plt.tight_layout()
    plt.show()

# The Dice Coefficient (also called Dice Score or Dice Similarity Coefficient - DSC) is a metric used to measure overlap between two regions.
def dice_coefficients(y_true, y_pred, smooth=100):
    y_true_flatten = K.flatten(y_true)
    y_pred_flatten = K.flatten(y_pred)

    intersection = K.sum(y_true_flatten * y_pred_flatten)
    union = K.sum(y_true_flatten) + K.sum(y_pred_flatten)
    return (2 * intersection + smooth) / (union + smooth)

def dice_coefficients_loss(y_true, y_pred, smooth=100):
    return -dice_coefficients(y_true, y_pred, smooth) # Loss Metric

# Intersection over Union (IoU) also known as the Jaccard Index or Jaccard similarity coefficient, is a fundamental metric used to measure the similarity and overlap between two sets. In computer vision, it is the standard for evaluating how accurately a model localizes objects.
def iou(y_true_flatten, y_pred_flatten, smooth=100):
    intersection = K.sum(y_true_flatten * y_pred_flatten)
    add = K.sum(y_true_flatten + y_pred_flatten)
    iou = (intersection + smooth) / (add - intersection + smooth) # Jaccard IOU
    return iou

# Jaccard distance is a measure of dissimilarity between two sets, derived directly from Jaccard similarity. While Jaccard similarity measures how much two sets overlap, Jaccard distance measures how different they are. It is defined as one minus the Jaccard similarity.
def jaccard_distance(y_true, y_pred):
    y_true_flatten = K.flatten(y_true)
    y_pred_flatten = K.flatten(y_pred)

    return -iou(y_true_flatten, y_pred_flatten) # Loss Metric


## model.compile(optimizer=opt, loss=dice_coefficients_loss, metrics=['binary_accuracy', iou, dice_coefficients])

# The Loss Function is the goal, and the Optimizer is the strategy to reach it.

## UNet File

In [None]:
import tensorflow as tf
from tensorflow.keras import Input
from tensorflow.keras.models import Model, load_model, save_model
from tensorflow.keras.layers import (
    Input,
    Activation,
    BatchNormalization,
    Dropout,
    Lambda,
    Conv2D,
    Conv2DTranspose,
    MaxPooling2D,
    concatenate,
)

from tensorflow.keras import backend as K


def unet(input_size=(256,256,3)):
    inputs = Input(input_size)

    ## Encoding Layer
    # Layer 1
    conv1 = Conv2D(filters=64, kernel_size=(3, 3), padding="same")(inputs)
    bn1 = Activation("relu")(conv1)
    conv1 = Conv2D(filters=64, kernel_size=(3, 3), padding="same")(bn1)
    bn1 = BatchNormalization(axis=3)(conv1)
    bn1 = Activation("relu")(bn1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(bn1)

    # Layer 2
    conv2 = Conv2D(filters=128, kernel_size=(3, 3), padding="same")(pool1)
    bn2 = Activation("relu")(conv2)
    conv2 = Conv2D(filters=128, kernel_size=(3, 3), padding="same")(bn2)
    bn2 = BatchNormalization(axis=3)(conv2)
    bn2 = Activation("relu")(bn2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(bn2)

    # Layer 3
    conv3 = Conv2D(filters=256, kernel_size=(3, 3), padding="same")(pool2)
    bn3 = Activation("relu")(conv3)
    conv3 = Conv2D(filters=256, kernel_size=(3, 3), padding="same")(bn3)
    bn3 = BatchNormalization(axis=3)(conv3)
    bn3 = Activation("relu")(bn3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(bn3)

    # Layer 4
    conv4 = Conv2D(filters=512, kernel_size=(3, 3), padding="same")(pool3)
    bn4 = Activation("relu")(conv4)
    conv4 = Conv2D(filters=512, kernel_size=(3, 3), padding="same")(bn4)
    bn4 = BatchNormalization(axis=3)(conv4)
    bn4 = Activation("relu")(bn4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(bn4)

    # Layer 5
    conv5 = Conv2D(filters=1024, kernel_size=(3, 3), padding="same")(pool4)
    bn5 = Activation("relu")(conv5)
    conv5 = Conv2D(filters=1024, kernel_size=(3, 3), padding="same")(bn5)
    bn5 = BatchNormalization(axis=3)(conv5)
    bn5 = Activation("relu")(bn5)

    ## UpConvolutional
    # Layer 6
    up6 = concatenate([
        Conv2DTranspose(512, kernel_size=(2,2), padding='same', strides=(2,2))(bn5), conv4], axis=3)
    """ After every concatenation we again apply two consecutive regular convolutions so that the model can learn to assemble a more precise output """
    conv6 = Conv2D(filters=512, kernel_size=(3, 3), padding="same")(up6)
    bn6 = Activation("relu")(conv6)
    conv6 = Conv2D(filters=512, kernel_size=(3, 3), padding="same")(bn6)
    bn6 = BatchNormalization(axis=3)(conv6)
    bn6 = Activation("relu")(bn6)

    # Layer 7
    up7 = concatenate([
        Conv2DTranspose(256, kernel_size=(2,2), padding='same', strides=(2,2))(bn6), conv3], axis=3)
    conv7 = Conv2D(filters=256, kernel_size=(3, 3), padding="same")(up7)
    bn7 = Activation("relu")(conv7)
    conv7= Conv2D(filters=512, kernel_size=(3, 3), padding="same")(bn7)
    bn7 = BatchNormalization(axis=3)(conv7)
    bn7 = Activation("relu")(bn7)

    # Layer 8
    up8 = concatenate([
        Conv2DTranspose(128, kernel_size=(2,2), padding='same', strides=(2,2))(bn7), conv2], axis=3)
    conv8 = Conv2D(filters=128, kernel_size=(3, 3), padding="same")(up8)
    bn8 = Activation("relu")(conv8)
    conv8 = Conv2D(filters=128, kernel_size=(3, 3), padding="same")(bn8)
    bn8 = BatchNormalization(axis=3)(conv8)
    bn8 = Activation("relu")(bn8)

    # Layer 9
    up9 = concatenate([
        Conv2DTranspose(64, kernel_size=(2,2), padding='same', strides=(2,2))(bn8), conv1], axis=3)
    conv9 = Conv2D(filters=64, kernel_size=(3, 3), padding="same")(up9)
    bn9 = Activation("relu")(conv9)
    conv9 = Conv2D(filters=64, kernel_size=(3, 3), padding="same")(bn9)
    bn9 = BatchNormalization(axis=3)(conv9)
    bn9 = Activation("relu")(bn9)

    ## Final Layer
    # Layer 10
    conv10 = Conv2D(filters=1, kernel_size=(1,1), activation='sigmoid')(bn9)

    return Model(inputs=[inputs], outputs=[conv10])

## Main File

## Imports

In [None]:
## Imports

import os
import random
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
plt.style.use('ggplot')
%matplotlib inline

import cv2
from tqdm import tqdm_notebook, tnrange
from glob import glob
from itertools import chain

# from skimage.io import imread, imshow, concatenate_images
# from skimage.transform import resize
# from skimage.morphology import label
# from skimage.color import rgb2gray

import tensorflow as tf
from tensorflow.keras import Input
from tensorflow.keras.models import Model, load_model, save_model
from tensorflow.keras.layers import Input, Activation, BatchNormalization, Dropout, Lambda, Conv2D, Conv2DTranspose, MaxPooling2D, concatenate
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras import backend as K
from tensorflow.keras.preprocessing.image import ImageDataGenerator

from sklearn.model_selection import train_test_split

In [None]:
# Setting Size Parameters

im_width = 256
im_height = 256

## Mount Drive


In [None]:
from google.colab import drive
from glob import glob
import os

# 1. Mount Google Drive
drive.mount('/content/drive')

# 2. Updated path for Google Colab
mask_files = glob(r'/content/drive/MyDrive/Colab Notebooks/Medical Computer Vision/datasets/kaggle_3m/*/*_mask*')

print(f"Found {len(mask_files)} mask files.")

In [None]:
image_filenames_train = []

for i in mask_files:
    image_filenames_train.append(i.replace('_mask',''))

print(image_filenames_train[:10])

In [None]:
len(image_filenames_train)

## Plot Images and Mask

In [None]:
row = 3
column = 3

plot_from_img_path(row, column, image_filenames_train, mask_files)

In [None]:
show_img_mask_rows(3, image_filenames_train, mask_files)

## DataFrame and Train_Test_Split

In [None]:
df = pd.DataFrame(data= {
    'image_filenames_train':image_filenames_train,
    'mask':mask_files
})

In [None]:
# Train and Test Split
df_train, df_test = train_test_split(df, test_size=0.1)

# Train and VAl Split
df_train, df_val = train_test_split(df_train, test_size=0.2)

In [None]:
df_train.shape, df_val.shape, df_test.shape

## Data Generator : Passing my data in batch to Model

In [None]:


# After Mask Normalization value is less than or equal to 0.5. Then we will skip that mask.
def normalize_and_diagnose(img,mask):
    img = img / 255
    mask = mask / 255
    mask[mask > 0.5] = 1 # Its Tumor
    mask[mask <= 0.5] = 0 # Not a Tumor
    return (img, mask)

# Referring Code from : https://github.com/zhixuhao/unet/blob/master/data.py
# Using 'train_generator' for Load and process data in batches while training, instead of loading everything into memory.
def train_generator(
    data_frame,
    batch_size,
    augmentation_dict,
    image_color_mode="rgb",
    mask_color_mode="grayscale",
    image_save_prefix="image",
    mask_save_prefix="mask",
    save_to_dir=None,
    target_size=(256, 256),
    seed=1,
):
    """
    can generate image and mask at the same time use the same seed for
    image_datagen and mask_datagen to ensure the transformation for image
    and mask is the same if you want to visualize the results of generator,
    set save_to_dir = "your path"
    """
    image_datagen = ImageDataGenerator(**augmentation_dict)
    mask_datagen = ImageDataGenerator(**augmentation_dict)

    image_generator = image_datagen.flow_from_dataframe(
        data_frame,                    # Pandas DataFrame containing file paths
        x_col="image_filenames_train",# Column name with image file paths
        class_mode=None,              # No labels returned (augmentation only)
        color_mode=image_color_mode,  # 'rgb' or 'grayscale' image mode
        target_size=target_size,      # Resize images to (height, width)
        batch_size=batch_size,        # Images per batch
        save_to_dir=save_to_dir,      # Directory to save augmented images
        save_prefix=image_save_prefix,# Prefix for saved filenames
        seed=seed,                    # Random seed for reproducibility
    )


    mask_generator = mask_datagen.flow_from_dataframe(
        data_frame,
        x_col="mask",
        class_mode=None,
        color_mode=mask_color_mode,
        target_size=target_size,
        batch_size=batch_size,
        save_to_dir=save_to_dir,
        save_prefix=mask_save_prefix,
        seed=seed,
    )

    train_gen = zip(image_generator, mask_generator)

    # Final return Tuple after image Normalization and Diagnostics
    for (img, mask) in train_gen:
        img, mask = normalize_and_diagnose(img, mask)
        yield (img, mask)

In [None]:
# HyperParameters

EPOCHS = 100
BATCH_SIZE = 32
LEARNING_RATE = 1e-4
SMOOTH = 100

In [None]:
from tensorflow.keras.optimizers.legacy import Adam # Using Legacy 'Adam' Optimizer for 'decay'.

train_generator_param = dict(
    rotation_range=0.2,          # Randomly rotate images up to ±20% degrees
    width_shift_range=0.05,      # Random horizontal shift up to 5% of image width
    height_shift_range=0.05,     # Random vertical shift up to 5% of image height
    shear_range=0.05,            # Apply small shear transformations
    zoom_range=0.05,             # Random zoom in/out up to 5%
    horizontal_flip=True,        # Randomly flip images horizontally
    fill_mode='nearest'          # Fill missing pixels after transform using nearest values
)

train_gen = train_generator(
    df_train,                    # DataFrame containing training image paths and labels
    BATCH_SIZE,                 # Number of samples per training batch
    train_generator_param,      # Augmentation configuration dictionary
    target_size=(im_height, im_width)  # Resize images to model input size
)

# Not applying Augmentation on Test Set.
test_gen = train_generator(df_test,
                            BATCH_SIZE,
                            dict(), # Empty dict = no augmentation
                            target_size=(im_height, im_width))

model = unet(input_size=(im_height, im_width, 3)) # Input shape (Height, Width, RGB channels)


In [None]:
from tensorflow.keras.optimizers import Adam
import tensorflow as tf

# Decay rate gradually reduces the learning rate during training so the model "Learns fast at the beginning. Fine-tunes carefully later".

# Learning rate schedule (modern replacement for decay)
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=LEARNING_RATE,
    decay_steps= 1000,           # Safe default, prevents extremely fast decay
    decay_rate= LEARNING_RATE / EPOCHS ,
    staircase= True
)

optimizer = Adam(
    learning_rate=lr_schedule,  # Replaces deprecated lr + decay
    beta_1=0.9,                 # Explicit to avoid legacy behavior differences
    beta_2=0.999,
    epsilon=1e-7,               # Required in Keras 3 (None can cause issues)
    amsgrad=False
)

In [None]:

model.compile(
    optimizer=optimizer,        # Optimization algorithm
    loss=dice_coefficients_loss,# Loss function (Dice loss for segmentation)
    metrics=[
        'binary_accuracy',      # Pixel-wise classification accuracy
        iou,                    # Intersection over Union metric
        dice_coefficients       # Dice similarity score
    ]
)

callbacks = [
    ModelCheckpoint(
        'unet.keras',            # File path to save best model
        verbose=1,              # Print message when saving
        save_best_only=True     # Save only when validation improves
    )
]


history = model.fit(
    train_gen,                                 # Training data generator
    steps_per_epoch=len(df_train) // BATCH_SIZE,# Batches per epoch
    epochs=EPOCHS,                             # Total number of training epochs
    callbacks=callbacks,                       # List of callbacks
    validation_data=test_gen,                  # Validation generator
    validation_steps=len(df_test) // BATCH_SIZE# Validation batches per epoch
)


Found 2827 validated image filenames.




Found 2852 validated image filenames.
[1m85/89[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m1:45[0m 26s/step - binary_accuracy: 0.7804 - dice_coefficients: 0.0495 - iou: 0.0256 - loss: -0.0495

In [None]:
# Check History objects

import pprint # Printing objects

pprint.pprint(history.history)


In [None]:
history_post_training = history.history
train_dice_coeff_list = history_post_training['dice_coefficients']
test_dice_coeff_list = history_post_training['val_dice_coefficients']

train_jaccard_list = history_post_training['iou']
test_jaccard_list = history_post_training['val_iou']

train_loss_list = history_post_training['loss']
test_loss_list = history_post_training['val_loss']

plt.figure(1)
plt.plot(test_loss_list, 'b-')
plt.plot(train_loss_list, 'r-')
plt.xlabel('iterations')
plt.ylabel('loss')
plt.title('loss graph', fontsize=12)

plt.figure(2)
plt.plot(train_dice_coeff_list, 'b-')
plt.plot(test_dice_coeff_list, 'r-')
plt.xlabel('iterations')
plt.ylabel('accuracy')
plt.title('accuracy graph', fontsize=12)

plt.show()


In [None]:
# Load Previouly Trained Model

model = load_model("unet.hdf5", custom_objects = {"dice_coefficient_loss": dice_coefficients_loss, 'iou': iou,
                                                 "dice_coefficient" : dice_coefficients})

test_gen = train_generator(df_test, BATCH_SIZE, dict(), target_size = (im_height, im_width))

results = model.evaluate(test_gen, steps = len(df_test)/BATCH_SIZE)

print("TEST LOSS", results[0])
print("TEST IOU", results[1])
print("TEST DICE COEFFICIENT", results[2])



## Plotting Predicted Mask Segmentations Results from Test Image Set.

In [None]:
for i in range(20):
    index = np.random.randint(1, len(df_test.index))
    img = cv2.imread(df_test['image_filename_train'].iloc[index]) # Original Image NOt Mask
    img = cv2.resize(img, (im_height, im_width))
    img = img / 255
    #print(img.shape) (256, 256, 3)
    img = img[np.newaxis, :, :, :] # 3d array will become 4d array
    #print(img.shape) (1, 256, 256, 3)
    predicted_img = model.predict(img)

    plt.figure(figsize=(12,12))
    # 3 columns original image, mask, predicted image
    plt.subplot(1,3,1)
    plt.imshow(np.squeeze(img))
    plt.title('Original IMage')

    plt.subplot(1,3,2)
    plt.imshow(np.squeeze(cv2.imread(df_test['mask'].iloc[index])))
    plt.title("Mask Image")

    plt.subplot(1,3,3)
    plt.imshow(np.squeeze(predicted_img) > 0.5) # Checking Probabilities
    plt.title('Predicted Image')

    plt.show()