In [None]:
!rm -rf Dataset Trained_Model                   # Remove existing directory
!pip install -U gdown --pre >/dev/null          # Install gdown to download file from GDrive
!gdown 1u4WJLjYrbZHwdvFOHQXJqDTtco6F5hJ-        # Download dataset from GDrive by file ID
!unzip -q Dataset.zip; rm Dataset.zip           # Extract the dataset zip file

In [None]:
import os
import cv2
import numpy as np
from random import sample
from skimage.io import imread
from sklearn.model_selection import train_test_split


IMAGES_PATH = 'Dataset/Images/'
MASKS_PATH  = 'Dataset/Masks/'
TEST_PATH   = 'Dataset/Test_Images/'

# Number of images to use (Larger the number, more RAM required)
N_IMAGES = 1500

# Imread each image and save to an array

sat_imgs = os.listdir(IMAGES_PATH)
msk_imgs = os.listdir(MASKS_PATH)
sat_imgs.sort(), msk_imgs.sort()

images = []
for image in sat_imgs[:N_IMAGES]:
    data = imread(IMAGES_PATH + image)
    images.append(data)

masks = []
for mask in msk_imgs[:N_IMAGES]:
    data = imread(MASKS_PATH + mask)
    data = cv2.cvtColor(data, cv2.COLOR_BGR2GRAY)
    data = np.expand_dims(data, axis=-1)
    masks.append(data)

images = np.stack(images)
masks = np.stack(masks) / 255

train_images, test_images, train_masks, test_masks = train_test_split(images, masks, test_size=0.2, random_state=2)

pred_images = sample(os.listdir(IMAGES_PATH), 4)
pred_masks  = []
for mask_name in pred_images:
    mask_name = mask_name.replace('_sat.jpg', '_mask.png')
    pred_masks.append(mask_name)

del images, masks

print("Training Set")
print(train_images.shape)
print(train_masks.shape)
print("Testing Set")
print(test_images.shape)
print(test_masks.shape)

!rm -rf epochs Trained_Model; mkdir epochs

In [None]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 
import tensorflow as tf

resolver = tf.distribute.cluster_resolver.TPUClusterResolver.connect()
tf.config.experimental_connect_to_cluster(resolver)
tf.tpu.experimental.initialize_tpu_system(resolver)
strategy = tf.distribute.experimental.TPUStrategy(resolver)

In [None]:
from tensorflow.keras import backend as K
def iou_coef(y_true, y_pred, smooth=1):
    intersection = K.sum(K.abs(y_true * y_pred), axis=[1,2,3])
    union = K.sum(y_true,[1,2,3])+K.sum(y_pred,[1,2,3])-intersection
    iou = K.mean((intersection + smooth) / (union + smooth), axis=0)
    return iou

def dice_coef(y_true, y_pred, smooth = 1):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)

def soft_dice_loss(y_true, y_pred):
    return 1-dice_coef(y_true, y_pred)

In [None]:
def double_conv_block(x, n_filters):
    x = layers.Conv2D(n_filters, 3, padding = "same", activation = "relu", kernel_initializer = "he_normal")(x)
    x = layers.Conv2D(n_filters, 3, padding = "same", activation = "relu", kernel_initializer = "he_normal")(x)
    return x

def downsample_block(x, n_filters):
    f = double_conv_block(x, n_filters)
    p = layers.MaxPool2D(2)(f)
    p = layers.Dropout(0.3)(p)
    return f, p

def upsample_block(x, conv_features, n_filters):
    x = layers.Conv2DTranspose(n_filters, 3, 2, padding="same")(x)
    x = layers.concatenate([x, conv_features])
    x = layers.Dropout(0.3)(x)
    x = double_conv_block(x, n_filters)
    return x

def build_unet_model():

    inputs = layers.Input(shape=(512,512,3))

    f1, p1 = downsample_block(inputs, 64)
    f2, p2 = downsample_block(p1, 128)
    f3, p3 = downsample_block(p2, 256)
    f4, p4 = downsample_block(p3, 512)

    bottleneck = double_conv_block(p4, 1024)

    u6 = upsample_block(bottleneck, f4, 512)
    u7 = upsample_block(u6, f3, 256)
    u8 = upsample_block(u7, f2, 128)
    u9 = upsample_block(u8, f1, 64)

    outputs = layers.Conv2D(1, 1, padding="same", activation = "sigmoid")(u9)
    unet_model = tf.keras.Model(inputs, outputs, name="U-Net")
    return unet_model

In [None]:
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras import backend as K
from IPython.display import clear_output
from skimage.io import imread, imshow
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import tensorflow as tf

model_path = "./Trained_Model/Road_Model.h5"
evals = []


checkpointer = ModelCheckpoint(model_path, monitor="val_loss", mode="min", save_best_only = True, verbose=1)
earlystopper = EarlyStopping(monitor = 'val_loss', min_delta = 0, patience = 10, verbose = 1, restore_best_weights = True)
lr_reducer = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=4, verbose=1, min_delta=1e-4)

def model_status(epoch, accu, val_accu, loss, val_loss):
    evals.append(model.evaluate(test_images, test_masks)[1])

    pred_img_path1 = IMAGES_PATH + pred_images[0]; pred_msk_path1 = MASKS_PATH + pred_masks[0]
    pred_img_path2 = IMAGES_PATH + pred_images[1]; pred_msk_path2 = MASKS_PATH + pred_masks[1]
    pred_img_path3 = IMAGES_PATH + pred_images[2]; pred_msk_path3 = MASKS_PATH + pred_masks[2]
    pred_img_path4 = IMAGES_PATH + pred_images[3]; pred_msk_path4 = MASKS_PATH + pred_masks[3]

    test_img1  = np.asarray([imread(pred_img_path1)])
    test_img2  = np.asarray([imread(pred_img_path2)])
    test_img3  = np.asarray([imread(pred_img_path3)])
    test_img4  = np.asarray([imread(pred_img_path4)])

    f = plt.figure(figsize = (24, 16))
    gs = f.add_gridspec(5, 6)
    f.suptitle(f'Epoch: {epoch}', x=0.5, y=0.02)

    
    f.add_subplot(gs[0, 0])
    plt.imshow(imread(pred_img_path1), cmap='gray')
    plt.title("Input Image")
    plt.axis('off')
    f.add_subplot(gs[0, 1])
    plt.imshow(imread(pred_msk_path1), cmap='gray')
    plt.title("Ground Truth")
    plt.axis('off')
    f.add_subplot(gs[0, 2])
    plt.imshow(model.predict(test_img1, verbose=1)[0][:,:,0], cmap='gray')
    plt.title("Predicted Image")
    plt.axis('off')


    f.add_subplot(gs[1, 0])
    plt.imshow(imread(pred_img_path2), cmap='gray')
    plt.axis('off')
    f.add_subplot(gs[1, 1])
    plt.imshow(imread(pred_msk_path2), cmap='gray')
    plt.axis('off')
    f.add_subplot(gs[1, 2])
    plt.imshow(model.predict(test_img2, verbose=1)[0][:,:,0], cmap='gray')
    plt.axis('off')

    f.add_subplot(gs[2, 0])
    plt.imshow(imread(pred_img_path3), cmap='gray')
    plt.axis('off')
    f.add_subplot(gs[2, 1])
    plt.imshow(imread(pred_msk_path3), cmap='gray')
    plt.axis('off')
    f.add_subplot(gs[2, 2])
    plt.imshow(model.predict(test_img3, verbose=1)[0][:,:,0], cmap='gray')
    plt.axis('off')

    f.add_subplot(gs[3, 0])
    plt.imshow(imread(pred_img_path4), cmap='gray')
    plt.axis('off')
    f.add_subplot(gs[3, 1])
    plt.imshow(imread(pred_msk_path4), cmap='gray')
    plt.axis('off')
    f.add_subplot(gs[3, 2])
    plt.imshow(model.predict(test_img4, verbose=1)[0][:,:,0], cmap='gray')
    plt.axis('off')

    f.add_subplot(gs[0:2, 3:6])
    plt.plot(accu)
    plt.plot(val_accu)
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.title("Model Accuracy")
    plt.legend(['Training Accuracy', 'Validation accuracy'], loc='lower right')
    
    f.add_subplot(gs[3:5, 3:6])
    plt.plot(loss)
    plt.plot(val_loss)
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.title("Model Loss")
    plt.legend(['Training loss', 'Validation loss'], loc='upper right')
    
    plt.savefig(f'epochs/{epoch}.png')
    plt.show()



class DisplayCallback(tf.keras.callbacks.Callback):
    def on_train_begin(self, logs=None):
        self.accu = []
        self.val_accu = []
        self.loss = []
        self.val_loss = []
    def on_epoch_end(self, epoch, logs=None):
        self.accu.append(logs.get("accuracy"))
        self.val_accu.append(logs.get("val_accuracy"))
        self.loss.append(logs.get("loss"))
        self.val_loss.append(logs.get("val_loss"))
        clear_output(wait=True)
        model_status(epoch, self.accu, self.val_accu, self.loss, self.val_loss)

In [None]:
with strategy.scope():
    model = build_unet_model()
    metrics = [
    'accuracy',
    tf.keras.metrics.TruePositives(name='tp'),
    tf.keras.metrics.FalsePositives(name='fp'),
    tf.keras.metrics.TrueNegatives(name='tn'),
    tf.keras.metrics.FalseNegatives(name='fn'),
    tf.keras.metrics.Precision(),
    tf.keras.metrics.Recall()]

In [None]:
EPOCHS = 150
LEARNING_RATE = 0.0001
BATCH_SIZE = 56
adam = tf.keras.optimizers.Adam(LEARNING_RATE)

model.compile(optimizer=adam, loss=soft_dice_loss, metrics=metrics)

In [None]:
history = model.fit(train_images,
                    train_masks,
                    validation_split = 0.1,
                    epochs = EPOCHS,
                    batch_size = BATCH_SIZE,
                    callbacks = [checkpointer, DisplayCallback()])

In [None]:
model.evaluate(test_images, test_masks)

___

# Road Detection using Trained Model

## Load trained model and required objects

In [None]:
from tensorflow.keras.models import load_model
from tensorflow.keras import backend as K

def iou_coef(y_true, y_pred, smooth=1):
    intersection = K.sum(K.abs(y_true * y_pred), axis=[1,2,3])
    union = K.sum(y_true,[1,2,3])+K.sum(y_pred,[1,2,3])-intersection
    iou = K.mean((intersection + smooth) / (union + smooth), axis=0)
    return iou

def dice_coef(y_true, y_pred, smooth = 1):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)

def soft_dice_loss(y_true, y_pred):
    return 1-dice_coef(y_true, y_pred)

model = load_model("./Trained_Model/Road_Model.h5", custom_objects={'soft_dice_loss': soft_dice_loss, 'iou_coef': iou_coef})

## Select a random input image and generate prediction

In [None]:
import os
import numpy as np
from random import sample
from skimage.io import imread
from matplotlib import pyplot as plt

input_img = imread(f'Dataset/Images/{sample(os.listdir("Dataset/Images"),1)[0]}')
test_data = np.asarray([input_img])
output = model.predict(test_data, verbose=0)[0][:,:,0]

for xi in range(len(output)):
    for yi in range(len(output[xi])):
        if output[xi][yi] > 0.1:
            input_img[xi][yi] = [255, 255, 0]

plt.rcParams["figure.figsize"] = (10,10)
plt.imshow(input_img)
plt.axis('off')
plt.show()