In [None]:
import os
import random
from datetime import datetime

import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from skimage.transform import rotate

from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, BatchNormalization, Activation, Dense, Dropout
from tensorflow.keras.layers import MaxPooling2D, GlobalMaxPool2D
from tensorflow.keras.layers import Conv2D, Conv2DTranspose
from tensorflow.keras.layers import concatenate, add
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img
from tensorflow.keras.metrics import Recall, Precision

In [None]:
# images attributes 
img_size = 256 

# training attributes
batch_size = 64
epochs = 50

# data paths
project_path = "./"
orto_path = project_path + "data/geoportal_orto/"
masks_path = project_path + "data/geoportal_build_mask/"

In [None]:
# list of names all images in the given path
ids = sorted(os.listdir(orto_path))
test_ids = sorted(os.listdir(test_orto_path))

# split train and valid
ids_train, ids_valid = train_test_split(ids, test_size=0.2, random_state=123)

print("Total images = ", len(ids))
print("Train images = ", len(ids_train))
print("Valid images = ", len(ids_valid))
print("Test images = ", len(test_ids))

In [None]:
def rotate_image(image, angle=180):
    return rotate(image, angle, resize=True, preserve_range=True)


def data_gen(img_folder, mask_folder, img_ids, img_size, batch_size, random_rotate=True):
    c = 0
    random.shuffle(img_ids)

    while True:

        img = np.zeros((batch_size, img_size, img_size, 3)).astype(float)
        mask = np.zeros((batch_size, img_size, img_size, 1)).astype(float)

        for i in range(c, c+batch_size): # initially from 0 to 16, c = 0. 

            # load image    
            img_data = img_to_array(load_img(img_folder+img_ids[i]))/255.0
            
            
            # load mask
            img_mask = img_to_array(load_img(mask_folder+img_ids[i], color_mode="grayscale"))
            img_mask = (img_mask > img_mask.min()).astype(int)
            # add extra dimension for parity with train_img size [img_size * img_size * 3]
            img_mask = img_mask.reshape(img_size, img_size, 1)
            
            if random_rotate:
                rotate_angle = random.choice((0, 90, 180, 270))
                img_data = rotate_image(img_data, rotate_angle)
                img_data = rotate_image(img_data, rotate_angle)             
            
            # add to array - img[0], img[1], and so on.
            img[i-c] = img_data 
            mask[i-c] = img_mask

        c += batch_size
        if c+batch_size >= len(img_ids):
            c=0
            random.shuffle(img_ids)

        yield img, mask

# preparing data generators
train_gen = data_gen(orto_path, masks_path, ids_train, img_size, batch_size, random_rotate=True)
valid_gen = data_gen(orto_path, masks_path, ids_valid, img_size, batch_size, random_rotate=False)
test_gen = data_gen(test_orto_path, test_masks_path, test_ids, img_size, batch_size, random_rotate=False)

In [None]:
def conv2d_block(input_tensor, n_filters, kernel_size = 3, batchnorm = True):
    """Function to add 2 convolutional layers with the parameters passed to it"""
    # first layer
    x = Conv2D(filters = n_filters, kernel_size = (kernel_size, kernel_size), kernel_initializer = 'he_normal', padding = 'same')(input_tensor)
    if batchnorm:
        x = BatchNormalization()(x)
    x = Activation('relu')(x)
    
    # second layer
    x = Conv2D(filters = n_filters, kernel_size = (kernel_size, kernel_size), kernel_initializer = 'he_normal', padding = 'same')(x)
    if batchnorm:
        x = BatchNormalization()(x)
    x = Activation('relu')(x)
    
    return x

In [None]:
def get_unet(input_img, n_filters = 16, dropout = 0.1, batchnorm = True):
    """Function to define the UNET Model"""
    # Contracting Path
    c1 = conv2d_block(input_img, n_filters * 1, kernel_size = 3, batchnorm = batchnorm)
    p1 = MaxPooling2D((2, 2))(c1)
    p1 = Dropout(dropout)(p1)
    
    c2 = conv2d_block(p1, n_filters * 2, kernel_size = 3, batchnorm = batchnorm)
    p2 = MaxPooling2D((2, 2))(c2)
    p2 = Dropout(dropout)(p2)
    
    c3 = conv2d_block(p2, n_filters * 4, kernel_size = 3, batchnorm = batchnorm)
    p3 = MaxPooling2D((2, 2))(c3)
    p3 = Dropout(dropout)(p3)
    
    c4 = conv2d_block(p3, n_filters * 8, kernel_size = 3, batchnorm = batchnorm)
    p4 = MaxPooling2D((2, 2))(c4)
    p4 = Dropout(dropout)(p4)
    
    c5 = conv2d_block(p4, n_filters = n_filters * 16, kernel_size = 3, batchnorm = batchnorm)
    
    # Expansive Path
    u6 = Conv2DTranspose(n_filters * 8, (3, 3), strides = (2, 2), padding = 'same')(c5)
    u6 = concatenate([u6, c4])
    u6 = Dropout(dropout)(u6)
    c6 = conv2d_block(u6, n_filters * 8, kernel_size = 3, batchnorm = batchnorm)
    
    u7 = Conv2DTranspose(n_filters * 4, (3, 3), strides = (2, 2), padding = 'same')(c6)
    u7 = concatenate([u7, c3])
    u7 = Dropout(dropout)(u7)
    c7 = conv2d_block(u7, n_filters * 4, kernel_size = 3, batchnorm = batchnorm)
    
    u8 = Conv2DTranspose(n_filters * 2, (3, 3), strides = (2, 2), padding = 'same')(c7)
    u8 = concatenate([u8, c2])
    u8 = Dropout(dropout)(u8)
    c8 = conv2d_block(u8, n_filters * 2, kernel_size = 3, batchnorm = batchnorm)
    
    u9 = Conv2DTranspose(n_filters * 1, (3, 3), strides = (2, 2), padding = 'same')(c8)
    u9 = concatenate([u9, c1])
    u9 = Dropout(dropout)(u9)
    c9 = conv2d_block(u9, n_filters * 1, kernel_size = 3, batchnorm = batchnorm)
    
    outputs = Conv2D(1, (1, 1), activation='sigmoid')(c9)
    model = Model(inputs=[input_img], outputs=[outputs])
    return model

In [None]:
input_img = Input((img_size, img_size, 3), name='img')
model = get_unet(input_img, n_filters=16, dropout=0.05, batchnorm=False)
model.compile(optimizer=Adam(), loss="binary_crossentropy", metrics=["accuracy", Recall(), Precision()])

In [None]:
model.summary()

In [None]:
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")

checkpoints_path = project_path + 'checkpoints/'

if not os.path.exists(checkpoints_path): 
        os.makedirs(checkpoints_path)

model_name = f"unet_ep_{epochs}_bs_{batch_size}_{timestamp}"

In [None]:
callbacks = [
    EarlyStopping(patience=10, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=5, min_lr=0.00001, verbose=1),
    ModelCheckpoint(checkpoints_path + model_name + ".h5", verbose=1, save_best_only=True, save_weights_only=True)
]

In [None]:
results = model.fit(
    train_gen, 
    steps_per_epoch=len(ids_train)//batch_size,
    epochs=epochs, 
    callbacks=callbacks,
    validation_data=valid_gen,
    validation_steps=len(ids_valid)//batch_size, 
    verbose=1
)

In [None]:
plt.figure(figsize=(8, 8)) 
plt.title("Learning curve") 
plt.plot(results.history["loss"], label="loss")
plt.plot(results.history["val_loss"], label="val_loss")
plt.plot( np.argmin(results.history["val_loss"]), np.min(results.history["val_loss"]), marker="x", color="r", label="best model")
plt.xlabel("Epochs")
plt.ylabel("log_loss") 
plt.legend();

In [None]:
# load the best model
model.load_weights(checkpoints_path + model_name + ".h5")

In [None]:
model.evaluate(
    test_gen, 
    steps=len(test_ids)//batch_size,
    verbose=1
)

In [None]:
def plot_sample(img_folder, mask_folder, img_ids, thresh=0.5, i=None):
    """Function to plot the results"""
    if i is None:
        i = random.randint(0, len(img_ids))
    
    img = np.zeros((1, img_size, img_size, 3)).astype(float)
    mask = np.zeros((1, img_size, img_size, 1)).astype(float)
    
    # load image    
    img_data = img_to_array(load_img(img_folder+img_ids[i]))/255.0

    # load mask
    img_mask = img_to_array(load_img(mask_folder+img_ids[i], color_mode="grayscale"))
    img_mask = (img_mask > img_mask.min()).astype(int)
    # add extra dimension for parity with train_img size [img_size * img_size * 3]
    img_mask = img_mask.reshape(img_size, img_size, 1)             

    # add to array
    img[0] = img_data 
    mask[0] = img_mask
    
    # print file id for reference
    print(img_ids[i])

    # mask prediction
    mask_pred = model.predict(img)[0]
    mask_pred_t = (mask_pred > thresh).astype(np.uint8)

    # mask diff
    mask_diff = mask_pred_t * (img_mask<1)

    fig, ax = plt.subplots(1, 5, figsize=(30, 10))
    ax[0].imshow(img[0])
    ax[0].set_title('orto rgb')

    ax[1].imshow(mask.squeeze())
    ax[1].set_title('mask')

    ax[2].imshow(mask_pred.squeeze())
    ax[2].set_title('mask predicted')

    ax[3].imshow(mask_pred_t.squeeze())
    ax[3].set_title('mask predicted binary')
    
    ax[4].imshow(mask_diff.squeeze())
    ax[4].set_title('mask predicted binary - mask');

In [None]:
plot_sample(test_orto_path, test_masks_path, img_ids=test_ids, thresh=0.5)

In [None]:
plot_sample(test_orto_path, test_masks_path, img_ids=test_ids, thresh=0.5)