In [2]:
# Utilities
import os
import datetime
import sys
import signal
import random

# Algebra
import numpy as np

# Visualization
import matplotlib.pyplot as plt
from PIL import Image
import cv2

# Neural Networks
import tensorflow as tf
from tensorflow import keras as ks
from keras.layers import Input, SeparableConv2D, BatchNormalization, MaxPooling2D, Conv2DTranspose
from keras.layers import concatenate, Conv2D
from keras.models import Model
from keras.optimizers import Adam

from sklearn.model_selection import train_test_split
from scipy import ndimage

tf.data.experimental.enable_debug_mode()

2023-09-09 17:52:23.622847: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-09-09 17:52:23.860486: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2023-09-09 17:52:24.607932: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /home/angelo/anaconda3/lib/python3.9/site-packages/cv2/../../../../lib:
2023-09-09 17:52:24.608016: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugi

The following cell defines the functions used to load the data from the dataset. The dataset is composed of images and masks. The masks are one-hot encoded. Using PIL instead of pyplot to load the images, the resulting range of the images is [0, 6] instead of [0, 1].

In [2]:
def load_data_from_names(root_dir: str, fnames: list, shape=(1000, 1000), mask=False, resize_image=True) -> np.array:
    # Given the root path and a list of file names as input, return the dataset
    # array.
    images = []

    for idx, img_name in enumerate(fnames):
        x = Image.open(os.path.join(root_dir, img_name))
        if resize_image:
            x = x.resize(shape)
        if not mask:
            x = np.array(x)/255.
        images.append(np.array(x))

        if idx%100 == 99:
            print(f"Processed {idx+1} images.")

    value = np.array(images) if not mask else one_hot_encoder(np.array(images))
    return value


def one_hot_encoder(y):
    unique_values = np.unique(y)
    one_hot_encoding = np.zeros(y.shape + (len(unique_values),), dtype=np.int8)
    for i, value in enumerate(unique_values):
        one_hot_encoding[(y == value), i] = 1
    return one_hot_encoding

Having just 600 images, I decided to create a brand-new dataset starting from the previous one. Since I had some troubles fitting all the dataset into my RAM, I got 256X 256 patches from the original images. Each patch has been chosen getting the part of the image having the largest variance in the corresponding mask.
The variance has been computed using a 5X5 window. The patches have been chosen in order to have a minimum overlapping area between them.

In [3]:
def overlapping_area(l1, r1, l2, r2):
    x = 0
    y = 1
    ''' Length of intersecting part i.e
        start from max(l1[x], l2[x]) of
        x-coordinate and end at min(r1[x],
        r2[x]) x-coordinate by subtracting
        start from end we get required
        lengths '''
    x_dist = abs(min(r1[x], r2[x]) - max(l1[x], l2[x]))

    y_dist = abs(min(r1[y], r2[y]) - max(l1[y], l2[y]))

    area_i = 0
    if x_dist > 0 and y_dist > 0:
        area_i = x_dist * y_dist

    return area_i


def create_training_set(target_max=True):

    rows, cols = 128, 128
    win_rows, win_cols = 5, 5
    threshold = .5 * (2 * rows * 2 * cols) if target_max else .2 * (2 * rows * 2 * cols)

    target_path_annotation_train = './ign_new/annotations/training/'
    target_path_images_train = './ign_new/images/training/'

    image_names = os.listdir('./ign/images/training')
    mask_names = os.listdir('./ign/annotations/training')

    image_names.sort()
    mask_names.sort()

    cnt = 0

    x = load_data_from_names('./ign/images/training', image_names, shape=(256, 256), resize_image=False)
    y = load_data_from_names('./ign/annotations/training', mask_names, shape=(256, 256), mask=True, resize_image=False)
    for ((img, img_name), (mask, mask_name)) in zip(zip(x, image_names), zip(y, mask_names)):
        win_mean = ndimage.uniform_filter(np.argmax(mask, axis=-1), (win_rows, win_cols))
        win_sqr_mean = ndimage.uniform_filter(np.argmax(mask, axis=-1)**2, (win_rows, win_cols))
        win_var = win_sqr_mean - win_mean**2

        mx_place = np.where(win_var == win_var.max()) if target_max else np.where(win_var == win_var.min())

        old_patches = []

        zip_var = zip(mx_place[0], mx_place[1]) if target_max else zip(mx_place[0][:250], mx_place[1][:250])

        for k, (i, j) in enumerate(zip_var):
            x_min = i-rows
            x_max = i+rows
            y_min = j-cols
            y_max = j+cols
            if x_min < 0:
                x_min = 0
                x_max = 2*rows
            if x_max > 1000:
                x_max = 1000
                x_min = 1000 - 2*rows
            if y_min < 0:
                y_min = 0
                y_max = 2*cols
            if y_max > 1000:
                y_max = 1000
                y_min = 1000 - 2*cols

            win_var_1 = np.argmax(mask, axis=-1)[x_min:x_max, y_min:y_max]
            img_1_var = img[x_min:x_max, y_min:y_max]*255

            if old_patches == []:
                cv2.imwrite(f'{target_path_images_train}{img_name[:img_name.find(".")]}_{k}.png', img_1_var)
                cv2.imwrite(f'{target_path_annotation_train}{mask_name[:mask_name.find(".")]}_{k}.png', win_var_1)
                old_patches.append((k, (x_min, x_max), (y_min, y_max)))
            else:
                a = [overlapping_area((x_min, y_min), (x_max, y_max), (old_x_min, old_y_min), (old_x_max, old_y_max)) < threshold
                     for (kk, (old_x_min, old_x_max), (old_y_min, old_y_max)) in old_patches]
                if all(a):
                    cv2.imwrite(f'{target_path_images_train}{img_name[:img_name.find(".")]}_{k}.png', img_1_var)
                    cv2.imwrite(f'{target_path_annotation_train}{mask_name[:mask_name.find(".")]}_{k}.png', win_var_1)
                    old_patches.append((k, (x_min, x_max), (y_min, y_max)))

        if cnt % 10 == 9:
            print(cnt, 'out of', len(image_names))
        cnt += 1


def create_test_set():

    target_path_annotation_val = './ign_new/annotations/validation/'
    target_path_images_val = './ign_new/images/validation/'

    image_names = os.listdir('./ign/images/validation')
    mask_names = os.listdir('./ign/annotations/validation')

    image_names.sort()
    mask_names.sort()

    x = load_data_from_names('./ign/images/validation', image_names, shape=(256, 256))
    y = load_data_from_names('./ign/annotations/validation', mask_names, shape=(256, 256), mask=True)

    for ((img, img_name), (mask, mask_name)) in zip(zip(x, image_names), zip(y, mask_names)):
        cv2.imwrite(f'{target_path_images_val}{img_name}', img*255)
        cv2.imwrite(f'{target_path_annotation_val}{mask_name}', np.argmax(mask, axis=-1))


def create_dataset():
    create_training_set()
    create_test_set()

The following snippet of code define the metrics used to evaluate the model.

In [10]:
def dice_coef(y_true, y_pred):
    y_true_f = y_true.flatten()
    y_pred_f = y_pred.flatten()
    intersection = np.sum(y_true_f * y_pred_f)
    smooth = 0.0001
    return (2. * intersection + smooth) / (np.sum(y_true_f) + np.sum(y_pred_f) + smooth)

def dice_coef_multilabel(y_true, y_pred, numLabels=7):
    dice = 0
    y_true = y_true.numpy()
    y_pred = y_pred.numpy()
    for index in range(numLabels):
        dice += dice_coef(y_true[:, :, :, index], y_pred[:, :, :, index])
    return dice

def np_dice_coef_multilabel(y_true, y_pred, numLabels=7):
    dice = 0
    for index in range(numLabels):
        dice += dice_coef(y_true[:, :, :, index], y_pred[:, :, :, index])
    return dice/numLabels

The following model is a modified version of the MobileUNet model. The main differences are the use of separable convolutional layers instead of the standard ones and the use of batch normalization layers.

In [5]:
def mod_mobileunet(input_size=(256, 256, 3)):
    inputs = Input(input_size)

    conv1 = SeparableConv2D(64, 3, activation='elu', padding='same')(inputs)
    conv1 = BatchNormalization()(conv1)
    conv1 = SeparableConv2D(64, 3, activation='elu', padding='same')(conv1)
    conv1 = BatchNormalization()(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = SeparableConv2D(128, 3, activation='elu', padding='same')(pool1)
    conv2 = BatchNormalization()(conv2)
    conv2 = SeparableConv2D(128, 3, activation='elu', padding='same')(conv2)
    conv2 = BatchNormalization()(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = SeparableConv2D(256, 3, activation='elu', padding='same')(pool2)
    conv3 = BatchNormalization()(conv3)
    conv3 = SeparableConv2D(256, 3, activation='elu', padding='same')(conv3)
    conv3 = BatchNormalization()(conv3)

    conv8 = Conv2DTranspose(128, 3, strides=(2, 2), activation='elu', padding='same')(conv3)
    cat8 = concatenate([conv2, conv8], axis=3)
    conv8 = SeparableConv2D(128, 3, activation='elu', padding='same')(cat8)
    conv8 = BatchNormalization()(conv8)
    conv8 = SeparableConv2D(128, 3, activation='elu', padding='same')(conv8)
    conv8 = BatchNormalization()(conv8)

    conv9 = Conv2DTranspose(64, 3, strides=(2, 2), activation='elu', padding='same')(conv8)
    cat9 = concatenate([conv1, conv9], axis=3)
    conv9 = SeparableConv2D(64, 3, activation='elu', padding='same')(cat9)
    conv9 = BatchNormalization()(conv9)
    conv9 = SeparableConv2D(64, 3, activation='elu', padding='same')(conv9)

    conv10 = Conv2D(7, 1, activation='softmax')(conv9)

    model = Model(inputs, conv10)
    model.compile(optimizer=Adam(learning_rate=1e-3), loss='binary_crossentropy',
                  metrics=[dice_coef_multilabel], run_eagerly=True)


    return model

This callback is designed to display the results of the model during the training phase. It shows the original image, the ground truth mask and the predicted mask that is made on an image drawn from the validation set.

In [6]:
class DisplayCallback(ks.callbacks.Callback):
    def __init__(self, epoch_interval=None, test_images=None, test_masks=None, BATCH_SIZE=None):
        self.epoch_interval = epoch_interval
        self.test_images = test_images
        self.test_masks = test_masks
        self.BATCH_SIZE = BATCH_SIZE

    def on_epoch_end(self, epoch, logs=None):
        if self.epoch_interval and epoch % self.epoch_interval == 0:
            pred_masks = self.model.predict(self.test_images)
            pred_masks = tf.math.argmax(pred_masks, axis=-1)
            pred_masks = np.argmax(pred_masks[..., tf.newaxis], axis=-1)

            # Randomly select an image from the test batch
            random_index = random.randint(0, self.BATCH_SIZE - 1)
            random_image = self.test_images[random_index]
            random_pred_mask = pred_masks[random_index]
            random_true_mask = np.argmax(self.test_masks[random_index], axis=-1)

            fig, ax = plt.subplots(nrows=1, ncols=3, figsize=(10, 5))
            ax[0].imshow(random_image)
            ax[0].set_title(f"Image: {epoch:03d}")

            ax[1].imshow(random_true_mask)
            ax[1].set_title(f"Ground Truth Mask: {epoch:03d}")

            ax[2].imshow(random_pred_mask)
            ax[2].set_title(
                f"Predicted Mask: {epoch:03d}",
            )

            plt.show()
            plt.close()

The training phase is performed by the following cell. The model is trained for 300 epochs. The training is stopped if the validation dice coefficient does not improve for 15 epochs. The model is saved every time the validation dice coefficient improves.
The training is performed on a subset of the dataset composed of 1000 images split in training set and validation in a 90%/10% ratio.
A panic button-like feature has been implemented in order to save the model if the training is interrupted using ctrl+C.

In [11]:
CNN_model = None

def signal_handler(sig, frame):
    if CNN_model is not None:
        CNN_model.save('./saved_models/semantic_segmentation_panic_new_data.h5')
        print('Model saved', flush=True)
    sys.exit(0)


signal.signal(signal.SIGINT, signal_handler)


def train_model(pretrained_model=None):  # pretrained_model is the path to the pretrained model to be used
    image_names = os.listdir('./ign_new/images/training')[:1000]
    mask_names = os.listdir('./ign_new/annotations/training')[:1000]

    image_names.sort()
    mask_names.sort()

    x = load_data_from_names('./ign_new/images/training', image_names, shape=(256, 256))
    y = load_data_from_names('./ign_new/annotations/training', mask_names, shape=(256, 256), mask=True)

    x_train, x_val, y_train, y_val = train_test_split(x, y, test_size=0.1, random_state=42)

    global CNN_model
    if pretrained_model:
        CNN_model = ks.models.load_model(pretrained_model, custom_objects={'dice_coef_multilabel': dice_coef_multilabel})
        CNN_model.compile(optimizer=Adam(learning_rate=1e-3), loss='binary_crossentropy', metrics=[dice_coef_multilabel], run_eagerly=True)
    else:
        CNN_model = mod_mobileunet(input_size=(256, 256, 3))

    print(CNN_model.summary())

    # Set hyperparameters
    BATCH_SIZE = 8
    N_EPOCHS = 300

    logdir = os.path.join("logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
    tensorboard_callback = tf.keras.callbacks.TensorBoard(logdir, histogram_freq=1)
    cBack = tf.keras.callbacks.EarlyStopping(monitor='val_dice_coef_multilabel', patience=15, mode='max', restore_best_weights=True)
    checkPointCb = tf.keras.callbacks.ModelCheckpoint(filepath="./saved_models/semantic_segmentation_new_data.h5",
                                                      monitor='val_dice_coef_multilabel', mode='max', save_best_only=True)

    display_callback = DisplayCallback(epoch_interval=1, test_images=x_val, test_masks=y_val, BATCH_SIZE=BATCH_SIZE)
    # dice_metric_callback = Metrics(7, CNN_model)

    # Training
    hist = CNN_model.fit(x_train, y_train,
                         batch_size=BATCH_SIZE,
                         epochs=N_EPOCHS,
                         validation_data=(x_val, y_val),
                         callbacks=[tensorboard_callback, cBack, checkPointCb])  # , display_callback])

    CNN_model.save('./saved_models/semantic_segmentation_new_data.h5')

    return CNN_model, hist

The test set has 200 images which are the down-sampled version of the original ones in order to suit the requirements in terms of . The following cell computes the mean dice coefficient on the test set. MOreover, it displays the original image, the ground truth mask and the predicted mask for each image in the test set for each of the test set images.
In order to see further insights on the model, the logs have been saved in a folder called 'logs' that can be visualized by tensorboard.

In [12]:
def test_model():
    image_names = os.listdir('./ign_new/images/validation')
    mask_names = os.listdir('./ign_new/annotations/validation')

    image_names.sort()
    mask_names.sort()

    x = load_data_from_names('./ign_new/images/validation', image_names, shape=(256, 256))
    y = load_data_from_names('./ign_new/annotations/validation', mask_names, shape=(256, 256), mask=True)
    model = ks.models.load_model('./saved_models/semantic_segmentation_analyze_me.h5', custom_objects={'dice_coef_multilabel': dice_coef_multilabel})
    model.compile(optimizer=Adam(learning_rate=1e-3), loss='binary_crossentropy', metrics=[dice_coef_multilabel], run_eagerly=True)

    predictions = model.predict(x)

    print('Mean dice coefficient multi-label', np_dice_coef_multilabel(y, predictions))

    for image, pred, y_true in zip(x, predictions, y):

        fig, ax = plt.subplots(nrows=1, ncols=3, figsize=(10, 5))
        ax[0].imshow(image)

        ax[1].imshow(np.argmax(y_true, axis=-1))

        ax[2].imshow(np.argmax(pred, axis=-1))

        plt.show()
    plt.close()

Perform the training on the model and plot the results in terms of loss and dice_coef_multilabel. [[ TODO: comment on the results ]]

In [None]:
CNN_model, hist = train_model()
loss_history = hist.history['loss']
val_loss_history = hist.history['val_loss']

acc_history = hist.history['dice_coef_multilabel']
val_acc_history = hist.history['val_dice_coef_multilabel']

plt.plot(loss_history)
plt.plot(val_loss_history)
plt.grid()
plt.xlabel('Epoch')
plt.legend(['Loss', 'Val Loss'])
plt.title('Loss')
plt.show()

plt.plot(acc_history)
plt.plot(val_acc_history)
plt.grid()
plt.xlabel('Epoch')
plt.legend(['dice_coef_multilabel', 'Val dice_coef_multilabel'])
plt.title('dice_coef_multilabel')
plt.show()

Processed 100 images.
Processed 200 images.
Processed 300 images.
Processed 400 images.
Processed 500 images.
Processed 600 images.
Processed 700 images.
Processed 800 images.
Processed 900 images.
Processed 1000 images.
Processed 100 images.
Processed 200 images.
Processed 300 images.
Processed 400 images.
Processed 500 images.
Processed 600 images.
Processed 700 images.
Processed 800 images.
Processed 900 images.
Processed 1000 images.
Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 256, 256, 3  0           []                               
                                )]                                                                
                                                                                                  
 separable_conv2d_10 (Separable  (None, 256, 2

2023-09-09 17:03:30.107344: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 707788800 exceeds 10% of free system memory.


Perform the test on the model [[ TODO: comment on the results ]]

In [None]:
test_model()