<a href="https://colab.research.google.com/github/SHAHBAZSIDDIQI/DataScience/blob/master/Generating%20dynamic%20image%20translation%20using%20Generative%20Adversarial%20Network%20Cyclic.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Loading important packages 
from random import random
from numpy import load
from numpy import zeros
from numpy import ones
from numpy import asarray
from numpy.random import randint
from keras.optimizers import Adam
from keras.initializers import RandomNormal
from keras.models import Model
from keras.models import Input
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import LeakyReLU
from keras.layers import Activation
from keras.layers import Concatenate
from matplotlib import pyplot

In [None]:
!pip install git+https://www.github.com/keras-team/keras-contrib.git
#import tensorflow_addons as tfa     #for instanceNormalization
#from tfa.layers import instancenormalization

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from keras_contrib.layers.normalization.instancenormalization import InstanceNormalization

defining the discriminator, however the input will be 128x128 image and hence the motive is to implement the 70x70 patch gan that will narrow down the convolutions to 8*8 .

In [None]:
def define_discriminator(image_shape):
  init = RandomNormal(stddev = 0.02)
  in_image = Input(shape = image_shape)
  d = Conv2D(64,(4,4),strides = (2,2),padding='same',kernel_initializer=init)(in_image)    #C64 converts 128x128 to 64x64
  d = LeakyReLU(alpha = 0.2)(d)
  
  d= Conv2D(128,(4,4),strides = (2,2),padding='same',kernel_initializer=init)(d)   #c128 converts 64x64 to 32x32
  d= InstanceNormalization(axis=-1)(d)
  d= LeakyReLU(alpha=0.2)(d)

  d= Conv2D(256,(4,4),strides = (2,2),padding='same',kernel_initializer=init)(d)   #C256 converts 32x32 to 16x16
  d= InstanceNormalization(axis=-1)(d)
  d= LeakyReLU(alpha=0.2)(d)

  d= Conv2D(512,(4,4),strides = (2,2),padding='same',kernel_initializer=init)(d)   #C512 converts 16x16 to 8x8
  d= InstanceNormalization(axis=-1)(d)
  d= LeakyReLU(alpha=0.2)(d)

  d= Conv2D(512,(4,4),padding='same',kernel_initializer=init)(d)   #second last layer
  d= InstanceNormalization(axis=-1)(d)
  d= LeakyReLU(alpha=0.2)(d)

  patch_out = Conv2D(1,(4,4),padding='same',kernel_initializer=init)(d)

  model = Model(in_image,patch_out)
  model.compile(loss='mse', optimizer=Adam(lr=0.0002, beta_1=0.5), loss_weights=[0.5])
  return model

In [None]:
image_shape = (128,128,3)
model = define_discriminator(image_shape)
model.summary()

In [None]:
def resnet_block(n_filters,input_layer):
  init = RandomNormal(stddev = 0.02)
  g = Conv2D(n_filters,(3,3),padding='same',kernel_initializer=init)(input_layer)
  g = InstanceNormalization(axis = -1)(g)
  g = Activation('relu')(g)

  g = Conv2D(n_filters,(3,3),padding='same',kernel_initializer=init)(g)
  g = InstanceNormalization(axis = -1)(g)

  g = Concatenate()([g, input_layer])   #the trick here merging happen, this is whr magic happens
  return g

The generator will be 6 resnet since we are using 128x128 pixel input data. It will be Cnolution layer of single strides followed by strides 2 then resnets  followed by upscaling to same size.

In [None]:
def define_generator(image_shape, n_resnet = 6):
  init = RandomNormal(stddev = 0.02)
  in_image = Input(shape=image_shape)

  g = Conv2D(64,(7,7),padding = 'same',kernel_initializer=init)(in_image)
  g = InstanceNormalization(axis = -1)(g)
  g = Activation('relu')(g)

  g = Conv2D(128,(3,3),strides = (2,2),padding = 'same',kernel_initializer=init)(g)
  g = InstanceNormalization(axis=-1)(g)
  g = Activation('relu')(g)

  g = Conv2D(256,(3,3),strides = (2,2),padding = 'same',kernel_initializer=init)(g)
  g = InstanceNormalization(axis=-1)(g)
  g = Activation('relu')(g)

  #Applying ResNet
  for _ in range(n_resnet):
    g = resnet_block(128,g)
  
  #upscaling

  g = Conv2DTranspose(128,(3,3),strides=(2,2),padding = 'same',kernel_initializer=init)(g)
  g = InstanceNormalization(axis = -1)(g)
  g = Activation('relu')(g)

  g = Conv2DTranspose(64,(3,3),strides = (2,2),padding='same',kernel_initializer=init)(g)
  g = InstanceNormalization(axis=-1)(g)
  g = Activation('relu')(g)

  g = Conv2D(3,(7,7),padding = 'same',kernel_initializer=init)(g)
  g = InstanceNormalization(axis=-1)(g)
  
  out_image  = Activation('tanh')(g)
  model = Model(in_image,out_image)
  return model

In [None]:
#testing model architecture 
image_shape = (128,128,3)
model = define_generator(image_shape)
model.summary()     #reduced to 6 millions from 35 millions in 9 resnet of 256 filters each

In [None]:
def define_composite_model(g_model_1,d_model,g_model_2,image_shape):
  g_model_1.trainable = True
  d_model.trainable = False
  g_model_2.trainable = False

  input_gen= Input(shape = image_shape)
  gen1_out = g_model_1(input_gen)
  output_d = d_model(gen1_out)

  input_id = Input(shape=image_shape)
  output_id = g_model_1(input_id)

  output_f = g_model_2(gen1_out)

  gen2_out = g_model_2(input_id)
  output_b = g_model_1(gen2_out)
  
  model = Model([input_gen, input_id], [output_d, output_id, output_f, output_b])
  # define optimization algorithm configuration
  opt = Adam(lr=0.0002, beta_1=0.5)
  # compile model with weighting of least squares loss and L1 loss
  model.compile(loss=['mse', 'mae', 'mae', 'mae'], loss_weights=[1, 5, 10, 10], optimizer=opt)
  return model

 # model = Model([input_gen,input_id],[output_d,output_id,output_f,output_b])
 # opt = Adam(lr = 0.0002,beta_1 = 0.5)
 # model.compile(loss=['mse','mae','mae','mae'],loss_weight = [0.1,0.5,1,1],optimizer = opt)
  #return model 


In [None]:
# load and prepare training images
def load_real_samples(filename):
  # load the dataset
  data = load(filename)
  # unpack arrays
  X1, X2 = data['arr_0'], data['arr_1']
  # scale from [0,255] to [-1,1]
  X1 = (X1 - 127.5) / 127.5
  X2 = (X2 - 127.5) / 127.5
  return [X1, X2]
# select a batch of random samples, returns images and target
def generate_real_samples(dataset, n_samples, patch_shape):
	# choose random instances
	ix = randint(0, dataset.shape[0], n_samples)
	# retrieve selected images
	X = dataset[ix]
	# generate 'real' class labels (1)
	y = ones((n_samples, patch_shape, patch_shape, 1))
	return X, y
 
# generate a batch of images, returns images and targets
def generate_fake_samples(g_model, dataset, patch_shape):
	# generate fake instance
	X = g_model.predict(dataset)
	# create 'fake' class labels (0)
	y = zeros((len(X), patch_shape, patch_shape, 1))
	return X, y
 
# save the generator models to file
def save_models(step, g_model_AtoB, g_model_BtoA):
	# save the first generator model
	filename1 = 'g_model_AtoB_%06d.h5' % (step+1)
	g_model_AtoB.save(filename1)
	# save the second generator model
	filename2 = 'g_model_BtoA_%06d.h5' % (step+1)
	g_model_BtoA.save(filename2)
	print('>Saved: %s and %s' % (filename1, filename2))
 
# generate samples and save as a plot and save the model
def summarize_performance(step, g_model, trainX, name, n_samples=5):
	# select a sample of input images
	X_in, _ = generate_real_samples(trainX, n_samples, 0)
	# generate translated images
	X_out, _ = generate_fake_samples(g_model, X_in, 0)
	# scale all pixels from [-1,1] to [0,1]
	X_in = (X_in + 1) / 2.0
	X_out = (X_out + 1) / 2.0
	# plot real images
	for i in range(n_samples):
		pyplot.subplot(2, n_samples, 1 + i)
		pyplot.axis('off')
		pyplot.imshow(X_in[i])
	# plot translated image
	for i in range(n_samples):
		pyplot.subplot(2, n_samples, 1 + n_samples + i)
		pyplot.axis('off')
		pyplot.imshow(X_out[i])
	# save plot to file
	filename1 = '%s_generated_plot_%06d.png' % (name, (step+1))
	pyplot.savefig(filename1)
	pyplot.close()
 
# update image pool for fake images
def update_image_pool(pool, images, max_size=50):
	selected = list()
	for image in images:
		if len(pool) < max_size:
			# stock the pool
			pool.append(image)
			selected.append(image)
		elif random() < 0.5:
			# use image, but don't add it to the pool
			selected.append(image)
		else:
			# replace an existing image and use replaced image
			ix = randint(0, len(pool))
			selected.append(pool[ix])
			pool[ix] = image
	return asarray(selected)

In [None]:
def train(d_model_A,d_model_B,g_model_AtoB,g_modelBtoA,c_model_AtoB,c_model_BtoA, dataset):
  n_epochs, n_batch = 5,1
  n_patch = d_model_A.output_shape[1]
  trainA, trainB = dataset['arr_0'],dataset['arr_1']
  poolA,poolB = list(),list()
  bat_per_epo = int(len(trainA) / n_batch)
  n_steps = bat_per_epo * n_epochs
  for i in range(n_steps):
    X_realA, y_realA = generate_real_samples(trainA, n_batch, n_patch)
    X_realB, y_realB = generate_real_samples(trainB, n_batch, n_patch)
    
    X_fakeA, y_fakeA = generate_fake_samples(g_model_BtoA, X_realB, n_patch)
    X_fakeB, y_fakeB = generate_fake_samples(g_model_AtoB, X_realA, n_patch)
    
    X_fakeA = update_image_pool(poolA, X_fakeA)
    X_fakeB = update_image_pool(poolB, X_fakeB)
    #updating generator B to A
    g_loss2,_,_,_,_ = c_model_BtoA.train_on_batch([X_realB,X_realA],[y_realA,X_realA,X_realB,X_realA])

    dA_loss1 = d_model_A.train_on_batch(X_realA,y_realA)
    dA_loss2 = d_model_A.train_on_batch(X_fakeA,y_fakeA)

    #updating generator A to B
    g_loss1,_,_,_,_ = c_model_BtoA.train_on_batch([X_realA,X_realB],[y_realB,X_realB,X_realA,X_realB])

    dB_loss1 = d_model_B.train_on_batch(X_realB,y_realB)
    dB_loss2 = d_model_B.train_on_batch(X_fakeB,y_fakeB)

    print('>%d, dA[%.3f,%.3f] dB[%.3f,%.3f] g[%.3f,%.3f]' % (i+1, dA_loss1,dA_loss2, dB_loss1,dB_loss2, g_loss1,g_loss2))
    if (i+1) % (bat_per_epo * 1) == 0:
      # plot A->B translation
      summarize_performance(i, g_model_AtoB, trainA, 'AtoB')
      # plot B->A translation
      summarize_performance(i, g_model_BtoA, trainB, 'BtoA')
    if (i+1) % (bat_per_epo * 5) == 0:
      # save the models
      save_models(i, g_model_AtoB, g_model_BtoA)
    

In [None]:
from numpy import load
from matplotlib import pyplot
data = load('/content/drive/My Drive/Colab Notebooks/horse2zebra_128.npz')

In [None]:
print(data['arr_0'].shape[1:])

In [None]:
image_shape = data['arr_0'].shape[1:]
g_model_AtoB = define_generator(image_shape)
g_model_BtoA = define_generator(image_shape)
d_model_A = define_discriminator(image_shape)
d_model_B = define_discriminator(image_shape)
c_model_AtoB = define_composite_model(g_model_AtoB, d_model_B, g_model_BtoA, image_shape)
c_model_BtoA = define_composite_model(g_model_BtoA, d_model_A, g_model_AtoB, image_shape)
train(d_model_A, d_model_B, g_model_AtoB, g_model_BtoA, c_model_AtoB, c_model_BtoA, data)


In [None]:
from google.colab import files
files.download("g_model_AtoB_005935.h5")

In [None]:
files.download("g_model_BtoA_005935.h5")

In [None]:
from keras.models import load_model
from numpy import load
from numpy import vstack
from matplotlib import pyplot
from numpy.random import randint
from keras_contrib.layers.normalization.instancenormalization import InstanceNormalization
 
# load and prepare training images
def load_real_samples(filename):
	# load the dataset
	data = load(filename)
	# unpack arrays
	X1, X2 = data['arr_0'], data['arr_1']
	# scale from [0,255] to [-1,1]
	X1 = (X1 - 127.5) / 127.5
	X2 = (X2 - 127.5) / 127.5
	return [X1, X2]
 
# select a random sample of images from the dataset
def select_sample(dataset, n_samples):
	# choose random instances
	ix = randint(0, dataset.shape[0], n_samples)
	# retrieve selected images
	X = dataset[ix]
	return X
 
# plot the image, the translation, and the reconstruction
def show_plot(imagesX, imagesY1, imagesY2):
	images = vstack((imagesX, imagesY1, imagesY2))
	titles = ['Real', 'Generated', 'Reconstructed']
	# scale from [-1,1] to [0,1]
	images = (images + 1) / 2.0
	# plot images row by row
	for i in range(len(images)):
		# define subplot
		pyplot.subplot(1, len(images), 1 + i)
		# turn off axis
		pyplot.axis('off')
		# plot raw pixel data
		pyplot.imshow(images[i])
		# title
		pyplot.title(titles[i])
	pyplot.show()
 
A_data, B_data = load_real_samples('/content/drive/My Drive/Colab Notebooks/horse2zebra_128.npz')
print('Loaded', A_data.shape, B_data.shape)
cust = {'InstanceNormalization': InstanceNormalization}
model_AtoB = load_model('g_model_AtoB_005935.h5', cust,compile=False)
model_BtoA = load_model('g_model_BtoA_005935.h5', cust,compile=False)
A_real = select_sample(A_data, 1)
B_generated  = model_AtoB.predict(A_real)
A_reconstructed = model_BtoA.predict(B_generated)
show_plot(A_real, B_generated, A_reconstructed)
# plot B->A->B
B_real = select_sample(B_data, 1)
A_generated  = model_BtoA.predict(B_real)
B_reconstructed = model_AtoB.predict(A_generated)
show_plot(B_real, A_generated, B_reconstructed)

In [None]:
!ls

AtoB_generated_plot_001187.png	BtoA_generated_plot_003561.png
AtoB_generated_plot_002374.png	BtoA_generated_plot_004748.png
AtoB_generated_plot_003561.png	BtoA_generated_plot_005935.png
AtoB_generated_plot_004748.png	drive
AtoB_generated_plot_005935.png	g_model_AtoB_005935.h5
BtoA_generated_plot_001187.png	g_model_BtoA_005935.h5
BtoA_generated_plot_002374.png	sample_data


In [None]:
from keras.preprocessing.image import load_img
im=load_img('AtoB_generated_plot_005935.png')
im

In [None]:
im=load_img('BtoA_generated_plot_005935.png')
im

In [None]:
files.download('AtoB_generated_plot_005935.png')