In [1]:
import os
import cv2
import math
import datetime
import numpy as np
from PIL import Image
from tqdm import tqdm
from enum import Enum
import functions as f
import tensorflow as tf
from patchify import patchify
from keras import backend as K
import matplotlib.pyplot as plt
from keras.layers import Rescaling
from matplotlib.patches import Rectangle
from keras.models import Model, load_model
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from keras.utils import to_categorical, image_dataset_from_directory, plot_model
from keras.callbacks import ModelCheckpoint, CSVLogger, EarlyStopping, ReduceLROnPlateau, TensorBoard
from keras.layers import Input, Conv2D, MaxPooling2D, concatenate, Conv2DTranspose, Dropout

In [None]:
EPOCHS = 100
NUM_ClASSES = 10
BATCH_SIZE = 32
IMAGE_SIZE = 256
INPUT_SIZE = (IMAGE_SIZE, IMAGE_SIZE, 3)
DIR_DATA = r"E:\Segmentation\datasets\FloodNet-Supervised_v1.0"

In [None]:
print(f"Training data: {len(os.listdir(os.path.join(DIR_DATA, 'train', 'train-org-img')))}")
print(f"Validation data: {len(os.listdir(os.path.join(DIR_DATA, 'val', 'val-org-img')))}")
print(f"Test data: {len(os.listdir(os.path.join(DIR_DATA, 'test', 'test-org-img')))}")

In [None]:
def load_images_and_masks(data_dir, split="train"):
    image_list = []
    mask_list = []

    for file in tqdm(os.listdir(os.path.join(data_dir, f"{split}/{split}-org-img"))):
        if file.endswith(".jpg"): 
            image_path = os.path.join(data_dir,  f"{split}/{split}-org-img", file)
            mask_path = (os.path.join(data_dir,  f"{split}/{split}-label-img", file[:-4] + "_lab.png"))

            image = cv2.imread(image_path)
            mask = cv2.imread(mask_path)

            image_list.append(np.asarray(image))
            mask_list.append(np.asarray(mask))

    return image_list, mask_list


X_val, Y_val = load_images_and_masks(f'{DIR_DATA}/', "val")
print(f"Validation data: {len(X_val)}")

Following is the source of color map used:
Source: https://github.com/farshadsafavi/BiseNetV2/blob/main/colors.txt

In [None]:
class_map= {'Background':0, 'Building-flooded':1, 'Building-non-flooded':2, 'Road-flooded':3, 'Road-non-flooded':4, 'Water':5, 'Tree':6, 'Vehicle':7, 'Pool':8, 'Grass':9}

color_map = {
    "Background": [0, 0, 0],
    "Building-flooded": [255, 0, 0],
    "Building-non-flooded": [0, 255, 0],
    "Road-flooded": [0, 255, 120],
    "Road-non-flooded": [0, 0, 255],
    "Water": [255, 0, 255],
    "Tree": [70, 70, 70],
    "Vehicle": [102, 102, 156],
    "Pool": [190, 153, 153],
    "Grass": [180, 165, 180]
}

handles = [
    Rectangle((0, 0), 1, 1, color=np.array(c)/255) for n, c in color_map.items()
]
labels = [n for n, c in color_map.items()]

In [None]:
def display_images_with_masks(images, masks, class_map, color_map):
    for i in range(5):
        plt.figure(figsize=(12, 6))

        plt.subplot(1, 3, 1)
        plt.imshow(cv2.cvtColor(images[i], cv2.COLOR_BGR2RGB))
        plt.title('Image')

        plt.subplot(1, 3, 2)
        mask_colored = np.zeros_like(images[i], dtype=np.uint8)
        for class_name, class_idx in class_map.items():
            color = color_map[class_name]
            mask_indices = np.where(masks[i] == class_idx)
            mask_colored[mask_indices[0], mask_indices[1], :] = color
        plt.imshow(cv2.cvtColor(mask_colored, cv2.COLOR_BGR2RGB))
        plt.title('Colored Mask')

        plt.subplot(1, 3, 3)
        plt.imshow(masks[i], cmap='viridis')
        plt.title('Original Mask')

        plt.legend(handles, labels, bbox_to_anchor =(-0.8,-0.5), loc='lower center', ncol=5)
        plt.show()

display_images_with_masks(X_val[10:], Y_val[10:], class_map, color_map)

In [None]:
def load_combined_generator(image_path, mask_path, batch_size, image_size):
    image_generator = image_dataset_from_directory(
        directory=image_path,
        labels=None,
        batch_size=batch_size,
        image_size=(image_size, image_size),
        shuffle=True,
        seed=42,
        label_mode=None,
        color_mode="rgb"
    )

    mask_generator = image_dataset_from_directory(
        directory=mask_path,
        labels=None,
        batch_size=batch_size,
        image_size=(image_size, image_size),
        shuffle=True,
        seed=42,
        label_mode=None,
        color_mode="grayscale"
    )

    combined_generator = tf.data.Dataset.zip((image_generator, mask_generator))

    return combined_generator

# Training set
train_generator = load_combined_generator(
    f'{DIR_DATA}\\train\\train-org-img',
    f'{DIR_DATA}\\train\\train-label-img',
    BATCH_SIZE,
    IMAGE_SIZE
)

# Validation set
val_generator = load_combined_generator(
    f'{DIR_DATA}\\val\\val-org-img',
    f'{DIR_DATA}\\val\\val-label-img',
    BATCH_SIZE,
    IMAGE_SIZE
)

# Test set
test_generator = load_combined_generator(
    f'{DIR_DATA}\\test\\test-org-img',
    f'{DIR_DATA}\\test\\test-label-img',
    BATCH_SIZE,
    IMAGE_SIZE
)


In [None]:
def build_unet(img_shape, height=4000, width=3000, channel=3, n_classes=10):
    # input layer shape is equal to patch image size
    inputs = Input(shape=img_shape)

    # rescale images from (0, 255) to (0, 1)
    rescale = Rescaling(scale=1. / 255, input_shape=(height, width, channel))(inputs)
    previous_block_activation = rescale  # Set aside residual

    contraction = {}
    # # Contraction path: Blocks 1 through 5 are identical apart from the feature depth
    for f in [16, 32, 64, 128]:
        x = Conv2D(f, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(
            previous_block_activation)
        x = Dropout(0.1)(x)
        x = Conv2D(f, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(x)
        contraction[f'conv{f}'] = x
        x = MaxPooling2D((2, 2))(x)
        previous_block_activation = x

    c5 = Conv2D(160, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(
        previous_block_activation)
    c5 = Dropout(0.2)(c5)
    c5 = Conv2D(160, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c5)
    previous_block_activation = c5

    # Expansive path: Second half of the network: upsampling inputs
    for f in reversed([16, 32, 64, 128]):
        x = Conv2DTranspose(f, (2, 2), strides=(2, 2), padding='same')(previous_block_activation)
        x = concatenate([x, contraction[f'conv{f}']])
        x = Conv2D(f, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(x)
        x = Dropout(0.2)(x)
        x = Conv2D(f, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(x)
        previous_block_activation = x

    outputs = Conv2D(filters=n_classes, kernel_size=(1, 1), activation="softmax")(previous_block_activation)

    return Model(inputs=inputs, outputs=outputs)

model = f.build_unet(img_shape=INPUT_SIZE, n_classes=1)
model.summary()

In [None]:
# Modelcheckpoint
checkpointer = ModelCheckpoint("/models/weights-improvement-{epoch:02d}-{val_accuracy:.2f}.hdf5", verbose=1, save_best_only=True)

callbacks = [
      EarlyStopping(patience=2, monitor='val_loss'),
        TensorBoard(log_dir='logs')]

def iou_coefficient(y_true, y_pred, smooth=1):
    intersection = K.sum(K.abs(y_true * y_pred), axis=[1, 2, 3])
    union = K.sum(y_true, [1, 2, 3]) + K.sum(y_pred, [1, 2, 3]) - intersection
    iou = K.mean((intersection + smooth) / (union + smooth), axis=0)
    return iou

# jaccard similarity: the size of the intersection divided by the size of the union of two sets
def jaccard_index(y_true, y_pred):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (intersection + 1.0) / (K.sum(y_true_f) + K.sum(y_pred_f) - intersection + 1.0)

In [None]:
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=["accuracy", iou_coefficient, jaccard_index])
model.summary()

In [None]:
results = model.fit(train_generator, validation_data=val_generator, batch_size=BATCH_SIZE, epochs=EPOCHS, callbacks=callbacks)