# Setup notebook

In [None]:
import os
from datetime import datetime

import numpy as np
import pandas as pd

import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl

from sklearn.model_selection import train_test_split

import matplotlib.pyplot as plt
%matplotlib inline

seed = 550
np.random.seed(seed)
tf.random.set_seed(seed)

print(f"TensorFlow version: {tf.__version__}")
print(f"Keras version: {tfk.__version__}")
print(f"GPU devices: {len(tf.config.list_physical_devices('GPU'))}")
BATCH_SIZE = 64

# Load data and augmentation

In [None]:
import numpy as np

# Load training and test datasets from compressed .npz files
training_data = np.load("/kaggle/input/datasetlomi/training_set_no_outliers.npz")
test_data = np.load("/kaggle/input/datasetlomi/test_set.npz")

# Normalize images to the range [0, 1] and extract labels
images = training_data["images"] / 255
labels = training_data["labels"]

# Print the shape of the images dataset (e.g., number of samples, dimensions)
print(images.shape)


In [None]:
X_train, X_val, y_train, y_val = train_test_split(images, labels, test_size=0.2)

In [None]:
def add_channel(image, label):
    # Add a new channel dimension to the image and label tensors
    image = tf.expand_dims(image, axis=-1)  # For example, (height, width) -> (height, width, 1)
    label = tf.expand_dims(label, axis=-1)  # For example, (height, width) -> (height, width, 1)
    return image, label


In [None]:
@tf.function
def random_flip(image, label, seed=None):
    # Generate a random probability to decide whether to flip
    flip_prob = tf.random.uniform([], seed=seed)

    # Conditionally flip the image horizontally based on flip_prob
    image = tf.cond(
        flip_prob > 0.5,
        lambda: tf.image.flip_left_right(image),
        lambda: image
    )

    # Conditionally flip the label horizontally based on flip_prob
    label = tf.cond(
        flip_prob > 0.5,
        lambda: tf.image.flip_left_right(label),
        lambda: label
    )

    return image, label  # Return the possibly flipped image and label

In [None]:
def to_datasett(X_train, y_train, augmentation=False, seed=seed, shuffle=True, batch_size=BATCH_SIZE):
    # Create a TensorFlow dataset from input tensors
    dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))

    # Shuffle the dataset if enabled
    if shuffle:
        dataset = dataset.shuffle(buffer_size=batch_size * 2, seed=seed)

    # Add a channel dimension to the images and labels
    dataset = dataset.map(
        lambda x, y: add_channel(x, y),
        num_parallel_calls=tf.data.AUTOTUNE
    )

    # Apply random flipping for data augmentation if enabled
    if augmentation:
        dataset = dataset.map(
            lambda x, y: random_flip(x, y, seed=seed),
            num_parallel_calls=tf.data.AUTOTUNE
        )

    # Optionally remove the added channel dimension if augmentation is enabled
    if augmentation:
        dataset = dataset.map(
            lambda x, y: (tf.squeeze(x, axis=-1), tf.squeeze(y, axis=-1)),
            num_parallel_calls=tf.data.AUTOTUNE
        )

    # Batch the dataset and prefetch for better performance
    dataset = dataset.batch(batch_size, drop_remainder=False)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)

    return dataset


In [None]:
train_dataset = to_datasett(X_train, y_train, augmentation = True)
val_dataset = to_datasett(X_val, y_val, augmentation = False)

In [None]:
for i, (image, label) in enumerate(train_dataset.take(1)):
    plt.figure(figsize=(12, 6))

    # Immagine originale
    plt.subplot(1, 2, 1)
    plt.imshow(image[0].numpy())
    plt.title(f"Image {i+1}")
    plt.axis('off')

    # Maschera
    plt.subplot(1, 2, 2)
    plt.imshow(label[0].numpy(), cmap='tab20')  # Usa una colormap per visualizzare le classi
    plt.title(f"Mask {i+1}")
    plt.axis('off')

    plt.show()

# Model

In [None]:
num_classes = 5
epoch = 1000
patience = 20

In [None]:
def unet_block(input_tensor, filters, kernel_size=3, activation='relu', stack=2, name=''):
    # Initialise the input tensor
    x = input_tensor

    # Apply a sequence of Conv2D, Batch Normalisation, and Activation layers for the specified number of stacks
    for i in range(stack):
        x = tfkl.Conv2D(filters, kernel_size=kernel_size, padding='same', name=name + 'conv' + str(i + 1))(x)
        x = tfkl.BatchNormalization(name=name + 'bn' + str(i + 1))(x)
        x = tfkl.Activation(activation, name=name + 'activation' + str(i + 1))(x)

    # Return the transformed tensor
    return x

In [None]:
def getunet_model(input_shape=(64, 128, 1), num_classes=num_classes, seed=seed):
    tf.random.set_seed(seed)
    input_layer = tfkl.Input(shape=input_shape, name='input_layer')

    # Downsampling path
    convol_1 = unet_block(input_layer, 32, name='down_block1')
    d1 = tfkl.MaxPooling2D()(convol_1)

    convol_2 = unet_block(d1, 64, name='down_block2')
    d2 = tfkl.MaxPooling2D()(convol_2)

    convol_3 = unet_block(d2, 128, name='down_block3')
    d3 = tfkl.MaxPooling2D()(convol_3)

    # Bottleneck
    bottleneck = unet_block(d3, 512, name='bottleneck')

    #Upsampling + skip connection
    u2 = tfkl.UpSampling2D()(bottleneck)
    u2 = tfkl.Concatenate()([u2, convol_3])
    conv_up_2 = unet_block(u2, 128, name='up_block2')

    u3 = tfkl.UpSampling2D()(conv_up_2)
    u3 = tfkl.Concatenate()([u3, convol_2])
    conv_up_3 = unet_block(u3, 64, name='up_block3')

    u4 = tfkl.UpSampling2D()(conv_up_3)
    u4 = tfkl.Concatenate()([u4, convol_1])
    conv_up_4 = unet_block(u4, 32, name='up_block4')

    # Output Layer
    output_layer = tfkl.Conv2D(num_classes, kernel_size=1, padding='same', activation="softmax", name='output_layer')(conv_up_4)

    model = tf.keras.Model(inputs=input_layer, outputs=output_layer, name='UNet')
    return model

In [None]:
model = getunet_model()

# Print a detailed summary of the model with expanded nested layers and trainable parameters.
model.summary(expand_nested=True, show_trainable=True)

# Generate and display a graphical representation of the model architecture.
#tf.keras.utils.plot_model(model, show_trainable=True, expand_nested=True, dpi=70)

In [None]:
def to_datasett(X_train, y_train, augmentation=False, seed=seed, shuffle=True, batch_size=BATCH_SIZE):
    # Create a TensorFlow dataset from input tensors
    dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))

    # Shuffle the dataset if enabled
    if shuffle:
        dataset = dataset.shuffle(buffer_size=batch_size * 2, seed=seed)

    # Add a channel dimension to the images and labels
    dataset = dataset.map(
        lambda x, y: add_channel(x, y),
        num_parallel_calls=tf.data.AUTOTUNE
    )

    # Apply random flipping for data augmentation if enabled
    if augmentation:
        dataset = dataset.map(
            lambda x, y: random_flip(x, y, seed=seed),
            num_parallel_calls=tf.data.AUTOTUNE
        )

    # Optionally remove the added channel dimension if augmentation is enabled
    if augmentation:
        dataset = dataset.map(
            lambda x, y: (tf.squeeze(x, axis=-1), tf.squeeze(y, axis=-1)),
            num_parallel_calls=tf.data.AUTOTUNE
        )

    # Batch the dataset and prefetch for better performance
    dataset = dataset.batch(batch_size, drop_remainder=False)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)

    return dataset


# Visualization

In [None]:
def create_segmentation_colormap(num_classes):
    """
    Create a linear colormap using a predefined palette.
    Uses 'viridis' as default because it is perceptually uniform
    and works well for colorblindness.
    """
    return plt.cm.viridis(np.linspace(0, 1, num_classes))

def apply_colormap(label, colormap=None):
    """
    Apply the colormap to a label.
    """
    # Ensure label is 2D
    label = np.squeeze(label)

    if colormap is None:
        num_classes = len(np.unique(label))
        colormap = create_segmentation_colormap(num_classes)

    # Apply the colormap
    colored = colormap[label.astype(int)]

    return colored



In [None]:
class VizCallback(tf.keras.callbacks.Callback):
    def __init__(self, dataset, frequency=5, num_classes=2):
        super().__init__()
        self.dataset = dataset
        self.frequency = frequency
        self.num_classes = num_classes
        self.dataset_iter = iter(dataset)  # Crea un iteratore per accedere ai dati

    def on_epoch_end(self, epoch, logs=None):
        if epoch % self.frequency == 0:  # Visualizza solo ogni "frequency" epochs
            try:
                # Estrai un batch di dati
                image, label = next(self.dataset_iter)
            except StopIteration:
                # Ricrea l'iteratore se i dati sono terminati
                self.dataset_iter = iter(self.dataset)
                image, label = next(self.dataset_iter)

            # Prepara i dati per la predizione
            image = tf.expand_dims(image[0], 0)  # Estrai una sola immagine dal batch
            label = label[0]  # Etichetta corrispondente
            pred = self.model.predict(image, verbose=0)
            y_pred = tf.math.argmax(pred, axis=-1)
            y_pred = y_pred.numpy()

            # Creazione della mappa colori
            colormap = create_segmentation_colormap(self.num_classes)

            plt.figure(figsize=(16, 4))

            # Immagine di input
            plt.subplot(1, 3, 1)
            plt.imshow(image[0])
            plt.title("Input Image")
            plt.axis('off')

            # Ground truth
            plt.subplot(1, 3, 2)
            colored_label = apply_colormap(label.numpy(), colormap)
            plt.imshow(colored_label)
            plt.title("Ground Truth Mask")
            plt.axis('off')

            # Predizione
            plt.subplot(1, 3, 3)
            colored_pred = apply_colormap(y_pred[0], colormap)
            plt.imshow(colored_pred)
            plt.title("Predicted Mask")
            plt.axis('off')

            plt.tight_layout()
            plt.show()
            plt.close()



# Compilation and training

In [None]:
model.compile(
    # Loss function for sparse integer labels in multi-class classification
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),

    # Optimizer with weight decay for better regularization
    optimizer=tf.keras.optimizers.AdamW(0.001),

    # Metrics to evaluate during training and validation
    metrics=[
        "accuracy",  # Standard accuracy metric
        MeanIntersectionOverUnion(num_classes=5, labels_to_exclude=[0])  # Custom Mean IoU metric excluding label 0
    ]
)


In [None]:
# Callback to stop training early if validation accuracy does not improve
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_accuracy',       # Metric to monitor
    mode='max',                   # Stop when the metric is maximized
    patience=patience,            # Number of epochs to wait without improvement
    restore_best_weights=True     # Restore the model weights from the best epoch
)

# Custom callback for visualizing predictions during training
viz_callback = VizCallback(
    val_dataset,                  # Validation dataset to visualize predictions
    frequency=5,                  # Frequency (in epochs) to perform visualization
    num_classes=5                 # Number of classes in the dataset
)


In [None]:
history = model.fit(
    train_dataset,
    epochs=epoch,
    validation_data=val_dataset,
    callbacks=[early_stopping, viz_callback],
    verbose=1
).history

# Calculate and print the final validation accuracy
final_val_meanIoU = round(max(history['val_mean_iou'])* 100, 2)
print(f'Final validation Mean Intersection Over Union: {final_val_meanIoU}%')


# Definisci il percorso di salvataggio nella directory di lavoro
model.save("/kaggle/working/UNet_.keras")


# Salva il modello
print(f"Model saved to: {model_filename}")

# Delete the model to free up resources
del model

# Save model and test

In [None]:
data = np.load("/kaggle/input/mars-for-students/mars_for_students.npz")
X_test = data["test_set"]
print(X_test.shape)

# Aggiungi il canale di profondità (grayscale)
X_test = np.expand_dims(X_test, axis=-1)  # Aggiunge la dimensione del canale
print("Test set shape after adding channel:", X_test.shape)

# Carica il modello salvato
model = tf.keras.models.load_model("/kaggle/working/UNet_.keras")

# Previsione sul test set
predictions = model.predict(X_test, verbose=1)  # Output sarà (10022, 64, 128, num_classes)
print("Predictions shape:", predictions.shape)

# Conversione delle predizioni in classi (argmax)
y_pred = np.argmax(predictions, axis=-1)  # Output sarà (10022, 64, 128)
print("Predictions after argmax shape:", y_pred.shape)

predictions = model.predict(X_test, verbose=1)
y_pred = np.argmax(predictions, axis=-1)  # Converte in classi


# Funzioni per la visualizzazione
def create_segmentation_colormap(num_classes):
    """Crea una mappa di colori per la segmentazione."""
    return plt.cm.viridis(np.linspace(0, 1, num_classes))

def apply_colormap(mask, colormap):
    """Applica la colormap a una maschera."""
    mask = mask.astype(int)  # Assicura che i valori siano interi
    return colormap[mask]

# Visualizza alcune immagini e relative segmentazioni
colormap = create_segmentation_colormap(num_classes=5)  # Usa il numero di classi del tuo modello
for i in range(150):  # Mostra i primi 5 esempi
    plt.figure(figsize=(12, 6))

    # Immagine di input
    plt.subplot(1, 2, 1)
    plt.imshow(X_test[i, :, :, 0], cmap="gray")  # Mostra in scala di grigi
    plt.title(f"Test Image {i+1}")
    plt.axis("off")

    # Segmentazione prodotta
    plt.subplot(1, 2, 2)
    segmented = apply_colormap(y_pred[i], colormap)
    plt.imshow(segmented)
    plt.title(f"Segmentation {i+1}")
    plt.axis("off")

    plt.show()