#Imports and directory mounting

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [None]:
from tensorflow import keras
import tensorflow as tf
import tensorflow.keras.backend as K
import tensorflow as tf
import glob, os
import argparse
import matplotlib.pyplot as plt

#Model class creation

In [None]:
class InstanceNormalization(tf.keras.models.Model):
    def __init__(self,epsilon=1e-5,
                 **kwargs):
        super(InstanceNormalization, self).__init__(**kwargs)
        self.epsilon = epsilon

    def build(self, input_shape):  
        assert len(input_shape)==4
        shape = (input_shape[-1],)
        self.built = True
        # self.pool = tf.keras.layers.AveragePooling2D(pool_size = (input_shape[1],input_shape[2]),padding='valid')

    def call(self,inputs):
        if inputs.shape[1] == None:
            return inputs
        else:
            mean = tf.keras.layers.AveragePooling2D(pool_size=(inputs.shape[1],inputs.shape[2]))(inputs)
            # print(type(mean),mean)
            mean = tf.keras.layers.Reshape(target_shape=(1,1,inputs.shape[-1]))(mean)
            variance = tf.keras.layers.AveragePooling2D(pool_size=(inputs.shape[1],inputs.shape[2]))((inputs-mean)*(inputs-mean))*inputs.shape[1]*inputs.shape[2]
            variance =tf.keras.layers.Reshape(target_shape=(1,1,inputs.shape[-1]))(variance)
            outputs = (inputs - mean) / (tf.exp(0.5*tf.math.log(variance + self.epsilon))) 

        return outputs

def res_block(input, filters, kernel_size=(3, 3), strides=(1, 1), normlization='InstanceNormalization'):
    initializer = tf.random_normal_initializer(0., 0.02)
    ## step size 1 , keep the feature size
    x = keras.layers.Conv2D(filters=filters,kernel_size=kernel_size, strides=strides, padding='same',kernel_initializer=initializer, use_bias=False)(input)    
    if normlization == 'batcnorm':
        x = keras.layers.BatchNormalization(momentum=0.0)(x,training=True)
    elif normlization == 'InstanceNormalization':
        x = InstanceNormalization()(x)
    x = keras.layers.Activation('relu')(x)
    x = keras.layers.Dropout(0.1)(x)
    x = keras.layers.Conv2D(filters=filters, kernel_size=kernel_size, strides=strides,padding='same',kernel_initializer=initializer, use_bias=False)(x)    
    if normlization == 'batcnorm':
        x = keras.layers.BatchNormalization(momentum=0.0)(x,training=True)
    elif normlization == 'InstanceNormalization':
        x = InstanceNormalization()(x)
    out = keras.layers.Add()([input, x])#residual block
    out = keras.layers.Activation('relu')(out)
    return out

def Conv_block(input, filters, kernel_size=(3, 3), strides=(2, 2),iftanh=False):
    initializer = tf.random_normal_initializer(0., 0.02)
    if iftanh:
        x = keras.layers.Conv2D(filters=filters,kernel_size=kernel_size, strides=strides, padding='same',kernel_initializer=initializer,activation='tanh', use_bias=False)(input) 
    else:
        x = keras.layers.Conv2D(filters=filters,kernel_size=kernel_size, strides=strides, padding='same',kernel_initializer=initializer, use_bias=False)(input)
        x = keras.layers.Activation('relu')(x)
    return x

def deconv_Relu(input,filters):
    x = keras.layers.Conv2DTranspose(filters,4,2,'same')(input)
    x = keras.layers.Activation('relu')(x)
    return x

def resnet(x,filters,num_block):
    for _ in tf.range(num_block):
        x = res_block(x,filters)
    return x

def model1(x,c,num_block):
    x1 = Conv_block(x,filters=c,strides=(1,1))
    x2 = Conv_block(x1,2*c)
    x3 = Conv_block(x2,4*c)
    x4 = Conv_block(x3,8*c)
    resout = resnet(x4,8*c,num_block)
    
    y1_0 = keras.layers.concatenate([resout,x4],3)
    y1_0 = deconv_Relu(y1_0,c*4)

    y1_1 = deconv_Relu(y1_0,c*2)
    y1_2 = deconv_Relu(y1_1,c)

    y_out1 = Conv_block(y1_2,3,strides=(1,1),iftanh=True)
    return y_out1

def model2(x,x2,c,num_block):

    x1_0 = keras.layers.concatenate([x2,x-x2],3)
    x1_0 = Conv_block(x1_0,c,strides=(1,1))
    x1_1 = Conv_block(x1_0,c*2)
    x1_2 = Conv_block(x1_1,c*4)
    x1_3 = Conv_block(x1_2,c*8)

    x_merge = x1_3
    x_merge = Conv_block(x_merge,c*8,strides=(1,1))
    x_resnet = resnet(x_merge,c*8,num_block=num_block)


    y1_0 = deconv_Relu(keras.layers.concatenate([x1_3,x_resnet],3),c*4)
    y1_1 = deconv_Relu(keras.layers.concatenate([x1_2,    y1_0],3),c*2)
    y1_2 = deconv_Relu(keras.layers.concatenate([x1_1,    y1_1],3),c*1) 

    gt = Conv_block(y1_2,3,strides=(1,1),iftanh=True)

    return(gt)

def xnet(c1=16,c2=24,res_num_block=9):
    inputs = keras.layers.Input(name='blur_image' ,shape=(None,None,3))
    y_out1=model1(inputs,c1,res_num_block)
    output = model2(inputs,y_out1,c2,res_num_block)
    model = keras.models.Model(inputs = inputs, outputs = [output, y_out1], name='Discriminator')
    return model

In [None]:
# inputs = keras.layers.Input(shape=(1024,1408,3))
model = xnet()
model.summary()

Model: "Discriminator"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 blur_image (InputLayer)        [(None, None, None,  0           []                               
                                 3)]                                                              
                                                                                                  
 conv2d_47 (Conv2D)             (None, None, None,   432         ['blur_image[0][0]']             
                                16)                                                               
                                                                                                  
 activation_51 (Activation)     (None, None, None,   0           ['conv2d_47[0][0]']              
                                16)                                                   

#Load the BGGAN model

In [None]:
bokeh_GT = "/content/bokeh/"
bokeh_Original = "/content/original/"
model_path = "/content/drive/Shareddrives/DeepLearning_VisionAnalytics/BG_GAN_Model/model/"
outputdir = "/content/drive/Shareddrives/DeepLearning_VisionAnalytics/BG_GAN_Model/OurTestset/OriginalModelOutputs/"

In [None]:
generator = xnet()

checkpoint = tf.train.Checkpoint(generator=generator)
checkpoint.restore(tf.train.latest_checkpoint(model_path))

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f26ebec5070>

In [None]:
!unzip /content/drive/Shareddrives/DeepLearning_VisionAnalytics/Dataset/TestBokehFree.zip > logs.txt

In [None]:
!unzip /content/drive/Shareddrives/DeepLearning_VisionAnalytics/Dataset/Training.zip > logs.txt

#Generate 100 bokeh images based on BGGAN model

In [None]:
generator.trainable = False
if not os.path.exists(outputdir):
    os.mkdir(outputdir)
factor = 8 
for i in range(100):
    img = bokeh_Original + str(i) + ".jpg"
    # read img
    out_name = os.path.basename(img)
    inputs = tf.io.read_file(img)
    blur_image = tf.io.decode_jpeg(inputs,3)
    h,w,_ = blur_image.shape
    if w%factor!=0:
        blur_image = tf.image.resize(blur_image,[1024,w+factor-int(w)%factor],method=tf.image.ResizeMethod.BICUBIC)
    blur_image = tf.cast(blur_image, tf.float32)
    blur_image = (blur_image / 127.5) - 1.0
    blur_image = tf.expand_dims(blur_image,0)

    # inference
    output =  generator(blur_image)[0]

    # save model
    output = tf.squeeze(output)
    output = (output+1.0)*127.5
    # output = output[0,:h,:w,::-1]
    output = tf.image.resize(output,[h,w],method=tf.image.ResizeMethod.BICUBIC)
    output = tf.cast(output, tf.uint8)
    output = tf.io.encode_jpeg(output)

    tf.io.write_file(os.path.join(outputdir, out_name), output)
    print('processing:',i+1, '/100, save img:',os.path.join(outputdir, out_name))
print('processed')

processing: 1 /100, save img: /content/drive/Shareddrives/DeepLearning_VisionAnalytics/BG_GAN_Model/OurTestset/OriginalModelOutputs/0.jpg
processing: 2 /100, save img: /content/drive/Shareddrives/DeepLearning_VisionAnalytics/BG_GAN_Model/OurTestset/OriginalModelOutputs/1.jpg
processing: 3 /100, save img: /content/drive/Shareddrives/DeepLearning_VisionAnalytics/BG_GAN_Model/OurTestset/OriginalModelOutputs/2.jpg
processing: 4 /100, save img: /content/drive/Shareddrives/DeepLearning_VisionAnalytics/BG_GAN_Model/OurTestset/OriginalModelOutputs/3.jpg
processing: 5 /100, save img: /content/drive/Shareddrives/DeepLearning_VisionAnalytics/BG_GAN_Model/OurTestset/OriginalModelOutputs/4.jpg
processing: 6 /100, save img: /content/drive/Shareddrives/DeepLearning_VisionAnalytics/BG_GAN_Model/OurTestset/OriginalModelOutputs/5.jpg
processing: 7 /100, save img: /content/drive/Shareddrives/DeepLearning_VisionAnalytics/BG_GAN_Model/OurTestset/OriginalModelOutputs/6.jpg
processing: 8 /100, save img: /con

# Evaluation

##PSNR

In [None]:
from math import log10, sqrt
import cv2
import numpy as np
  
def PSNR(original, compressed):
    mse = np.mean((original - compressed) ** 2)
    if(mse == 0):  # MSE is zero means no noise is present in the signal .
                  # Therefore PSNR have no importance.
        return 100
    max_pixel = 255.0
    psnr = 20 * log10(max_pixel / sqrt(mse))
    return psnr

In [None]:
values = []
for i in range(100):
  img1 = bokeh_GT + str(i) + ".jpg"
  img2 = outputdir + str(i) + ".jpg"

  ground_truth_bokeh = cv2.imread(img1)
  generated_bokeh = cv2.imread(img2)
  
  values.append(PSNR(ground_truth_bokeh, generated_bokeh))
values = np.array(values)
print(f"Avg PSNR value is {np.average(values)} dB")

Avg PSNR value is 30.96744956123231 dB


## SSIM

In [None]:
import math
import numpy as np
import cv2

def ssim(img1, img2):
    C1 = (0.01 * 255)**2
    C2 = (0.03 * 255)**2

    img1 = img1.astype(np.float64)
    img2 = img2.astype(np.float64)
    kernel = cv2.getGaussianKernel(11, 1.5)
    window = np.outer(kernel, kernel.transpose())

    mu1 = cv2.filter2D(img1, -1, window)[5:-5, 5:-5]  # valid
    mu2 = cv2.filter2D(img2, -1, window)[5:-5, 5:-5]
    mu1_sq = mu1**2
    mu2_sq = mu2**2
    mu1_mu2 = mu1 * mu2
    sigma1_sq = cv2.filter2D(img1**2, -1, window)[5:-5, 5:-5] - mu1_sq
    sigma2_sq = cv2.filter2D(img2**2, -1, window)[5:-5, 5:-5] - mu2_sq
    sigma12 = cv2.filter2D(img1 * img2, -1, window)[5:-5, 5:-5] - mu1_mu2

    ssim_map = ((2 * mu1_mu2 + C1) * (2 * sigma12 + C2)) / ((mu1_sq + mu2_sq + C1) *
                                                            (sigma1_sq + sigma2_sq + C2))
    return ssim_map.mean()


def calculate_ssim(img1, img2):
    '''calculate SSIM
    the same outputs as MATLAB's
    img1, img2: [0, 255]
    '''
    if not img1.shape == img2.shape:
        raise ValueError('Input images must have the same dimensions.')
    if img1.ndim == 2:
        return ssim(img1, img2)
    elif img1.ndim == 3:
        if img1.shape[2] == 3:
            ssims = []
            for i in range(3):
                ssims.append(ssim(img1, img2))
            return np.array(ssims).mean()
        elif img1.shape[2] == 1:
            return ssim(np.squeeze(img1), np.squeeze(img2))
    else:
        raise ValueError('Wrong input image dimensions.')

In [None]:
values = []
for i in range(100):
  img1 = bokeh_GT + str(i) + ".jpg"
  img2 = outputdir + str(i) + ".jpg"

  ground_truth_bokeh = cv2.imread(img1)
  generated_bokeh = cv2.imread(img2)
  
  values.append(calculate_ssim(ground_truth_bokeh, generated_bokeh))
values = np.array(values)
print(f"Avg SSIM value is {np.average(values)} dB")

Avg SSIM value is 0.8972150874928632 dB
