# Train U-Net on the CAS Landslide Detection Dataset

*Authors: Abdelouahed Drissi*

## Dataset

In [2]:
import os
import tensorflow as tf
from tensorflow.keras import backend
import matplotlib.pyplot as plt

import os
import pandas as pd
import numpy as np

from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, Conv2DTranspose, BatchNormalization, Dropout, Lambda

from tensorflow import keras
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from losses import weighted_bce


2025-03-22 11:36:57.524648: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 AVX512F AVX512_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2025-03-22 11:36:57.592656: I tensorflow/core/util/port.cc:104] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.


### Function to read the image file

In [1]:
def load_image_file(image_path, mask_path):
    image = tf.io.read_file(image_path)
    mask = tf.io.read_file(mask_path)

    image = tf.image.decode_png(image, channels=3)
    mask = tf.image.decode_png(mask, channels=1)

    return {"image": image, "segmentation_mask": mask}

### Costum Functions for data augmentation

In [None]:
# Define number of augmented copies per original image
AUGMENTATION_FACTOR = 2

# Define augmentation functions
def random_flip(image, mask):
    """Random horizontal and vertical flips"""
    if tf.random.uniform(()) > 0.5:
        image = tf.image.flip_left_right(image)
        mask = tf.image.flip_left_right(mask)
    
    if tf.random.uniform(()) > 0.5:
        image = tf.image.flip_up_down(image)
        mask = tf.image.flip_up_down(mask)
    
    return image, mask

def random_rotate(image, mask):
    """Random rotation (0°, 90°, 180°, 270°)"""
    k = tf.random.uniform(shape=[], minval=0, maxval=4, dtype=tf.int32)
    image = tf.image.rot90(image, k=k)
    mask = tf.image.rot90(mask, k=k)
    return image, mask

def random_brightness(image, mask):
    """Random brightness adjustment"""
    image = tf.image.random_brightness(image, max_delta=2.5)
    return image, mask

def random_contrast(image, mask):
    """Random contrast adjustment"""
    image = tf.image.random_contrast(image, lower=0.2, upper=1.4)
    return image, mask

def augment_data(image, mask):
    """Apply all augmentations sequentially"""
    image, mask = random_flip(image, mask)
    image, mask = random_rotate(image, mask)
    image, mask = random_brightness(image, mask)
    image, mask = random_contrast(image, mask)
    return image, mask

### Loading the dataset

In [None]:
# Dataset paths
train_image_dir = "./dataset/train/images"
train_mask_dir = "./dataset/train/masks"
valid_image_dir = "./dataset/validation/images"
valid_mask_dir = "./dataset/validation/masks"
test_image_dir = "./dataset/test/images"
test_mask_dir = "./dataset/test/masks"

# Load datasets and match images with masks
def load_data(image_dir, mask_dir):
    image_names = sorted(os.listdir(image_dir))
    mask_names = sorted(os.listdir(mask_dir))
    pairs = []
    for img_name in image_names:
        mask_name = img_name.replace("image", "mask")
        if mask_name in mask_names:
            pairs.append((os.path.join(image_dir, img_name), os.path.join(mask_dir, mask_name)))
    data = [load_image_file(image_path, mask_path) for image_path, mask_path in pairs]
    return data
 
# Modified load_data function with augmentation
def load_data_with_augmentation(image_dir, mask_dir):    
    # Load original data
    original_data = load_data(image_dir, mask_dir)

    # Generate augmented data
    augmented_data = []
    for datapoint in original_data:
        for _ in range(AUGMENTATION_FACTOR):
            # Apply random augmentations
            image = datapoint["image"]
            mask = datapoint["segmentation_mask"]
            
            # Apply augmentation chain
            aug_image, aug_mask = augment_data(image, mask)
            augmented_data.append({"image": aug_image, "segmentation_mask": aug_mask})
    
    # Combine original + augmented data
    return original_data + augmented_data

# Create augmented datasets
data_train = load_data_with_augmentation(train_image_dir, train_mask_dir)
data_valid = load_data(valid_image_dir, valid_mask_dir)
data_test = load_data(test_image_dir, test_mask_dir)

len(data_train), len(data_valid), len(data_test)


### Mean an STD calculation
#### Run this cell if you want to recalculate Mean and STD

In [None]:
def compute_dataset_statistics(image_dir):
    """
    Compute mean and standard deviation for each band in the dataset.
    """
    image_paths = sorted([os.path.join(image_dir, f) for f in os.listdir(image_dir)])
    sum_ = np.zeros(3)
    sum_sq = np.zeros(3)
    count = 0

    for img_path in image_paths:
        
        image = tf.io.read_file(img_path)
        image = tf.image.decode_png(image, channels=3)
        
        # Resize to target size (if needed)
        image = tf.image.resize(image, (image_size, image_size), method="bilinear")
        
        sum_ += np.mean(image, axis=(0, 1))
        sum_sq += np.mean(image**2, axis=(0, 1))
        count += 1

    mean = sum_ / count
    std = np.sqrt((sum_sq / count) - (mean ** 2))
    return mean, std

# Compute mean/std for the training data
mean, std = compute_dataset_statistics(train_image_dir)

# Convert to tensors for TensorFlow operations
mean = tf.constant(mean, dtype=tf.float32)
std = tf.constant(std, dtype=tf.float32)

### Normalization and Image Resizing

In [None]:
# Normalize and preprocess images and masks
image_size = 256
# Original statistics from the CAS Dataset
mean = tf.constant([0.485, 0.456, 0.406])
std = tf.constant([0.229, 0.224, 0.225])

def normalize(input_image, input_mask):
    input_image = tf.image.convert_image_dtype(input_image, tf.float32)
    #input_image = (input_image - mean[None, None, :]) / tf.maximum(std[ None, None, :], tf.keras.backend.epsilon()) # Uncomment this line if you want to normalize to [-1 1]
    input_image = (input_image ) / 255.0    # Normalize to [0 1] range
    input_mask = input_mask / 255.0         # Normalize to [0 1] range
   
    return input_image, input_mask


def load_image(datapoint):
    input_image = tf.image.resize(datapoint["image"], (image_size, image_size))
    input_mask = tf.image.resize(
        datapoint["segmentation_mask"],
        (image_size, image_size),
        method="bilinear",
    )
    
    input_image, input_mask = normalize(input_image, input_mask)
    return ({"pixel_values": input_image}, input_mask)

In [None]:
train_data = [load_image(datapoint) for datapoint in data_train]
valid_data = [load_image(datapoint) for datapoint in data_valid]
test_data = [load_image(datapoint) for datapoint in data_test]

### Build input pipeline

In [None]:
batch_size = 16
auto = tf.data.AUTOTUNE
# Create dataset generators
def generator(data):
    for datapoint in data:
        yield datapoint


train_ds = tf.data.Dataset.from_generator(
    lambda: generator(train_data),
    output_types=({"pixel_values": tf.float32}, tf.float32),
    output_shapes=({"pixel_values": (image_size, image_size, 3)}, (image_size, image_size,1))
).cache().shuffle(batch_size * 10).batch(batch_size).repeat().prefetch(auto)

valid_ds = tf.data.Dataset.from_generator(
    lambda: generator(valid_data),
    output_types=({"pixel_values": tf.float32}, tf.float32),
    output_shapes=({"pixel_values": (image_size, image_size, 3)}, (image_size, image_size,1))
).batch(batch_size).repeat().prefetch(auto)

test_ds = tf.data.Dataset.from_generator(
    lambda: generator(test_data),
    output_types=({"pixel_values": tf.float32}, tf.float32),
    output_shapes=({"pixel_values": (image_size, image_size, 3)}, (image_size, image_size,1))
).batch(batch_size).prefetch(auto)


### Performance Metrics Definition

In [None]:
def recall(y_true, y_pred):
    y_pred = tf.round(y_pred)
    true_positives = tf.reduce_sum(tf.round(y_true * y_pred))
    possible_positives = tf.reduce_sum(tf.round(y_true))
    return true_positives / (tf.cast(possible_positives,tf.float32) + tf.keras.backend.epsilon())

def precision(y_true, y_pred):
    y_pred = tf.round(y_pred)
    true_positives = tf.reduce_sum(tf.round(y_true * y_pred))
    predicted_positives = tf.reduce_sum(tf.round(y_pred))
    return true_positives / (predicted_positives + tf.keras.backend.epsilon())

def f1_score(y_true, y_pred):
    p = precision(y_true, y_pred)
    r = recall(y_true, y_pred)
    return 2 * ((p * r) / (p + r + tf.keras.backend.epsilon()))

def f2_score(y_true, y_pred):
    """F2-score prioritizing recall over precision"""
    
    # Precision and recall with stability
    p = precision(y_true, y_pred)
    r = recall(y_true, y_pred)
    
    # F-beta calculation (beta=2)
    beta_sq = 4
    return (1 + beta_sq) * (p * r) / (beta_sq * p + r + tf.keras.backend.epsilon())

def iou_score(y_true, y_pred): 
    y_pred = tf.round(y_pred)
    intersection = tf.reduce_sum(y_true * y_pred)
    union = tf.reduce_sum(y_true + y_pred) - intersection
    return intersection / (union + tf.keras.backend.epsilon())

def miss_rate(y_true, y_pred):
    """Percentage of undetected landslides"""
    y_pred = tf.round(y_pred)
    false_negatives = tf.reduce_sum(y_true * (1 - y_pred))
    possible_positives = tf.reduce_sum(y_true)
    
    return tf.where(
        possible_positives > 0,
        false_negatives / (possible_positives + tf.keras.backend.epsilon()),
        0.0  # No landslides to miss
    )

def false_positive_rate(y_true, y_pred):
    """FP rate with edge case handling"""
    y_pred = tf.round(y_pred)
    
    false_positives = tf.reduce_sum((1 - y_true) * y_pred)
    true_negatives = tf.reduce_sum((1 - y_true) * (1 - y_pred))
    
    return tf.where(
        (false_positives + true_negatives) > 0,
        false_positives / (false_positives + true_negatives + tf.keras.backend.epsilon()),
        0.0  # No negative samples
    )

## Model

In [None]:
from tensorflow.keras.layers import Dropout, BatchNormalization

# Image bands
img_bands = 3
# Loss function
loss=weighted_bce

def unet(lr,filtersFirstLayer,input_size = (image_size,image_size,img_bands)):
    inputs = Input(input_size, name="pixel_values")
    conv1 = Conv2D(filtersFirstLayer, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(inputs)
    conv1 = BatchNormalization()(conv1)
    conv1 = Conv2D(filtersFirstLayer, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(conv1)
    conv1 = BatchNormalization()(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
    #pool1 = Dropout(Dropout_Rate)(pool1)

    conv2 = Conv2D(filtersFirstLayer*2, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(pool1)
    conv2 = BatchNormalization()(conv2)
    conv2 = Conv2D(filtersFirstLayer*2, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(conv2)
    conv2 = BatchNormalization()(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
    #pool2 = Dropout(Dropout_Rate)(pool2)

    conv3 = Conv2D(filtersFirstLayer*4, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(pool2)
    conv3 = BatchNormalization()(conv3)
    conv3 = Conv2D(filtersFirstLayer*4, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(conv3)
    conv3 = BatchNormalization()(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)
    #pool3 = Dropout(Dropout_Rate)(pool3)

    conv4 = Conv2D(filtersFirstLayer*8, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(pool3)
    conv4 = BatchNormalization()(conv4)
    conv4 = Conv2D(filtersFirstLayer*8, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(conv4)
    conv4 = BatchNormalization()(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(conv4)
    #pool4 = Dropout(Dropout_Rate)(pool4)

    conv5 = Conv2D(filtersFirstLayer*16, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(pool4)
    conv5 = BatchNormalization()(conv5)
    conv5 = Conv2D(filtersFirstLayer*16, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(conv5)
    conv5 = BatchNormalization()(conv5)
    
    up6 = Conv2D(filtersFirstLayer*8, 2, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(UpSampling2D(size = (2,2))(conv5))
    merge6 = concatenate([conv4,up6], axis = 3)
    conv6 = Conv2D(filtersFirstLayer*8, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(merge6)
    conv6 = BatchNormalization()(conv6)
    conv6 = Conv2D(filtersFirstLayer*8, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(conv6)
    conv6 = BatchNormalization()(conv6)
    
    up7 = Conv2D(filtersFirstLayer*4, 2, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(UpSampling2D(size = (2,2))(conv6))
    merge7 = concatenate([conv3,up7], axis = 3)
    conv7 = Conv2D(filtersFirstLayer*4, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(merge7)
    conv7 = BatchNormalization()(conv7)
    conv7 = Conv2D(filtersFirstLayer*4, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(conv7)
    conv7 = BatchNormalization()(conv7)
    
    up8 = Conv2D(filtersFirstLayer*2, 2, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(UpSampling2D(size = (2,2))(conv7))
    merge8 = concatenate([conv2,up8], axis = 3)
    conv8 = Conv2D(filtersFirstLayer*2, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(merge8)
    conv8 = BatchNormalization()(conv8)
    conv8 = Conv2D(filtersFirstLayer*2, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(conv8)
    conv8 = BatchNormalization()(conv8)
    
    up9 = Conv2D(filtersFirstLayer, 2, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(UpSampling2D(size = (2,2))(conv8))
    merge9 = concatenate([conv1,up9], axis = 3)
    conv9 = Conv2D(filtersFirstLayer, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(merge9)
    conv9 = BatchNormalization()(conv9)
    conv9 = Conv2D(filtersFirstLayer, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(conv9)
    conv9 = BatchNormalization()(conv9)
    conv9 = Conv2D(2, 3, activation = 'relu', padding = 'same', kernel_initializer = 'glorot_normal')(conv9)
    conv9 = BatchNormalization()(conv9)
    conv10 = Conv2D(1, 1, activation = 'sigmoid')(conv9)
    
    model = Model(inputs=inputs,  outputs=conv10)
    
    model.compile(
        optimizer = Adam(learning_rate = lr)
        , loss = loss
        , metrics=[
                    recall,
                    f2_score,
                    miss_rate,
                    false_positive_rate,
                    tf.keras.metrics.Precision(name='precision')  # Monitor but don't optimize
                ]
    )
    print("Model input shape:", model.input_shape)  # Expected: (None, image_size, image_size, 3)

    return model

In [None]:
import gc
# Training and evaluation loop parameters
filters = [4,8,16,32]
learning_rates = [10e-3, 5e-4, 10e-4, 5e-5, 10e-5]
batch_sizes = [4,8,16,32]
epochs = 100

steps_per_epoch = len(train_data) // batch_size  # Number of batches per epoch

In [None]:

for filter_count in filters:
    for lr in learning_rates:
        for batch in batch_sizes:
            # Clean up memory
            model = None
            tf.keras.backend.clear_session() 
            gc.collect() 
            print(f"Filters: {filter_count}, Learning Rate: {lr}, Batch Size: {batch}")
            model = unet(lr=lr, filtersFirstLayer=filter_count, input_size=(image_size, image_size, 3))
            early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=30, restore_best_weights=True)

            # Save best model during training
            checkpoint_path = f'./dataset/unet/weights/unet_filters_{filter_count}_batch_{batch}_lr_{lr}.keras'
            model_checkpoint = tf.keras.callbacks.ModelCheckpoint(checkpoint_path, monitor='val_loss', save_best_only=True)

            # Train the model
            history = model.fit(
                train_ds,
                validation_data=valid_ds,
                epochs=epochs,
                steps_per_epoch=steps_per_epoch,
                validation_steps=len(valid_data) // batch_size,
                callbacks=[early_stop, model_checkpoint],
                verbose=1
            )

            # Final evaluation
            results = model.evaluate(test_ds)
            print(f"Test Recall: {results[1]:.2%}, Miss Rate: {results[3]:.2%}")

### Testing and Metric Comparison Across Checkpoints

In [None]:
results_dict = {
    "model": [], "batch_size": [], "learning_rate": [], "filters": [],
    "precision": [], "recall": [], "f1_score": [], "iou_score": [],
    "f2_score": [],"miss_rate": [],"false_positive_rate": [],

}

checkpoint_path = f'./dataset/unet/weights/'
for filename in os.listdir(checkpoint_path):
    # Only load files with the proper model file extension
    if filename.endswith(".keras"): 
        # Clean up memory
        model = None
        tf.keras.backend.clear_session() 
        gc.collect()
             
        # Define the model structure
        filter_count = int(filename.split('_')[2])
        lr = float(filename.split('_')[6].split('.keras')[0])
        batch = int(filename.split('_')[4])
        model = unet(lr=lr, filtersFirstLayer=filter_count, input_size=(image_size, image_size, 3))
        
        # Load best model weights
        model.load_weights(os.path.join(checkpoint_path, filename))
        print(f"Evaluating model with checkpoint: {filename}")
        
        # Evaluate on test data
        y_true = []
        y_pred = []
        for sample in test_ds:
            pred = model.predict(sample[0]["pixel_values"], verbose=0)
            y_true.append(sample[1].numpy())
            y_pred.append(pred)
        
        y_true = np.concatenate(y_true, axis=0)
        y_pred = np.concatenate(y_pred, axis=0)
        
         # Ensure both y_true and y_pred have the same shape for metrics
        if y_pred.shape[-1] == 1:
            y_pred = np.squeeze(y_pred, axis=-1)  # Remove the last dimension if it's 1
        if y_true.shape[-1] == 1:
            y_true = np.squeeze(y_true, axis=-1)

        # Compute and collect metrics
        precision_val = precision(y_true, y_pred).numpy()
        recall_val = recall(y_true, y_pred).numpy()
        f1_val = f1_score(y_true, y_pred).numpy()
        iou_val = iou_score(y_true, y_pred).numpy()
        
        f2_val = f2_score(y_true, y_pred).numpy()
        miss_rate_val = miss_rate(y_true, y_pred).numpy()
        fpr_val = false_positive_rate(y_true, y_pred).numpy()

        results_dict["precision"].append(precision_val)
        results_dict["recall"].append(recall_val)
        results_dict["f1_score"].append(f1_val)
        results_dict["iou_score"].append(iou_val)
        results_dict["f2_score"].append(f2_val)
        results_dict["miss_rate"].append(miss_rate_val)
        results_dict["false_positive_rate"].append(fpr_val)

        results_dict["model"].append("U-Net")
        results_dict["batch_size"].append(batch)
        results_dict["learning_rate"].append(lr)
        results_dict["filters"].append(filter_count)

# Convert results_dict to DataFrame and save as CSV
results_df = pd.DataFrame(results_dict)
results_df.to_csv("dataset/unet/results/results_unet.csv", index=False)

print("Validation and metric calculation completed, results saved.")

### Sample Predictions (Ground Truth vs. Predicted Masks for model with the best F2_score)

In [None]:
def plot_sample_predictions(test_ds, model, num_samples=3):
    plt.figure(figsize=(15, num_samples * 5))
    
    for i, sample in enumerate(test_ds.take(num_samples)):
        input_image = sample[0]["pixel_values"]  # Taking first image in batch
        ground_truth_mask = sample[1]
        
        # Predict
        pred_mask = model.predict(input_image, verbose=0)
        
        image = input_image.numpy()[0]
        #print(input_image)
        #image = (image - image.min()) / (image.max() - image.min())  # Normalize to [0, 1]
        #print(image)
        # Plot input, ground truth, and prediction
        plt.subplot(num_samples, 3, i*3 + 1)
        plt.imshow(image)
        plt.title("Input Image")
        plt.axis("off")

        plt.subplot(num_samples, 3, i*3 + 2)
        plt.imshow(ground_truth_mask.numpy()[0], cmap='gray')
        plt.title("Ground Truth Mask")
        plt.axis("off")

        pred_mask[0] = pred_mask[0]
        plt.subplot(num_samples, 3, i*3 + 3)
        plt.imshow(pred_mask[0]>0.6, cmap='gray')
        plt.title("Predicted Mask")
        plt.axis("off")
    
    plt.tight_layout()
    plt.savefig("dataset/unet/finalresults/sample_predictions_dataAugmentedmore.png")
    plt.show()


# Find the model with the best f1_score
results_df = pd.read_csv("dataset/unet/results/results_unet.csv")
maxIndex = results_df["f2_score"].idxmax()
filter_count = results_df["filters"][maxIndex]
lr = results_df["learning_rate"][maxIndex]
batch_size = results_df["batch_size"][maxIndex]

filename = f'./dataset/unet/weights/unet_filters_{filter_count}_batch_{batch_size}_lr_{lr}.keras'

# Load best model weights
model = unet(lr=lr, filtersFirstLayer=filter_count, input_size=(image_size, image_size, 3))
model.load_weights(filename)

print(f"Plot sample from file: {filename}")
plot_sample_predictions(train_ds, model,10)
