In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import numpy as np
import cv2
from glob import glob
import random
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from keras.models import Model
from sklearn.model_selection import train_test_split
from keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, Conv2DTranspose, BatchNormalization, Dropout, Lambda, Activation, Concatenate
#from keras.metrics import MeanIoU
from tensorflow.keras.applications.resnet50 import ResNet50

from tqdm import tqdm
from skimage.io import imread, imshow
from skimage.transform import resize
import matplotlib.pyplot as plt


In [None]:
# Merged two gray scale images into one image of two channel so that we can input two images as input while training and predction
def merge_data(input1, input2):
  merged_data = []
  for in1, in2 in zip(input1, input2):
    merged_data.append([in1, in2])
  return merged_data


In [None]:
# Load the paths for data 
def load_data():
    my_output = []

    input1 = sorted(glob(os.path.join("/content/drive/MyDrive/Dataset/input/*")))
    input2 = sorted(glob(os.path.join("/content/drive/MyDrive/Dataset/Masks/*")))
    output = sorted(glob(os.path.join("/content/drive/MyDrive/Dataset/output/*")))

# Dataset Splitting
    input1_train,  input1_test = train_test_split(input1, test_size=0.2, shuffle=False)
    input2_train,  input2_test = train_test_split(input2, test_size=0.2, shuffle=False)
    input_train =  merge_data(input1_train, input2_train)
    input_test = merge_data(input1_test, input2_test)

    train_output, test_output = train_test_split(output, test_size=0.2, shuffle=False)
    return (input_train, train_output), (input_test, test_output)

In [None]:
# Preprocess will call it
def read_image(path):
  x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
  x = cv2.resize(x, (512, 512))
  x = x/255.0
  x = x.astype(np.float32)
  x = np.expand_dims(x, axis=-1)
  return x

In [None]:
def read_input(paths):
  images = []
  for path in paths:
    images.append(read_image(path.decode()))
  new_image =  np.concatenate((images[0], images[1]), axis=-1)
  return new_image

In [None]:
def read_output(path):
  return read_image(path.decode())

In [None]:
def preprocess(input_path, output_path):

# Read the image file from given paths
    def f(input_path, output_path):
        input_path = input_path
        output_path = output_path
        x = read_input(input_path)
        y = read_output(output_path)
        return x, y
        #conver the matrix value to float
    input, output = tf.numpy_function(f, [input_path, output_path], [tf.float32, tf.float32])
    # reshape the input data according to its dimi son
    input.set_shape([512, 512, 2])
    output.set_shape([512, 512, 1])
    return input, output

In [None]:
# Dataset Compilation
def tf_dataset(input, output, batch=8):
    dataset = tf.data.Dataset.from_tensor_slices((input, output))
    dataset = dataset.shuffle(buffer_size=5000)
    dataset = dataset.map(preprocess)
    dataset = dataset.batch(batch)
    dataset = dataset.prefetch(2)
    return dataset

In [None]:
def conv_block(input, num_filters):
    x = Conv2D(num_filters, 3, padding="same")(input)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    x = Conv2D(num_filters, 3, padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x

def decoder_block(input, skip_features, num_filters):
    x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(input)
    x = Concatenate()([x, skip_features])
    x = conv_block(x, num_filters)
    return x

def build_resnet50_unet(input_shape):
    """ Input """
    inputs = Input(input_shape)

    resnet50 = ResNet50(include_top=False, weights=None, input_tensor=inputs)

    """ Encoder """
    #s1 = resnet50.get_layer("input_1").output          
    s1 = resnet50.layers[0].output
    s2 = resnet50.get_layer("conv1_relu").output        
    s3 = resnet50.get_layer("conv2_block3_out").output  
    s4 = resnet50.get_layer("conv3_block4_out").output  

    """ Bridge """
    b1 = resnet50.get_layer("conv4_block6_out").output  

    """ Decoder """
    d1 = decoder_block(b1, s4, 512)                     
    d2 = decoder_block(d1, s3, 256)                     
    d3 = decoder_block(d2, s2, 128)                     
    d4 = decoder_block(d3, s1, 64)                      

    """ Output """
    outputs = Conv2D(1, 1, padding="same", activation="sigmoid")(d4)

    model = Model(inputs, outputs, name="ResNet50_U-Net")
    return model

if __name__ == "__main__":
    input_shape = (512, 512, 2)
    model = build_resnet50_unet(input_shape)
    model.summary()

Model: "ResNet50_U-Net"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 512, 512, 2  0           []                               
                                )]                                                                
                                                                                                  
 conv1_pad (ZeroPadding2D)      (None, 518, 518, 2)  0           ['input_1[0][0]']                
                                                                                                  
 conv1_conv (Conv2D)            (None, 256, 256, 64  6336        ['conv1_pad[0][0]']              
                                )                                                                 
                                                                                     

In [None]:
model.compile(optimizer=Adam(lr = 1e-3), loss=tf.keras.losses.MeanAbsoluteError(), metrics=['MSE','MAE', 'RootMeanSquaredError'])

  super(Adam, self).__init__(name, **kwargs)


In [None]:
my_callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir='./logs'),
]
batch_size = 8
epochs = 25
(train_x, train_y), (valid_x, valid_y) = load_data()
train_dataset = tf_dataset(train_x, train_y, batch=batch_size)
valid_dataset = tf_dataset(valid_x, valid_y, batch=batch_size)

In [None]:
history = model.fit(train_dataset, validation_data=valid_dataset, epochs = epochs, batch_size = batch_size)

Epoch 1/25
Epoch 2/25
Epoch 3/25
Epoch 4/25
Epoch 5/25
Epoch 6/25
Epoch 7/25
Epoch 8/25
Epoch 9/25
Epoch 10/25
Epoch 11/25
Epoch 12/25
Epoch 13/25
Epoch 14/25
Epoch 15/25
Epoch 16/25

In [None]:
def load_test_data():
    input1 = sorted(glob(os.path.join("/content/drive/MyDrive/Test_data/input/*")))
    input2 = sorted(glob(os.path.join("/content/drive/MyDrive/Test_data/Masks/*")))
    output = sorted(glob(os.path.join("/content/drive/MyDrive/Test_data/output/*")))
    test_image =  merge_data(input1, input2)
    return test_image, output

In [None]:
test_x, test_y = load_test_data()
test_dataset = tf_dataset(test_x, test_y, batch=batch_size)

In [None]:
score = model.evaluate(test_dataset, verbose = 0) 

In [None]:
dict(zip(model.metrics_names, score))

In [None]:
plt.plot(history.history['loss'], label="Loss")
plt.plot(history.history['val_loss'], label="val_loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Losses over epochs")
plt.legend()
plt.show()

In [None]:
plt.plot(history.history['MAE'], label="MAE")
plt.plot(history.history['val_MAE'], label="val_MAE")
plt.xlabel("Epochs")
plt.ylabel("MAE")
plt.title("Mean Absolute Error over epochs")
plt.legend()
plt.show()

In [None]:
plt.plot(history.history['MSE'], label="MSE")
plt.plot(history.history['val_MSE'], label="val_MSE")
plt.xlabel("Epochs")
plt.ylabel("MSE")
plt.title("MSE over Epochs")
plt.legend()
plt.show()

In [None]:
plt.plot(history.history['root_mean_squared_error'], label="root_mean_squared_error")
plt.plot(history.history['val_root_mean_squared_error'], label="val_root_mean_squared_error")
plt.xlabel("Epochs")
plt.ylabel("RMSE")
plt.title("RMSE over epochs")
# plt.ylim([0, 0.5])
plt.legend()
plt.show()

In [None]:
inp = read_image('/content/drive/MyDrive/Final_test_data_external/input/20.png')
mask =  read_image('/content/drive/MyDrive/Final_test_data_external/Masks/20.png')
test_data =  np.concatenate((inp, mask), axis=-1)
test_data = np.expand_dims(test_data, axis=0)

In [None]:
result  = model.predict(test_data)[0]

In [None]:
exp_img_path = '/content/drive/MyDrive/Final_test_data_external/output/21.png'
expeted_result =cv2.imread(exp_img_path, cv2.IMREAD_GRAYSCALE)

In [None]:
plt.figure(figsize=(8, 8))

plt.subplot(221)
plt.title('Expected Result')
plt.imshow(expeted_result, cmap='gray'), plt.xticks([]), plt.yticks([])

plt.subplot(222)
plt.title('Prediction Result')
result = np.squeeze(result)
plt.imshow(result, cmap='gray'), plt.xticks([]), plt.yticks([])
plt.show()


In [None]:
def load_external_test_data():
    input1 = sorted(glob(os.path.join("/content/drive/MyDrive/Final_test_data_external/input/*")))
    input2 = sorted(glob(os.path.join("/content/drive/MyDrive/Final_test_data_external/Masks/*")))
    output = sorted(glob(os.path.join("/content/drive/MyDrive/Final_test_data_external/output/*")))
    test_image =  merge_data(input1, input2)
    return test_image, output

In [None]:
test_e_x, test_e_y = load_external_test_data()
external_test_dataset = tf_dataset(test_e_x, test_e_y, batch=batch_size)

In [None]:
score_ex = model.evaluate(external_test_dataset, verbose = 0)

In [None]:
dict(zip(model.metrics_names, score_ex))