#Exam of Anthea Silvia Sasdelli
September session 2023

## Satellite images segmentation

The task consists of creating a neural model able to perform semantic segmentation on satellite images into six (seven with the no information) different classes.

In this project, the input-target pair is composed by a $1000 \times 1000$ RGB image as visualized above, together with a $1000 \times 1000$ mask, that classifies each pixel by assigning to it a real number.

##Import the libraries

In [None]:
# Utilities
import os
from glob import glob
from IPython.utils import io

# Algebra
import numpy as np

# Visualization
import matplotlib.pyplot as plt
from PIL import Image
import cv2

# Neural Networks
import tensorflow as tf
from tensorflow import keras as ks
from keras.callbacks import EarlyStopping
from keras import backend as K

The data is downloaded by a local file due to problems with the dataset link.

In [None]:
with io.capture_output() as captured:
  !gdown 1jOljqtxkSqeI4s04RanUlIATDKBY0wUR
  !unzip ign_dataset.zip
  !rm ign_dataset.zip

##Preprocessing of the data
The dataset contained two folders:


*   Images: 1000×1000  RGB image
*   Annotation: 1000x1000 masks

Each one was divided in training and validation, which was used as test set.

In [None]:
X_path = './ign/images/'
Y_path = './ign/annotations/'

#Lists of the names of the files

image_train = sorted(os.listdir(os.path.join(X_path, 'training')))
image_test = sorted(os.listdir(os.path.join(X_path, 'validation')))
mask_train = sorted(os.listdir(os.path.join(Y_path, 'training')))
mask_test = sorted(os.listdir(os.path.join(Y_path, 'validation')))

In [None]:
def load_dataset(image_root_path, mask_root_path, image_file_names, mask_file_names):
    num_samples = len(image_file_names)
    image_size = (256, 256)
    num_channels = 3

    # Initialize arrays to store images and masks
    images = np.zeros((num_samples, *image_size, num_channels), dtype=np.uint8)
    masks = np.zeros((num_samples, *image_size), dtype=np.float32)

    for i, (image_name, mask_name) in enumerate(zip(image_file_names, mask_file_names)):
        # Load and resize the RGB image
        image_path = os.path.join(image_root_path, image_name)
        img = Image.open(image_path).convert('RGB')
        img = img.resize(image_size, Image.NEAREST)
        images[i] = np.array(img)

        # Load and resize the mask
        mask_path = os.path.join(mask_root_path, mask_name)
        mask = Image.open(mask_path).convert('L')
        mask = mask.resize(image_size, Image.NEAREST)
        masks[i] = np.array(mask)

    return images, masks

In [None]:
x_train, y_train = load_dataset('./ign/images/training/', './ign/annotations/training/', image_train, mask_train)
x_test, y_test = load_dataset('./ign/images/validation/', './ign/annotations/validation/', image_test, mask_test)

Since the masks dataset had an unusual format, i decided to encode both train and validation with a one hot encoding in order to have the shape  $batch \times height \times width \times num. class$

In [None]:
def batch_to_one_hot(batch, num_classes):
    """
    Convert a batch of integer class label masks to one-hot encoding.

    Args:
        batch (numpy.ndarray): Batch of integer class label masks with shape (batch_size, size, size).
        num_classes (int): The number of classes in the dataset.

    Returns:
        numpy.ndarray: One-hot encoding of the input batch with shape (batch_size, size, size, num_classes).
    """
    # Create an empty one-hot encoding array
    one_hot_encoding = np.zeros((batch.shape[0], batch.shape[1], batch.shape[2], num_classes), dtype=np.float32)

    # Iterate through each class label and set the corresponding channel to 1
    for class_index in range(num_classes):
        one_hot_encoding[:, :, :, class_index] = (batch == class_index).astype(np.float32)

    return one_hot_encoding

y_train = batch_to_one_hot(y_train, 7)


In [None]:
y_test = batch_to_one_hot(y_test, 7)

So now they have this shape:

In [None]:
y_train.shape

(600, 256, 256, 7)

##Visualization
Here there is an example of the same image with all the different 7 channels of the mask, showing the pixels that are in that class.

In [None]:
def show(x, y, title):
    plt.figure(figsize=(10, 7))

    plt.subplot(1, 2, 1)
    plt.imshow(x)
    if title:
        plt.title(title[0])

    plt.subplot(1, 2, 2)
    plt.imshow(y)
    if title:
        plt.title(title[1])

    plt.show()

for i in range(y_train.shape[3]):
  show(x_train[150], y_train[150][...,i], ('This is the image', 'This is its relative mask of the channel '+str(i)))

Output hidden; open in https://colab.research.google.com to view.

## U-Net
The U-Net architecture is chosen for multiclass semantic segmentation due to its specialized design, incorporating encoder and decoder networks with skip connections for precise localization. It effectively captures semantic information, distinguishing between classes.

In the specific it is composed by:

*   **Input layer**
*   **Contractive path**, where the spatial information is reduced while the feature information is increased
*   **The bottom layer**
*   **Expansive path**, where the data is rexpanded and the layers are concatenated with the corresponded layer in the contractive path
*   **The output layer**





In [None]:
FILTER = 4

IMAGE_WIDTH = 256
IMAGE_HEIGHT = 256
IMAGE_CHANNELS = 3

In [None]:
'''Input layer'''

inputs = tf.keras.layers.Input((IMAGE_WIDTH, IMAGE_HEIGHT, IMAGE_CHANNELS))

In [None]:
''' Contractive path '''

### Layer 1
c1 = tf.keras.layers.Conv2D(FILTER, (3, 3),
                            activation = 'selu',
                            kernel_initializer = 'he_normal',
                            padding = 'same')(inputs)
c1 = tf.keras.layers.Dropout(0.1)(c1)
c1 = tf.keras.layers.Conv2D(FILTER, (3, 3),
                            activation = 'selu',
                            kernel_initializer = 'he_normal',
                            padding = 'same')(c1)
p1 = tf.keras.layers.MaxPool2D((2, 2))(c1)

### Layer 2
c2 = tf.keras.layers.Conv2D(FILTER*2, (3, 3),
                            activation = 'selu',
                            kernel_initializer = 'he_normal',
                            padding = 'same')(p1)
c2 = tf.keras.layers.Dropout(0.1)(c2)
c2 = tf.keras.layers.Conv2D(FILTER*2, (3, 3),
                            activation = 'selu',
                            kernel_initializer = 'he_normal',
                            padding = 'same')(c2)
p2 = tf.keras.layers.MaxPool2D((2, 2))(c2)

### Layer 3
c3 = tf.keras.layers.Conv2D(FILTER*4, (3, 3),
                            activation = 'selu',
                            kernel_initializer = 'he_normal',
                            padding = 'same')(p2)
c3 = tf.keras.layers.Dropout(0.2)(c3)
c3 = tf.keras.layers.Conv2D(FILTER*4, (3, 3),
                            activation = 'selu',
                            kernel_initializer = 'he_normal',
                            padding = 'same')(c3)
p3 = tf.keras.layers.MaxPool2D((2, 2))(c3)

### Layer 4
c4 = tf.keras.layers.Conv2D(FILTER*8, (3, 3),
                            activation = 'selu',
                            kernel_initializer = 'he_normal',
                            padding = 'same')(p3)
c4 = tf.keras.layers.Dropout(0.2)(c4)
c4 = tf.keras.layers.Conv2D(FILTER*8, (3, 3),
                            activation = 'selu',
                            kernel_initializer = 'he_normal',
                            padding = 'same')(c4)
p4 = tf.keras.layers.MaxPool2D((2, 2))(c4)

### Layer 5, bottom layer
c5 = tf.keras.layers.Conv2D(FILTER*16, (3, 3),
                            activation = 'selu',
                            kernel_initializer = 'he_normal',
                            padding = 'same')(p4)
c5 = tf.keras.layers.Dropout(0.3)(c5)
c5 = tf.keras.layers.Conv2D(FILTER*16, (3, 3),
                            activation = 'selu',
                            kernel_initializer = 'he_normal',
                            padding = 'same')(c5)



In [None]:
'''Expansive path'''

### Layer 6
u6 = tf.keras.layers.Conv2DTranspose(FILTER*8, (2, 2),
                                     strides = (2, 2),
                                     padding = 'same')(c5)

u6 = tf.keras.layers.concatenate([u6, c4])

c6 = tf.keras.layers.Conv2D(FILTER*8, (3, 3),
                            kernel_initializer = 'he_normal',
                            padding = 'same')(u6)

c6 = tf.keras.layers.Dropout(0.2)(c6)

c6 = tf.keras.layers.Conv2D(FILTER*8, (3, 3),
                            activation = 'selu',
                            kernel_initializer = 'he_normal',
                            padding = 'same')(c6)

### Layer 7
u7 = tf.keras.layers.Conv2DTranspose(FILTER*4, (2, 2),
                                     strides = (2, 2),
                                     padding = 'same')(c6)

c7 = tf.keras.layers.concatenate([u7, c3])

c7 = tf.keras.layers.Conv2D(FILTER*4, (3, 3),
                            kernel_initializer = 'he_normal',
                            padding = 'same')(c7)

c7 = tf.keras.layers.Dropout(0.2)(c7)

c7 = tf.keras.layers.Conv2D(FILTER*4, (3, 3),
                            activation = 'selu',
                            kernel_initializer = 'he_normal',
                            padding = 'same')(c7)

### Layer 8
u8 = tf.keras.layers.Conv2DTranspose(FILTER*2, (2, 2),
                                     strides = (2, 2),
                                     padding = 'same')(c7)

u8 = tf.keras.layers.concatenate([u8, c2])

c8 = tf.keras.layers.Conv2D(FILTER*2, (3, 3),
                            kernel_initializer = 'he_normal',
                            padding = 'same')(u8)

c8 = tf.keras.layers.Dropout(0.1)(c8)

c8 = tf.keras.layers.Conv2D(FILTER*2, (3, 3),
                            activation = 'selu',
                            kernel_initializer = 'he_normal',
                            padding = 'same')(c8)

### Layer 9
u9 = tf.keras.layers.Conv2DTranspose(FILTER, (2, 2),
                                     strides = (2, 2),
                                     padding = 'same')(c8)

u9 = tf.keras.layers.concatenate([u9, c1])

c9 = tf.keras.layers.Conv2D(FILTER, (3, 3),
                            kernel_initializer = 'he_normal',
                            padding = 'same')(u9)

c9 = tf.keras.layers.Dropout(0.1)(c9)

c9 = tf.keras.layers.Conv2D(FILTER, (3, 3),
                            activation = 'selu',
                            kernel_initializer = 'he_normal',
                            padding = 'same')(c9)

In [None]:
'''Output layer'''
outputs = tf.keras.layers.Conv2D(7, (1, 1), activation = 'softmax')(c9)

In [None]:
''' Model building '''
model_1 = tf.keras.Model(inputs = [inputs], outputs = [outputs])
model_1.summary()

Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_3 (InputLayer)        [(None, 256, 256, 3)]        0         []                            
                                                                                                  
 conv2d_38 (Conv2D)          (None, 256, 256, 4)          112       ['input_3[0][0]']             
                                                                                                  
 dropout_18 (Dropout)        (None, 256, 256, 4)          0         ['conv2d_38[0][0]']           
                                                                                                  
 conv2d_39 (Conv2D)          (None, 256, 256, 4)          148       ['dropout_18[0][0]']          
                                                                                            

In order to perform better the metrics of the Dice Coefficient later in the model evaluation, it has been chosen to use a customized Loss `dice_coef_loss` and Metrics `dice_coef_multilabel` for the training of the U-Net.

In [None]:
'''Customized loss for the model'''

def dice_coef_loss(y_true, y_pred):
    smooth = 0.001

    # Flatten the one-hot encoded tensors
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)

    intersection = K.sum(y_true_f * y_pred_f)
    dice_loss = 1 - (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)

    return dice_loss

In [None]:
'''Customized metrics'''

def dice_coefficient(y_true, y_pred):
    intersection = tf.reduce_sum(y_true * y_pred)
    union = tf.reduce_sum(y_true) + tf.reduce_sum(y_pred)
    return (2.0 * intersection) / (union + intersection)

def dice_coef_multilabel(y_true, y_pred):
    dice=0
    for index in range(7):
        dice += dice_coefficient(y_true[:,:,:,index], y_pred[:,:,:,index])
    return dice/7



Early Stopping was chosen in order to prevent overfitting by monitoring a specific metric, in this case the Accuracy and stopping training if the metric stops improving.

In [None]:
def setup_early_stopping(patience, monitor, mode):
    """
    Set up early stopping to prevent overfitting during training.

    Parameters:
    - patience: Number of epochs with no improvement after which training will be stopped.
    - monitor: The quantity to be monitored (e.g., 'val_loss' for validation loss).
    - mode: One of {'auto', 'min', 'max'}. In 'min' mode, training will stop when the monitored quantity has stopped decreasing. In 'max' mode, it will stop when the monitored quantity has stopped increasing.

    Returns:
    - EarlyStopping callback object to be used during model training.
    """
    early_stopping = EarlyStopping(monitor=monitor, patience=patience, mode=mode)
    return early_stopping


In [None]:
early_stopping = setup_early_stopping(patience=5, monitor='accuracy', mode='auto')

Now the model is compiled and trained.

In [None]:
opt = tf.keras.optimizers.Lion()

# Compile the model
model_1.compile(optimizer= opt,
              loss= dice_coef_loss,
              metrics=['accuracy', dice_coef_multilabel] )

In [None]:
''' Model training '''
EPOCHS = 40
BATCH_SIZE = 32

records = model_1.fit(x_train, y_train,
                    batch_size = BATCH_SIZE,
                    epochs = EPOCHS,
                    verbose = 1,
                    callbacks=[early_stopping]
                    )

Epoch 1/40
Epoch 2/40
Epoch 3/40
Epoch 4/40
Epoch 5/40
Epoch 6/40
Epoch 7/40
Epoch 8/40
Epoch 9/40
Epoch 10/40
Epoch 11/40
Epoch 12/40
Epoch 13/40
Epoch 14/40
Epoch 15/40
Epoch 16/40
Epoch 17/40
Epoch 18/40
Epoch 19/40
Epoch 20/40
Epoch 21/40
Epoch 22/40
Epoch 23/40
Epoch 24/40
Epoch 25/40
Epoch 26/40
Epoch 27/40
Epoch 28/40
Epoch 29/40
Epoch 30/40
Epoch 31/40
Epoch 32/40
Epoch 33/40
Epoch 34/40
Epoch 35/40
Epoch 36/40
Epoch 37/40
Epoch 38/40
Epoch 39/40
Epoch 40/40


##Metric
The comparison metric for this project is the Dice Cofficient for multi-class segmentation.

The Dice coefficient is defined by twice the overlapping area of the two masks, over the sum of the area of the two masks.

![](https://miro.medium.com/max/429/1*yUd5ckecHjWZf6hGrdlwzA.png)

The implementation of the dice coefficient is similar to the implementation of the IoU, since both of them explicitely uses that a mask is between 0 and 1.

In [None]:
# The Dice coefficient functions

def dice_coef(y_true, y_pred):

    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = np.sum(y_true_f * y_pred_f)
    smooth = 0.0001
    return (2. * intersection + smooth) / (np.sum(y_true_f) + np.sum(y_pred_f) + smooth)

def dice_coef_multilabel(y_true, y_pred):
    dice = 0
    for index in range(7):
        dice += dice_coef(y_true[:, :, :, index], y_pred[:, :, :, index])
    return dice / 7

In [None]:
#Dice coefficient function that shows the Dice coefficients for each class

def dice_coef_for_each_class(y, y_pred):

    # Ensure both y_true and y_pred have the same data type
    y_pred = y_pred.astype('float32')
    y = y.astype('float32')

    # Calculate the Dice coefficient
    num_classes = y.shape[-1]
    dice_scores = []

    for class_idx in range(num_classes):
        dice = dice_coef(y[..., class_idx], y_pred[..., class_idx])
        dice_scores.append(dice)

    return dice_scores

In [None]:
#Predict the masks of the test set

y_pred = model_1.predict(x_test)

dice = dice_coef_multilabel(y_test, y_pred)



In [None]:
print(f"The Dice coefficient for the test set is {dice}.")

The Dice coefficient for the test set is 0.6201927948012534.


In [None]:
dice_scores = dice_coef_for_each_class(y_test, y_pred)

for class_idx, dice_score in enumerate(dice_scores):
    print(f"Dice coefficient for class {class_idx}: {dice_score}")

Dice coefficient for class 0: 0.05918501719395228
Dice coefficient for class 1: 0.8105888124631282
Dice coefficient for class 2: 0.0006290135519489467
Dice coefficient for class 3: 0.21253295096886082
Dice coefficient for class 4: 0.12392971032437809
Dice coefficient for class 5: 0.00034925769269899106
Dice coefficient for class 6: 0.001356923391973737


Here a visualization of the test dataset with the predicted masks is presented.

In [None]:
for i in range(y_pred.shape[3]):
  show(x_test[100], y_test[110][...,i], ('This is the image', 'This is its relative mask of the channel '+str(i)))
  show(x_test[100], y_pred[110][...,i], ('This is the image', 'This is its relative predicted mask of the channel '+str(i)))

Output hidden; open in https://colab.research.google.com to view.

##Conclusions
The U-Net proved to be a good choice for implementing the multiclass semantic segmentation task.

According to the metrics used, namely the Dice coefficient function, the system worked well but there is definitely room for improvement.

A possible update could be to add **weights of the classes** to the network based on their prevalence in the dataset as they are very unbalanced and the metrics are certainly affected and lowered.

This can also be seen in the Dice coefficient plot for each class, where some classes have a significantly lower value than others.
For example the class 1 has a way better results than the class 5.
