# U-Net - forcing meaningful features inside the decoder through multi-output learning





**Kaggle mean intersection over union metric** : 0.46

**Local test mean intersection over union metric** : 0.45

**Description**: Using multiple heads on the previously trained U-Net architecture with attention gating mechanisms for skip connections.

**Motivation**: Learning more meaningful features even in the middle of the network, simplifying convergence and allowing better gradient flow. Trying to make the network learn what skip connections are useful for the problem at hand.



## Google Drive - Connecting to the project

In [None]:
from google.colab import drive
import sys
drive.mount('/content/drive')
project_path = "/content/drive/MyDrive/Colab Notebooks/ANN/le acque del friuli - Homework 2"
%cd {project_path}
sys.path.append(project_path)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/MyDrive/Colab Notebooks/ANN/le acque del friuli - Homework 2


## Imports

In [None]:
# Set seed for reproducibility
seed = 42

# Import necessary libraries
import os

# Import necessary modules
import logging
import random
import numpy as np

# Set seeds for random number generators in NumPy and Python
np.random.seed(seed)
random.seed(seed)

# Import TensorFlow and Keras
import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl

# Set seed for TensorFlow
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)

# Import other libraries
import os
import math
from PIL import Image
from keras import backend as K
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

import cv2

import utils

# Configure plot display settings
sns.set(font_scale=1.4)
sns.set_style('white')
plt.rc('font', size=14)
%matplotlib inline

In [None]:
# Set batch size for training
BATCH_SIZE = 64

# Number of classes
NUM_CLASSES = 5

INPUT_SHAPE = (64,128,1)

## Load Data

In [None]:
def color_img(image):
    image = (image * 255).astype(np.uint8)
    return cv2.applyColorMap(image, cv2.COLORMAP_RAINBOW) # https://docs.opencv.org/3.4/d3/d50/group__imgproc__colormap.html

In [None]:
# Load the dataset
data = np.load("dataset/cleaned_dataset.npz")
test_img = data['test_set']

# Split the dataset in images and labels array
X = data['images']
y = data['labels']
y = tfk.utils.to_categorical(y, num_classes=NUM_CLASSES)

# Split the dataset into training and validation (test is given)
train_img, val_img, train_lbl, val_lbl = train_test_split(X, y, test_size=0.2, random_state=seed)

In [None]:
def make_dataset(images,labels,batch_size, shuffle=True, augment=False, seed=None):
    dataset = tf.data.Dataset.from_tensor_slices((images,{'out1':labels, 'out2':labels, 'out3':labels}))

    if shuffle:
        dataset = dataset.shuffle(buffer_size=batch_size * 2, seed=seed)

    # Batch the data
    dataset = dataset.batch(batch_size, drop_remainder=False)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)

    return dataset

In [None]:
train_dataset = make_dataset(
    train_img, train_lbl,
    batch_size=BATCH_SIZE,
    shuffle=True,
    augment=True,
    seed=seed
)

val_dataset = make_dataset(
    val_img, val_lbl,
    batch_size=BATCH_SIZE,
    shuffle=False,
    augment=False,
    seed=seed
)

## Model implementation

In [None]:
def res_bottleneck(input, n_filters, kernel_size, strides, name='', dropout=0.0):
    e = tfkl.Conv2D(filters=n_filters, kernel_size=kernel_size, padding='same', use_bias=False)(input)
    e = tfkl.GroupNormalization(groups=32, axis=-1)(e)
    e = tfkl.Activation('relu')(e)
    e = tfkl.Conv2D(filters=n_filters, kernel_size=kernel_size, padding='same', use_bias=False)(e)
    e = tfkl.GroupNormalization(groups=32, axis=-1)(e)
    e = tfkl.Activation('relu')(e)
    e = tfkl.Add()([e, input]) #skip connection to make this a residual block
    e = tfkl.GroupNormalization(groups=32, axis=-1)(e)
    e = tfkl.Activation('relu')(e)
    e = tfkl.Dropout(rate=dropout)(e)
    return e

In [None]:
def expand_as(tensor, rep):

    # Anonymous lambda function to expand the specified axis by a factor of argument, rep.
    # If tensor has shape (512,512,N), lambda will return a tensor of shape (512,512,N*rep), if specified axis=2

    output_shape = (tfk.backend.int_shape(tensor)[1],
                    tfk.backend.int_shape(tensor)[2],
                    tfk.backend.int_shape(tensor)[3]*rep)
    my_repeat = tfkl.Lambda(lambda x, repnum: tfk.backend.repeat_elements(x, repnum, axis=3), arguments={'repnum': rep},
                            output_shape=output_shape)(tensor)
    return my_repeat


# Attention block
def attnGatingBlock(x, g, inter_shape, dropout=0.0):
    print(f" Building attention block")
    print(f"    Using g={g}")
    print(f"    Using x={x}")
    shape_x = tfk.backend.int_shape(x)
    shape_g = tfk.backend.int_shape(g)

    # Getting the gating signal to the same number of filters as the inter_shape
    phi_g = tfkl.Conv2D(filters=inter_shape, kernel_size=1, strides=1, padding='same')(g)

    # Getting the x signal to the same shape as the gating signal
    theta_x = tfkl.Conv2D(filters=inter_shape, kernel_size=3, strides=(shape_x[1] // shape_g[1], shape_x[2] // shape_g[2]), padding='same')(x)

    # Element-wise addition of the gating and x signals
    add_xg = tfkl.add([phi_g, theta_x])
    add_xg = tfkl.Activation('relu')(add_xg)

    # 1x1x1 convolution
    psi = tfkl.Conv2D(filters=1, kernel_size=1, padding='same')(add_xg)
    psi = tfkl.Activation('sigmoid')(psi)

    shape_sigmoid = tfk.backend.int_shape(psi)

    # Upsampling psi back to the original dimensions of x signal
    upsample_sigmoid_xg = tfkl.UpSampling2D(size=(shape_x[1] // shape_sigmoid[1], shape_x[2] // shape_sigmoid[2]))(psi)

    # Expanding the filter axis to the number of filters in the original x signal
    upsample_sigmoid_xg = expand_as(upsample_sigmoid_xg, shape_x[3])

    # Element-wise multiplication of attention coefficients back onto original x signal
    attn_coefficients = tfkl.multiply([upsample_sigmoid_xg, x])

    # Final 1x1x1 convolution to consolidate attention signal to original x dimensions
    output = tfkl.Conv2D(filters=shape_x[3], kernel_size=1, strides=1, padding='same')(attn_coefficients)
    output = tfkl.BatchNormalization()(output)

    output = tfkl.Dropout(rate=dropout)(output)
    print("    Attention block built. Output =",output)
    return output

In [None]:
# Definition of a U-net block
def unet_block(input_tensor, filters, kernel_size=3, activation='relu', stack=2, name=''):
    # Initialise the input tensor
    x = input_tensor

    # Apply a sequence of Conv2D, Batch Normalisation, and Activation layers for the specified number of stacks
    for i in range(stack):
        x = tfkl.Conv2D(filters, kernel_size=kernel_size, padding='same', dilation_rate=(2,2), name=name + 'conv' + str(i + 1))(x)
        x = tfkl.GroupNormalization(groups=32, axis=-1, name=name + 'bn' + str(i + 1))(x)
        x = tfkl.Activation(activation, name=name + 'activation' + str(i + 1))(x)

    # Return the transformed tensor
    return x

In [None]:
# Definition of the model
def get_unet_model(input_shape=INPUT_SHAPE, num_classes=NUM_CLASSES, seed=seed):
    tf.random.set_seed(seed)
    input_layer = tfkl.Input(shape=input_shape, name='input_layer')


    # Downsampling path
    down_block_1 = unet_block(input_layer, 32, name='down_block1_')
    d1 = tfkl.Conv2D(filters=32, kernel_size=3, strides=(2,2), padding='same',activation='relu')(down_block_1)

    down_block_2 = unet_block(d1, 64, name='down_block2_')
    d2 = tfkl.Conv2D(filters=64, kernel_size=3, strides=(2,2), padding='same',activation='relu')(down_block_2)

    down_block_3 = unet_block(d2, 128, name='down_block3_')
    d3 = tfkl.Conv2D(filters=128, kernel_size=3, strides=(2,2), padding='same',activation='relu')(down_block_3)

    # Bottleneck
    bottleneck = res_bottleneck(d3, 128, kernel_size=3, strides=1, name='bottleneck', dropout=0.2)

    # Upsampling path
    a1 = attnGatingBlock(down_block_3, bottleneck, 64, dropout=0.3)
    u1 = tfkl.UpSampling2D(interpolation='bilinear')(bottleneck)
    u1 = tfkl.Concatenate()([u1, a1])
    u1 = unet_block(u1, 128, name='up_block1_')

    a2 = attnGatingBlock(down_block_2, u1, 64, dropout=0.3)
    u2 = tfkl.UpSampling2D(interpolation='bilinear')(u1)
    u2 = tfkl.Concatenate()([u2, a2])
    u2 = unet_block(u2, 64, name='up_block2_')

    a3 = attnGatingBlock(down_block_1, u2, 64, dropout=0.5)
    u3 = tfkl.UpSampling2D(interpolation='bilinear')(u2)
    u3 = tfkl.Concatenate()([u3, a3])
    u3 = unet_block(u3, 32, name='up_block3_')


    # Output Layers
    up1 = tfkl.Conv2DTranspose(filters=64, kernel_size=3, strides=2**2, padding='same')(u1)
    out1 = tfkl.Conv2D(num_classes, kernel_size=1, padding='same', activation="softmax", name='out1')(up1)

    up2 = tfkl.Conv2DTranspose(filters=32, kernel_size=3, strides=2**1, padding='same')(u2)
    out2 = tfkl.Conv2D(num_classes, kernel_size=1, padding='same', activation="softmax", name='out2')(up2)

    out3 = tfkl.Conv2D(num_classes, kernel_size=1, padding='same', activation="softmax", name='out3')(u3)

    model = tf.keras.Model(inputs=input_layer, outputs=[out1,out2,out3], name='UNet')
    return model

In [None]:
model = get_unet_model()

# Print a detailed summary of the model with expanded nested layers and trainable parameters.
model.summary(expand_nested=False, show_trainable=True)

 Building attention block
    Using g=<KerasTensor shape=(None, 8, 16, 128), dtype=float32, sparse=False, name=keras_tensor_126>
    Using x=<KerasTensor shape=(None, 16, 32, 128), dtype=float32, sparse=False, name=keras_tensor_115>
    Attention block built. Output = <KerasTensor shape=(None, 16, 32, 128), dtype=float32, sparse=False, name=keras_tensor_138>
 Building attention block
    Using g=<KerasTensor shape=(None, 16, 32, 128), dtype=float32, sparse=False, name=keras_tensor_146>
    Using x=<KerasTensor shape=(None, 32, 64, 64), dtype=float32, sparse=False, name=keras_tensor_108>
    Attention block built. Output = <KerasTensor shape=(None, 32, 64, 64), dtype=float32, sparse=False, name=keras_tensor_158>
 Building attention block
    Using g=<KerasTensor shape=(None, 32, 64, 64), dtype=float32, sparse=False, name=keras_tensor_166>
    Using x=<KerasTensor shape=(None, 64, 128, 32), dtype=float32, sparse=False, name=keras_tensor_101>
    Attention block built. Output = <KerasTens

## Training

In [None]:
# Set learning rate for the optimiser
LEARNING_RATE = 1e-3

# Set early stopping patience threshold
PATIENCE = 10

# Set plateauing patience threshold
LR_PATIENCE = 5

# Set maximum number of training epochs
EPOCHS = 1000


In [None]:
# Model compilation with Adam as optimizer and CategoricalFocalCrossentropy as loss functions
model.compile(
    optimizer=tf.keras.optimizers.Adam(LEARNING_RATE),
    loss={
        'out1': tfk.losses.CategoricalCrossentropy(),
        'out2': tfk.losses.CategoricalCrossentropy(),
        'out3': tfk.losses.CategoricalCrossentropy()
    },
    loss_weights={
        'out1': 0.2,
        'out2': 0.5,
        'out3': 1
    },
    metrics=[
        [tfk.metrics.OneHotMeanIoU(NUM_CLASSES,ignore_class=0, name='meanIoU')],
        [tfk.metrics.OneHotMeanIoU(NUM_CLASSES,ignore_class=0, name='meanIoU')],
        [tfk.metrics.OneHotMeanIoU(NUM_CLASSES,ignore_class=0, name='meanIoU')]
        ]
    )

In [None]:
# Setup callbacks, implementing early stopping as a technique to avoid overfitting
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_out3_meanIoU',
    mode='max',
    patience=PATIENCE,
    restore_best_weights=True
)

# Reduce learning rate on plateau
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_out3_meanIoU',
    mode='max',
    factor=0.1,
    patience=LR_PATIENCE,
    min_lr=1e-6
)

# Combine the callbacks into a list
callbacks = [early_stopping, reduce_lr]

In [None]:
# Train the model
history = model.fit(
    train_dataset,
    epochs=EPOCHS,
    validation_data=val_dataset,
    callbacks=callbacks,
    verbose=1,
).history

Epoch 1/1000
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m94s[0m 1s/step - loss: 2.9421 - out1_loss: 0.3012 - out1_meanIoU: 0.0974 - out2_loss: 0.7246 - out2_meanIoU: 0.1054 - out3_loss: 1.9160 - out3_meanIoU: 0.0665 - val_loss: 2.5328 - val_out1_loss: 0.2891 - val_out1_meanIoU: 0.1139 - val_out2_loss: 0.6731 - val_out2_meanIoU: 0.1268 - val_out3_loss: 1.5706 - val_out3_meanIoU: 0.1219 - learning_rate: 0.0010
Epoch 2/1000
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 164ms/step - loss: 2.3828 - out1_loss: 0.2781 - out1_meanIoU: 0.1380 - out2_loss: 0.6380 - out2_meanIoU: 0.1568 - out3_loss: 1.4665 - out3_meanIoU: 0.1678 - val_loss: 2.4541 - val_out1_loss: 0.2964 - val_out1_meanIoU: 0.1221 - val_out2_loss: 0.6999 - val_out2_meanIoU: 0.1378 - val_out3_loss: 1.4599 - val_out3_meanIoU: 0.1587 - learning_rate: 0.0010
Epoch 3/1000
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 164ms/step - loss: 2.1408 - out1_loss: 0.2533 - out1_meanIoU: 0.1857 

In [None]:
# Plot and display training and validation loss
plt.figure(figsize=(18, 3))
plt.plot(history['loss'], label='Training', alpha=0.8, color='#ff7f0e', linewidth=2)
plt.plot(history['val_loss'], label='val_tot', alpha=0.9, color='#5a9aa5', linewidth=2)
plt.plot(history['val_out1_loss'], label='out1', alpha=0.9, color='#5f0a3a', linewidth=2)
plt.plot(history['val_out2_loss'], label='out2', alpha=0.9, color='#0a9a85', linewidth=2)
plt.plot(history['val_out3_loss'], label='out3', alpha=0.9, color='#5a9a0f', linewidth=2)
plt.xlim(5, len(history['val_out1_loss']))
plt.ylim(0, 3)
plt.title('Cross Entropy')
plt.legend()
plt.grid(alpha=0.3)
plt.show()

# Plot and display training and validation accuracy
plt.figure(figsize=(18, 3))
plt.plot(history['out1_meanIoU'], label='out1_Training', alpha=0.8, color='#ff7f0e', linewidth=2)
plt.plot(history['out2_meanIoU'], label='out2_Training', alpha=0.8, color='#0f7f0e', linewidth=2)
plt.plot(history['out3_meanIoU'], label='out3_Training', alpha=0.8, color='#f00f0e', linewidth=2)
plt.plot(history['val_out1_meanIoU'], label='out1', alpha=0.9, color='#ff00a5', linewidth=2)
plt.plot(history['val_out2_meanIoU'], label='out2', alpha=0.9, color='#0a9a85', linewidth=2)
plt.plot(history['val_out3_meanIoU'], label='out3', alpha=0.9, color='#5a9a0f', linewidth=2)
plt.title('Mean IoU')
plt.legend()
plt.grid(alpha=0.3)
plt.show()

## Test Prediction and Final Evaluation

In [None]:
# Predict class probabilities and get predicted classes
test_predictions = model.predict(test_img_local, verbose=0)
test_predictions = np.argmax(test_predictions, axis=-1)
test_lbl_local = np.argmax(test_lbl_local, axis=-1)

In [None]:
# Calculate and display test set accuracy
test_accuracy = accuracy_score(test_lbl_local.flatten(), test_predictions.flatten())
print(f'Accuracy score over the test set: {round(test_accuracy, 4)}')

# Calculate and display test set precision
test_precision = precision_score(test_lbl_local.flatten(), test_predictions.flatten(), average='weighted', zero_division=0)
print(f'Precision score over the test set: {round(test_precision, 4)}')

# Calculate and display test set recall
test_recall = recall_score(test_lbl_local.flatten(), test_predictions.flatten(), average='weighted')
print(f'Recall score over the test set: {round(test_recall, 4)}')

# Calculate and display test set F1 score
test_f1 = f1_score(test_lbl_local.flatten(), test_predictions.flatten(), average='weighted')
print(f'F1 score over the test set: {round(test_f1, 4)}')

# Calculate and display test set mean Intersection Over Union score
# one_hot_pred = tfk.utils.to_categorical(test_predictions, num_classes=NUM_CLASSES)
test_miou = MeanIoU(NUM_CLASSES)
test_miou.update_state(test_lbl_local, test_predictions)
test_miou = test_miou.result().numpy()
print(f'Mean Intersection over Union over the test set: {round(test_miou, 4)}')

Accuracy score over the test set: 0.7352
Precision score over the test set: 0.7335
Recall score over the test set: 0.7352
F1 score over the test set: 0.728
Mean Intersection over Union over the test set: 0.459199994802475


## Save

In [None]:
# Save the model
from datetime import datetime
current_time = datetime.now().strftime("%d|%m-%H:%M")
model_filename = f'UNet_{current_time}.keras'
print("Saved as :",model_filename)
model.save(f'models/{model_filename}', include_optimizer=False)

Saved as : UNet_14|12-11:47.keras


## Submission

In [None]:
# Predict the masks for the submission
preds = model.predict(test_img)
preds = np.argmax(preds, axis=-1)

[1m314/314[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 27ms/step


In [None]:
# Converts segmentation predictions into a DataFrame format for Kaggle
def y_to_df(y) -> pd.DataFrame:
    n_samples = len(y)
    y_flat = y.reshape(n_samples, -1)
    df = pd.DataFrame(y_flat)
    df["id"] = np.arange(n_samples)
    cols = ["id"] + [col for col in df.columns if col != "id"]
    return df[cols]

In [None]:
# Create and download the csv submission file
timestep_str = model_filename.replace("model_", "").replace(".keras", "")
submission_filename = f"submissions/submission_{timestep_str}.csv"
submission_df = y_to_df(preds)
submission_df.to_csv(submission_filename, index=False)