In [None]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt
import cv2
import os
print(tf.__version__)

from tensorflow.keras import regularizers
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, Activation, MaxPool2D, UpSampling2D, Concatenate, Conv2DTranspose
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.metrics import Recall, Precision
from tensorflow.keras import backend as K

In [None]:
train_path = "./BIPED/edges/imgs/train/rgbr/real/"
test_path = "./BIPED/edges/edge_maps/train/rgbr/real/"
edge_train_path = "./BIPED/edges/imgs/test/rgbr/"
edge_test_path = "./BIPED/edges/edge_maps/test/rgbr/"

train_ids = next(os.walk(train_path))[2]
test_ids = next(os.walk(test_path))[2]
edge_train_ids = next(os.walk(edge_train_path))[2]
edge_test_ids = next(os.walk(edge_test_path))[2]

print(len(train_ids))
print(len(test_ids))
print(len(edge_train_ids))
print(len(edge_test_ids))

In [None]:
images = [os.path.join(train_path, f"{name}") for name in train_ids]
edges = [os.path.join(test_path, f"{name}") for name in test_ids]

test_images = [os.path.join(edge_train_path, f"{name}") for name in edge_train_ids]
test_edges = [os.path.join(edge_test_path, f"{name}") for name in edge_test_ids]

In [None]:
print(len(images), len(edges), len(test_images), len(test_edges))

In [None]:
for i in range (1,15):
    print(images[i], edges[i])

In [None]:
img = cv2.imread(edges[i])
print(img.shape)

In [None]:
def read_image(x, H=720, W=1280):
    x = np.array2string(x)
    print("Image: ",type(x))
    x = cv2.imread(x, cv2.IMREAD_COLOR)
    print(x, end="\n")
    x = cv2.resize(x, (H, W))
    x = x/255.0
    x = x.astype(np.float32)
    return x

In [None]:
import base64
def preprocess(x, y, H=720, W=1280):
    
    def f(x, y):
#         x = np.array_str(x)
#         y = np.array_str(y)
        x = x.astype('str')
        y = y.astype('str')
        print("Preprocess : ", type(x))
        image = read_image(x)
        edge = read_image(y)
        return image, edge
    
    image, edge = tf.numpy_function(f, [x,y], [tf.float32, tf.float32])
    image.set_shape([H, W, 3])
    edge.set_shape([H, W, 3])
    return image, edge

In [None]:
def tfDataset(x, y, batch_size = 8):
    
    data = tf.data.Dataset.from_tensors((x,y))
    data = data.map(preprocess)
    data = data.batch(batch_size)
    #data = data.repeat(len(train_data))
    data = data.prefetch(2)
    
    return data

In [None]:
train_data = tfDataset(images, edges)
test_data = tfDataset(test_images, test_edges)
print(train_data)
print(test_data)

In [None]:
def read2rgb(x):
    x = cv2.imread(x)
    x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
    return x

In [None]:
fig = plt.figure(figsize=(15,15))

a = fig.add_subplot(1,4,1)
imgplot = plt.imshow(read2rgb(images[1]))

a = fig.add_subplot(1,4,2)
imgplot = plt.imshow(read2rgb(images[2]))
imgplot.set_clim(0.0, 0.7)

a = fig.add_subplot(1,4,3)
imgplot = plt.imshow(read2rgb(images[3]))
imgplot.set_clim(0.0, 1.4)

a = fig.add_subplot(1,4,4)
imgplot = plt.imshow(read2rgb(images[4]))
imgplot.set_clim(0.0, 2.1)

fig = plt.figure(figsize=(15,15))

a = fig.add_subplot(1,4,1)
imgplot = plt.imshow(read2rgb(edges[1]))

a = fig.add_subplot(1,4,2)
imgplot = plt.imshow(read2rgb(edges[2]))
imgplot.set_clim(0.0, 0.7)

a = fig.add_subplot(1,4,3)
imgplot = plt.imshow(read2rgb(edges[3]))
imgplot.set_clim(0.0, 1.4)

a = fig.add_subplot(1,4,4)
imgplot = plt.imshow(read2rgb(edges[4]))
imgplot.set_clim(0.0, 2.1)

In [None]:
weight_init = tf.initializers.glorot_uniform()

l2 = regularizers.l2
w_decay=1e-3

In [None]:
def double_conv_block(inputs, f1, f2, stride=(1,1), act=True):
    
    k_reg = None if w_decay is None else l2(w_decay)
    
    c1 = Conv2D(filters=f1, kernel_size=(3,3), strides=stride, padding="same", kernel_initializer=weight_init, kernel_regularizer=k_reg)(inputs)
    c1 = BatchNormalization()(c1)
    c1 = Conv2D(filters=f2, kernel_size=(3,3), strides=(1,1), padding="same", kernel_initializer=weight_init, kernel_regularizer=k_reg)(c1)
    c1 = BatchNormalization()(c1)
    
    if act:
        c1 = Activation("relu")(c1)
    
    return c1

In [None]:
def DenseBlock(inputs, filters):
    
    k_reg = None if w_decay is None else l2(w_decay)
    
    c1 = Activation("relu")(inputs)
    c1 = Conv2D(filters=filters, kernel_size=(3,3), strides=(1,1), padding='same', kernel_initializer=weight_init, kernel_regularizer=k_reg)(c1)
    c1 = BatchNormalization()(c1)
    c1 = Activation("relu")(c1)
    c1 = Conv2D(filters=filters, kernel_size=(3,3), strides=(1,1), padding='same', kernel_initializer=weight_init, kernel_regularizer=k_reg)(c1)
    c1 = BatchNormalization()(c1)
    
    return c1

In [None]:
def single_conv_block(inputs, filters, kernel=(1,1), strides=(1,1), bn=False, act=False, w_init=None):
    
    k_reg = None if w_decay is None else l2(w_decay)
    
    c1 = Conv2D(filters=filters, kernel_size=kernel, strides=strides, padding="same", kernel_initializer=w_init, kernel_regularizer=k_reg)(inputs)
    
    if bn:
        c1 = BatchNormalization()(c1)
        
    if act:
        c1 = Activation("relu")(c1)
        
    return c1

In [None]:
def UpConvBlock(inputs, up_scale):
    
    constant_features = 16
    k_reg = None if w_decay is None else l2(w_decay)
    c1 = inputs
    
    total_up_scale = 2 ** up_scale
    
    for i in range(up_scale):
        
        filters = 1 if i == up_scale-1 else constant_features
        
        if i==up_scale-1:
            
            c1 = Conv2D(filters=filters, kernel_size=(1,1), 
                        strides=(1,1), padding='same', activation='relu', 
                        kernel_initializer=tf.initializers.TruncatedNormal(mean=0.),kernel_regularizer=k_reg)(c1)
            c1 = Conv2DTranspose(filters, kernel_size=(total_up_scale,total_up_scale), 
                                 strides=(2,2), padding='same', kernel_initializer=tf.initializers.TruncatedNormal(stddev=0.1), 
                                 kernel_regularizer=k_reg)(c1)
        else:
            
            c1 = Conv2D(filters=filters, kernel_size=(1,1), 
                        strides=(1,1), padding='same', activation='relu', 
                        kernel_initializer=tf.initializers.TruncatedNormal(mean=0.),kernel_regularizer=k_reg)(c1)
            c1 = Conv2DTranspose(filters, kernel_size=(total_up_scale,total_up_scale), strides=(2,2), padding='same',
                                kernel_initializer=weight_init, kernel_regularizer=k_reg)(c1)
            
    
    
    return c1   
            

In [None]:
def build_model():
    
    inputs = Input((720, 1280,3))
    
    b1 = double_conv_block(inputs, 32, 64, stride=(2,2), act=False)
    side_b1 = single_conv_block(b1, 128, strides=(2,2), bn=True, w_init=weight_init)
    
    b2 = double_conv_block(b1, 128, 128, act=False)
    b2 = MaxPool2D(pool_size=(3, 3), strides=2, padding='same')(b2)
    b2_up = Concatenate()([b2 + side_b1])
    side_b2 = single_conv_block(b2_up, 256, strides=(2,2), bn=True, w_init=weight_init)
    
    b3_pre = single_conv_block(b2, 128, strides=(1,1), bn=True, w_init=weight_init)
    b3 = DenseBlock(b2_up+b3_pre, 256)
    b3 = DenseBlock(b3, 256)
    b3_down = MaxPool2D(pool_size=(3,3), strides=2, padding="same")(b3)
    b3_add = b3_down + side_b2
    side_b3 = single_conv_block(b3_add, 512, strides=(2,2), bn=True, w_init=weight_init)
    
    b4_pre_1 = single_conv_block(b2, 256, strides=(2,2), w_init=weight_init)
    b4_pre_2 = single_conv_block(b3_down+b4_pre_1, 256, strides=(1,1), bn=True, w_init=weight_init)
    b4 = DenseBlock(b3_add+b4_pre_2, 512)
    b4 = DenseBlock(b4, 512)
    b4 = DenseBlock(b4, 512)
    b4_down = MaxPool2D(pool_size=(3,3), strides=(2,2), padding="same")(b4)
    b4_add = b4_down+side_b3
    side_b4 = single_conv_block(b4_add, 512, strides=(1,1), bn=True, w_init=weight_init)
    
    b5_pre_1 = single_conv_block(b4_pre_1, 512, strides=(2,2), w_init=weight_init)
    b5_pre_2 = single_conv_block(b4_down+b5_pre_1, 512, strides=(1,1), bn=True, w_init=weight_init)
    b5 = DenseBlock(b4_add+b5_pre_2, 512)
    b5 = DenseBlock(b5, 512)
    b5 = DenseBlock(b5, 512)
    b5_add = b5+side_b4
    
    b6_pre = single_conv_block(b5, 512, strides=(1,1), bn=True, w_init=weight_init)
    b6 = DenseBlock(b5_add+b6_pre, 256)
    b6 = DenseBlock(b6, 256)
    b6 = DenseBlock(b6, 256)
    
    out1 = UpConvBlock(b1, 1)
    out2 = UpConvBlock(b2, 2)
    out3 = UpConvBlock(b3, 2)
    out4 = UpConvBlock(b4, 3)
    out5 = UpConvBlock(b5, 4)
    out6 = UpConvBlock(b6, 4)
    results = [out1, out2, out3, out4, out5, out6]
    
    block_cat = Concatenate()([out1,out2,out3,out4,out5,out6])
    b1_cat = single_conv_block(block_cat, 3, strides=(1,1), bn=True, w_init=tf.constant_initializer(1/5))
    bl = Concatenate()([block_cat, b1_cat])
    bl = Conv2D(3, kernel_size=(3,3), padding="same")(bl)
    bl = Activation("sigmoid")(bl)

    return Model(inputs=inputs, outputs=bl)

In [None]:
model = build_model()
model.summary()