## 🌐 Connect Colab to Google Drive

In [None]:
GOOGLE = True

try:
    from google.colab import drive
    drive.mount("/gdrive")
    %cd /gdrive/My Drive
    %cd ANDL_HOMEWORK2_LINK
    home_path = "./models/"
    GOOGLE = True
except:
    home_path = "./"
    GOOGLE = False

print(f"GOOGLE: {GOOGLE}")
print(f"HOME: {home_path}")

Mounted at /gdrive
/gdrive/My Drive
/gdrive/.shortcut-targets-by-id/1vsTygZGT_kAzNeU8X79sw_R-6RcOwzqm/Homework 2
GOOGLE: True
HOME: ./models/enrico


## DATASET PATHS

In [None]:
AUGN = 4

# Dataset Training
trainset_filename = f"{home_path}/dataset/training_{AUGN}.npz"

# Dataset Validation
validation_filename = f"{home_path}/dataset/validation_{AUGN}.npz"

# Dataset Test
testset_filename = f"{home_path}/dataset/test.npz"

ID = 9

## ⚙️ Import Libraries

In [None]:
import os
from datetime import datetime

import numpy as np
import pandas as pd

import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl

import matplotlib.pyplot as plt
%matplotlib inline

np.random.seed(42)
tf.random.set_seed(42)

print(f"TensorFlow version: {tf.__version__}")
print(f"Keras version: {tfk.__version__}")
print(f"GPU devices: {len(tf.config.list_physical_devices('GPU'))}")

from tensorflow.keras import mixed_precision

# Enable mixed precision training
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

TensorFlow version: 2.17.1
Keras version: 3.5.0
GPU devices: 1


In [None]:
# Set seed for reproducibility
seed = 42

# Import necessary libraries
import os

# Set environment variables before importing modules
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['PYTHONHASHSEED'] = str(seed)
os.environ['MPLCONFIGDIR'] = os.getcwd() + '/configs/'

# Suppress warnings
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)

# Import necessary modules
import logging
import random
import numpy as np

# Set seeds for random number generators in NumPy and Python
np.random.seed(seed)
random.seed(seed)

# Import TensorFlow and Keras
import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl

# Set seed for TensorFlow
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)

# Reduce TensorFlow verbosity
tf.autograph.set_verbosity(0)
tf.get_logger().setLevel(logging.ERROR)
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

# Print TensorFlow version
print(tf.__version__)

# Import other libraries
import os
import math
from PIL import Image
from keras import backend as K
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.utils.class_weight import compute_class_weight

# Configure plot display settings
sns.set(font_scale=1.4)
sns.set_style('white')
plt.rc('font', size=14)
%matplotlib inline

2.17.1


## ⏳ Load the Data

In [None]:
# Set batch size for training
BATCH_SIZE = 64

# Set learning rate for the optimiser
LEARNING_RATE = 1e-3

# Set early stopping patience threshold
PATIENCE = 80

# Set maximum number of training epochs
EPOCHS = 1000

REDUCE_CL0_FACTOR = 2

REDUCE_CL4_FACTOR = 7

In [None]:
# Take datasets
# Add color channel and rescale pixels between 0 and 1
training_data = np.load(trainset_filename)
X_train = training_data["images"]
X_train = X_train[..., np.newaxis] / 255.0
y_train = training_data["labels"]
y_train = y_train[..., np.newaxis]
del training_data

validation_data = np.load(validation_filename)
X_val = validation_data["images"]
X_val = X_val[..., np.newaxis] / 255.0
y_val = validation_data["labels"]
y_val = y_val[..., np.newaxis]
del validation_data

print(f"Training X shape: {X_train.shape}")
print(f"Training y shape: {y_train.shape}")

Training X shape: (11165, 64, 128, 1)
Training y shape: (11165, 64, 128, 1)


In [None]:
INPUT_SHAPE = (64,128,1)
NUM_CLASSES = 5

print(f"Input shape: {INPUT_SHAPE}")
print(f"Number of classes: {NUM_CLASSES}")

Input shape: (64, 128, 1)
Number of classes: 5


In [None]:
def create_segmentation_colormap(num_classes):
    return plt.cm.viridis(np.linspace(0, 1, num_classes))

def apply_colormap(label, colormap=None):
    # Ensure label is 2D
    label = np.squeeze(label)

    if colormap is None:
        num_classes = len(np.unique(label))
        colormap = create_segmentation_colormap(num_classes)

    # Apply the colormap
    colored = colormap[label.astype(int)]

    return colored

In [None]:
import numpy as np
from collections import Counter

def compute_segmentation_class_weights(masks, num_classes):
    flat_labels = masks.flatten()

    # Unique classes
    classes = np.unique(range(num_classes))

    # Compute class weights
    class_weights = compute_class_weight(class_weight='balanced', classes=classes, y=flat_labels)

    # Convert to a dictionary for easy access
    class_weight_dict = dict({(int(i)): float(weight) for i, weight in zip(classes, class_weights)})

    return class_weight_dict

In [None]:
#compute_segmentation_class_weights(y_train, num_classes=5)
cl_w_dict = {0: 0.0, 1: 0.8, 2: 0.9, 3: 0.9, 4: 5}

print(cl_w_dict)

{0: 0.0, 1: 0.8, 2: 0.9, 3: 0.9, 4: 5}


## 🛠️ Train and Save the Model

In [None]:
class InstanceNormalization(tfk.Layer):
    def __init__(self, epsilon=1e-5, **kwargs):
        super(InstanceNormalization, self).__init__(**kwargs)
        self.epsilon = epsilon

    def build(self, input_shape):
        # Create trainable parameters: gamma (scale) and beta (shift)
        self.gamma = self.add_weight(
            shape=(input_shape[-1],),
            initializer="ones",
            trainable=True,
            name="gamma"
        )
        self.beta = self.add_weight(
            shape=(input_shape[-1],),
            initializer="zeros",
            trainable=True,
            name="beta"
        )
        super(InstanceNormalization, self).build(input_shape)

    def call(self, inputs):
        # Compute mean and variance across spatial dimensions (axis 1, 2)
        mean, variance = tf.nn.moments(inputs, axes=[1, 2], keepdims=True)
        # Normalize the input
        normalized = (inputs - mean) / tf.sqrt(variance + self.epsilon)
        # Apply scale (gamma) and shift (beta)
        return self.gamma * normalized + self.beta

    def compute_output_shape(self, input_shape):
        return input_shape

    def get_config(self):
        config = super(InstanceNormalization, self).get_config()
        config.update({
            "epsilon": self.epsilon
        })
        return config

def squeeze_excite_block(input_tensor, name, ratio=16):
    """Squeeze-and-Excitation block."""
    channel_axis = -1  # Assuming channels-last format
    filters = input_tensor.shape[channel_axis]

    se = tfkl.GlobalAveragePooling2D()(input_tensor)
    se = tfkl.Reshape((1, 1, filters))(se)
    se = tfkl.Dense(filters // ratio, activation='relu', kernel_initializer='he_normal', use_bias=False)(se)
    se = tfkl.Dense(filters, activation='sigmoid', kernel_initializer='he_normal', use_bias=False)(se)
    return tfkl.Multiply()([input_tensor, se])

def unet_block(input_tensor, filters, kernel_size=3, activation='relu', stack=2, name='', drop=0.0, ratio=16):
    # Initialise the input tensor
    x = input_tensor

    if ratio != 0:
        x = squeeze_excite_block(x, name, ratio)

    # Apply a sequence of Conv2D, Batch Normalisation, and Activation layers for the specified number of stacks
    for i in range(stack):
        x = tfkl.Conv2D(filters, kernel_size=kernel_size, padding='same', kernel_initializer='he_normal', kernel_regularizer=tfk.regularizers.l2(1e-4), name=name + 'conv' + str(i + 1))(x)
        x = InstanceNormalization(name=name + 'bn' + str(i + 1))(x)
        x = tfkl.Activation(activation, name=name + 'activation' + str(i + 1))(x)

    if drop != 0.0:
            x = tfkl.Dropout(drop)(x)

    # Return the transformed tensor
    return x

In [None]:
def concatConv(input_layer, name):
    return tfkl.Conv2D(64, kernel_size=3, padding='same', kernel_initializer='he_normal', kernel_regularizer=tfk.regularizers.l2(1e-4), name=name)(input_layer)


def get_unet_model(input_shape, num_classes, seed, filename=None):
    tf.random.set_seed(seed)
    input_layer = tfkl.Input(shape=input_shape, name='input_layer')

    x1En = unet_block(input_layer, 64, stack=2, name='db_1_', ratio=0) # 128

    sx1En_64 = tfkl.MaxPooling2D()(x1En)
    sx1En_32 = tfkl.MaxPooling2D(pool_size=(4,4))(x1En)
    sx1En_16 = tfkl.MaxPooling2D(pool_size=(8,8))(x1En)

    x2En = unet_block(sx1En_64, 128,stack=2,  name='db_2_', drop=0., ratio=0) # 64

    sx2En_32 = tfkl.MaxPooling2D()(x2En)
    sx2En_16 = tfkl.MaxPooling2D(pool_size=(4,4))(x2En)

    x3En = unet_block(sx2En_32, 256, stack=2, name='db_3_', drop=0., ratio=0) #32

    sx3En_16 = tfkl.MaxPooling2D()(x3En)

    x4En = unet_block(sx3En_16, 512, stack=2, name='db_4_', drop=0.2, ratio=0) # 16

    sx4En_8 = tfkl.MaxPooling2D()(x4En)

    x5En = unet_block(sx4En_8, 1024, stack=2, name='db_5_', drop = 0.3, ratio=0) # 8

    sx5De_16 = tfkl.UpSampling2D()(x5En)
    sx5De_32 = tfkl.UpSampling2D(size=(4,4))(x5En)
    sx5De_64 = tfkl.UpSampling2D(size=(8,8))(x5En)
    sx5De_128 = tfkl.UpSampling2D(size=(16,16))(x5En)

    cx4De = tfkl.Concatenate()([concatConv(sx1En_16, name="cx1_16"),
                                concatConv(sx2En_16, name="cx2_16"),
                                concatConv(sx3En_16, name="cx3_16"),
                                concatConv(x4En, name="cx4_16"),
                                concatConv(sx5De_16, name="cx5_16")])
    x4De = unet_block(cx4De, 320, stack=2, name='up_4_', drop = 0.2) # 16

    sx4De_32 = tfkl.UpSampling2D()(x4De)
    sx4De_64 = tfkl.UpSampling2D(size=(4,4))(x4De)
    sx4De_128 = tfkl.UpSampling2D(size=(8,8))(x4De)

    cx3De = tfkl.Concatenate()([concatConv(sx1En_32, name="cx1_32"),
                                concatConv(sx2En_32, name="cx2_32"),
                                concatConv(x3En, name="cx3_32"),
                                concatConv(sx4De_32, name="cx4_32"),
                                concatConv(sx5De_32, name="cx5_32")])
    x3De = unet_block(cx3De, 320, stack=2, name='up_3_', drop = 0.1) # 32

    sx3De_64 = tfkl.UpSampling2D()(x3De)
    sx3De_128 = tfkl.UpSampling2D(size=(4,4))(x3De)

    cx2De = tfkl.Concatenate()([concatConv(sx1En_64, name="cx1_64"),
                                concatConv(x2En, name="cx2_64"),
                                concatConv(sx3De_64, name="cx3_64"),
                                concatConv(sx4De_64, name="cx4_64"),
                                concatConv(sx5De_64, name="cx5_64")])
    x2De = unet_block(cx2De, 320, stack=2, name='up_2_', drop = 0.) # 64

    sx2De_128 = tfkl.UpSampling2D()(x2En)

    cx1De = tfkl.Concatenate()([concatConv(x1En, name="cx1_128"),
                                concatConv(sx2De_128, name="cx2_128"),
                                concatConv(sx3De_128, name="cx3_128"),
                                concatConv(sx4De_128, name="cx4_128"),
                                concatConv(sx5De_128, name="cx5_128")])
    x1De = unet_block(cx1De, 320, stack=2, name='up_1_', drop = 0.) # 128

    # Output Layer
    output_layer = tfkl.Conv2D(num_classes, kernel_size=1, padding='same', activation="softmax", name='output_layer')(x1De)

    model = tf.keras.Model(inputs=input_layer, outputs=output_layer, name='UNet')

    if filename is not None:
        model.load_weights(filename)

    return model

In [None]:
model = get_unet_model(INPUT_SHAPE, NUM_CLASSES, seed=seed, filename=f"{home_path}/weights/best_9.keras")

# Print a detailed summary of the model with expanded nested layers and trainable parameters.
model.summary(expand_nested=True, show_trainable=True)

In [None]:
# Define custom Mean Intersection Over Union metric
class MeanIntersectionOverUnion(tf.keras.metrics.MeanIoU):
    def __init__(self, num_classes, labels_to_exclude=None, name="mean_iou", dtype=None):
        super(MeanIntersectionOverUnion, self).__init__(num_classes=num_classes, name=name, dtype=dtype)
        if labels_to_exclude is None:
            labels_to_exclude = [0]  # Default to excluding label 0
        self.labels_to_exclude = labels_to_exclude

    def update_state(self, y_true, y_pred, sample_weight=None):
        # Convert predictions to class labels
        y_pred = tf.math.argmax(y_pred, axis=-1)

        # Flatten the tensors
        y_true = tf.reshape(y_true, [-1])
        y_pred = tf.reshape(y_pred, [-1])

        # Apply mask to exclude specified labels
        for label in self.labels_to_exclude:
            mask = tf.not_equal(y_true, label)
            y_true = tf.boolean_mask(y_true, mask)
            y_pred = tf.boolean_mask(y_pred, mask)

        # Update the statecshpc1024
        return super().update_state(y_true, y_pred, sample_weight)

# Visualization callback
class VizCallback(tf.keras.callbacks.Callback):
    def __init__(self, images, labels, frequency=5, imgs=5, recall=1):
        super().__init__()
        self.images = images
        self.labels = labels
        self.frequency = frequency
        self.imgs = imgs
        self.recall = recall

        self.lastR = []
        for i in range(imgs):
            self.lastR.append(random.randint(0, len(self.images)))

        self.colormap = create_segmentation_colormap(5)

    def on_epoch_end(self, epoch, logs=None):
        if epoch % self.frequency == 0:  # Visualize only every "frequency" epochs

            for i in range(self.recall):
                newR = random.randint(0, len(self.images))
                self.lastR.append(newR)
                self.lastR.pop(0)

            plt.figure(figsize=(10, 5))

            for i, idx in enumerate(self.lastR):
                image = self.images[idx]
                label = self.labels[idx]

                image = tf.expand_dims(image,0)
                label = tf.expand_dims(label,0)

                pred = self.model.predict(image, verbose=0)
                y_pred = tf.math.argmax(pred, axis=-1)
                y_pred = y_pred.numpy()

                plt.subplot(3, self.imgs, i + 1)
                plt.imshow(image[0], cmap="gray")
                plt.title("Image")
                plt.axis('off')

                plt.subplot(3, self.imgs, (1 * self.imgs) + i + 1)
                col_lbl = apply_colormap(label.numpy(), self.colormap)
                plt.imshow(col_lbl)
                plt.title("Truth")
                plt.axis('off')

                plt.subplot(3, self.imgs, (2 * self.imgs ) + i + 1)
                col_pred = apply_colormap(y_pred[0], self.colormap)
                plt.imshow(col_pred)
                plt.title("Pred")
                plt.axis('off')

            plt.tight_layout()
            plt.show()
            plt.close()


In [None]:
def weighted_sparse_categorical_crossentropy(weight_dict):
    """
    Args:
        weight_dict: A dictionary where keys are class IDs and values are the corresponding weights.
    Returns:
        A custom loss function that applies class weights to the sparse categorical cross-entropy loss.
    """
    # Convert the dictionary to a TensorFlow tensor
    num_classes = max(weight_dict.keys()) + 1  # Assuming class IDs start at 0
    weights = tf.constant([weight_dict.get(i, 1.0) for i in range(num_classes)], dtype=tf.float32)

    def loss(y_true, y_pred):
        # Remove the last dimension in y_true if it's (64, 128, 1) -> (64, 128)
        y_true = tf.squeeze(y_true, axis=-1)

        # Ensure y_true is integer
        y_true = tf.cast(y_true, tf.int32)

        # Gather weights for each pixel's true class
        class_weights = tf.gather(weights, y_true)

        # Compute sparse categorical cross-entropy loss for each pixel
        scce = tf.keras.losses.sparse_categorical_crossentropy(y_true, y_pred)

        # Scale the loss by the class weights
        weighted_loss = scce * class_weights

        # Return mean loss over all pixels
        return tf.reduce_mean(weighted_loss)

    return loss

# Compile the model
print("Compiling model...")
model.compile(
    loss=weighted_sparse_categorical_crossentropy(cl_w_dict),
    optimizer=tf.keras.optimizers.AdamW(),
    metrics=[MeanIntersectionOverUnion(num_classes=NUM_CLASSES, labels_to_exclude=[0])]
)
print("Model compiled!")

Compiling model...
Model compiled!


In [None]:
# Setup callbacks
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='loss',
    mode='min',
    patience=PATIENCE,
    restore_best_weights=True
)

reduce_lr_on_plateau = tfk.callbacks.ReduceLROnPlateau(
    monitor='loss',
    factor=0.2,
    patience=15
)

save_checkpoint = tfk.callbacks.ModelCheckpoint(
    f'{home_path}/checkpoint/best_{ID}.keras',
    monitor='val_mean_iou',
    save_best_only=True,
    mode='max',
    verbose=1
)

viz_callback = VizCallback(X_val, y_val, frequency=3, imgs=10, recall=5)

In [None]:
# Train the model
history = model.fit(
    x=X_train,
    y=y_train,
    epochs=EPOCHS,
    validation_data=(X_val, y_val),
    batch_size=BATCH_SIZE,
    callbacks=[early_stopping, viz_callback, reduce_lr_on_plateau, save_checkpoint],
    verbose=1
).history

# Calculate and print the final validation accuracy
final_val_meanIoU = round(max(history['val_mean_iou'])* 100, 2)
print(f'Final validation Mean Intersection Over Union: {final_val_meanIoU}%')

timestep_str = datetime.now().strftime("%y%m%d_%H%M%S")
model_filename = f"{home_path}/weights/en_net_{ID}_{timestep_str}.keras"
model.save(model_filename)
del model

print(f"Model saved to {model_filename}")

Output hidden; open in https://colab.research.google.com to view.