# Consolidated Dataset

## Data Loading

In [None]:
!gdown https://drive.google.com/uc?id=1fjRQBzft-amgePhi_1Hetq2QTSj3MPH-

In [None]:
! unzip -qq /content/consolidated-dataset.zip -d data/

In [None]:
!mv /content/data/content/consolidated-dataset .

In [None]:
!rm -r consolidated-dataset.zip

In [None]:
!rm -r data

In [None]:
import os

def count_files_in_subfolders(directory):

    file_counts = {}
    for root, dirs, files in os.walk(directory):
        # Count only files in each subdirectory, excluding the root directory
        if root != directory:
            file_counts[root] = len([f for f in files if os.path.isfile(os.path.join(root, f))])
    return file_counts

# Replace 'your_directory_path' with the path to your directory
directory_path = 'consolidated-dataset'
file_counts = count_files_in_subfolders(directory_path)

# Printing the results
for path, count in file_counts.items():
    print(f"{path}: {count} files")


## U-Net

### Random split

In [None]:
import tensorflow as tf
import os
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

In [None]:
base_dir = 'consolidated-dataset'
all_towns = ['']  # Include all towns
image_size = (192, 256)  # Example size, adjust as needed
batch_size = 32  # Adjust based on your system's capability

In [None]:
def get_file_paths(base_dir, towns):
    image_paths, mask_paths = [], []
    for town in towns:
        images_dir = os.path.join(base_dir, town, 'Camera_RGB')
        masks_dir = os.path.join(base_dir, town, 'Camera_SemSeg')

        images = [os.path.join(images_dir, f) for f in os.listdir(images_dir) if f.endswith('.png')]
        masks = [os.path.join(masks_dir, f) for f in os.listdir(masks_dir) if f.endswith('.png')]

        image_paths.extend(images)
        mask_paths.extend(masks)

    return image_paths, mask_paths

# Getting file paths for each set
all_images, all_masks = get_file_paths(base_dir, all_towns)

# Split the dataset into train, validation, and test sets
train_ratio = 0.6
validation_ratio = 0.2
test_ratio = 0.2

# First split to separate out the training set
train_images, test_images, train_masks, test_masks = train_test_split(all_images, all_masks, test_size=1 - train_ratio, random_state = 9)

# Second split to divide the remaining data into validation and test sets
val_images, test_images, val_masks, test_masks = train_test_split(test_images, test_masks, test_size=test_ratio/(test_ratio + validation_ratio))

In [None]:
def preprocess_image(image_path):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_png(image, channels=3)
    image = tf.image.resize(image, image_size)
    image = image / 255.0  # Normalize to [0, 1]
    return image

def preprocess_mask(mask_path):
    mask = tf.io.read_file(mask_path)
    mask = tf.image.decode_png(mask, channels=3)  # Load as a 3-channel image
    mask = tf.image.resize(mask, image_size, method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)

    # Extract the first channel that contains class encoding
    mask = mask[:, :, 0]


    mask = (mask == 24)
    mask = tf.cast(mask, tf.float32)  # Convert to float32

    return mask


def load_and_preprocess(image_path, mask_path):
    return preprocess_image(image_path), preprocess_mask(mask_path)

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_masks))
train_dataset = train_dataset.map(load_and_preprocess, num_parallel_calls=tf.data.AUTOTUNE)
train_dataset = train_dataset.batch(batch_size)
train_dataset = train_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_masks))
val_dataset = val_dataset.map(load_and_preprocess, num_parallel_calls=tf.data.AUTOTUNE)
val_dataset = val_dataset.batch(batch_size)
val_dataset = val_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_masks))
test_dataset = test_dataset.map(load_and_preprocess, num_parallel_calls=tf.data.AUTOTUNE)
test_dataset = test_dataset.batch(batch_size)
test_dataset = test_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

In [None]:
def show_sample(dataset):
    for images, masks in dataset.take(1):
        plt.figure(figsize=(30, 10))
        for i in range(5):
            plt.subplot(2, 5, i + 1)
            plt.imshow(images[i])
            plt.axis('off')
            plt.subplot(2, 5, i + 6)
            plt.imshow(masks[i, :, :], cmap='gray')
            plt.axis('off')
        plt.show()

In [None]:
show_sample(train_dataset)

In [None]:
show_sample(val_dataset)

In [None]:
show_sample(test_dataset)

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Lambda, RepeatVector, Reshape
from tensorflow.keras.layers import Conv2D, Conv2DTranspose
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import concatenate
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras import backend as K

In [None]:
height = image_size[0]
width = image_size[1]

In [None]:
def dice_coefficient(y_true, y_pred, smooth=1):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)

def iou(y_true, y_pred, smooth=1):
    intersection = K.sum(K.abs(y_true * y_pred), axis=-1)
    sum_ = K.sum(K.abs(y_true) + K.abs(y_pred), axis=-1)
    jac = (intersection + smooth) / (sum_ - intersection + smooth)
    return jac


In [None]:
from tensorflow.keras.layers import BatchNormalization, Dropout, Activation

def conv_block(input_tensor, num_filters):

    # first layer
    x = Conv2D(num_filters, (3, 3), padding='same')(input_tensor)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    # second layer
    x = Conv2D(num_filters, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    return x

def encoder_block(input_tensor, num_filters):

    x = conv_block(input_tensor, num_filters)
    p = MaxPooling2D((2, 2))(x)
    return x, p

def decoder_block(input_tensor, concat_tensor, num_filters):

    x = Conv2DTranspose(num_filters, (2, 2), strides=(2, 2), padding='same')(input_tensor)
    x = concatenate([x, concat_tensor], axis=-1)
    x = conv_block(x, num_filters)
    return x

input_img = Input((height, width, 3), name='img')

# Encoder
x1, p1 = encoder_block(input_img, 32)
x2, p2 = encoder_block(p1, 64)
x3, p3 = encoder_block(p2, 128)
x4, p4 = encoder_block(p3, 256)

# Bridge
x5 = conv_block(p4, 512)

# Decoder
x6 = decoder_block(x5, x4, 256)
x7 = decoder_block(x6, x3, 128)
x8 = decoder_block(x7, x2, 64)
x9 = decoder_block(x8, x1, 32)

# Output
outputs = Conv2D(1, (1, 1), activation='sigmoid')(x9)

model = Model(inputs=[input_img], outputs=[outputs])
model.compile(optimizer='adam', loss='binary_crossentropy',metrics=['accuracy', dice_coefficient, iou])
model.summary()

In [None]:
callbacks = [
    EarlyStopping(patience=5, verbose=1),
    ReduceLROnPlateau(patience=5, verbose=1),
    ModelCheckpoint('model-192-unet.h5', verbose=1, save_best_only=True, monitor = 'val_iou', mode = 'max')
]

results = model.fit(train_dataset, epochs=20, callbacks=callbacks,
                    validation_data=val_dataset)

In [None]:
# Load the best model (assuming it's saved as 'model-sdc-seg-v2_192.h5')
model = tf.keras.models.load_model('model-192.h5', custom_objects={'dice_coefficient': dice_coefficient, 'iou': iou})

# Evaluate the model on the test dataset
test_loss, test_accuracy, test_dice, test_iou = model.evaluate(test_dataset)

# Print the metrics
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")
print(f"Test Dice Coefficient: {test_dice}")
print(f"Test IoU: {test_iou}")

In [None]:
def plot_predictions(dataset, model, num_examples=5):
    plt.figure(figsize=(15, 10))

    for images, masks in dataset.take(1):
        preds = model.predict(images)

        for i in range(num_examples):
            plt.subplot(3, num_examples, i + 1)
            plt.imshow(images[i])
            plt.title('Image')
            plt.axis('off')

            plt.subplot(3, num_examples, num_examples + i + 1)
            plt.imshow(masks[i, :, :], cmap='gray')
            plt.title('True Mask')
            plt.axis('off')

            plt.subplot(3, num_examples, 2 * num_examples + i + 1)
            plt.imshow(preds[i].squeeze(), cmap='gray')
            plt.title('Predicted Mask')
            plt.axis('off')

    plt.tight_layout()
    plt.show()

# Plot predictions
plot_predictions(test_dataset, model)

In [None]:
!gdown https://drive.google.com/uc?id=1ndiwxCG1XgfsN4RSsclApuXuQLihxKNZ

In [None]:
!unzip -qq surprise.zip

In [None]:
eval_images, eval_masks = get_file_paths('surprise', [''])

In [None]:
test_dataset = tf.data.Dataset.from_tensor_slices((eval_images, eval_masks))
test_dataset = test_dataset.map(load_and_preprocess, num_parallel_calls=tf.data.AUTOTUNE)
test_dataset = test_dataset.batch(batch_size)
test_dataset = test_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

In [None]:
# Evaluate the model on the test dataset
test_loss, test_accuracy, test_dice, test_iou = model.evaluate(test_dataset)

# Print the metrics
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")
print(f"Test Dice Coefficient: {test_dice}")
print(f"Test IoU: {test_iou}")

## Seg-Net

### Random split

In [None]:
import tensorflow as tf
import os
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

In [None]:
base_dir = 'consolidated-dataset'
all_towns = ['']  # Include all towns
image_size = (192, 256)  # Example size, adjust as needed
batch_size = 16  # Adjust based on your system's capability

In [None]:
def get_file_paths(base_dir, towns):
    image_paths, mask_paths = [], []
    for town in towns:
        images_dir = os.path.join(base_dir, town, 'Camera_RGB')
        masks_dir = os.path.join(base_dir, town, 'Camera_SemSeg')

        images = [os.path.join(images_dir, f) for f in os.listdir(images_dir) if f.endswith('.png')]
        masks = [os.path.join(masks_dir, f) for f in os.listdir(masks_dir) if f.endswith('.png')]

        image_paths.extend(images)
        mask_paths.extend(masks)

    return image_paths, mask_paths

# Getting file paths for each set
all_images, all_masks = get_file_paths(base_dir, all_towns)

# Split the dataset into train, validation, and test sets
train_ratio = 0.6
validation_ratio = 0.2
test_ratio = 0.2

# First split to separate out the training set
train_images, test_images, train_masks, test_masks = train_test_split(all_images, all_masks, test_size=1 - train_ratio)

# Second split to divide the remaining data into validation and test sets
val_images, test_images, val_masks, test_masks = train_test_split(test_images, test_masks, test_size=test_ratio/(test_ratio + validation_ratio))

In [None]:
def preprocess_image(image_path):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_png(image, channels=3)
    image = tf.image.resize(image, image_size)
    image = image / 255.0  # Normalize to [0, 1]
    return image

def preprocess_mask(mask_path):
    mask = tf.io.read_file(mask_path)
    mask = tf.image.decode_png(mask, channels=3)  # Load as a 3-channel image
    mask = tf.image.resize(mask, image_size, method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)

    # Extract the first channel that contains class encoding
    mask = mask[:, :, 0]


    mask = (mask == 24)
    mask = tf.cast(mask, tf.float32)  # Convert to float32

    return mask


def load_and_preprocess(image_path, mask_path):
    return preprocess_image(image_path), preprocess_mask(mask_path)

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_masks))
train_dataset = train_dataset.map(load_and_preprocess, num_parallel_calls=tf.data.AUTOTUNE)
train_dataset = train_dataset.batch(batch_size)
train_dataset = train_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_masks))
val_dataset = val_dataset.map(load_and_preprocess, num_parallel_calls=tf.data.AUTOTUNE)
val_dataset = val_dataset.batch(batch_size)
val_dataset = val_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_masks))
test_dataset = test_dataset.map(load_and_preprocess, num_parallel_calls=tf.data.AUTOTUNE)
test_dataset = test_dataset.batch(batch_size)
test_dataset = test_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

In [None]:
def show_sample(dataset):
    for images, masks in dataset.take(1):
        plt.figure(figsize=(30, 10))
        for i in range(5):
            plt.subplot(2, 5, i + 1)
            plt.imshow(images[i])
            plt.axis('off')
            plt.subplot(2, 5, i + 6)
            plt.imshow(masks[i, :, :], cmap='gray')
            plt.axis('off')
        plt.show()

In [None]:
show_sample(train_dataset)

In [None]:
show_sample(val_dataset)

In [None]:
show_sample(test_dataset)

In [None]:
!gdown https://drive.google.com/uc?id=1B6Mf9HAVjGH90TCtBYEumB7OxiuMOjvU

In [None]:
!unzip -qq core.zip

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Lambda, RepeatVector, Reshape
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, BatchNormalization, Activation
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import concatenate
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras import backend as K
from core.layers import MaxPoolingWithArgmax2D, MaxUnPooling2D
from core.segnet import SegNet

In [None]:
height = image_size[0]
width = image_size[1]

In [None]:
def dice_coefficient(y_true, y_pred, smooth=1):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)

def iou(y_true, y_pred, smooth=1):
    intersection = K.sum(K.abs(y_true * y_pred), axis=-1)
    sum_ = K.sum(K.abs(y_true) + K.abs(y_pred), axis=-1)
    jac = (intersection + smooth) / (sum_ - intersection + smooth)
    return jac


In [None]:
def encoder_block(input_tensor, num_filters):
    x = input_tensor

    for n_filter in num_filters:
        x = Conv2D(n_filter, (3, 3), padding='same', kernel_initializer='he_normal')(x)
        x = BatchNormalization()(x)
        x = Activation('relu')(x)

    x, mask = MaxPoolingWithArgmax2D(pool_size=(2, 2))(x)
    return x, mask

def decoder_block(input_tensor, mask, num_filters):
    x = MaxUnPooling2D(size=(2, 2))([input_tensor, mask])

    for n_filter in num_filters:
        x = Conv2D(n_filter, (3, 3), padding='same', kernel_initializer='he_normal')(x)
        x = BatchNormalization()(x)
        x = Activation('relu')(x)

    return x

def build_segnet(input_shape):
    input_img = Input(shape=input_shape, name='Image')

    # Encoder
    x1, m1 = encoder_block(input_img, [32, 32])
    x2, m2 = encoder_block(x1, [64, 64])
    x3, m3 = encoder_block(x2, [128, 128, 128])
    x4, m4 = encoder_block(x3, [256, 256, 256])
    x5, m5 = encoder_block(x4, [256, 256, 256])

    # Decoder
    x6 = decoder_block(x5, m5, [256, 256, 256])
    x7 = decoder_block(x6, m4, [256, 256, 128])
    x8 = decoder_block(x7, m3, [128, 128, 64])
    x9 = decoder_block(x8, m2, [64, 32])
    x10 = decoder_block(x9, m1, [32])

    # Output
    outputs = Conv2D(1, (1, 1), activation='sigmoid')(x10)

    return Model(inputs=[input_img], outputs=[outputs])

height = image_size[0]
width = image_size[1]

model = build_segnet((height, width, 3))
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy', dice_coefficient, iou])
model.summary()

In [None]:
callbacks = [
    EarlyStopping(patience=5, verbose=1),
    ReduceLROnPlateau(patience=5, verbose=1),
    ModelCheckpoint('model-192-segnet.h5', verbose=1, save_best_only=True, monitor = 'val_iou', mode = 'max')
]

results = model.fit(train_dataset, epochs=30, callbacks=callbacks,
                    validation_data=val_dataset)

In [None]:
# Load the best model (assuming it's saved as 'model-sdc-seg-v2_192.h5')
model = tf.keras.models.load_model('model-192-segnet.h5', custom_objects={'dice_coefficient': dice_coefficient, 'iou': iou})

# Evaluate the model on the test dataset
test_loss, test_accuracy, test_dice, test_iou = model.evaluate(test_dataset)

# Print the metrics
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")
print(f"Test Dice Coefficient: {test_dice}")
print(f"Test IoU: {test_iou}")

In [None]:
def plot_predictions(dataset, model, num_examples=5):
    plt.figure(figsize=(15, 10))

    for images, masks in dataset.take(1):
        preds = model.predict(images)

        for i in range(num_examples):
            plt.subplot(3, num_examples, i + 1)
            plt.imshow(images[i])
            plt.title('Image')
            plt.axis('off')

            plt.subplot(3, num_examples, num_examples + i + 1)
            plt.imshow(masks[i, :, :], cmap='gray')
            plt.title('True Mask')
            plt.axis('off')

            plt.subplot(3, num_examples, 2 * num_examples + i + 1)
            plt.imshow(preds[i].squeeze(), cmap='gray')
            plt.title('Predicted Mask')
            plt.axis('off')

    plt.tight_layout()
    plt.show()

# Plot predictions
plot_predictions(test_dataset, model)

In [None]:
!gdown https://drive.google.com/uc?id=1ndiwxCG1XgfsN4RSsclApuXuQLihxKNZ

In [None]:
!unzip -qq surprise.zip

In [None]:
eval_images, eval_masks = get_file_paths('surprise', [''])

In [None]:
test_dataset = tf.data.Dataset.from_tensor_slices((eval_images, eval_masks))
test_dataset = test_dataset.map(load_and_preprocess, num_parallel_calls=tf.data.AUTOTUNE)
test_dataset = test_dataset.batch(batch_size)
test_dataset = test_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

In [None]:
# Evaluate the model on the test dataset
test_loss, test_accuracy, test_dice, test_iou = model.evaluate(test_dataset)

# Print the metrics
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")
print(f"Test Dice Coefficient: {test_dice}")
print(f"Test IoU: {test_iou}")

## Evaluation of Trained Models

In [None]:
!gdown https://drive.google.com/uc?id=1abQH_9ET6CRzzZ5YAysOv-NLH4tCv7Vf

In [None]:
!gdown https://drive.google.com/uc?id=1vbzH_BlrZ_dAlAeg0VZMasriMxI0RQCJ

In [None]:
# Load the best model (assuming it's saved as 'model-sdc-seg-v2_192.h5')
model_unet = tf.keras.models.load_model('model-192-unet.h5', custom_objects={'dice_coefficient': dice_coefficient, 'iou': iou})

# Evaluate the model on the test dataset
test_loss, test_accuracy, test_dice, test_iou = model_unet.evaluate(test_dataset)

# Print the metrics
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")
print(f"Test Dice Coefficient: {test_dice}")
print(f"Test IoU: {test_iou}")

In [None]:
# Load the best model (assuming it's saved as 'model-sdc-seg-v2_192.h5')
model_segnet = tf.keras.models.load_model('model-192-segnet.h5', custom_objects={
    'MaxPoolingWithArgmax2D': MaxPoolingWithArgmax2D,
    'MaxUnPooling2D': MaxUnPooling2D,
    'dice_coefficient': dice_coefficient,
    'iou': iou
})

# Evaluate the model on the test dataset
test_loss, test_accuracy, test_dice, test_iou = model_segnet.evaluate(test_dataset)

# Print the metrics
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")
print(f"Test Dice Coefficient: {test_dice}")
print(f"Test IoU: {test_iou}")

In [None]:
eval_images, eval_masks = get_file_paths('surprise', [''])

In [None]:
eval_dataset = tf.data.Dataset.from_tensor_slices((eval_images, eval_masks))
eval_dataset = eval_dataset.map(load_and_preprocess, num_parallel_calls=tf.data.AUTOTUNE)
eval_dataset = eval_dataset.batch(batch_size)
eval_dataset = eval_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

In [None]:
# Evaluate the model on the test dataset
test_loss, test_accuracy, test_dice, test_iou = model_unet.evaluate(eval_dataset)

# Print the metrics
print(f"Eval Loss: {test_loss}")
print(f"Eval Accuracy: {test_accuracy}")
print(f"Eval Dice Coefficient: {test_dice}")
print(f"Eval IoU: {test_iou}")

In [None]:
# Evaluate the model on the test dataset
test_loss, test_accuracy, test_dice, test_iou = model_segnet.evaluate(eval_dataset)

# Print the metrics
print(f"Eval Loss: {test_loss}")
print(f"Eval Accuracy: {test_accuracy}")
print(f"Eval Dice Coefficient: {test_dice}")
print(f"Eval IoU: {test_iou}")

In [None]:
def plot_predictions(dataset, model, num_examples=5):
    plt.figure(figsize=(15, 10))

    for images, masks in dataset.take(1):
        preds = model.predict(images)

        for i in range(num_examples):
            plt.subplot(3, num_examples, i + 1)
            plt.imshow(images[i])
            plt.title('Image')
            plt.axis('off')

            plt.subplot(3, num_examples, num_examples + i + 1)
            plt.imshow(masks[i, :, :], cmap='gray')
            plt.title('True Mask')
            plt.axis('off')

            plt.subplot(3, num_examples, 2 * num_examples + i + 1)
            plt.imshow(preds[i].squeeze(), cmap='gray')
            plt.title('Predicted Mask')
            plt.axis('off')

    plt.tight_layout()
    plt.show()

In [None]:
# Plot predictions
plot_predictions(eval_dataset, model_unet)

In [None]:
# Plot predictions
plot_predictions(eval_dataset, model_segnet)