# Artificial Neural Networks and Deep Learning

---

## Homework 2



## 🌐 Connect Colab to Google Drive

In [None]:
from google.colab import drive

drive.mount("/gdrive")
%cd /gdrive/My Drive/[2024-2025] AN2DL/Homework 2

Mounted at /gdrive
/gdrive/My Drive/[2024-2025] AN2DL/Homework 2


## ⚙️ Import Libraries

In [None]:
import os
from datetime import datetime

import numpy as np
import pandas as pd

import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl
from sklearn.model_selection import train_test_split


import matplotlib.pyplot as plt
%matplotlib inline

import hashlib
seed = 42
np.random.seed(seed)
tf.random.set_seed(seed)
from tensorflow.keras import layers, models


print(f"TensorFlow version: {tf.__version__}")
print(f"Keras version: {tfk.__version__}")
print(f"GPU devices: {len(tf.config.list_physical_devices('GPU'))}")

TensorFlow version: 2.17.1
Keras version: 3.5.0
GPU devices: 0


## Data Exploration Functions


In [None]:
def plot_image_mask_pairs(X, y, num_pairs=5):
    num_samples = min(num_pairs, len(X))
    plt.figure(figsize=(10, 4 * num_samples))

    random_indices = np.random.choice(len(X), num_samples, replace=False)
    for idx, i in enumerate(random_indices):
        plt.subplot(num_samples, 2, 2 * idx + 1)
        plt.imshow(X[i])
        plt.title(f"Image {i+1}")
        plt.axis('off')

        plt.subplot(num_samples, 2, 2 * idx + 2)
        plt.imshow(y[i], cmap='nipy_spectral', vmin=0, vmax=4)  # Standardize the color scaling
        plt.title(f"Mask {i+1}")
        plt.axis('off')

    plt.tight_layout()
    plt.show()

In [None]:
import numpy as np

def compute_class_distribution(y_train, num_classes=5):
    """
    Compute the percentage of pixels belonging to each class in the dataset.

    Parameters:
    - y_train (numpy array): Array of shape (num_images, height, width) containing the label masks.
    - num_classes (int): The number of classes in the dataset (default is 5).

    Returns:
    - percentages (list): A list of percentages corresponding to each class.
    """
    # Flatten the masks to count pixel values across all images
    flattened_labels = y_train.flatten()

    # Total number of pixels
    total_pixels = flattened_labels.size

    # Compute the percentage for each class
    percentages = [(flattened_labels == i).sum() / total_pixels * 100 for i in range(num_classes)]

    return percentages

In [None]:
import numpy as np

def flip_dataset(X_train, y_train):
    """
    Augments the dataset by adding horizontally and vertically flipped versions of
    the images and their corresponding masks.

    Args:
        X_train (numpy.ndarray): Training images of shape (n_samples, 64, 128).
        y_train (numpy.ndarray): Corresponding masks of shape (n_samples, 64, 128).

    Returns:
        augmented_X (numpy.ndarray): Augmented training images.
        augmented_y (numpy.ndarray): Augmented training masks.
    """
    # Horizontal flipping
    X_h_flip = np.flip(X_train, axis=2)  # Flip along width (horizontal axis)
    y_h_flip = np.flip(y_train, axis=2)

    # Combine original and horizontally flipped data
    X_combined = np.concatenate((X_train, X_h_flip), axis=0)
    y_combined = np.concatenate((y_train, y_h_flip), axis=0)

    # Vertical flipping
    X_v_flip = np.flip(X_combined, axis=1)  # Flip along height (vertical axis)
    y_v_flip = np.flip(y_combined, axis=1)

    # Combine with vertically flipped data
    augmented_X = np.concatenate((X_combined, X_v_flip), axis=0)
    augmented_y = np.concatenate((y_combined, y_v_flip), axis=0)

    return augmented_X, augmented_y


## Data Cleanup Functions


In [None]:
def find_duplicate_masks(labels):
    hashes = {}
    duplicates = {}

    for i, mask in enumerate(labels):
        mask_flat = mask.tobytes()
        mask_hash = hashlib.md5(mask_flat).hexdigest()

        if mask_hash in hashes:
            first_index = hashes[mask_hash]
            if first_index not in duplicates:
                duplicates[first_index] = []  # Initialize the list for this first index
            duplicates[first_index].append(i)  # Add the duplicate index
        else:
            hashes[mask_hash] = i  # Store the first occurrence of this hash

    return duplicates

In [None]:
def remove_alien_elements(input_dict, keys_to_remove):
    result = {}
    for key, value in input_dict.items():
        if key not in keys_to_remove:
            result[key] = value
    return result

In [None]:
pip install albumentations opencv-python



In [None]:
import albumentations as A
from albumentations.core.composition import OneOf
from albumentations.core.transforms_interface import ImageOnlyTransform
from albumentations.augmentations.transforms import *
import cv2
from albumentations import Compose


import numpy as np
def augment_image_and_mask(image, mask, augmentations):
    """
    Apply the same augmentations to an image and its corresponding mask.

    Args:
        image (np.ndarray): The input image.
        mask (np.ndarray): The corresponding mask.
        augmentations: Albumentations augmentation pipeline.

    Returns:
        Tuple[np.ndarray, np.ndarray]: Augmented image and mask.
    """
    data = augmentations(image=image, mask=mask)
    return data["image"], data["mask"]

  check_for_updates()


In [None]:
def augment_dataset(X_train, y_train, augmentations):
    """
    Augment the dataset by applying transformations, including horizontal and vertical flips.

    Args:
        X_train (np.ndarray): Array of input images of shape (N, 64, 128).
        y_train (np.ndarray): Array of masks of shape (N, 64, 128).
        augmentations: Albumentations augmentation pipeline.

    Returns:
        Tuple[np.ndarray, np.ndarray]: Augmented image and mask arrays.
    """
    augmented_images = []
    augmented_masks = []

    for img, mask in zip(X_train, y_train):
        # Original
        augmented_images.append(img)
        augmented_masks.append(mask)

        # Horizontal Flip
        img_hf, mask_hf = augment_image_and_mask(img, mask, A.Compose([A.HorizontalFlip(p=1)]))
        augmented_images.append(img_hf)
        augmented_masks.append(mask_hf)

        # Vertical Flip
        img_vf, mask_vf = augment_image_and_mask(img, mask, A.Compose([A.VerticalFlip(p=1)]))
        augmented_images.append(img_vf)
        augmented_masks.append(mask_vf)

        # Combined (Horizontal + Vertical Flip)
        img_hvf, mask_hvf = augment_image_and_mask(img_hf, mask_hf, A.Compose([A.VerticalFlip(p=1)]))
        augmented_images.append(img_hvf)
        augmented_masks.append(mask_hvf)

    # Convert lists to arrays
    return np.array(augmented_images), np.array(augmented_masks)


## Training Functions

In [None]:
import keras
import tensorflow as tf
from keras.metrics import MeanIoU
from keras.saving import register_keras_serializable

@keras.saving.register_keras_serializable()
class MeanIntersectionOverUnion(tf.keras.metrics.MeanIoU):
    def __init__(self, num_classes, labels_to_exclude=None, ignore_class=None, name="mean_iou", dtype=None, **kwargs):
        # Pass only recognized arguments to the parent class
        super(MeanIntersectionOverUnion, self).__init__(num_classes=num_classes, name=name, dtype=dtype, **kwargs)

        if labels_to_exclude is None:
            labels_to_exclude = [0]  # Default to excluding label 0 (background)
        self.labels_to_exclude = labels_to_exclude
        self.ignore_class = ignore_class  # Explicitly handle the ignore_class

    def update_state(self, y_true, y_pred, sample_weight=None):
        # Convert predictions to class labels
        y_pred = tf.math.argmax(y_pred, axis=-1)

        # Flatten the tensors
        y_true = tf.reshape(y_true, [-1])
        y_pred = tf.reshape(y_pred, [-1])

        # Apply mask to exclude specified labels
        for label in self.labels_to_exclude:
            mask = tf.not_equal(y_true, label)
            y_true = tf.boolean_mask(y_true, mask)
            y_pred = tf.boolean_mask(y_pred, mask)

        # Update the state
        return super().update_state(y_true, y_pred, sample_weight)

    def get_config(self):
        # Extend the parent class's get_config to include custom arguments
        config = super(MeanIntersectionOverUnion, self).get_config()
        config.update({
            "labels_to_exclude": self.labels_to_exclude,
            "ignore_class": self.ignore_class,
        })
        return config

    @classmethod
    def from_config(cls, config):
        # Ensure the custom config can be deserialized
        return cls(**config)


In [None]:
def load_single_image(image_path, label_path, input_size=(64, 128)):
    """
    Load a single image-label pair with the correct shape.
    """
    print(f"Loading image from {image_path}")
    print(f"Loading label from {label_path}")
    # Read and preprocess the image
    image = tf.io.read_file(image_path)
    image = tf.io.decode_png(image, channels=1)  # Ensure 1 channel, greyscale
    image = tf.image.resize(image, input_size)   # Resize to fixed size
    image = tf.cast(image, tf.float32) / 255.0

    # Read and preprocess the label
    label = tf.io.read_file(label_path)
    label = tf.io.decode_png(label, channels=1)  # Ensure single channel
    label = tf.image.resize(label, input_size, method='bilinear')  # Resize to fixed size
    label = tf.cast(label, tf.int32)

    return image, label

In [None]:
# Visualization callback
class VizCallback(tf.keras.callbacks.Callback):
    def __init__(self, image, label, frequency=5):
        super().__init__()
        self.image = image
        self.label = label
        self.frequency = frequency

    def on_epoch_end(self, epoch, logs=None):
        if epoch % self.frequency == 0:  # Visualize only every "frequency" epochs
            image, label = self.image, self.label
            image = tf.expand_dims(image, 0)
            pred = self.model.predict(image, verbose=0)
            y_pred = tf.math.argmax(pred, axis=-1)
            y_pred = y_pred.numpy()

            num_classes = 5 #had to do it
            colormap = create_segmentation_colormap(num_classes)

            plt.figure(figsize=(16, 4))

            # Input image
            plt.subplot(1, 3, 1)
            plt.imshow(image[0], cmap='gray')
            plt.title("Input Image")
            plt.axis('off')

            # Ground truth
            plt.subplot(1, 3, 2)
            colored_label = apply_colormap(label, colormap)
            plt.imshow(colored_label)
            plt.title("Ground Truth Mask")
            plt.axis('off')

            # Prediction
            plt.subplot(1, 3, 3)
            colored_pred = apply_colormap(y_pred, colormap)
            plt.imshow(colored_pred)
            plt.title("Predicted Mask")
            plt.axis('off')

            plt.tight_layout()
            plt.show()
            plt.close()

In [None]:
#Creating a colour map
def create_segmentation_colormap(num_classes):
    """
    Create a linear colormap using a predefined palette.
    Uses 'viridis' as default because it is perceptually uniform
    and works well for colorblindness.
    """

    return plt.cm.viridis(np.linspace(0, 1, num_classes))

def apply_colormap(label, colormap=None):
    """
    Apply the colormap to a label.
    """
    # Ensure label is 2D
    label = np.squeeze(label)

    if colormap is None:
        num_classes = len(np.unique(label))
        colormap = create_segmentation_colormap(num_classes)

    # Apply the colormap
    colored = colormap[label.astype(int)]

    return colored

## ⏳ Load the Data

In [None]:
data = np.load("mars_for_students.npz")

training_set = data["training_set"]
X_train = training_set[:, 0]
y_train = training_set[:, 1]

X_test = data["test_set"]

print(f"Training X shape: {X_train.shape}")
print(f"Training y shape: {y_train.shape}")
print(f"Test X shape: {X_test.shape}")

Training X shape: (2615, 64, 128)
Training y shape: (2615, 64, 128)
Test X shape: (10022, 64, 128)


In [None]:
#Finding and removing duplicates
duplicates = find_duplicate_masks(y_train)

#After manual inspection, the first alien image appears at index 62:
alien_image_indices = duplicates[62]
alien_image_indices.append(62)

# Step 3: Create a dictionary of all masks
input_dict = {index: y_train[index] for index in range(len(y_train))}

no_aliens_map = remove_alien_elements(input_dict, alien_image_indices)

remaining_indices = [i for i in range(len(y_train)) if i not in alien_image_indices]

X_train = X_train[remaining_indices]
y_train = y_train[remaining_indices]

print(f"Total masks before removal: {len(input_dict)}")
print(f"Total masks after removal: {len(no_aliens_map)}")

Total masks before removal: 2615
Total masks after removal: 2505


In [None]:
# Adding flipped versions of images
X_train, y_train = flip_dataset(X_train, y_train)

In [None]:
#Split a validation set
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1, random_state=seed)

In [None]:
augmentations = A.Compose([
    A.ShiftScaleRotate(shift_limit=0.5625, scale_limit=0.1, rotate_limit=15, border_mode=cv2.BORDER_CONSTANT, value=0, mask_value=0, p=0.5),
    A.OneOf([
        A.MotionBlur(blur_limit=7, p=0.4),
        A.MedianBlur(blur_limit=7, p=0.4),
        A.Blur(blur_limit=7, p=0.4)
    ], p=0.5),
    A.Resize(64, 128, always_apply=True),  # Ensures all outputs are the correct shape
])

# Apply the augmentations
X_train, y_train = augment_dataset(X_train, y_train, augmentations)

## 🛠️ Train and Save the Model

In [None]:
# Add color channel and rescale pixels between 0 and 1
X_train = X_train[..., np.newaxis] / 255.0
X_test = X_test[..., np.newaxis] / 255.0
X_val = X_val[..., np.newaxis] / 255.0

input_shape = X_train.shape[1:]
num_classes = len(np.unique(y_train))

#

print(f"Input shape: {input_shape}")
print(f"Number of classes: {num_classes}")

Input shape: (64, 128, 1)
Number of classes: 5


In [None]:
# Defining constants

batch_size = 64
num_epochs = 150
learning_rate = 0.001
patience = 10
IMAGE_SIZE = 512

### Define Model

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, Dense, Flatten, Reshape
from tensorflow.keras.models import Model

# Define Position Attention Module (PAM)
class PositionAttentionModule(tf.keras.layers.Layer):
    def __init__(self, in_channels):
        super(PositionAttentionModule, self).__init__()
        self.in_channels = in_channels

    def build(self, input_shape):
        # Define layers used in PAM
        self.query_conv = Conv2D(self.in_channels, kernel_size=1, padding='same')
        self.key_conv = Conv2D(self.in_channels, kernel_size=1, padding='same')
        self.value_conv = Conv2D(self.in_channels, kernel_size=1, padding='same')

    def call(self, inputs):
        B, H, W, C = inputs.shape  # Batch size, Height, Width, Channels

        # Step 1: Compute the query, key, and value
        query = self.query_conv(inputs)  # (B, H, W, C)
        key = self.key_conv(inputs)  # (B, H, W, C)
        value = self.value_conv(inputs)  # (B, H, W, C)

        # Step 2: Reshape for matrix multiplication
        query = Reshape((H * W, C))(query)  # (B, HW, C)
        key = Reshape((H * W, C))(key)  # (B, HW, C)
        value = Reshape((H * W, C))(value)  # (B, HW, C)

        # Step 3: Compute attention
        attention_map = tf.nn.softmax(tf.matmul(query, key, transpose_b=True))  # (B, HW, HW)

        # Step 4: Multiply the attention map with the value tensor
        output = tf.matmul(attention_map, value)  # (B, HW, C)

        # Step 5: Reshape back to original feature map shape
        output = Reshape((H, W, C))(output)  # (B, H, W, C)

        return output


In [None]:
# Define Channel Attention Module (CAM)
class ChannelAttentionModule(tf.keras.layers.Layer):
    def __init__(self, in_channels):
        super(ChannelAttentionModule, self).__init__()
        self.in_channels = in_channels

    def build(self, input_shape):
        # Define layers used in CAM
        self.fc1 = Dense(self.in_channels // 8, activation='relu')
        self.fc2 = Dense(self.in_channels, activation='sigmoid')

    def call(self, inputs):
        B, H, W, C = inputs.shape

        # Step 1: Global average pooling to squeeze spatial dimensions
        gap = tf.reduce_mean(inputs, axis=[1, 2])  # (B, C)

        # Step 2: Fully connected layers for channel attention
        attention = self.fc1(gap)  # (B, C//8)
        attention = self.fc2(attention)  # (B, C)

        # Step 3: Apply channel attention
        attention = tf.reshape(attention, (-1, 1, 1, C))  # (B, 1, 1, C)
        output = inputs * attention  # (B, H, W, C)

        return output

In [None]:
class ResNetBlock(layers.Layer):
    def __init__(self, filters, kernel_size=3, stride=1):
        super(ResNetBlock, self).__init__()
        self.conv1 = Conv2D(filters, kernel_size=kernel_size, strides=stride, padding='same', activation='relu')
        self.conv2 = Conv2D(filters, kernel_size=kernel_size, strides=1, padding='same', activation=None)
        self.shortcut = Conv2D(filters, kernel_size=1, strides=stride, padding='same', activation=None)

    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.conv2(x)
        shortcut = self.shortcut(inputs)
        return self.add([x, shortcut])

In [None]:
from tensorflow.keras.layers import UpSampling2D

class DualAttentionNetwork(Model):
    def __init__(self, num_classes=5, input_shape=(64, 128, 1)):
        super(DualAttentionNetwork, self).__init__()

        # Input layer
        self.input_layer = Input(shape=input_shape)

        # Replicate the 1 channel into 3 channels for ResNet50 compatibility
        self.grayscale_to_rgb = Lambda(lambda x: tf.repeat(x, 3, axis=-1))

        # Backbone - ResNet50 without the top layers, as a feature extractor
        self.backbone = ResNet50(include_top=False, weights=None, input_shape=(64, 128, 3), pooling=None)

        # Additional layers
        self.conv = Conv2D(256, kernel_size=3, padding='same', activation='relu')  # Reduce channels

        # Position Attention Module (PAM)
        self.pam = PositionAttentionModule(in_channels=256)  # Specify the input channels here (256 after conv)

        # Channel Attention Module (CAM)
        self.cam = ChannelAttentionModule(in_channels=256)  # Specify the input channels here (256 after conv)

        # Final output convolution layer
        self.output_conv = Conv2D(num_classes, kernel_size=1, activation='softmax')  # Final output layer

        # Upsampling to match the target dimensions (64, 128)
        self.upsample = UpSampling2D(size=(32, 32), interpolation='bilinear')

    def call(self, inputs):
        # Convert grayscale to RGB
        x = self.grayscale_to_rgb(inputs)

        # Pass through ResNet50 backbone
        x = self.backbone(x)

        # Add Conv2D to adjust the channel size
        x = self.conv(x)

        # Apply PAM (Position Attention)
        x = self.pam(x)

        # Apply CAM (Channel Attention)
        x = self.cam(x)

        # Final output convolution for classification (5 classes in your case)
        x = self.output_conv(x)

        # Upsample to match the target shape (64, 128)
        x = self.upsample(x)

        return x


In [None]:
# Instantiate and compile the model
input_shape = (64, 128, 1)  # Your grayscale input
batch_size = 64
model = DualAttentionNetwork(num_classes=5, input_shape=input_shape)

### Building model

In [None]:
# Print a detailed summary of the model with expanded nested layers and trainable parameters.
model.summary(expand_nested=True, show_trainable=True)

## Compile Model

In [None]:
# Compile the model
print("Compiling model...")


model.compile(
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    optimizer=tf.keras.optimizers.AdamW(learning_rate),
    metrics=["accuracy", MeanIntersectionOverUnion(num_classes=num_classes, labels_to_exclude=[0])]
)

print("Model compiled!")

Compiling model...
Model compiled!


### Setup Callbacks

In [None]:
earlystop = tf.keras.callbacks.EarlyStopping(
    monitor='val_accuracy',
    mode='max',
    patience=patience,
    restore_best_weights=True
)

plateau = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_accuracy',
    mode='max',
    factor=0.1,
    patience=5,
    min_lr=1e-4
)

viz_callback = VizCallback(X_val[10], y_val[10])

### Train the Model

In [None]:
# Train the model
history = model.fit(
    X_train,
    y_train,
    epochs=num_epochs,
    callbacks=[earlystop, viz_callback],
    verbose=1,
    validation_data=(X_val, y_val)
).history

# Calculate and print the final validation accuracy
final_val_meanIoU = round(max(history['val_mean_iou'])* 100, 2)
print(f'Final validation Mean Intersection Over Union: {final_val_meanIoU}%')

# Save the trained model to a file with the accuracy included in the filename
model_filename = 'baseline_'+str(final_val_meanIoU)+'.keras'

timestep_str = datetime.now().strftime("%y%m%d_%H%M%S")
model_filename = f"baseline_model_{timestep_str}.keras"
model.save(model_filename)
del model

Epoch 1/150


ValueError: Arguments `target` and `output` must have the same shape up until the last dimension: target.shape=(None, 64, 128), output.shape=(None, 2, 4, 5)

## 📊 Prepare Your Submission

In our Kaggle competition, submissions are made as `csv` files. To create a proper `csv` file, you need to flatten your predictions and include an `id` column as the first column of your dataframe. To maintain consistency between your results and our solution, please avoid shuffling the test set. The code below demonstrates how to prepare the `csv` file from your model predictions.




In [None]:
# If model_filename is not defined, load the most recent model from Google Drive
if "model_filename" not in globals() or model_filename is None:
    files = [f for f in os.listdir('.') if os.path.isfile(f) and f.startswith('model_') and f.endswith('.keras')]
    files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
    if files:
        model_filename = files[0]
    else:
        raise FileNotFoundError("No model files found in the current directory.")

In [None]:
model = tfk.models.load_model(model_filename)
print(f"Model loaded from {model_filename}")

In [None]:
preds = model.predict(X_test)
preds = np.argmax(preds, axis=-1)
print(f"Predictions shape: {preds.shape}")

In [None]:
def y_to_df(y) -> pd.DataFrame:
    """Converts segmentation predictions into a DataFrame format for Kaggle."""
    n_samples = len(y)
    y_flat = y.reshape(n_samples, -1)
    df = pd.DataFrame(y_flat)
    df["id"] = np.arange(n_samples)
    cols = ["id"] + [col for col in df.columns if col != "id"]
    return df[cols]

In [None]:
# Create and download the csv submission file
timestep_str = model_filename.replace("model_", "").replace(".keras", "")
submission_filename = f"submission_{timestep_str}.csv"
submission_df = y_to_df(preds)
submission_df.to_csv(submission_filename, index=False)

from google.colab import files
files.download(submission_filename)

#  
<img src="https://airlab.deib.polimi.it/wp-content/uploads/2019/07/airlab-logo-new_cropped.png" width="350">

<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/9/95/Instagram_logo_2022.svg/800px-Instagram_logo_2022.svg.png" width="15"> **Instagram:** https://www.instagram.com/airlab_polimi/

<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/8/81/LinkedIn_icon.svg/2048px-LinkedIn_icon.svg.png" width="15"> **LinkedIn:** https://www.linkedin.com/company/airlab-polimi/
___
Credits: Alberto Archetti 📧 alberto.archetti@polito.it





```
   Copyright 2024 Alberto Archetti

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
```