# Artificial Neural Networks and Deep Learning

---

## Homework 2: Exploring Augmentations

❗This notebook is using a presumably over-cleaned dataset. However, despite this inconvenience, it should still be useful enough to experiment with different augmentation approaches.  


## 🌐 Connect Colab to Google Drive

In [1]:
from google.colab import drive

drive.mount("/gdrive")
%cd /gdrive/My Drive/Homework_2/Data

Drive already mounted at /gdrive; to attempt to forcibly remount, call drive.mount("/gdrive", force_remount=True).
/gdrive/My Drive/Homework_2/Data


## ⚙️ Install Albumations and Import Libraries

In [2]:
!pip install -U git+https://github.com/albu/albumentations > /dev/null && echo "All libraries are successfully installed!"


  Running command git clone --filter=blob:none --quiet https://github.com/albu/albumentations /tmp/pip-req-build-0lxje9f5
All libraries are successfully installed!


In [3]:
import os
from datetime import datetime

import numpy as np
import pandas as pd
import cv2
import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl

import matplotlib.pyplot as plt


import albumentations as A
from albumentations import (
    HorizontalFlip, VerticalFlip, Rotate, RandomBrightnessContrast,
    ShiftScaleRotate, ElasticTransform, GridDistortion, Blur, CLAHE,
    RandomGamma, Perspective, OneOf, Compose, Normalize
)

np.random.seed(42)
tf.random.set_seed(42)

print(f"TensorFlow version: {tf.__version__}")
print(f"Keras version: {tfk.__version__}")
print(f"GPU devices: {len(tf.config.list_physical_devices('GPU'))}")

TensorFlow version: 2.17.1
Keras version: 3.5.0
GPU devices: 0


## ⏳ Load the Data

In [4]:
data = np.load("mars_for_students.npz")

training_set = data["training_set"]
X_train = training_set[:, 0]
y_train = training_set[:, 1]

X_test = data["test_set"]

print(f"Training X shape: {X_train.shape}")
print(f"Training y shape: {y_train.shape}")
print(f"Test X shape: {X_test.shape}")

Training X shape: (2615, 64, 128)
Training y shape: (2615, 64, 128)
Test X shape: (10022, 64, 128)


## 🛠️ Train and Save the Model

In [5]:
# Add color channel and rescale pixels between 0 and 1
X_train = X_train[..., np.newaxis] / 255.0
X_test = X_test[..., np.newaxis] / 255.0

input_shape = X_train.shape[1:]
num_classes = len(np.unique(y_train))

print(f"Input shape: {input_shape}")
print(f"Number of classes: {num_classes}")

Input shape: (64, 128, 1)
Number of classes: 5


## Load Data

In [9]:

x_train_cleaned = np.load("x_train_cleaned.npy")
y_train_cleaned = np.load("y_train_cleaned.npy")

X_train_augmented = np.load("X_train_augmented.npy")
y_train_augmented = np.load("y_train_augmented.npy")

x_val = np.load("x_val.npy")
y_val = np.load("y_val.npy")

## Definition of some helper functions to extract subsets from the training data
- they are basically the same functions used in the process of deconstructing the original train set to build up a cleaned version, just used the other way around in this context

In [14]:
import numpy as np

def get_indices_with_label_range(y_train, min_labels, max_labels):
    indices = []
    for idx, mask in enumerate(y_train):
        unique_labels = np.unique(mask)
        num_labels = len(unique_labels)
        if min_labels <= num_labels <= max_labels:
            indices.append(idx)
    return indices

# How to use
# y_train: List or array of masks
# min_labels: Minimum number of labels
# max_labels: Maximum number of labels
# indices = get_indices_with_label_range(y_train, min_labels=2, max_labels=5)

# print(f"Number of images with label counts in range [2, 5]: len({indices})")

import numpy as np

def get_indices_with_specific_label(y_train, target_label):
    """
    Returns indices of masks in y_train where the specified label is present.

    Parameters:
    - y_train: Array-like, list or numpy array of masks.
    - target_label: The label to search for in the masks.

    Returns:
    - List of indices where the target label is present.
    """
    indices = []
    for idx, mask in enumerate(y_train):
        if target_label in np.unique(mask):
            indices.append(idx)
    return indices

# How to use
# y_train: List or array of masks
# target_label: Label to search for
# indices = get_indices_with_specific_label(y_train, target_label=3)

# print(f"Number of images where label 3 is present: len({indices})")


from matplotlib.colors import ListedColormap

# Define colors for each label
colors = [
    (0, 0, 0),        # 0: Background - Black
    (0.5, 0.25, 0),   # 1: Soil - Brown
    (0.5, 0.5, 0.5),  # 2: Bedrock - Gray
    (1, 1, 0),        # 3: Sand - Yellow
    (0, 0, 1)         # 4: Big Rock - Blue
]

custom_cmap = ListedColormap(colors)


def save_image_mask_pairs(train_images, train_masks, folder_name="train_images_with_masks"):
    """
    Saves image-mask pairs into a specified folder, with filenames based on their index.

    Args:
        train_images (numpy.ndarray): Array of training images (shape: [N, H, W, C] or [N, H, W]).
        train_masks (numpy.ndarray): Array of corresponding masks (shape: [N, H, W]).
        folder_name (str): Name of the folder where images will be saved.

    Returns:
        None
    """
    # Ensure the folder exists
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)

    # Loop through the dataset
    for idx, (image, mask) in enumerate(zip(train_images, train_masks)):
        plt.figure(figsize=(10, 5))

        # Plot the image
        plt.subplot(1, 2, 1)
        if image.ndim == 3 and image.shape[-1] == 1:  # Grayscale with channel
            plt.imshow(image.squeeze(), cmap="gray")
        else:  # RGB or 2D grayscale
            plt.imshow(image, cmap="gray")
        plt.title("Image")
        plt.axis("off")

        # Plot the mask
        plt.subplot(1, 2, 2)
        plt.imshow(mask, cmap=custom_cmap, vmin=0, vmax=4)
        plt.title("Mask")
        plt.axis("off")

        # Save the figure
        file_path = os.path.join(folder_name, f"{idx}.png")
        plt.savefig(file_path, bbox_inches="tight")
        plt.close()

    print(f"Saved {len(train_images)} image-mask pairs to the folder: '{folder_name}'")


### Extract meaningful subsets with the purpose of tailored augmentation


In [17]:
print(f"Total number of images, so far: {len(y_train_cleaned)}")

print("And from those we have:")

indices_containing_min_4_labels = get_indices_with_label_range(y_train_cleaned, min_labels=4, max_labels=5)
print(f"Number of images with label counts in range [4, 5]: {len(indices_containing_min_4_labels)}")

y_train_cleaned_min_4_labels, x_train_cleaned_min_4_labels = y_train_cleaned[indices_containing_min_4_labels], x_train_cleaned[indices_containing_min_4_labels]

indices_containing_3_labels = get_indices_with_label_range(y_train_cleaned, min_labels=3, max_labels=3)
print(f"Number of images with label counts in range [3, 3]: {len(indices_containing_3_labels)}")

y_train_cleaned_3_labels, x_train_cleaned_3_labels = y_train_cleaned[indices_containing_3_labels], x_train_cleaned[indices_containing_3_labels]

indices_containing_max_2_labels = get_indices_with_label_range(y_train_cleaned, min_labels=1, max_labels=2)
print(f"Number of images with label counts in range [1, 2]: {len(indices_containing_max_2_labels)}")

y_train_cleaned_max_2_labels, x_train_cleaned_max_2_labels = y_train_cleaned[indices_containing_max_2_labels], x_train_cleaned[indices_containing_max_2_labels]






Total number of images, so far: 1074
And from those we have:
Number of images with label counts in range [4, 5]: 150
Number of images with label counts in range [3, 3]: 527
Number of images with label counts in range [1, 2]: 397
Number of images with big rock in the original training set:  63
Number of images with big rock in the cleaned training set:  63
Number of images with big rock in the subset with min 4 lables:  44
Number of images with big rock in the subset with min 4 lables:  19


### ⁉ Now where are all the images of big rock ⁉

In [None]:
print("Number of images with big rock in the original training set: ", len(get_indices_with_specific_label(y_train, target_label=4)))
print("Number of images with big rock in the cleaned training set: ", len(get_indices_with_specific_label(y_train_cleaned, target_label=4)))
print("Number of images with big rock in the subset with min 4 labels: ",len(get_indices_with_specific_label(y_train_cleaned_min_4_labels, target_label=4)))
print("Number of images with big rock in the subset with 3 labels: ",len(get_indices_with_specific_label(y_train_cleaned_3_labels, target_label=4)))

split bed rock images and masks

In [18]:
bed_rock_indices_within_min_4_labels = get_indices_with_specific_label(y_train_cleaned_min_4_labels, target_label=4)
bed_rock_indices_within_3_labels = get_indices_with_specific_label(y_train_cleaned_3_labels, target_label=4)

x_bed_rock, y_bed_rock = np.concatenate((x_train_cleaned_min_4_labels[bed_rock_indices_within_min_4_labels], x_train_cleaned_3_labels[bed_rock_indices_within_3_labels])), np.concatenate((y_train_cleaned_min_4_labels[bed_rock_indices_within_min_4_labels], y_train_cleaned_3_labels[bed_rock_indices_within_3_labels]))

x_train_cleaned_min_4_labels_without_bed_rock, y_train_cleaned_min_4_labels_without_bed_rock = np.delete(x_train_cleaned_min_4_labels, bed_rock_indices_within_min_4_labels, axis=0), np.delete(y_train_cleaned_min_4_labels, bed_rock_indices_within_min_4_labels, axis=0)
x_train_cleaned_3_labels_without_bed_rock, y_train_cleaned_3_labels_without_bed_rock = np.delete(x_train_cleaned_3_labels, bed_rock_indices_within_3_labels, axis=0), np.delete(y_train_cleaned_3_labels, bed_rock_indices_within_3_labels, axis=0)

# verify
print(len(x_train_cleaned_min_4_labels_without_bed_rock), len(y_train_cleaned_min_4_labels_without_bed_rock)) # should be 106
print(len(x_train_cleaned_3_labels_without_bed_rock), len(y_train_cleaned_3_labels_without_bed_rock)) # should be 508

106 106
508 508


### ❇ Optionally plot subsets to a folder for inspection

- as you may have noticed, i included a function `save_image_mask_pairs(train_images, train_masks, folder_name="train_images_with_masks")`
- we can use it to select more valuable images for augmentation, alternatively we could determine bed_rock images where the bedrock mask is above a certain threshhold, but in that case manual inspection might be more efficient

In [19]:
save_image_mask_pairs(x_bed_rock, y_bed_rock, folder_name="bed_rock_images")

Saved 63 image-mask pairs to the folder: 'bed_rock_images'


In [20]:
# just a small temporary selection, we could try on a simple unet first what works best for feature extraction

some_ok_bedrock_imgs= [0, 3, 5, 6, 8, 9,57,59,60]
x_bed_rock_selection = x_bed_rock[some_ok_bedrock_imgs]
y_bed_rock_selection = y_bed_rock[some_ok_bedrock_imgs]

## Augmentation

 1. Define Augmentation Pipelines for Upsampling
 2. Define `DataGenerator`
 3. Visualize Augmentation Techniques
 4. Upsample using Augmentations

In [None]:
### Pipelines

In [45]:
class AugmentationLogger:
    def __init__(self, pipeline):
        self.pipeline = pipeline
        self.applied_augmentations = []

    def __call__(self, **kwargs):
        # Reset log for the new call
        self.applied_augmentations = []
        augmented = self.pipeline(**kwargs)

        # Check which augmentations were applied
        for transform in self.pipeline.transforms:
            if isinstance(transform, A.NoOp):  # Skip logging NoOp
                continue
            if isinstance(transform, A.OneOf):
                # For OneOf, check which transformation was applied
                chosen_transform = self._log_oneof(transform)
                if chosen_transform:
                    self.applied_augmentations.append(f"OneOf({chosen_transform})")
            else:
                if np.random.rand() < transform.p:
                    self.applied_augmentations.append(transform.__class__.__name__)

        return augmented

    def _log_oneof(self, oneof_transform):
        # Simulate which transformation was chosen in OneOf
        probs = [t.p for t in oneof_transform.transforms]
        total_prob = sum(probs)
        chosen = np.random.choice(
            oneof_transform.transforms,
            p=[p / total_prob for p in probs]
        )
        return chosen.__class__.__name__

    def get_log(self):
        return self.applied_augmentations


In [29]:
import cv2

def get_3_classes_augmentation_pipeline():
    return A.Compose([
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.3),  # for non class 4 probably not a bad idea

        # Light geometric Variationen
        A.OneOf([
            A.Rotate(limit=10, border_mode=cv2.BORDER_REFLECT_101, p=0.5),
            A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=10,
                               border_mode=cv2.BORDER_REFLECT_101, p=0.5), # this can be useful if we aim smooth out the effects of geometric artifacts
        ], p=0.3),

        # Light Simulation of different perspectives
        A.OneOf([
            A.GridDistortion(num_steps=5, distort_limit=0.2, p=0.4),
            A.Perspective(scale=(0.02, 0.05), keep_size=True, p=0.2),
            A.NoOp()
        ], p=0.5),


        A.RandomBrightnessContrast(brightness_limit=0.15, contrast_limit=0.18, p=0.5),


        A.RandomGamma(gamma_limit=(80,120), p=0.3),

        A.OneOf([
            A.Blur(blur_limit=3, p=0.3),
            A.Sharpen(alpha=(0.2, 0.4), lightness=(0.8,1.0), p=0.3),
            A.NoOp()
        ], p=0.4),

    ], additional_targets={'mask': 'mask'})


def get_bedrock_augmentation_pipeline():
    return A.Compose([
        # A.VerticalFlip is probably confusing on big rock
        A.HorizontalFlip(p=0.5),


       # this can be used with caution if we observe the model lacks in robustness to handle orientation by implementing a bounding-box-based conditional strategy, meaning using it iff big rock isnt on the border, s.t. we dont cut it off,

        #A.OneOf([
            #A.Rotate(limit=10, border_mode=0, p=0.7),
            #A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=10, border_mode=0, p=0.5), A.NoOp()], p=0.5),

        A.OneOf([
            #A.GridDistortion(num_steps=5, distort_limit=0.1, p=0.2), # potentially hiding big rock, so not a good idea
            A.Perspective(scale=(0.02, 0.05), keep_size=True, p=0.2),
            A.NoOp()
        ], p=0.5),


        A.RandomBrightnessContrast(brightness_limit=0.05, contrast_limit=0.1, p=0.5),

        A.RandomGamma(gamma_limit=(90,110), p=0.3), # Weniger Variation als bei non class4

        # less blur more sharpen
        A.OneOf([
            A.Sharpen(alpha=(0.2, 0.5), lightness=(0.8,1.0), p=0.5),
            A.Blur(blur_limit=2, p=0.2),
            A.NoOp()
        ], p=0.5),


    ], additional_targets={'mask': 'mask'})

def get_standard_augmentation_pipeline(): # used for up to 3 classes so far
    return A.Compose([
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.3),

        # Leichte Rotation/Skalierung, aber nicht zu komplex
        A.OneOf([
            A.Rotate(limit=10, border_mode=cv2.BORDER_REFLECT_101, p=0.5),
            A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=10, border_mode=cv2.BORDER_REFLECT_101, p=0.4),
            A.NoOp()
        ], p=0.6),

        # Weniger komplexe Verzerrungen, vielleicht ab und zu GridDistortion, aber mild
        A.OneOf([
            A.GridDistortion(num_steps=5, distort_limit=0.15, p=0.3),
            A.NoOp()
        ], p=0.4),

        # Beleuchtungsvariationen moderat
        A.RandomBrightnessContrast(brightness_limit=0.1, contrast_limit=0.1, p=0.5),

        A.RandomGamma(gamma_limit=(80,120), p=0.3),

        # Ggf. leichte Schärfe oder Blur
        A.OneOf([
            A.Blur(blur_limit=3, p=0.3),
            A.Sharpen(alpha=(0.2, 0.4), lightness=(0.8,1.0), p=0.3),
            A.NoOp()
        ], p=0.3),

    ], additional_targets={'mask': 'mask'})


### 2. DataGenerator[Linktext](https://)

In [30]:
import tensorflow as tf
import numpy as np

class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, images, masks, batch_size, augmentations=None, shuffle=True):
        self.images = images
        self.masks = masks
        self.batch_size = batch_size
        self.augment = augmentations
        self.shuffle = shuffle
        self.indices = np.arange(len(images))
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.images) / self.batch_size))

    def __getitem__(self, index):
        batch_indices = self.indices[index * self.batch_size : (index + 1) * self.batch_size]
        batch_images = self.images[batch_indices]
        batch_masks = self.masks[batch_indices]

        augmented_images = []
        augmented_masks = []

        for img, mask in zip(batch_images, batch_masks):
            augmented = self.augment(image=img.squeeze(), mask=mask)
            aug_img = augmented['image'].astype(np.float32)
            aug_mask = augmented['mask'].astype(np.int32)
            aug_img = np.expand_dims(aug_img, axis=-1)
            augmented_images.append(aug_img)
            augmented_masks.append(aug_mask)

        return np.array(augmented_images), np.array(augmented_masks)

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)


In [47]:
batch_size = 32
# Instantiate augmentation pipelines
bedrock_augment = AugmentationLogger(get_bedrock_augmentation_pipeline()) # wrap around the Logger if you want to see which augmentations where actually performed
min_4_classes_augment = get_3_classes_augmentation_pipeline()
standard_augment = get_standard_augmentation_pipeline()

# Create training generator with standard augmentations for images with min 4 labels
train_generator_min_4 = DataGenerator(
    images=x_train_cleaned_min_4_labels_without_bed_rock,
    masks=y_train_cleaned_min_4_labels_without_bed_rock,
    batch_size=batch_size,
    augmentations=min_4_classes_augment,
    shuffle=True
)

# Create training generator with standard augmentations for images with 3 labels
train_generator_3 = DataGenerator(
    images=x_train_cleaned_3_labels_without_bed_rock,
    masks=y_train_cleaned_3_labels_without_bed_rock,
    batch_size=batch_size,
    augmentations=standard_augment,
    shuffle=True
)

# Create training generator with standard augmentations for images with max 2 labels
train_generator_max_2 = DataGenerator(
    images=x_train_cleaned_max_2_labels,
    masks=y_train_cleaned_max_2_labels,
    batch_size=batch_size,
    augmentations=standard_augment,
    shuffle=True
)

# Create bedrock generator
train_generator_bedrock = DataGenerator(
    images=x_bed_rock_selection,
    masks=y_bed_rock_selection,
    batch_size=batch_size,
    augmentations=bedrock_augment,
    shuffle=True
)


  result = _ensure_min_value(result, min_value, info.field_name)


### visualize some generated samples (sorry for the duplicate code)

In [53]:
def visualize_augmented_samples_with_logs(generator, num_samples=5, num_augmented=3):
    """
    Visualize original images/masks, their augmented versions, and the applied augmentations.

    Parameters:
    - generator: Instance of DataGenerator
    - num_samples: Number of original samples to display
    - num_augmented: Number of augmented versions per original
    """
    total_rows = num_samples * (1 + num_augmented)

    fig, axes = plt.subplots(
        nrows=total_rows,
        ncols=2,
        figsize=(10, total_rows * 2)
    )

    sample_indices = np.random.choice(len(generator.images), num_samples, replace=False)
    sample_images = generator.images[sample_indices]
    sample_masks = generator.masks[sample_indices]

    row = 0
    for i in range(num_samples):
        image = sample_images[i].squeeze()
        mask = sample_masks[i]

        axes[row, 0].imshow(image, cmap='gray')
        axes[row, 0].set_title('Original Image')
        axes[row, 0].axis('off')

        axes[row, 1].imshow(mask, cmap='viridis', vmin=0, vmax=np.max(mask))
        axes[row, 1].set_title('Original Mask')
        axes[row, 1].axis('off')

        row += 1

        for j in range(num_augmented):
            augmented = generator.augment(image=image, mask=mask) if generator.augment else {'image': image, 'mask': mask}
            aug_image = augmented['image']
            aug_mask = augmented['mask']

            # Log applied augmentations
            if isinstance(generator.augment, AugmentationLogger):
                aug_logs = generator.augment.get_log()
            else:
                aug_logs = []

            axes[row, 0].imshow(aug_image.squeeze(), cmap='gray')
            axes[row, 0].set_title(f'Augmented Image {j + 1}')
            axes[row, 0].axis('off')

            axes[row, 1].imshow(aug_mask, cmap='viridis', vmin=0, vmax=4)
            axes[row, 1].set_title(f'Augmented Mask {j + 1}')
            axes[row, 1].axis('off')

            # Add the log as text below the augmented image
            text = '\n'.join(aug_logs)
            fig.text(0.5, (total_rows - row - 1) / total_rows, text, ha='center', fontsize=8)

            row += 1

    plt.tight_layout()
    plt.show()


def visualize_augmented_samples(generator, num_samples=5, num_augmented=3):
    """
    Visualize original images/masks and their augmented versions.

    Parameters:
    - generator: Instance of DataGenerator
    - num_samples: Number of original samples to display
    - num_augmented: Number of augmented versions per original
    """
    # Calculate the total number of rows needed
    total_rows = num_samples * (1 + num_augmented)

    # Set the figure size dynamically based on the number of rows and columns
    fig, axes = plt.subplots(
        nrows=total_rows,
        ncols=2,
        figsize=(8, total_rows * 2)
    )

    # Select random samples from the generator
    sample_indices = np.random.choice(len(generator.images), num_samples, replace=False)
    sample_images = generator.images[sample_indices]
    sample_masks = generator.masks[sample_indices]

    row = 0
    for i in range(num_samples):
        image = sample_images[i].squeeze()
        mask = sample_masks[i]

        # Display Original Image
        axes[row, 0].imshow(image, cmap='gray')
        axes[row, 0].set_title('Original Image')
        axes[row, 0].axis('off')

        # Display Original Mask
        axes[row, 1].imshow(mask, cmap=custom_cmap, vmin=0, vmax=4)  # Replace 'viridis' if custom cmap
        axes[row, 1].set_title('Original Mask')
        axes[row, 1].axis('off')

        row += 1

        # Generate and display augmented samples
        for j in range(num_augmented):
            augmented = generator.augment(image=image, mask=mask) if generator.augment else {'image': image, 'mask': mask}
            aug_image = augmented['image']
            aug_mask = augmented['mask']

            # Augmented Image
            axes[row, 0].imshow(aug_image.squeeze(), cmap='gray')
            axes[row, 0].set_title(f'Augmented Image {j + 1}')
            axes[row, 0].axis('off')

            # Augmented Mask
            axes[row, 1].imshow(aug_mask, cmap=custom_cmap, vmin=0, vmax=4)  # Replace 'viridis' if custom cmap
            axes[row, 1].set_title(f'Augmented Mask {j + 1}')
            axes[row, 1].axis('off')

            row += 1

    # Adjust layout for better visualization
    plt.tight_layout()
    plt.show()



### choose visualization with or without logs

In [52]:
# Visualize augmentations for images with min 4 labels
visualize_augmented_samples(train_generator_min_4, num_samples=5, num_augmented=2)

KeyboardInterrupt: 

In [None]:
# Visualize augmentations for images with 3 classes
visualize_augmented_samples(train_generator_3, num_samples=3, num_augmented=3)

In [None]:
# Visualize augmentations for images with max 2 classes
visualize_augmented_samples(train_generator_max_2, num_samples=3, num_augmented=3)

In [None]:
# # Visualize augmentations for bedrock images
visualize_augmented_samples_with_logs(train_generator_bedrock, num_samples=3, num_augmented=3)

### Upsample through Augmentation, if we're happy with the pipeline

In [54]:
from tqdm import tqdm
import numpy as np

def augment_images(images, masks, augmentation_pipeline, augment_times):
    """
    Apply augmentation_pipeline to each image-mask pair augment_times times.
    Displays progress using tqdm.

    Parameters:
    - images: NumPy array of images (N, H, W, C) where C=1 for grayscale
    - masks: NumPy array of masks (N, H, W)
    - augmentation_pipeline: Albumentations augmentation pipeline
    - augment_times: Number of augmented copies per image

    Returns:
    - augmented_images: Augmented images as NumPy array (N', H, W, C)
    - augmented_masks: Augmented masks as NumPy array (N', H, W)
    """
    augmented_images = []
    augmented_masks = []

    # Initialize tqdm progress bar
    total_augmentations = len(images) * augment_times
    with tqdm(total=total_augmentations, desc="Augmenting Images and Masks") as pbar:
        for img, mask in zip(images, masks):
            for _ in range(augment_times):
                # Albumentations expects images as (H, W, C) and masks as (H, W)
                augmented = augmentation_pipeline(image=img.squeeze(), mask=mask)
                aug_img = augmented['image'].astype(np.float32)  # Ensure image dtype
                aug_mask = augmented['mask'].astype(np.int32)    # Ensure mask dtype

                # Add channel dimension back to image (grayscale)
                aug_img = np.expand_dims(aug_img, axis=-1)

                augmented_images.append(aug_img)
                augmented_masks.append(aug_mask)

                # Update progress bar
                pbar.update(1)

    # Convert lists to NumPy arrays
    return np.array(augmented_images), np.array(augmented_masks)

In [55]:
# How to use
x_bed_rock_selection_augmented, y_bed_rock_selection_augmented = augment_images(
    x_bed_rock_selection,
    y_bed_rock_selection,
    bedrock_augment,
    augment_times=3
)

Augmenting Images and Masks: 100%|██████████| 27/27 [00:00<00:00, 837.88it/s]


## Model stuff

In [None]:
inputs = tfkl.Input(shape=input_shape)
x = tfkl.Conv2D(filters=num_classes, kernel_size=(1, 1), activation="softmax")(inputs)
model = tfk.Model(inputs=inputs, outputs=x, name="minimal_working_net")

# Define the MeanIoU ignoring the background class
mean_iou = tfk.metrics.MeanIoU(num_classes=num_classes, ignore_class=0, sparse_y_pred=False)

model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=[mean_iou])

model.summary()

## Use loaded data in your model

In [None]:
# Fit the model
history = model.fit(
    # on augmented data
    train_data=(X_train_augmented, y_train_augmented),
    # or on cleaned data:
    # train_data=(x_train_cleaned, y_train_cleaned),
    validation_data=(x_val, y_val),
    epochs=10,
    batch_size=64,
)

In [None]:
inputs = tfkl.Input(shape=input_shape)
x = tfkl.Conv2D(filters=num_classes, kernel_size=(1, 1), activation="softmax")(inputs)
model = tfk.Model(inputs=inputs, outputs=x, name="minimal_working_net")

# Define the MeanIoU ignoring the background class
mean_iou = tfk.metrics.MeanIoU(num_classes=num_classes, ignore_class=0, sparse_y_pred=False)

model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=[mean_iou])

model.summary()

history = model.fit(X_train, y_train, epochs=2)

Epoch 1/2
[1m82/82[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 83ms/step - loss: 1.6226 - mean_io_u_2: 0.0918
Epoch 2/2
[1m82/82[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 89ms/step - loss: 1.5858 - mean_io_u_2: 0.1144


## Function to evaluate the model and visualize predictions

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import seaborn as sns
import tensorflow as tf

def evaluate_model_performance(model, X_test, y_test, class_names):
    """
    Evaluates the performance of a segmentation model on the test set.

    Args:
        model: Trained segmentation model.
        X_test: Test images, shape (num_samples, height, width, channels).
        y_test: Ground truth masks, shape (num_samples, height, width).
        class_names: List of class names (e.g., ["background", "rock", "sand", "crater", "other"]).

    Returns:
        Prints and plots detailed metrics, including IoU per class, accuracy, and a combined confusion matrix.
    """
    # Predict masks for the test set
    y_pred = model.predict(X_test, batch_size=16)
    y_pred_classes = np.argmax(y_pred, axis=-1)  # Convert to class indices

    # Flatten the arrays for confusion matrix computation
    y_test_flat = y_test.flatten()
    y_pred_flat = y_pred_classes.flatten()

    # Compute the confusion matrix
    combined_cm = confusion_matrix(y_test_flat, y_pred_flat, labels=np.arange(len(class_names)))

    # Normalize the confusion matrix
    combined_cm_normalized = combined_cm.astype('float') / combined_cm.sum(axis=1)[:, np.newaxis]

    # Calculate IoU for each class
    num_classes = len(class_names)
    iou_per_class = []
    for c in range(num_classes):
        intersection = combined_cm[c, c]
        union = combined_cm[c, :].sum() + combined_cm[:, c].sum() - intersection
        iou = intersection / union if union > 0 else 0
        iou_per_class.append(iou)

    # Overall metrics
    mean_iou = np.mean(iou_per_class)
    pixel_accuracy = np.sum(y_pred_flat == y_test_flat) / len(y_test_flat)

    # Print metrics
    print("Evaluation Results:")
    print("-------------------")
    print(f"Pixel-wise Accuracy: {pixel_accuracy:.4f}")
    print(f"Mean IoU: {mean_iou:.4f}")
    print("\nClass-wise IoU:")
    for i, class_name in enumerate(class_names):
        print(f"  {class_name}: {iou_per_class[i]:.4f}")

    # Plot the combined confusion matrix
    plt.figure(figsize=(10, 8))
    sns.heatmap(combined_cm_normalized, annot=True, fmt=".2f", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
    plt.title("Normalized Confusion Matrix")
    plt.xlabel("Predicted Class")
    plt.ylabel("True Class")
    plt.show()

    # Visualize some predictions
    def visualize_predictions(X, y_true, y_pred, num_samples=5):
        indices = np.random.choice(len(X), num_samples, replace=False)
        for idx in indices:
            plt.figure(figsize=(12, 6))
            plt.subplot(1, 3, 1)
            plt.title("Input Image")
            plt.imshow(X[idx].squeeze(), cmap="gray")
            plt.axis("off")

            plt.subplot(1, 3, 2)
            plt.title("Ground Truth")
            plt.imshow(y_true[idx], cmap="viridis", vmin=0, vmax=num_classes - 1)
            plt.axis("off")

            plt.subplot(1, 3, 3)
            plt.title("Predicted Mask")
            plt.imshow(y_pred[idx], cmap="viridis", vmin=0, vmax=num_classes - 1)
            plt.axis("off")

            plt.show()

    print("\nSample Predictions:")
    visualize_predictions(X_test, y_test, y_pred_classes)


In [None]:

# How to use:
class_names = ["background", "soil", "bedrock", "sand", "bigrock"]
evaluate_model_performance(model, x_val, y_val, class_names)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap

def visualize_test_predictions(model, test_set, num_imgs, custom_cmap=None):
    """
    Visualizes random predictions from the test set with predicted masks overlayed.

    Args:
        model: Trained segmentation model.
        test_set: Test images, shape (num_samples, height, width, channels).
        num_imgs: Number of random test images to visualize.
        custom_cmap: (Optional) A colormap for visualizing masks.
                     Default uses `plt.cm.viridis`.

    Returns:
        Displays the input images with predicted masks overlayed.
    """
    # Generate predictions
    predictions = model.predict(test_set, batch_size=16)
    predicted_masks = np.argmax(predictions, axis=-1)  # Convert logits to class indices

    # Randomly select images to visualize
    indices = np.random.choice(len(test_set), num_imgs, replace=False)

    for idx in indices:
        plt.figure(figsize=(12, 6))

        # Input image
        plt.subplot(1, 2, 1)
        plt.title("Input Image")
        plt.imshow(test_set[idx].squeeze(), cmap="gray")  # Grayscale display
        plt.axis("off")

        # Predicted mask overlayed
        plt.subplot(1, 2, 2)
        plt.title("Predicted Mask Overlayed")
        plt.imshow(test_set[idx].squeeze(), cmap="gray",  alpha=0.8)  # Original image
        plt.imshow(predicted_masks[idx], cmap=custom_cmap or "viridis",vmin=0, vmax=4, alpha=0.5)  # Overlay mask
        plt.axis("off")

        plt.show()


In [None]:
timestep_str = datetime.now().strftime("%y%m%d_%H%M%S")
model_filename = f"model_{timestep_str}.keras"
model.save(model_filename)
del model

print(f"Model saved to {model_filename}")

## 📊 Prepare Your Submission

In our Kaggle competition, submissions are made as `csv` files. To create a proper `csv` file, you need to flatten your predictions and include an `id` column as the first column of your dataframe. To maintain consistency between your results and our solution, please avoid shuffling the test set. The code below demonstrates how to prepare the `csv` file from your model predictions.




In [None]:
# If model_filename is not defined, load the most recent model from Google Drive
if "model_filename" not in globals() or model_filename is None:
    files = [f for f in os.listdir('.') if os.path.isfile(f) and f.startswith('model_') and f.endswith('.keras')]
    files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
    if files:
        model_filename = files[0]
    else:
        raise FileNotFoundError("No model files found in the current directory.")

In [None]:
model = tfk.models.load_model(model_filename)
print(f"Model loaded from {model_filename}")

In [None]:
preds = model.predict(X_test)
preds = np.argmax(preds, axis=-1)
print(f"Predictions shape: {preds.shape}")

In [None]:
def y_to_df(y) -> pd.DataFrame:
    """Converts segmentation predictions into a DataFrame format for Kaggle."""
    n_samples = len(y)
    y_flat = y.reshape(n_samples, -1)
    df = pd.DataFrame(y_flat)
    df["id"] = np.arange(n_samples)
    cols = ["id"] + [col for col in df.columns if col != "id"]
    return df[cols]

In [None]:
# Create and download the csv submission file
timestep_str = model_filename.replace("model_", "").replace(".keras", "")
submission_filename = f"submission_{timestep_str}.csv"
submission_df = y_to_df(preds)
submission_df.to_csv(submission_filename, index=False)

from google.colab import files
files.download(submission_filename)

#  
<img src="https://airlab.deib.polimi.it/wp-content/uploads/2019/07/airlab-logo-new_cropped.png" width="350">

<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/9/95/Instagram_logo_2022.svg/800px-Instagram_logo_2022.svg.png" width="15"> **Instagram:** https://www.instagram.com/airlab_polimi/

<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/8/81/LinkedIn_icon.svg/2048px-LinkedIn_icon.svg.png" width="15"> **LinkedIn:** https://www.linkedin.com/company/airlab-polimi/
___
Credits: Alberto Archetti 📧 alberto.archetti@polito.it





```
   Copyright 2024 Alberto Archetti

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
```