Importing libraries

In [None]:
from numpy import load
from numpy import zeros
from numpy import ones
from numpy.random import randint
from keras.optimizers import Adam, Adadelta,RMSprop
from keras.initializers import RandomNormal
from keras.models import Model
from keras.layers import Input
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import LeakyReLU
from keras.layers import Activation
from keras.layers import Concatenate
from keras.layers import Dropout
from keras.layers import BatchNormalization
from keras.layers import LeakyReLU
from keras.layers import ZeroPadding2D
from matplotlib import pyplot

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Loading the datasets

In [None]:
a=256
def load_images(path):
    src_list = list()
    print(path)
    x=''.join((path,'/'))
    print(x)
    for filename in  sorted(listdir(path)):
        pixels = cv2.imread(x+filename)
        pixels = tf.image.resize(pixels, [a, a],method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
        src_list.append(pixels)
    return (src_list)


clear='path to directory of clear images'
hazy='path to directory of hazy images'
clear= load_images(clear)
hazy=load_images(hazy)
hazy=np.array(hazy,dtype='float32')
clear=np.array(clear,dtype='float32')
#Pre-processing the images, we want the value of images to be [-1,1]
hazy=(hazy - 127.5)/127.5
clear=(clear - 127.5)/127.5
plt.imshow(0.5*hazy[0]+0.5)
plt.show()
plt.imshow(0.5*clear[0]+0.5)
plt.show()

#Splitting the dataset into training and testing phase
hazy1=hazy[0:400]
clear1=clear[0:400]
hazy2=hazy[400:492]
clear2=clear[400:492]

del hazy
del clear
gc.collect()


Defining the loss functions for the generator and discriminator

In [None]:
def discriminator_loss(y_true,y_pred):
    loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=y_true, logits=y_pred))
    return real_loss

def generator_loss(y_true,y_pred):
    return tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=y_true, logits=y_pred))

Defining the discriminator, we will be using a Markovian discriminator, in which the output is a grid. This helps preserve spatial information

In [None]:
def define_discriminator(image_shape):
	init = RandomNormal(stddev=0.02)
	in_src_image = Input(shape=image_shape)
	in_target_image = Input(shape=image_shape)
	merged = Concatenate()([in_src_image, in_target_image])
	d = Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(merged)
	d=GroupNormalization()(d,training=True)
	d = LeakyReLU(alpha=0.2)(d)
	d = Conv2D(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d=GroupNormalization()(d,training=True)
	d = LeakyReLU(alpha=0.2)(d)
	d = Conv2D(256, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d=GroupNormalization()(d,training=True)
	d = LeakyReLU(alpha=0.2)(d)
	d=ZeroPadding2D()(d)
	d = Conv2D(512, (4,4), strides=(1,1), padding='valid', kernel_initializer=init)(d)
	d=GroupNormalization()(d,training=True)
	d = LeakyReLU(alpha=0.2)(d)
	d=ZeroPadding2D()(d)
	d = Conv2D(1, (4,4),strides=(1,1), padding='valid', kernel_initializer=init)(d)
	patch_out = Activation('linear')(d)
	model = Model([in_src_image, in_target_image], patch_out,name='discriminator')
	opt =Adam(learning_rate=0.0002,beta_1=0.5)
	model.compile(loss=discriminator_loss, optimizer=opt)
	return model

Defining the generator, we do so by first defining the encoder blocks for downsampling and then the decoder blocks for upsampling. The generator adopts a U-net architecture with skip connections

In [None]:
def define_encoder_block(layer_in, n_filters, batchnorm=True):
 init = RandomNormal(stddev=0.02)
 g = Conv2D(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(layer_in)
 if batchnorm:
  g=InstanceNormalization()(g,training=True)
 g = LeakyReLU(alpha=0.2)(g)
 return g


def decoder_block(layer_in, skip_in, n_filters, dropout=True,norm=True):
	init = RandomNormal(stddev=0.02)
	g=layer_in
	if norm:
		g=InstanceNormalization()(g,training=True)
	g = Conv2DTranspose(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(g)
	if dropout:
		g = Dropout(0.5)(g, training=True)
	g = Concatenate()([g,skip_in])
	g = Activation('relu')(g)
	return g



def define_generator(image_shape=(a,a,3)):
	init = RandomNormal(stddev=0.02)
	in_image = Input(shape=image_shape)
	e1 = define_encoder_block(in_image,64,batchnorm=False)
	e2 = define_encoder_block(e1, 128)
	e3 = define_encoder_block(e2, 256)
	e4 = define_encoder_block(e3, 512)
	e5 = define_encoder_block(e4, 512,)
	e6 = define_encoder_block(e5, 512,)
	e7 = define_encoder_block(e6, 512,)
	b = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(e7) #Bottleneck layer
	b = Activation('relu')(b)
	d1 = decoder_block(b, e7, 512,norm=False)
	d2 = decoder_block(d1, e6, 512)
	d3 = decoder_block(d2, e5, 512,)
	d4 = decoder_block(d3, e4, 512,dropout=False)
	d5 = decoder_block(d4, e3, 256, dropout=False)
	d6 = decoder_block(d5, e2, 128, dropout=False)
	d7 = decoder_block(d6, e1, 64, dropout=False)
	g = Conv2DTranspose(3, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d7)
	out_image = Activation('tanh')(g)
	model = Model(in_image, out_image,name='generator')
	return model

Defining the GAN model and its methods

In [None]:
def define_gan(g_model, d_model, image_shape): #Defines a forward pass of the GAN model
	for layer in d_model.layers:
		if not isinstance(layer, BatchNormalization):
			layer.trainable = False
	in_src = Input(shape=image_shape)
	gen_out = g_model(in_src)
	dis_out = d_model([in_src, gen_out])
	model = Model(in_src, [dis_out,gen_out])
	opt = Adam(learning_rate=0.0002,beta_1=0.5)
	model.compile(loss=[generator_loss,'mae'], optimizer=opt, loss_weights=[1,100]) #Mean absolute error is added as well with a weightage of 100, as this has been shown to have best performance with GAN
	return model


def generate_real_samples(trainA,trainB, n_samples, patch_shape,tr=True): #Samples real hazy images and groun truth images from the dataset
	ix = randint(0, trainA.shape[0], n_samples)
	X1, X2 = trainA[ix], trainB[ix]
	if tr:
		for i in range(n_samples):
			X1[i],X2[i]=random_jitter(X1[i],X2[i])
	y = ones((n_samples, patch_shape, patch_shape, 1))
	return [X1, X2], y


def generate_fake_samples(g_model, samples, patch_shape,): #Samples real hazy images and generates fake clear images for the corresponding sample of hazy images
	X = g_model.predict(samples)
	y = zeros((len(X), patch_shape, patch_shape, 1))
	return X, y

psn_c=list()
def summarize_performance(step, g_model, trainA,trainB,testA,testB, n_samples=tr,x_samples=test_size): #Check performance of model through quantitative and qualitative (visual) inspection
	[X_realA, X_realB], _ = generate_real_samples(trainA,trainB, n_samples, 1,tr=False)
	X_fakeB, _ = generate_fake_samples(g_model, X_realA, 1)
	X_realB = (X_realB + 1) / 2.0
	X_fakeB = (X_fakeB + 1) / 2.0
	psn=list()
	for i in range(n_samples):
		psn.append(psnr(X_realB[i],X_fakeB[i]))
	print("Training: Average psnr[%f] Variance[%f] Max[%f]"%(np.average(psn),np.var(psn),np.max(psn)))
	psn=list()
	[X_realA, X_realB], _ = generate_real_samples(testA,testB, x_samples, 1,tr=False)
	X_fakeB, _ = generate_fake_samples(g_model, X_realA, 1)
	X_realB = (X_realB + 1) / 2.0
	X_fakeB = (X_fakeB + 1) / 2.0
	for i in range(x_samples):
		psn.append(psnr(X_realB[i],X_fakeB[i]))
	print("Testing: Average psnr[%f] Variance[%f] Max[%f]"%(np.average(psn),np.var(psn),np.max(psn)))
	psn_c.append(np.average(psn))
	del psn
	gc.collect()


def train(d_model, g_model, gan_model, trainA,trainB,testA,testB, n_epochs=100, n_batch=8): #Code for training the model. We first train the discriminator and then train the generator.
	n_patch = d_model.output_shape[1]
	bat_per_epo = int(len(trainA) / n_batch)
	half_batch=int(n_batch/2)+bool(n_batch==1)
	n_steps = bat_per_epo * n_epochs
	g_hist,g1,d1,d2=list(),list(),list(),list()
	for i in range(n_steps):
		n_disc=1
		ct1=0
		ct2=0
		for j in range(n_disc):
			[X_realA, X_realB], y_real = generate_real_samples(trainA,trainB, half_batch, n_patch)
			X_fakeB, y_fake = generate_fake_samples(g_model, X_realA, n_patch)
			d_loss1 = d_model.train_on_batch([X_realA, X_realB], y_real)
			d_loss2 = d_model.train_on_batch([X_realA, X_fakeB], y_fake)
			ct1=ct1+d_loss1
			ct2=ct2+d_loss2
			del X_realA
			del X_realB
			del X_fakeB
			del y_real
			del y_fake
			gc.collect()
		d1.append(ct1/n_disc)
		d2.append(ct2/n_disc)
		[X_realA, X_realB], y_real = generate_real_samples(trainA,trainB, n_batch, n_patch)
		X_fakeB, y_fake = generate_fake_samples(g_model, X_realA, n_patch)
		x1,x2,x3 = gan_model.train_on_batch(X_realA, [y_real, X_realB])
		g_hist.append(x2)
		g1.append(x3)
		print('>%d, d1[%f] d2[%f] v_loss[%f] l1_loss[%f]' % (i+1, d_loss1,d_loss2,x2,x3))
		del X_realA
		del X_realB
		del X_fakeB
		del y_real
		del y_fake
		gc.collect()
		if (i+1) % (bat_per_epo*2) == 0:
				summarize_performance(step=i, g_model=g_model,trainA=trainA,trainB=trainB,testA=testA,testB=testB,)
				plot_history(g_hist,g1,d1,d2)


d_model = define_discriminator(image_shape)
g_model = define_generator(image_shape)
gan_model = define_gan(g_model, d_model, image_shape)

In [None]:
train(d_model=d_model, g_model=g_model, gan_model=gan_model,trainA=hazy1,trainB=clear1,testA=hazy2,testB=clear2)

After training is done, we can test our model by changing the value of 'i' in the code

In [None]:

i=0
i=i%hazy2.shape[0]
X_realB = (clear2[i] + 1) / 2.0
img=np.expand_dims(hazy2[i],axis=0)
X_fakeB = (g_model.predict(img) + 1) / 2.0
print(psnr(X_realB,X_fakeB))
plt.imshow((hazy2[i] + 1)/2.0)
plt.title('Hazy Image')
plt.show()
plt.imshow((X_fakeB[0]+1)/2.0)
plt.title('Generated Image')
plt.show()
plt.imshow((X_realB[0]+1)/2.0)
plt.title('Clear image')
plt.show()