# Artificial Neural Networks and Deep Learning

---

## Homework 2: Minimal Working Example

To make your first submission, follow these steps:
1. Create a folder named `[2024-2025] AN2DL/Homework 2` in your Google Drive.
2. Upload the `mars_for_students.npz` file to this folder.
3. Upload the Jupyter notebook `Homework 2 - Minimal Working Example.ipynb`.
4. Load and process the data.
5. Implement and train your model.
6. Submit the generated `.csv` file to Kaggle.


## 🌐 Connect Colab to Google Drive

## ⚙️ Import Libraries

In [3]:
import os
from datetime import datetime

import numpy as np
import pandas as pd

import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl
import random
import matplotlib.pyplot as plt
%matplotlib inline

np.random.seed(42)
tf.random.set_seed(42)

print(f"TensorFlow version: {tf.__version__}")
print(f"Keras version: {tfk.__version__}")
print(f"GPU devices: {len(tf.config.list_physical_devices('GPU'))}")

TensorFlow version: 2.16.2
Keras version: 3.6.0
GPU devices: 1


## ⏳ Load the Data

In [2]:
os.chdir('/Users/ernestonatuzzi/università/dpl')
data = np.load("new_mars_clean_data.npz")

training_set = data["training_set"]
X_train = training_set[:, 0]
y_train = training_set[:, 1]

X_test = data["test_set"]

print(f"Training X shape: {X_train.shape}")
print(f"Training y shape: {y_train.shape}")
print(f"Test X shape: {X_test.shape}")

Training X shape: (2505, 64, 128)
Training y shape: (2505, 64, 128)
Test X shape: (10022, 64, 128)


## 🛠️ Train and Save the Model

In [3]:
# Add color channel and rescale pixels between 0 and 1
X_train = X_train[..., np.newaxis] / 255.0
X_test = X_test[..., np.newaxis] / 255.0

input_shape = X_train.shape[1:]
num_classes = len(np.unique(y_train))

print(f"Input shape: {input_shape}")
print(f"Number of classes: {num_classes}")

Input shape: (64, 128, 1)
Number of classes: 5


In [4]:
# prompt: split x train in train test val

from sklearn.model_selection import train_test_split

print("Splitting data...")

train_img, test_img, train_lbl, test_lbl = train_test_split(
    X_train, y_train, test_size=0.1, random_state=42
)

train_img, val_img, train_lbl, val_lbl = train_test_split(
     train_img, train_lbl, test_size=0.2, random_state=42
)

train_img = np.concatenate([train_img,test_img])
train_lbl = np.concatenate([train_lbl,test_lbl])



print("Data splitted! 0.7 - 0.2 - 0.1")

print(f"\nNumber of images:")
print(f"Train: {len(train_img)}")
print(f"Validation: {len(val_img)}")

print(f"Train shape: {train_img.shape}")
print(f"Validation shape: {val_img.shape}")


Splitting data...
Data splitted! 0.7 - 0.2 - 0.1

Number of images:
Train: 2054
Validation: 451
Train shape: (2054, 64, 128, 1)
Validation shape: (451, 64, 128, 1)


In [5]:
# Definition of hyperparameters

data_params = {
    'batch_size': 32,
    'input_shape': (64, 128, 3),
    'num_classes': 5,
    'seed': 42
}

HYPERPARAMETERS = {
    "BATCH_SIZE": 32,
    "EPOCHS": 200,
    "LEARNING_RATE": 0.001,
    "LEARNING_DESCENT_PATIENCE": 20,
    "LEARNING_DESCENT_FACTOR": 0.5,
    "EARLY_STOPPING_PATIENCE": 30,
    "DROPOUT": 0.4,
    "LAYERS_FINE_TUNE": 0,
    "MODEL_NAME": "boh",
}

In [6]:
import tensorflow as tf
import numpy as np
@tf.function
def preprocess(image, label):
    """
    Assicura che immagine e maschera abbiano la dimensione corretta.
    """
    if len(image.shape) == 2:
        image = tf.expand_dims(image, axis=-1)  # Aggiungi dimensione del canale
    if len(label.shape) == 2:
        label = tf.expand_dims(label, axis=-1)  # Aggiungi dimensione del canale
    return image, label
@tf.function
def random_flip(image, label):
    """Consistent random horizontal flip per immagini a 1 canale."""
    flip_prob = tf.random.uniform([])

    # Applica il flip orizzontale in modo consistente sia a immagine che a maschera
    image = tf.cond(
        flip_prob > 0.5,
        lambda: tf.image.flip_left_right(image),
        lambda: image
    )
    label = tf.cond(
        flip_prob > 0.5,
        lambda: tf.image.flip_left_right(label),
        lambda: label
    )
    return image, label

@tf.function
def random_translation(image, label, max_translation=(20, 20)):
    """
    Applica una traslazione casuale a immagine e maschera usando TensorFlow.
    Args:
        image: Tensor dell'immagine.
        label: Tensor della maschera.
        max_translation: Massima traslazione (altezza, larghezza).
    Returns:
        Immagine e maschera traslate.
    """
    # Genera un valore casuale per decidere quali assi traslare
    apply_x = tf.random.uniform([], 0, 1) > 0.5  # Probabilità del 50% di traslare su X
    apply_y = tf.random.uniform([], 0, 1) > 0.5  # Probabilità del 50% di traslare su Y

    # Genera traslazioni casuali solo per gli assi attivi
    tx = tf.random.uniform([], -max_translation[0], max_translation[0], dtype=tf.int32) if apply_x else 0
    ty = tf.random.uniform([], -max_translation[1], max_translation[1], dtype=tf.int32) if apply_y else 0

    # Applica la traslazione
    image = tf.roll(image, shift=[tx, ty], axis=[0, 1])
    label = tf.roll(label, shift=[tx, ty], axis=[0, 1])
    return image, label


@tf.function
def random_zoom(image, label, zoom_range=(0.8, 1.2)):
    """
    Applica uno zoom casuale a immagine e maschera, assicurando dimensioni valide.
    Args:
        image: Tensor dell'immagine.
        label: Tensor della maschera.
        zoom_range: Range per il fattore di zoom.
    Returns:
        Immagine e maschera zoomate.
    """
    # Aggiungi una dimensione del canale se mancante
    if len(image.shape) == 2:
        image = tf.expand_dims(image, axis=-1)
    if len(label.shape) == 2:
        label = tf.expand_dims(label, axis=-1)


    zoom_factor = tf.random.uniform([], zoom_range[0], zoom_range[1])
    original_size = tf.cast(tf.shape(image)[1:3], tf.float32)  # Ottieni altezza e larghezza
    new_size = tf.cast(original_size * zoom_factor, tf.int32)

    # Assicura che le dimensioni siano valide
    new_size = tf.maximum(new_size, 1)

    # Ridimensiona l'immagine
    image = tf.image.resize(image, new_size, method='bilinear')
    label = tf.image.resize(label, new_size, method='nearest')

    # Ripristina alla dimensione originale
    image = tf.image.resize_with_crop_or_pad(image, tf.shape(image)[1], tf.shape(image)[2])
    label = tf.image.resize_with_crop_or_pad(label, tf.shape(label)[1], tf.shape(label)[2])


    image = tf.squeeze(image, axis=-1)
    label = tf.squeeze(label, axis=-1)

    return image, label

@tf.function
def augment_pipeline(image, label):
    image, label = preprocess(image, label)
    image, label = random_flip(image, label)
    #image, label = random_zoom(image, label)
    image, label = random_translation(image, label)



    return image, label



In [7]:
def make_dataset(train_images, train_labels, batch_size, shuffle=True, augment=False, seed=None):
    """
    Create a memory-efficient TensorFlow dataset.
    """
    # Create dataset from file paths
    dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))

    if shuffle:
        dataset = dataset.shuffle(buffer_size=batch_size * 2, seed=seed)

    if augment:
        dataset = dataset.map(
            lambda x, y: augment_pipeline(x, y),
            num_parallel_calls=tf.data.AUTOTUNE
        )



    # Batch the data
    dataset = dataset.batch(batch_size, drop_remainder=False)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)

    return dataset

In [8]:
# Create the datasets
print("Creating datasets...")

train_dataset = make_dataset(
    train_img, train_lbl,
    batch_size= HYPERPARAMETERS['BATCH_SIZE'],
    shuffle=True,
    augment=True
).map(
        lambda x, y:(
            x,
            {'output_activation_layer': y,'output_layer': y},
        ),
        num_parallel_calls=tf.data.AUTOTUNE,
  )

val_dataset = make_dataset(
    val_img, val_lbl,
    batch_size= HYPERPARAMETERS['BATCH_SIZE'],
    shuffle=False,
).map(
        lambda x, y:(
            x,
            {'output_activation_layer': y,'output_layer': y},
        ),
        num_parallel_calls=tf.data.AUTOTUNE,
  )

print(val_dataset)


Creating datasets...


2024-12-14 16:58:58.592308: I metal_plugin/src/device/metal_device.cc:1154] Metal device set to: Apple M4 Pro
2024-12-14 16:58:58.592343: I metal_plugin/src/device/metal_device.cc:296] systemMemory: 24.00 GB
2024-12-14 16:58:58.592349: I metal_plugin/src/device/metal_device.cc:313] maxCacheSize: 8.00 GB
2024-12-14 16:58:58.592373: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2024-12-14 16:58:58.592386: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


<_ParallelMapDataset element_spec=(TensorSpec(shape=(None, 64, 128, 1), dtype=tf.float64, name=None), {'output_activation_layer': TensorSpec(shape=(None, 64, 128), dtype=tf.float64, name=None), 'output_layer': TensorSpec(shape=(None, 64, 128), dtype=tf.float64, name=None)})>


In [9]:
import tensorflow as tf
from tensorflow.keras.layers import Layer, Input
from tensorflow.keras.models import Model

# Definizione del custom layer
@tf.keras.utils.register_keras_serializable()
class ArgmaxLayer(Layer):
    def __init__(self, **kwargs):
        super(ArgmaxLayer, self).__init__(**kwargs)

    def call(self, inputs):
        # Usa la funzione tf.argmax e espandi le dimensioni
        argmax_output = tf.argmax(inputs, axis=-1)
        argmax_output = tf.expand_dims(argmax_output, axis=-1)
        argmax_output = tf.cast(argmax_output, tf.float32)
        return argmax_output



In [10]:
import tensorflow as tf
from matplotlib.colors import ListedColormap
import matplotlib.patches as mpatches

# Define the custom StopGradientLayer class
@tf.keras.utils.register_keras_serializable()
class StopGradientLayer(tfkl.Layer):
    def call(self, inputs):
        return tf.stop_gradient(inputs)

class MeanIntersectionOverUnion(tf.keras.metrics.MeanIoU):
    def __init__(self, num_classes, labels_to_exclude=None, name="mean_iou", dtype=None):
        super(MeanIntersectionOverUnion, self).__init__(num_classes=num_classes, name=name, dtype=dtype)
        if labels_to_exclude is None:
            labels_to_exclude = [0]  # Default to excluding label 0
        self.labels_to_exclude = labels_to_exclude

    def update_state(self, y_true, y_pred, sample_weight=None):
        # Convert predictions to class labels
        y_pred = tf.math.argmax(y_pred, axis=-1)

        # Flatten the tensors
        y_true = tf.reshape(y_true, [-1])
        y_pred = tf.reshape(y_pred, [-1])

        # Apply mask to exclude specified labels
        for label in self.labels_to_exclude:
            mask = tf.not_equal(y_true, label)
            y_true = tf.boolean_mask(y_true, mask)
            y_pred = tf.boolean_mask(y_pred, mask)

        # Update the state
        return super().update_state(y_true, y_pred, sample_weight)

class VizCallback(tf.keras.callbacks.Callback):
    def __init__(self, image_paths, label_paths, output_dir, frequency=5):
        super().__init__()
        self.image_paths = image_paths
        self.label_paths = label_paths
        self.output_dir = output_dir
        self.frequency = frequency

        if not os.path.exists(output_dir):
            os.makedirs(output_dir)

    def on_epoch_end(self, epoch, logs=None):
        if epoch % self.frequency == 0:
            colors = ['red', 'green', 'blue', 'orange', 'purple']
            class_labels = ['background', 'soil', 'bedrock', 'sand', 'big_rock']
            cmap = ListedColormap(colors)

            #ATTENZIONE tolgo l'immagine 261 -> l'immagine fa schifo secodno me va tolta dal dataset e vanno trovate le altre sporcaccione come a lei
            indices = [122, 28, random.randint(0, len(self.image_paths) - 1)]
            fig, axes = plt.subplots(3, 4, figsize=(12, 20))

            for i, idx in enumerate(indices):
                image = self.image_paths[idx]
                label = self.label_paths[idx]
                image1 = tf.expand_dims(image, 0)

                # Predict and debug the format
                predictions = self.model.predict(image1, verbose=0)
                print("Type of predictions:", type(predictions))

                # Handle different output formats
                if isinstance(predictions, dict):
                    output_1 = predictions.get('output_activation_layer')
                    output_2 = predictions.get('output_layer')
                elif isinstance(predictions, (list, tuple)):
                    output_1 = predictions[0]
                    output_2 = predictions[1] if len(predictions) > 1 else None
                else:
                    output_1 = predictions
                    output_2 = None

                # Compute predictions
                y_pred1 = tf.math.argmax(output_1, axis=-1).numpy()
                y_pred2 = tf.math.argmax(output_2, axis=-1).numpy() if output_2 is not None else None

                # Display original image
                axes[i, 0].imshow(image, cmap='gray')
                axes[i, 0].set_title(f"Image {idx}")
                axes[i, 0].axis('off')

                # Display ground truth
                axes[i, 1].imshow(label, cmap=cmap, vmin=0, vmax=len(colors) - 1)
                axes[i, 1].set_title(f"Label {idx}")
                axes[i, 1].axis('off')

                # Display first output prediction
                axes[i, 2].imshow(y_pred1[0], cmap=cmap, vmin=0, vmax=len(colors) - 1)
                axes[i, 2].set_title(f"Prediction PEPPE's model{idx}")
                axes[i, 2].axis('off')

                # Display second output prediction if available
                if y_pred2 is not None:
                    axes[i, 3].imshow(y_pred2[0], cmap=cmap, vmin=0, vmax=len(colors) - 1)
                    axes[i, 3].set_title(f"Prediction SECONDO Model{idx}")
                    axes[i, 3].axis('off')
                else:
                    axes[i, 3].axis('off')

            plt.tight_layout()
            save_path = os.path.join(self.output_dir, f"epoch_{epoch}.png")
            plt.savefig(save_path)
            plt.close(fig)


## If models need to be imported


## Otherwise

In [11]:
import tensorflow as tf
from tensorflow.keras import layers as tfkl

# Residual Block
def residual_block(x, filters, kernel_size=3, activation='relu', name=''):
    shortcut = tfkl.Conv2D(filters, kernel_size=1, padding='same', kernel_initializer='he_normal', name=name + '_shortcut')(x)
    x = tfkl.Conv2D(filters, kernel_size, padding='same', kernel_initializer='he_normal', name=name + '_conv1')(x)
    x = tfkl.BatchNormalization(name=name + '_bn1')(x)
    x = tfkl.Activation(activation, name=name + '_activation1')(x)

    x = tfkl.Conv2D(filters, kernel_size, padding='same', kernel_initializer='he_normal', name=name + '_conv2')(x)
    x = tfkl.BatchNormalization(name=name + '_bn2')(x)
    x = tfkl.Add(name=name + '_add')([x, shortcut])
    x = tfkl.Activation(activation, name=name + '_activation2')(x)
    return x

# Attention Block
def attention_block(g, x, filters, name=''):
    theta_x = tfkl.Conv2D(filters, (1, 1), padding='same', kernel_initializer='he_normal', name=name + '_theta')(x)
    phi_g = tfkl.Conv2D(filters, (1, 1), padding='same', kernel_initializer='he_normal', name=name + '_phi')(g)
    f = tfkl.Activation('relu', name=name + '_activation')(tfkl.Add(name=name + '_add')([theta_x, phi_g]))
    psi_f = tfkl.Conv2D(1, (1, 1), padding='same', activation='sigmoid', kernel_initializer='he_normal', name=name + '_psi')(f)
    return tfkl.Multiply(name=name + '_multiply')([x, psi_f])

# U-Net with Attention and Residual Blocks
def get_segmentation_model(input_shape=input_shape, num_classes=num_classes, dropout_rate=0.0):
    inputs = tfkl.Input(shape=input_shape, name='input_layer')

    # Downsampling Path
    d1 = residual_block(inputs, 64, name='down_block1')
    p1 = tfkl.MaxPooling2D(pool_size=(2, 2), name='pool1')(d1)

    d2 = residual_block(p1, 128, name='down_block2')
    p2 = tfkl.MaxPooling2D(pool_size=(2, 2), name='pool2')(d2)

    d3 = residual_block(p2, 256, name='down_block3')
    p3 = tfkl.MaxPooling2D(pool_size=(2, 2), name='pool3')(d3)

    d4 = residual_block(p3, 512, name='down_block4')
    p4 = tfkl.MaxPooling2D(pool_size=(2, 2), name='pool4')(d4)

    # Bottleneck with Dropout and Dilated Convolutions
    b = residual_block(p4, 1024, name='bottleneck')  # Aumenta i filtri per il bottleneck
    b = tfkl.Dropout(dropout_rate, name='bottleneck_dropout')(b)

    # Upsampling Path with Attention
    u4 = tfkl.Conv2DTranspose(512, (2, 2), strides=(2, 2), padding='same', name='up_block4_upsample')(b)
    a4 = attention_block(u4, d4, 512, name='up_block4_attention')
    u4 = tfkl.Concatenate(name='up_block4_concat')([u4, a4])
    u4 = residual_block(u4, 512, name='up_block4')

    u3 = tfkl.Conv2DTranspose(256, (2, 2), strides=(2, 2), padding='same', name='up_block3_upsample')(u4)
    a3 = attention_block(u3, d3, 256, name='up_block3_attention')
    u3 = tfkl.Concatenate(name='up_block3_concat')([u3, a3])
    u3 = residual_block(u3, 256, name='up_block3')

    u2 = tfkl.Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same', name='up_block2_upsample')(u3)
    a2 = attention_block(u2, d2, 128, name='up_block2_attention')
    u2 = tfkl.Concatenate(name='up_block2_concat')([u2, a2])
    u2 = residual_block(u2, 128, name='up_block2')

    u1 = tfkl.Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same', name='up_block1_upsample')(u2)
    a1 = attention_block(u1, d1, 64, name='up_block1_attention')
    u1 = tfkl.Concatenate(name='up_block1_concat')([u1, a1])
    u1 = residual_block(u1, 64, name='up_block1')

    conv_output = tfkl.Conv2D(num_classes, kernel_size=1, padding='same', name='output_conv_layer')(u1)
    outputs = tfkl.Activation('softmax', name='output_activation_layer')(conv_output)

    # Model
    model = tf.keras.Model(inputs=inputs, outputs=outputs, name='Advanced_UNet')

    return inputs, conv_output, outputs, b, d1, d2, d3, d4

def get_segmentation_mode_import(input_shape=input_shape, num_classes=num_classes, dropout_rate=0.0):
    inputs = tfkl.Input(shape=input_shape, name='input_layer')

    # Downsampling Path
    d1 = residual_block(inputs, 64, name='down_block1')
    p1 = tfkl.MaxPooling2D(pool_size=(2, 2), name='pool1')(d1)

    d2 = residual_block(p1, 128, name='down_block2')
    p2 = tfkl.MaxPooling2D(pool_size=(2, 2), name='pool2')(d2)

    d3 = residual_block(p2, 256, name='down_block3')
    p3 = tfkl.MaxPooling2D(pool_size=(2, 2), name='pool3')(d3)

    d4 = residual_block(p3, 512, name='down_block4')
    p4 = tfkl.MaxPooling2D(pool_size=(2, 2), name='pool4')(d4)

    # Bottleneck with Dropout and Dilated Convolutions
    b = residual_block(p4, 1024, name='bottleneck')  # Aumenta i filtri per il bottleneck
    b = tfkl.Dropout(dropout_rate, name='bottleneck_dropout')(b)

    # Upsampling Path with Attention
    u4 = tfkl.Conv2DTranspose(512, (2, 2), strides=(2, 2), padding='same', name='up_block4_upsample')(b)
    a4 = attention_block(u4, d4, 512, name='up_block4_attention')
    u4 = tfkl.Concatenate(name='up_block4_concat')([u4, a4])
    u4 = residual_block(u4, 512, name='up_block4')

    u3 = tfkl.Conv2DTranspose(256, (2, 2), strides=(2, 2), padding='same', name='up_block3_upsample')(u4)
    a3 = attention_block(u3, d3, 256, name='up_block3_attention')
    u3 = tfkl.Concatenate(name='up_block3_concat')([u3, a3])
    u3 = residual_block(u3, 256, name='up_block3')

    u2 = tfkl.Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same', name='up_block2_upsample')(u3)
    a2 = attention_block(u2, d2, 128, name='up_block2_attention')
    u2 = tfkl.Concatenate(name='up_block2_concat')([u2, a2])
    u2 = residual_block(u2, 128, name='up_block2')

    u1 = tfkl.Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same', name='up_block1_upsample')(u2)
    a1 = attention_block(u1, d1, 64, name='up_block1_attention')
    u1 = tfkl.Concatenate(name='up_block1_concat')([u1, a1])
    u1 = residual_block(u1, 64, name='up_block1')

    conv_output = tfkl.Conv2D(num_classes, kernel_size=1, padding='same', name='output_conv_layer')(u1)
    outputs = tfkl.Activation('softmax', name='output_activation_layer')(conv_output)

    # Model
    model = tf.keras.Model(inputs=inputs, outputs=outputs, name='Advanced_UNet')

    return model



In [12]:
def multi_attention_concatenation(conv_layer, d_layer, skip_layer, name):

    a_d = attention_block(conv_layer, d_layer, 512, name=name+'interna')
    a_skip = attention_block(conv_layer, skip_layer, 512, name=name+'esterna')
    concat = tfkl.Concatenate(name=name+'concat')([conv_layer, a_d, a_skip])

    return concat

def get_segmentation_LITTLEmodel(input_shape=(64,128,5), num_classes=num_classes, input=None, dropout_rate=0.0, precedent_bottle=None, skip1=None, skip2=None, skip3=None, skip4=None):

    inputs = StopGradientLayer()(input)

    inputs = ArgmaxLayer()(inputs)

    # Downsampling Path
    d1 = residual_block(inputs, 64, name='down_block1LITTLE')
    p1 = tfkl.MaxPooling2D(pool_size=(2, 2), name='pool1LITTLE')(d1)

    d2 = residual_block(p1, 128, name='down_block2LITTLE')
    p2 = tfkl.MaxPooling2D(pool_size=(2, 2), name='pool2LITTLE')(d2)

    d3 = residual_block(p2, 256, name='down_block3LITTLE')
    p3 = tfkl.MaxPooling2D(pool_size=(2, 2), name='pool3LITTLE')(d3)

    d4 = residual_block(p3, 512, name='down_block4LITTLE')
    p4 = tfkl.MaxPooling2D(pool_size=(2, 2), name='pool4LITTLE')(d4)

    # Bottleneck with Dropout and Dilated Convolutions
    b = residual_block(p4, 1024, name='bottleneckLITTLE')  # Aumenta i filtri per il bottleneck
    b = tfkl.Dropout(dropout_rate, name='bottleneck_dropoutLITTLE')(b)

    b = tfkl.Concatenate()([b, precedent_bottle])

     # Upsampling Path with Attention
    u4 = tfkl.Conv2DTranspose(512, (2, 2), strides=(2, 2), padding='same', name='up_block4_upsampleLITTLE')(b)
    concat_att_4 = multi_attention_concatenation(u4, d4, skip4, 'up_block4_attentionLITTLE')
    u4 = residual_block(concat_att_4, 512, name='up_block4LITTLE')

    u3 = tfkl.Conv2DTranspose(256, (2, 2), strides=(2, 2), padding='same', name='up_block3_upsampleLITTLE')(u4)
    concat_att_3 = multi_attention_concatenation(u3, d3, skip3, 'up_block3_attentionLITTLE')
    u3 = residual_block(concat_att_3, 256, name='up_block3LITTLE')

    u2 = tfkl.Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same', name='up_block2_upsampleLITTLE')(u3)
    concat_att_2 = multi_attention_concatenation(u2, d2, skip2, 'up_block2_attentionLITTLE')
    u2 = residual_block(concat_att_2, 128, name='up_block2LITTLE')

    u1 = tfkl.Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same', name='up_block1_upsampleLITTLE')(u2)
    concat_att_1 = multi_attention_concatenation(u1, d1, skip1, 'up_block1_attentionLITTLE')
    u1 = residual_block(concat_att_1, 64, name='up_block1LITTLE')

    # Output Layer

    outputs = tfkl.Conv2D(num_classes, kernel_size=1, activation='softmax', padding='same', name='output_layer')(u1)

    return inputs, outputs


In [13]:

inputs, output_conv, output_soft, b, d1, d2, d3, d4 = get_segmentation_model(input_shape=(64,128,1))
inputs_intermediate, outputs_final = get_segmentation_LITTLEmodel(input_shape=(64,128,5),input=output_conv,precedent_bottle=b, skip1=d1, skip2=d2, skip3=d3, skip4=d4)


final_model = tf.keras.Model(inputs=inputs, outputs=[output_soft,outputs_final])


In [17]:

model_import = tf.keras.models.load_model('keras/PEPPE601.keras', compile=False)
for l1,l2 in zip(model_import.layers, final_model.layers):

  if l1.name != l2.name:
    print(l2.name)  
    break

  l2.set_weights(l1.get_weights())
  l2.trainable = False

stop_gradient_layer


## Run again from here


In [18]:
final_model.summary(expand_nested=True, show_trainable=True)

# Generate and display a graphical representation of the model architecture.
tf.keras.utils.plot_model(
    final_model,
    to_file='model.png',         # Salva il grafico come file
    show_shapes=True,            # Mostra le forme degli input e output
    show_layer_names=True,       # Mostra i nomi dei layer
    expand_nested=False,         # Non espandere i layer annidati
    dpi=70                       # Riduci la risoluzione del grafico
)

You must install pydot (`pip install pydot`) for `plot_model` to work.


In [19]:
def hybrid_loss(y_true, y_pred):
    """
    Combina Categorical Cross-Entropy e Dice Loss.
    Args:
        y_true: Tensor delle etichette vere (sparse o one-hot encoded).
        y_pred: Tensor delle predizioni (probabilità per classe).
    Returns:
        La perdita combinata (CCE + Dice).
    """

    # Categorical Cross-Entropy
    cce = tf.keras.losses.SparseCategoricalCrossentropy()
    cce_loss = cce(y_true, y_pred)


    # Converti y_true in one-hot encoding se è in formato sparso
    if len(y_true.shape) == len(y_pred.shape) - 1:  # Caso di Sparse Labels
        y_true = tf.one_hot(tf.cast(y_true, tf.int32), depth=y_pred.shape[-1])

    # Dice Loss
    smooth = 1e-5
    intersection = tf.reduce_sum(y_true * y_pred, axis=[1, 2, 3])
    dice_loss = 1 - (2. * intersection + smooth) / (
        tf.reduce_sum(y_true, axis=[1, 2, 3]) + tf.reduce_sum(y_pred, axis=[1, 2, 3]) + smooth
    )

    # Combina le perdite
    total_loss = cce_loss + dice_loss
    return total_loss


In [4]:
class FocalLoss(tfk.losses.Loss):
    def __init__(self, alpha=0.25, gamma=2.0, class_weights=[0.0, 1.0, 1.0, 1.0, 1.5]):
        super(FocalLoss, self).__init__()
        self.alpha = tf.constant(alpha, dtype=tf.float32)
        self.gamma = tf.constant(gamma, dtype=tf.float32)
        self.class_weights = tf.constant(class_weights, dtype=tf.float32)

    def call(self, y_true, y_pred):
        y_true = tf.cast(y_true, tf.int32)
        y_true_one_hot = tf.one_hot(tf.squeeze(y_true), depth=5)

        # Focal loss con pesi per classe
        epsilon = 1e-7
        y_pred = tf.clip_by_value(y_pred, epsilon, 1. - epsilon)

        ce = -y_true_one_hot * tf.math.log(y_pred)
        weight = self.alpha * tf.pow(1. - y_pred, self.gamma)
        fl = weight * ce * self.class_weights

        return tf.reduce_mean(tf.reduce_sum(fl, axis=-1))

In [22]:
focal_loss = FocalLoss()
print("Compiling model...")
final_model.compile(
    loss={'output_activation_layer': hybrid_loss, 'output_layer': focal_loss},
    loss_weights= {'output_activation_layer': 1.0, 'output_layer': 1.0},
    optimizer=tf.keras.optimizers.AdamW(0.001),
    metrics={
        'output_activation_layer': [MeanIntersectionOverUnion(num_classes=5, labels_to_exclude=[0])],
        'output_layer': [MeanIntersectionOverUnion(num_classes=5, labels_to_exclude=[0])]  
    }
)
print("Model compiled!")

Compiling model...
Model compiled!


In [23]:
from tensorflow.keras.callbacks import ReduceLROnPlateau

# Esempio di utilizzo
reduce_on_plateau = ReduceLROnPlateau(
        monitor='val_output_layer_mean_iou',
        factor=0.6,
        patience=7,
        min_lr=0.000001,
        verbose=1
    )

# Setup callbacks
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_output_layer_mean_iou',
    mode='max',
    patience=15,
    restore_best_weights=True
)

viz_callback = VizCallback(
    image_paths=val_img,
    label_paths=val_lbl,
    output_dir='visualizations',  # Directory to save the visualizations
    frequency=1  # Visualize every 5 epochs
)

In [25]:
# Train the model
history = final_model.fit(
    train_dataset,
    epochs=70,
    validation_data=val_dataset,
    callbacks=[early_stopping, reduce_on_plateau, viz_callback],
    verbose=1
).history



Epoch 1/70


2024-12-14 17:05:58.796927: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:117] Plugin optimizer for device_type GPU is enabled.


[1m65/65[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 861ms/step - loss: 0.6228 - output_activation_layer_loss: 0.5839 - output_activation_layer_mean_iou: 0.6585 - output_layer_loss: 0.0388 - output_layer_mean_iou: 0.4948Type of predictions: <class 'list'>
Type of predictions: <class 'list'>
Type of predictions: <class 'list'>
[1m65/65[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m82s[0m 1s/step - loss: 0.6233 - output_activation_layer_loss: 0.5846 - output_activation_layer_mean_iou: 0.6588 - output_layer_loss: 0.0386 - output_layer_mean_iou: 0.4959 - val_loss: 3.3974 - val_output_activation_layer_loss: 1.2495 - val_output_activation_layer_mean_iou: 0.6016 - val_output_layer_loss: 2.0104 - val_output_layer_mean_iou: 0.0055 - learning_rate: 0.0010
Epoch 2/70
[1m65/65[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 848ms/step - loss: 0.5969 - output_activation_layer_loss: 0.5815 - output_activation_layer_mean_iou: 0.6598 - output_layer_loss: 0.0154 - output_layer_mean

In [26]:
# Calculate and print the final validation accuracy
final_peppe_iou = round(max(history['val_output_activation_layer_mean_iou'])* 100, 2)
final_new_loss_iou = round(max(history['val_output_layer_mean_iou'])* 100, 2)
print(f'Final validation Mean Intersection Over Union peppe: {final_peppe_iou}%')
print(f'Final validation Mean Intersection Over Union: {final_new_loss_iou}%')


Final validation Mean Intersection Over Union peppe: 60.16%
Final validation Mean Intersection Over Union: 76.14%


## Sii prudente salva il modello

In [28]:
# save the model

# Define the directory where you want to save the model
timestep_str = datetime.now().strftime("%y%m%d_%H%M%S")
model_filename = f"model_604BOTTLE+SKIP_+argmax_epoche{round(final_new_loss_iou, 4)}.keras"


# Save the model
final_model.save(model_filename)


## 📊 Prepare Your Submission

In our Kaggle competition, submissions are made as `csv` files. To create a proper `csv` file, you need to flatten your predictions and include an `id` column as the first column of your dataframe. To maintain consistency between your results and our solution, please avoid shuffling the test set. The code below demonstrates how to prepare the `csv` file from your model predictions.




In [6]:
from tensorflow.keras.models import load_model

# Se non hai trainato vai su a runnare la custom block graduient layer IMPORTTANTEEEEEEEEEEEEE

# Load the model, providing custom_objects
model2 = load_model(
    'model_BOTTLE+SKIP_+epoche98.keras',
    custom_objects={'StopGradientLayer': StopGradientLayer},
    compile=False
)

In [29]:
preds = final_model.predict(X_test)

output_1_peppe = preds[0]
output_2_io = preds[1]

output_1_peppe = np.argmax(output_1_peppe, axis=-1)
output_2_io = np.argmax(output_2_io, axis=-1)
print(f"Predictions shape: {output_1_peppe.shape}")
print(f"Predictions shape: {output_2_io.shape}")

[1m314/314[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m96s[0m 299ms/step
Predictions shape: (10022, 64, 128)
Predictions shape: (10022, 64, 128)


In [30]:
def y_to_df(y) -> pd.DataFrame:
    """Converts segmentation predictions into a DataFrame format for Kaggle."""
    n_samples = len(y)
    y_flat = y.reshape(n_samples, -1)
    df = pd.DataFrame(y_flat)
    df["id"] = np.arange(n_samples)
    cols = ["id"] + [col for col in df.columns if col != "id"]
    return df[cols]

In [31]:
# Create and download the csv submission file
submission_filename = f"submission_604BOTTLE+SKIP_+argmax_epoche{round(final_new_loss_iou, 4)}.csv"
submission_df = y_to_df(output_2_io)
submission_df.to_csv(submission_filename, index=False)