# Environment setup and libray inclusion

In [1]:
from google.colab import drive
drive.mount('/gdrive')
%cd /gdrive/MyDrive/Colab/AN2DL/Homework2

# Installing Keras-CV
!pip install -q --upgrade keras-cv albumentations

Mounted at /gdrive
/gdrive/MyDrive/Colab/AN2DL/Homework2
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m66.0/66.0 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m650.7/650.7 kB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m258.1/258.1 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m950.8/950.8 kB[0m [31m22.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m632.7/632.7 kB[0m [31m20.3 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
# Inclusing libraries
from google.colab                           import runtime
from keras_cv.layers                        import AugMix, RandAugment, \
                                                    RandomHue, RandomSaturation
from matplotlib.colors                      import ListedColormap
from tensorflow                             import keras as tfk
from tensorflow.keras                       import layers as tfkl
from tensorflow.keras.applications          import ConvNeXtBase
from tensorflow.keras.models                import Model, load_model
from tensorflow.keras.preprocessing.image   import ImageDataGenerator
from tensorflow.keras.utils                 import to_categorical, Sequence
from sklearn.metrics                        import accuracy_score, precision_score,\
                                                    recall_score, f1_score,\
                                                    confusion_matrix
from sklearn.model_selection                import train_test_split
from sklearn.utils                          import class_weight

import albumentations                       as A
import matplotlib.pyplot                    as plt
import numpy                                as np
import os
import pandas                               as pd
import seaborn                              as sns
import tensorflow                           as tf

Setting seed for our execution environment.

In [3]:
seed: int = 42
np.random.seed(seed)
tf.random.set_seed(seed)

We define a series of useful functions which will be called throught the notebook.

In [4]:
def displayImage(image, title="") -> None:
    """
    Function in charge of displaying an image

    -------
    image: image to be displayed
    title: title of the image
    """

    plt.imshow(image, cmap="grey")
    plt.title(title)
    plt.show()


def displaySegmentedImage(image_with_segmentation, title="Original Image") -> None:
    """
    Function in charge of displaying a segmented image

    -------
    image_with_segmentation: image to be displayed (NOT one-hot-encoded)
    title: title of the image
    """

    if image_with_segmentation.shape[0] != 2:
        raise ValueError("Input array must have two components along axis 0 (image and segmentation).")

    # Extract image and segmentation
    image   = image_with_segmentation[0]  # Original image
    seg     = image_with_segmentation[1]  # Segmentation classes

    # Define a colormap for segmentation classes
    colours = ['black', 'brown', 'blue', 'yellow', 'orange']
    cmap    = []
    for label in range(5):
        if np.any(seg == label):
            cmap.append(colours[label])
    cmap    = ListedColormap(cmap)

    # Plot the results
    fig, axes = plt.subplots(1, 2, figsize=(12, 6))

    # Display the original image
    axes[0].imshow(image, cmap='gray')
    axes[0].set_title(title)
    axes[0].axis("off")

    # Overlay segmentation on top of the original image
    axes[1].imshow(image, cmap='gray')
    axes[1].imshow(seg, cmap=cmap, alpha=0.5)  # Alpha for transparency
    axes[1].set_title("Segmented Image")
    axes[1].axis("off")

    # Add colour legend
    legend_elements = ["background", "soil", "bedrock", "sand", "big rock"]
    legend_handles  = [plt.Line2D([0], [0], marker='o', color='w', markerfacecolor=colours[i], markersize=10) for i in range(5)]
    axes[0].legend(legend_handles, legend_elements, title="Classes", loc="upper right")

    plt.tight_layout()
    plt.show()


def toCategorical(mask, img_shape, num_classes):
    mask_categorical = np.zeros((mask.shape[0], *img_shape, num_classes))
    for i in range(mask.shape[0]):
        mask_categorical[i] = to_categorical(mask[i], num_classes=num_classes)
    return mask_categorical


def goodnight() -> None:
    """
    Function in charge of disconnecting current runtime
    """
    runtime.unassign()

# Dataset loading and cleaning

Our dataset contains several images with an alien superimposed on them. We therefore need to perform some cleaning before applying general preprocessing. Also, since class-0 (*background*) does not count during evaluation, images which ONLY contain background serve no use in training, hence we can remove them as well.

In [5]:
data    = np.load('mars.npz')
images  = data['training_set']

# Flag to signal that dataset hasn't been cleaned so far
cleaned: bool = False

Now that the dataset has been loaded, we actually clean it from irrelevant and troublesome samples.

In [6]:
# Index of a known outlier
track_index: int = 669

# List of indices of outlier samples
outlier_indices: list[int] = []
only_bg_indices: list[int] = []

# Remove all images which have segmentation map equal to that of image at track_index == 669
if not cleaned:
    for i, image in enumerate(images):
        if np.array_equal(image[1], images[track_index][1]):
            outlier_indices.append(i)
print(f"{len(images)} ==[OUTLIERS]==> ", end="")
images = np.delete(images, outlier_indices, axis=0)
print(f"{len(images)} ==[ONLY-BG]==> ", end="")

# Remove all only-background images
for i, image in enumerate(images):
    if np.array_equal(image[1], np.zeros_like(image[1])):
        only_bg_indices.append(i)
images = np.delete(images, only_bg_indices, axis=0)
print(f"{len(images)}")
print(f"Removed {len(outlier_indices)} outliers and {len(only_bg_indices)} only-background images")

# Updating flag so as to avoid re-cleaning
cleaned: bool = True

2615 ==[OUTLIERS]==> 2505 ==[ONLY-BG]==> 2498
Removed 110 outliers and 7 only-background images


We define custom weights for our segmentation classes here:

In [7]:
weights: dict = {
    0: 0,
    1: 1,
    2: 1,
    3: 1,
    4: 2,
}

# Normalise so as to have sum one of weights
norm = sum(weights.values())
for i in range(5):
    weights[i] /= norm
print(weights, sum(weights.values()))
weights = list(weights.values())

{0: 0.0, 1: 0.2, 2: 0.2, 3: 0.2, 4: 0.4} 1.0


# Dataset preprocessing

After having performed initial cleaning of the training set, we perform additional pre-processing to it. This will mainly consist in augmenting images, which will allow us to increase the number of training samples as well as improve generalisation of our model. Our first operation will normalise the whole set into the [0, 1] range and then divide it into training and validation sets.

In [8]:
# Normalise dataset
normalised_images = images.copy()
normalised_images[:, 0, :, :] = (normalised_images[:, 0, :, :] / 255.0).astype(np.float32)

# Divide dataset into training and validation splits
train, validation = train_test_split(
    normalised_images, test_size=0.3, random_state=seed
)

# Setting up some useful parameters for later
base_shape  = train[0].shape        # Shape of sample (C * W * H)
img_shape   = train[0][0].shape     # Shape of image (W * H)
height      = img_shape[0]          # Height of image
width       = img_shape[1]          # Width of image
NUM_CLASSES = 5                     # Number of classes
batch_size  = 64                    # Tensor flow dataset batch size
aug_factor  = 10                    # Number of augmentations for each sample

# Logging
print(f"Training set shape:   {train.shape}")
print(f"Validation set shape: {validation.shape}")

Training set shape:   (1748, 2, 64, 128)
Validation set shape: (750, 2, 64, 128)


We define our custom augmentation pipeline which will allow us to distort our dataset without losing mask information for segmentation.

In [9]:
def getAugmentationPipeline(height=height, width=width):
    return A.Compose(
        [
            # Geometric transformations
            A.HorizontalFlip(p=0.5),
            A.VerticalFlip(p=0.5),
            # Small rotations (±30°) and scaling/translation/shear within a reasonable range
            A.Affine(
                scale=(0.9, 1.1),
                rotate=(-30, 30),
                shear=(-10, 10),
                translate_percent=(0.1, 0.1),
                interpolation=1,      # For image, nearest-neighbor is acceptable since grayscale. If you prefer smoother interpolation for the image, use interpolation=2 or similar.
                mask_interpolation=0, # Ensure mask uses nearest-neighbor
                p=0.5
            ),
            # Random crop and then resize back to original dimension
            A.RandomCrop(height=int(0.8*height), width=int(0.8*width), p=0.5),
            A.Resize(height=height, width=width, interpolation=1, p=1.0),  # always resize back

            # Intensity-based augmentations (grayscale-friendly)
            A.RandomBrightnessContrast(
                brightness_limit=0.2,
                contrast_limit=0.2,
                p=0.5
            ),
            A.CLAHE(clip_limit=2.0, tile_grid_size=(8,8), p=0.5),
            A.GaussNoise(var_limit=(10.0, 50.0), p=0.3),

            # Mild blur/sharpen
            A.GaussianBlur(blur_limit=(3,3), p=0.3),

            # Mild elastic deformation
            A.ElasticTransform(
                alpha=1,
                sigma=50,
                interpolation=1,  # nearest-neighbor to avoid label artifacts
                border_mode=4,    # reflect border
                p=0.2
            ),
        ],
        additional_targets={'mask': 'mask'}
    )

The following function will perform augmentations by creating and executing the custom pipeline on each sample. We augment `aug_factor` times each sample and store the original sample as well as all its augmentations in our dataset.

In [10]:
# Define a custom augmentation factor based on class frequencies
class_count: list[int] = [0 for _ in range(NUM_CLASSES)]
for image in train:
    for i in range(NUM_CLASSES):
        class_count[i] += np.sum(image[1] == i)

# Convert to np.array, obtain frequencies and invert
class_count         = np.array(class_count)
class_frequencies   = class_count / (width * height * len(train))
inverse_frequencies = np.ceil(1 / class_frequencies)

# Limit augmentation factors to sum of other inverse_frequencies
aug_factors: list[int] = []
for i in range(NUM_CLASSES):
    sum_others = 0
    for j in range(NUM_CLASSES):
        if i != j:
            sum_others += inverse_frequencies[j]
    aug_factors.append(int(min(inverse_frequencies[i], sum_others)))
aug_factors = np.array(aug_factors)
print(aug_factors)

[ 5  3  5  6 19]


In [11]:
# Perform augmentations and export
def augmentImagesWithMask(images, aug_factors=aug_factors):
    """
    Augment and export the dataset to a single .npz file.
    Args:
        images (numpy array): Original images of shape (num_samples, 2, height, width).
        aug_factor (int): Number of augmentations per sample.
    """

    # Obtain an augmentation pipeline and create augmentation placeholders
    augmentation_pipeline = getAugmentationPipeline()
    augmented_images = []
    augmented_masks  = []

    # Augment each sample
    for image_with_mask in images:
        image = image_with_mask[0].astype(np.float32)   # Ensure correct data type
        mask  = image_with_mask[1].astype(np.uint8)     # Ensure correct data type

        # Add original samples as well
        augmented_images.append(image)
        augmented_masks.append(mask)

        # Obtain all pixel classes of the image
        unique_classes = np.unique(mask)

        # Augment as many times as the maximum ceil(aug_factors) of the present classes
        aug_factor = np.max(aug_factors[unique_classes])

        # Generate augmentations
        for _ in range(aug_factor):
            augmented = augmentation_pipeline(image=image, mask=mask)

            augmented_image = augmented["image"].astype(np.float32)   # Explicit conversion
            augmented_mask  = augmented["mask"].astype(np.uint8)      # Explicit conversion

            augmented_images.append(augmented_image)
            augmented_masks.append(augmented_mask)

    # Convert to numpy arrays and stack together
    augmented_images = np.array(augmented_images, dtype=np.float32)
    augmented_masks  = np.array(augmented_masks, dtype=np.uint8)
    augmented        = np.stack((augmented_images, augmented_masks), axis=1)

    # Return stacked dataset
    return augmented

We now augment and complete preprocessing by converting all masks into one-hot encoding, which is necessary for training.

In [12]:
# Perform augmentations and save as npz
print(f"TRAINING: {train.shape} ==[AUGMENTATION]=>", end=" ")
augmented_train = augmentImagesWithMask(train)
print(f"{augmented_train.shape} ==[CATEGORICAL]=>", end=" ")

# Convert to categorical
augmented_train_images  = augmented_train[:, 0]
augmented_train_masks   = toCategorical(augmented_train[:, 1], img_shape, NUM_CLASSES)
print(f"{augmented_train_images.shape} + {augmented_train_masks.shape}")

TRAINING: (1748, 2, 64, 128) ==[AUGMENTATION]=> (11345, 2, 64, 128) ==[CATEGORICAL]=> (11345, 64, 128) + (11345, 64, 128, 5)


Converting also validation to categorical.

In [13]:
print(f"VALIDATION: {validation.shape} ==[CATEGORICAL]=> ", end="")
validation_images       = validation[:, 0].astype(np.float32)
validation_segs         = toCategorical(validation[:, 1], img_shape, NUM_CLASSES).astype(np.int8)
print(f"{validation_images.shape} + {validation_segs.shape}")

VALIDATION: (750, 2, 64, 128) ==[CATEGORICAL]=> (750, 64, 128) + (750, 64, 128, 5)


Finally, we export the augmented and preprocessed training and validation sets.

In [None]:
np.savez(
    "mars_augmented.npz",
    train_images=augmented_train_images,
    train_segs=augmented_train_masks,
    val_images=validation_images,
    val_segs=validation_segs,
    weights=weights,
)
print("== EXPORTED ==")

== EXPORTED ==


# Disconnecting

Disconnect runtime to avoid consuming resources if the notebook has completed its execution.

In [None]:
goodnight()