## 🌐 Connect Colab to Google Drive

In [1]:
from google.colab import drive

drive.mount("/gdrive")
%cd /gdrive/My Drive
%cd [2024-2025] AN2DL Homework 2

Mounted at /gdrive
/gdrive/My Drive
/gdrive/My Drive/[2024-2025] AN2DL Homework 2


## ⚙️ Import Libraries

In [2]:
from datetime import datetime

import numpy as np
import pandas as pd
import logging
import random

import tensorflow as tf
from tensorflow import keras as tfk
tfk.config.enable_unsafe_deserialization()
from tensorflow.keras import layers as tfkl
from tensorflow.keras.layers import Layer
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from scipy.stats import mode

import matplotlib.pyplot as plt
%matplotlib inline

seed = 29
np.random.seed(seed)
tf.random.set_seed(seed)

# Set seeds for random number generators in NumPy and Python
np.random.seed(seed)
random.seed(seed)

# Set seed for TensorFlow
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)

# Reduce TensorFlow verbosity
tf.autograph.set_verbosity(0)
tf.get_logger().setLevel(logging.ERROR)
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

# Suppress warnings
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)

## ⏳ Load the Datasets

In [3]:
data = np.load("mars_for_students.npz")

training_set = data["training_set"]
X_train = training_set[:, 0]
y_train = training_set[:, 1]

X_test = data["test_set"]

print(f"Training X shape: {X_train.shape}")
print(f"Training y shape: {y_train.shape}")
print(f"Test X shape: {X_test.shape}")

# Add color channel and rescale pixels between 0 and 1
X_train = X_train[..., np.newaxis] / 255.0
X_test = X_test[..., np.newaxis] / 255.0

input_shape = X_train.shape[1:]
num_classes = len(np.unique(y_train))

print(f"Input shape: {input_shape}")
print(f"Number of classes: {num_classes}")

Training X shape: (2615, 64, 128)
Training y shape: (2615, 64, 128)
Test X shape: (10022, 64, 128)
Input shape: (64, 128, 1)
Number of classes: 5


## 🔍 Inspect the training dataset

In [None]:
# Calculate prevalent labels
y_train_labels = mode(y_train, axis=(1, 2))[0].flatten()

print(f"Shape X_train: {X_train.shape}")
print(f"Shape y_train_labels: {y_train_labels.shape}")

# List all unique labels to check correctness
unique_labels = np.unique(y_train)
print(f"Unique classes: {unique_labels}")

# Plot images in batches
def plot_images(X, y, start_index=0, images_per_row=10, images_per_col=10):
    fig, axes = plt.subplots(images_per_col, images_per_row, figsize=(15, 15))
    for i in range(images_per_row * images_per_col):
        idx = start_index + i
        if idx >= len(X):
            break
        ax = axes[i // images_per_row, i % images_per_row]
        ax.imshow(X[idx], cmap="gray")
        ax.set_title(f"Class: {y[idx]}")
        ax.axis("off")
    plt.tight_layout()
    plt.show()

# Plot a sample image from each class
def plot_one_sample_per_class(X, y, y_mask, classes):
    for label in classes:
        for i in range(len(y_mask)):
            if label in np.unique(y_mask[i]):
                plt.figure()
                plt.imshow(X[i], cmap="gray")
                plt.title(f"Class: {label}")
                plt.axis("off")
                plt.show()
                break

plot_one_sample_per_class(X_train, y_train_labels, y_train, unique_labels)

# Plot all images
images_per_row = 10
images_per_col = 10
images_per_page = images_per_row * images_per_col
num_images = X_train.shape[0]

for start_idx in range(0, num_images, images_per_page):
    plot_images(X_train, y_train_labels, start_index=start_idx, images_per_row=images_per_row, images_per_col=images_per_col)

## ❌ Remove outliers from dataset

In [None]:
# Lists to contain filtered elements
X_train_filtered = []
y_train_filtered = []

for i in range(len(y_train)):
    label = y_train[i].argmax() if y_train.ndim > 1 else y_train[i]
    if label != 415:
        # Add to filtered dataset the non-alien images
        X_train_filtered.append(X_train[i])
        y_train_filtered.append(y_train[i])

# Convert lists to numpy arrays
X_train_filtered = np.array(X_train_filtered)
y_train_filtered = np.array(y_train_filtered)

print(f"Shape X_train_filtered: {X_train_filtered.shape}")
print(f"Shape y_train_filtered: {y_train_filtered.shape}")
print(f"Unique classes: {np.unique(y_train_filtered)}")

Shape X_train_filtered: (2505, 64, 128, 1)
Shape y_train_filtered: (2505, 64, 128)
Unique classes: [0. 1. 2. 3. 4.]


## 🔍 Inspect the filtered training dataset

In [None]:
num_images_filtered = X_train_filtered.shape[0]
y_train_filtered_labels = mode(y_train_filtered, axis=(1, 2))[0].flatten()

# Plot the filtered dataset
for start_idx in range(0, num_images_filtered, images_per_page):
    plot_images(X_train_filtered, y_train_filtered_labels, start_index=start_idx, images_per_row=images_per_row, images_per_col=images_per_col)

## 🧮 Define network parameters

In [None]:
# Set batch size for training
batch_size = 64

# Set learning rate for the optimizer
learning_rate = 1e-4

# Set early stopping patience threshold
patience = 15

# Set maximum number of training epochs
epochs = 300

In [None]:
# Create an EarlyStopping callback
early_stopping = tfk.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=patience,
    restore_best_weights=True
)

# Create a LearningRate Scheduler, which reduces learning rate if val_loss doesn't improve
lr_scheduler = tfk.callbacks.ReduceLROnPlateau(
    monitor='val_loss', factor=0.5, patience=5, min_lr=1e-5
)

# Store the callback in a list
callbacks = [early_stopping, lr_scheduler]

## ✂ Split into Training and Validation Sets

In [None]:
# Split the training dataset to get a validation set
X_train, X_val, y_train, y_val = train_test_split(
    X_train_filtered,
    y_train_filtered,
    test_size=0.1,
    random_state=seed)

class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train.flatten())
class_weights = {i: weight for i, weight in enumerate(class_weights)}
class_weights = [class_weights[key] for key in sorted(class_weights.keys())]

# Print the shapes of the resulting sets
print('Training set shape:\t', X_train.shape, y_train.shape)
print('Validation set shape:\t', X_val.shape, y_val.shape)
print('Class weights:\t', class_weights)

# Convert in un tensore statico
class_weights = tf.constant(class_weights, dtype=tf.float32)

Training set shape:	 (2254, 64, 128, 1) (2254, 64, 128)
Validation set shape:	 (251, 64, 128, 1) (251, 64, 128)
Class weights:	 [0.8176698618113929, 0.5910939761597104, 0.8421959857784994, 1.1219193842338118, 153.08214226496435]


## 🔄 Preprocess Dataset

In [None]:
def augment_data(image, label):
    # Geometric Transformations
    image = tf.image.random_flip_left_right(image)
    label = tf.image.random_flip_left_right(label)

    image = tf.image.random_flip_up_down(image)
    label = tf.image.random_flip_up_down(label)

    # Chromatic Transformations
    image = tf.image.random_brightness(image, max_delta=0.4)
    image = tf.image.random_contrast(image, lower=0.7, upper=1.3)

    return image, label

def preprocess_image(image):
    image = tf.expand_dims(image, axis=-1) if len(image.shape) == 2 else image
    image = tf.cast(image, tf.float32)
    return image

def preprocess_label(label):
    label = tf.expand_dims(label, axis=-1) if len(label.shape) == 2 else label
    label = tf.cast(label, tf.int32)
    return label

def preprocess_data(image, label):
    image = preprocess_image(image)
    label = preprocess_label(label)
    return image, label

In [None]:
# Original dataset
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
train_dataset = train_dataset.map(preprocess_data, num_parallel_calls=tf.data.AUTOTUNE)
train_dataset = train_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

# Augmented dataset
augmented_dataset = train_dataset.map(lambda x, y: augment_data(x, y), num_parallel_calls=tf.data.AUTOTUNE)

# Combined dataset, having both augmented and original dataset
combined_dataset = train_dataset.concatenate(augmented_dataset)
combined_dataset = combined_dataset.shuffle(buffer_size=len(X_train))
combined_dataset = combined_dataset.prefetch(tf.data.AUTOTUNE)

# Validation dataset
val_dataset = tf.data.Dataset.from_tensor_slices((X_val, y_val))
val_dataset = val_dataset.map(preprocess_data, num_parallel_calls=tf.data.AUTOTUNE)
val_dataset = val_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

## 🔍 Plot Datasets

In [None]:
def plot_from_dataset(dataset, title, num_images=10):
    fig, axes = plt.subplots(1, num_images, figsize=(num_images * 2, 3))
    fig.suptitle(title, fontsize=16, y=1.05)
    count = 0

    for batch in dataset:
        images, label_maps = batch
        for i in range(len(images)):
            if count >= num_images:
                break
            image = images[i].numpy()
            if image.shape[-1] == 1:
                image = tf.squeeze(image, axis=-1).numpy()
            axes[count].imshow(image, cmap='gray' if image.ndim == 2 else None, aspect='auto')
            axes[count].axis('off')

            count += 1

        if count >= num_images:
            break

    plt.tight_layout()
    plt.show()

plot_from_dataset(train_dataset, "Train Dataset without Augmentation", num_images=10)
plot_from_dataset(augmented_dataset, "Train Dataset with Augmentation", num_images=10)
plot_from_dataset(combined_dataset, "Combined Dataset", num_images=10)

## 🔨 Build the model

In [None]:
def unet_block(input_tensor, filters, kernel_size=3, activation='relu', stack=2, name=''):
    # Residual connection
    residual = input_tensor
    residual = tfkl.Conv2D(filters, kernel_size=1, padding='same', kernel_initializer='he_normal', kernel_regularizer=tf.keras.regularizers.l2(1e-4))(residual)

    # Convolutional path
    x = input_tensor
    for i in range(stack):
        x = tfkl.Conv2D(filters, kernel_size=kernel_size, padding='same', kernel_initializer='he_normal', kernel_regularizer=tf.keras.regularizers.l2(1e-4))(x)
        x = tfkl.BatchNormalization()(x)
        x = tfkl.Activation(activation)(x)
        x = tfkl.Conv2D(filters, kernel_size=kernel_size, padding='same', kernel_initializer='he_normal', kernel_regularizer=tf.keras.regularizers.l2(1e-4))(x)
        x = tfkl.BatchNormalization()(x)
        x = tfkl.Activation(activation)(x)
        x = tfkl.SpatialDropout2D(0.2)(x)

    # Add residual connection
    x = tfkl.Add()([x, residual])
    return x

def dense_block(input_tensor, filters, kernel_size=3, growth_rate=32, num_layers=4):
    x = input_tensor
    for i in range(num_layers):
        conv = tfkl.Conv2D(growth_rate, kernel_size, padding='same', kernel_initializer='he_normal', kernel_regularizer=tf.keras.regularizers.l2(1e-4))(x)
        conv = tfkl.BatchNormalization()(conv)
        conv = tfkl.Activation('relu')(conv)
        x = tfkl.Concatenate()([x, conv])
    return x

def par_dil_conv(input_tensor, filters, kernel_size=3, dilation_rates=(1, 2, 4), activation='relu'):
    branches = []
    for rate in dilation_rates:
        branch = tfkl.Conv2D(filters, kernel_size=kernel_size, dilation_rate=rate, padding='same',
                             kernel_initializer='he_normal', kernel_regularizer=tf.keras.regularizers.l2(1e-4))(input_tensor)
        branch = tfkl.BatchNormalization()(branch)
        branch = tfkl.Activation(activation)(branch)
        branches.append(branch)
    output = tfkl.Concatenate()(branches)
    output = tfkl.Conv2D(filters, kernel_size=1, padding='same', kernel_initializer='he_normal')(output)
    return output

def bottleneck_layer(input_tensor, filters, reduction_ratio=4, dilation_rates=(1, 2, 4)):
    # Compression
    reduced_filters = filters // reduction_ratio
    bottleneck = tfkl.Conv2D(reduced_filters, kernel_size=1, padding='same', activation='relu')(input_tensor)

    # Parallel Dilated Convolutions
    bottleneck = par_dil_conv(bottleneck, filters=reduced_filters, dilation_rates=dilation_rates)

    # Expansion
    bottleneck = tfkl.Conv2D(filters, kernel_size=3, padding='same', activation='relu')(bottleneck)
    return bottleneck

def se_block(input_tensor, reduction_ratio=16):
    filters = input_tensor.shape[-1]
    se = tfkl.GlobalAveragePooling2D()(input_tensor)
    se = tfkl.Dense(filters // reduction_ratio, activation='relu')(se)
    se = tfkl.Dense(filters, activation='sigmoid')(se)
    se = tfkl.Reshape((1, 1, filters))(se)
    return tfkl.Multiply()([input_tensor, se])

# Class to downsize a tensor
class DownsizeLayer(Layer):
    def call(self, inputs):
        return tf.image.resize(inputs, (32, 64))

    def compute_output_shape(self, input_shape):
        return (input_shape[0], 32, 64, input_shape[-1])

# Class to upsize a tensor
class UpsizeLayer(Layer):
    def call(self, inputs):
        return tf.image.resize(inputs, (64, 128))

    def compute_output_shape(self, input_shape):
        return (input_shape[0], 64, 128, input_shape[-1])

# Function to create a UNet
def create_unet(input_shape, num_classes):
  input_layer = tfkl.Input(shape=input_shape)

  # Downsampling path
  down_block_1 = dense_block(input_layer, filters=32, growth_rate=16, num_layers=3)
  d1 = tfkl.Conv2D(32, (3, 3), strides=2, padding="same", kernel_regularizer=tf.keras.regularizers.l2(1e-4))(down_block_1)

  down_block_2 = dense_block(d1, filters=64, growth_rate=16, num_layers=3)
  d2 = tfkl.Conv2D(64, (3, 3), strides=2, padding="same", kernel_regularizer=tf.keras.regularizers.l2(1e-4))(down_block_2)

  # Bottleneck con Parallel Dilated Convolutions
  bottleneck = bottleneck_layer(d2, filters=128, dilation_rates=(1, 2, 4))
  bottleneck = se_block(bottleneck)

  # Upsampling path
  u1 = tfkl.Conv2DTranspose(64, kernel_size=2, strides=2, padding='same')(bottleneck)
  u1 = tfkl.Concatenate()([u1, se_block(down_block_2)])
  u1 = unet_block(u1, 64, name='up_block1_')

  u2 = tfkl.Conv2DTranspose(32, kernel_size=2, strides=2, padding='same')(u1)
  u2 = tfkl.Concatenate()([u2, se_block(down_block_1)])
  u2 = unet_block(u2, 32, name='up_block2_')

  # Output Layer
  output_layer = se_block(u2)
  output_layer = tfkl.Conv2D(num_classes, kernel_size=1, padding='same', activation="softmax")(output_layer)

  return tfk.Model(inputs=input_layer, outputs=output_layer)

In [None]:
# Dice Loss
def dice_loss(y_true, y_pred, smooth=1e-6):
    if len(y_true.shape) < len(y_pred.shape):
        y_true = tf.one_hot(tf.cast(y_true, tf.int32), depth=y_pred.shape[-1])

    y_true_f = tf.keras.backend.flatten(y_true)
    y_pred_f = tf.keras.backend.flatten(y_pred)
    tf.debugging.assert_equal(
    tf.shape(y_true),
    tf.shape(y_pred),
    message="Shape mismatch: y_true and y_pred have different shapes."
)
    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    return 1 - ((2. * intersection + smooth) /
                (tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) + smooth))

# Focal Loss
def focal_loss(gamma=2., alpha=0.25):
    gamma = tf.constant(gamma, dtype=tf.float32)
    alpha = tf.constant(alpha, dtype=tf.float32)

    def focal_loss_fixed(y_true, y_pred):
        epsilon = tf.keras.backend.epsilon()
        y_pred = tf.clip_by_value(y_pred, epsilon, 1. - epsilon)
        y_true_one_hot = tf.one_hot(tf.cast(y_true, tf.int32), depth=y_pred.shape[-1])
        y_true_one_hot = tf.squeeze(y_true_one_hot, axis=-2) if len(y_true_one_hot.shape) > len(y_pred.shape) else y_true_one_hot
        cross_entropy = -y_true_one_hot * tf.keras.backend.log(y_pred)
        weight = alpha * tf.math.pow((1 - y_pred), gamma)
        loss = weight * cross_entropy
        return tf.reduce_mean(tf.reduce_sum(loss, axis=-1))

    return focal_loss_fixed

def weighted_loss(y_true, y_pred):
    global class_weights
    weights_per_pixel = tf.gather(class_weights, tf.cast(y_true, tf.int32))
    weights_per_pixel = tf.expand_dims(weights_per_pixel, axis=-1)
    weights_per_pixel = tf.squeeze(weights_per_pixel, axis=-1)

    # Calculate SparseCategoricalCrossentropy
    scce = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False, reduction='none')
    unweighted_loss = scce(y_true, y_pred)

    # To match dimensions
    unweighted_loss = tf.expand_dims(unweighted_loss, axis=-1)

    weighted_loss = unweighted_loss * weights_per_pixel

    # Media finale
    return tf.reduce_mean(weighted_loss)

focal = focal_loss(gamma=2.0, alpha=0.25)

# Combined Loss
def combined_loss_wrapper():
    def combined_loss(y_true, y_pred):
        # Weighted Loss
        w_loss = weighted_loss(y_true, y_pred)

        # Dice Loss
        d_loss = dice_loss(y_true, y_pred)

        # Focal Loss
        f_loss = focal(y_true, y_pred)

        # Somma delle perdite
        return w_loss + d_loss + f_loss

    return combined_loss

In [None]:
downsize_layer = DownsizeLayer()
upsize_layer = UpsizeLayer()

# Global UNet
input_shape_global = (32, 64, 1)
unet_global = create_unet(input_shape_global, num_classes)

# Local UNet
unet_local = create_unet(input_shape, num_classes)

# Input
inputs = tfkl.Input(shape=input_shape)

global_input = downsize_layer(inputs)
global_features = unet_global(global_input)

local_features = unet_local(inputs)

# Features fusion from both nets
global_upsampled = upsize_layer(global_features) # Upsize to match dimensions
fused_features = tfkl.Concatenate()([global_upsampled, local_features])

output = tfkl.Conv2D(num_classes, kernel_size=3, padding='same', activation='softmax')(fused_features)

model = tfk.Model(inputs, output)

# Define the MeanIoU ignoring the background class
mean_iou = tfk.metrics.MeanIoU(num_classes=num_classes, ignore_class=0, sparse_y_pred=False, name='mean_iou')
optimizer = tfk.optimizers.AdamW(learning_rate=learning_rate, weight_decay=1e-5)
loss = weighted_loss

# Compile the model
model.compile(
    optimizer=optimizer,
    loss=loss,
    metrics=[mean_iou]
)

## 🛠️ Train and Save the Model

In [None]:
history = model.fit(
    combined_dataset,
    epochs=epochs,
    validation_data=val_dataset,
    batch_size=batch_size,
    callbacks=callbacks
).history

# Calculate and print the final validation accuracy
final_val_meanIoU = round(max(history['val_mean_iou'])* 100, 2)
print(f'Final validation Mean Intersection Over Union: {final_val_meanIoU}%')

Epoch 1/300
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m194s[0m 1s/step - loss: 1.7909 - mean_iou: 0.1287 - val_loss: 1.9444 - val_mean_iou: 0.1115 - learning_rate: 1.0000e-04
Epoch 2/300
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 242ms/step - loss: 1.7848 - mean_iou: 0.1623 - val_loss: 1.9532 - val_mean_iou: 0.1079 - learning_rate: 1.0000e-04
Epoch 3/300
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 242ms/step - loss: 1.7407 - mean_iou: 0.1614 - val_loss: 1.9729 - val_mean_iou: 0.0866 - learning_rate: 1.0000e-04
Epoch 4/300
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 246ms/step - loss: 1.7627 - mean_iou: 0.1794 - val_loss: 1.9622 - val_mean_iou: 0.0789 - learning_rate: 1.0000e-04
Epoch 5/300
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 247ms/step - loss: 1.7187 - mean_iou: 0.1928 - val_loss: 1.9490 - val_mean_iou: 0.1130 - learning_rate: 1.0000e-04
Epoch 6/300
[1m72/72[0m [32m━━━━━━━━━━━

In [None]:
timestep_str = datetime.now().strftime("%y%m%d_%H%M%S")
model_filename = f"model_{timestep_str}.keras"
model.save(model_filename)
del model

## 📊 Test the model

In [None]:
model = tfk.models.load_model(model_filename, custom_objects={
        "DownsizeLayer": DownsizeLayer,
        "UpsizeLayer": UpsizeLayer,
        'dice_loss': dice_loss,
        'focal_loss': focal_loss,
        'weighted_loss': weighted_loss,
        'combined_loss': combined_loss_wrapper(),
        'unet_block': unet_block,
        'dense_block': dense_block,
        'par_dil_conv': par_dil_conv,
        'bottleneck_layer': bottleneck_layer,
        'se_block': se_block,
        'MeanIoU': tfk.metrics.MeanIoU
    }
)

preds = model.predict(X_test)
preds = np.argmax(preds, axis=-1)
print(f"Predictions shape: {preds.shape}")

[1m314/314[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 49ms/step
Predictions shape: (10022, 64, 128)


## 💾 Save the predictions

In [None]:
def y_to_df(y) -> pd.DataFrame:
    n_samples = len(y)
    y_flat = y.reshape(n_samples, -1)
    df = pd.DataFrame(y_flat)
    df["id"] = np.arange(n_samples)
    cols = ["id"] + [col for col in df.columns if col != "id"]
    return df[cols]

In [None]:
# Create the csv submission file
timestep_str = model_filename.replace("model_", "").replace(".keras", "")
submission_filename = f"submission_{timestep_str}.csv"
submission_df = y_to_df(preds)
submission_df.to_csv(submission_filename, index=False)