<a href="https://colab.research.google.com/github/RoeBuckRoe/TraVelGAN/blob/main/TraVeLGAN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### **Setup**

In [12]:
from os import listdir
from numpy import asarray
from numpy import vstack
from numpy import load
from numpy import zeros
from numpy import ones
from numpy import savez_compressed
from numpy.random import randn
from numpy.random import randint
from keras.optimizers import Adam
from keras.initializers import RandomNormal
from keras.models import Model
from keras.models import Sequential
from tensorflow.keras import regularizers
from keras.layers import Lambda
from keras.layers import Dense
from keras.layers import Reshape
from keras.layers import Flatten
from keras.layers import Conv2D
from keras.layers import Activation
from keras.layers import Conv2DTranspose
from tensorflow.keras.layers import MaxPooling2D
from keras.layers import Concatenate
from keras.layers import BatchNormalization
from keras.layers import LeakyReLU
from keras.layers import Dropout
from keras.layers import Input
from matplotlib import pyplot
from keras.preprocessing.image import img_to_array
from keras.preprocessing.image import load_img
from keras import backend as K

import tensorflow as tf
init = tf.compat.v1.global_variables_initializer()

### **Uploading and processing data**

In [3]:
from google.colab import files

uploaded = files.upload()

Saving p2h.zip to p2h.zip


In [6]:
!unzip -q 'p2h.zip' #unzip the uploaded files
!ls #view the contents of the uploaded file

p2h.zip  potr2hedc  potr2hedc_256.npz  sample_data


In [7]:
# load all images in a directory into memory
def load_images(path, size=(256,256,3)):
	img_list = list()
	# enumerate filenames in directory (all are images).
	for filename in listdir(path):
		# load and resize the image
		pixels = load_img(path + filename, target_size=size)
		# convert to numpy array
		pixels = img_to_array(pixels)
		img= pixels
		img_list.append(img)
	return [asarray(img_list)]
 
# dataset path
path = '/content/potr2hedc/potr/'
# load dataset
[src_images] = load_images(path)
print('Loaded: ', src_images.shape)
# save as compressed numpy array
path = '/content/potr2hedc/hedc/'
# load dataset
[tar_images] = load_images(path)
print('Loaded: ', tar_images.shape)
filename = 'potr2hedc_256.npz'
savez_compressed(filename, src_images, tar_images)
print('Saved dataset: ', filename)

Loaded:  (180, 256, 256, 3)
Loaded:  (180, 256, 256, 3)
Saved dataset:  potr2hedc_256.npz


### **Discriminator**

In [2]:
def discriminator(image_shape):
  #init = RandomNormal(stddev=0.02) #wt initialization
  in_input_im = Input(shape=image_shape) #input image input
  in_gen_im = Input(shape=image_shape) # generated image input
  merged = Concatenate()([in_input_im, in_gen_im]) # merge input images

# Discriminator_model

  # Conv32
  d = Conv2D(32, (4,4), strides=(2,2), padding='same', 
             kernel_initializer=init)(merged)
  d = LeakyReLU(alpha=0.2)(d)
  # Conv64
  d = Conv2D(64, (4,4), strides=(2,2), padding='same',
             kernel_initializer=init)(d)
  d = BatchNormalization()(d)
  d = LeakyReLU(alpha=0.2)(d)
  # Conv128
  d = Conv2D(128, (4,4), strides=(2,2), padding='same',
             kernel_initializer=init)(d)
  d = BatchNormalization()(d)
  d = LeakyReLU(alpha=0.2)(d)
  # Conv256
  d = Conv2D(256, (4,4), strides=(2,2), padding='same',
             kernel_initializer=init)(d)
  d = BatchNormalization()(d)
  d = LeakyReLU(alpha=0.2)(d)
  d = Conv2D(256, (4,4), padding='same', kernel_initializer=init)(d)
  d = BatchNormalization()(d)
  d = LeakyReLU(alpha=0.2)(d)
  # Output Layer
  d = Conv2D(1, (4,4), padding='same', kernel_initializer=init)(d)
  patch_out = Activation('sigmoid')(d)
 
  model = Model([in_input_im, in_gen_im], patch_out) # define model
  
  #print(model.summary())

  opt = Adam(learning_rate=0.0002, beta_1=0.5) # compile model
  model.compile(loss='binary_crossentropy', optimizer=opt, loss_weights=[0.5])

  return model


### **Generator** 

**Encoder and Decoder**

In [3]:
def enc_block(layer_in, n_filters, batchnorm=True):
 
  init = RandomNormal(stddev=0.02) # wt initialization

  # add downsampling layer
  g = Conv2D(n_filters, (4,4), strides=(2,2), padding='same',
  kernel_initializer=init)(layer_in)
  # conditionally add batch normalization
  if batchnorm:
    g = BatchNormalization()(g, training=True)
  # leaky relu activation
  g = LeakyReLU(alpha=0.2)(g)
  return g

def dec_block(layer_in, skip_in, n_filters, dropout=True):

  init = RandomNormal(stddev=0.02)  # wt initialization
  # add upsampling layer
  g = Conv2DTranspose(n_filters, (4,4), strides=(2,2), padding='same',
  kernel_initializer=init)(layer_in)
  # add batch normalization
  g = BatchNormalization()(g, training=True)
  # conditionally add dropout
  if dropout:
    g = Dropout(0.5)(g, training=True)
  # merge with skip connection
  g = Concatenate()([g, skip_in])
  # relu activation
  g = Activation('relu')(g)
  return g

**Generator**

In [4]:
def generator(image_shape=(256,256,3)):
  init = RandomNormal(stddev=0.02) # wt initialization
  in_image = Input(shape = image_shape) # image input

# Generator_Model
  
  # encoder model
  e1 = enc_block(in_image, 32, batchnorm=False)
  e2 = enc_block(e1,64)  
  e3 = enc_block(e2,128) 
  e4 = enc_block(e3,256)
  e5 = enc_block(e4,256) 
  e6 = enc_block(e5,256) 
  e7 = enc_block(e6,256) 

  # bottlneck layer
  b = Conv2D(512, (4,4), strides=(2,2), padding='same', 
             kernel_initializer=init)(e7)
  b = Activation('relu')(b)

  # decoder model
  d1 = dec_block(b, e7, 256)
  d2 = dec_block(d1, e6, 256)
  d3 = dec_block(d2, e5, 256)
  d4 = dec_block(d3, e4, 256, dropout=False)
  d5 = dec_block(d4, e3, 128, dropout=False)
  d6 = dec_block(d5, e2, 64, dropout=False)
  d7 = dec_block(d6, e1, 32, dropout=False) 

  # output
  g = Conv2DTranspose(3, (4,4), strides=(2,2), padding='same',
                      kernel_initializer=init)(d7)
  out_image = Activation('tanh')(g)

# define model
  model = Model(in_image, out_image)

  #print(model.summary())

  return model

### **Siamese**

In [1]:
def siamese(image_shape=(256,256,3)):

# Siamese model
  
  # input
  input_image = Input(image_shape)
  # real samples
  xi = Input(image_shape)
  xj = Input(image_shape)
  # generated samples
  Geni = Input(image_shape)
  Genj = Input(image_shape)

  si = Conv2D(64, kernel_size=4, activation='relu')(xi)
  si = MaxPooling2D()(si)
  si = Conv2D(128, kernel_size=4, activation='relu')(si)
  si = MaxPooling2D()(si)
  si = Conv2D(128, kernel_size=4, activation='relu')(si)
  si = MaxPooling2D()(si)
  si = Conv2D(256, kernel_size=4, activation='relu')(si)
  si = Flatten()(si)
  si_out = Dense(4096, activation='sigmoid')(si)

  sj = Conv2D(64, kernel_size=4, activation='relu')(xj)
  sj = MaxPooling2D()(sj)
  sj = Conv2D(128, kernel_size=4, activation='relu')(sj)
  sj = MaxPooling2D()(sj)
  sj = Conv2D(128, kernel_size=4, activation='relu')(sj)
  sj = MaxPooling2D()(sj)
  sj = Conv2D(256, kernel_size=4, activation='relu')(sj)
  sj = Flatten()(sj)
  sj_out = Dense(4096, activation='sigmoid')(sj)
  
  gi = Conv2D(64, kernel_size=4, activation='relu')(Geni)
  gi = MaxPooling2D()(gi)
  gi = Conv2D(128, kernel_size=4, activation='relu')(gi)
  gi = MaxPooling2D()(gi)
  gi = Conv2D(128, kernel_size=4, activation='relu')(gi)
  gi = MaxPooling2D()(gi)
  gi = Conv2D(256, kernel_size=4, activation='relu')(gi)
  gi = Flatten()(gi)
  gi_out = Dense(4096, activation='sigmoid')(gi)

  gj = Conv2D(64, kernel_size=4, activation='relu')(Genj)
  gj = MaxPooling2D()(gj)
  gj = Conv2D(128, kernel_size=4, activation='relu')(gj)
  gj = MaxPooling2D()(gj)
  gj = Conv2D(128, kernel_size=4, activation='relu')(gj)
  gj = MaxPooling2D()(gj)
  gj = Conv2D(256, kernel_size=4, activation='relu')(gj)
  gj = Flatten()(gj)
  gj_out = Dense(4096, activation='sigmoid')(gj)

  L_layer = Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))

  L1_distance = L_layer([si_out, gi_out])
  L2_distance = L_layer([sj_out, gj_out])

  predn1 = Dense(1,activation='sigmoid',bias_initializer='zeros')(L1_distance)
  predn2 = Dense(1,activation='sigmoid',bias_initializer='zeros')(L2_distance)
  
  model = Model(inputs=[xi, xj, Geni, Genj], outputs = [predn1, predn2])

  optimizer = Adam(learning_rate = 0.00006)
  model.compile(loss="binary_crossentropy",optimizer=optimizer)

  #print(model.summary())

  return model

### GAN Model

In [15]:
def Tvel_gan(g_model, d_model, s_model, image_shape):

  #d_model.trainable = False

  # source images
  in_xi = Input(image_shape)
  in_xj = Input(image_shape)

  # generating outputs for two source images
  gen_outi = g_model(in_xi)
  gen_outj = g_model(in_xj)

  # discriminator output
  dis_out = d_model([in_xi, gen_outi])

  # siamese network output
  sia_out = s_model([in_xi, in_xj, gen_outi, gen_outj])

  # define model
  model = Model([in_xi, in_xj], [dis_out, gen_outi, sia_out])

  #print(model.summary())

  # compile model
  opt = Adam(learning_rate=0.0002, beta_1=0.5)
  model.compile(loss=['binary_crossentropy', 'mae'], optimizer=opt, loss_weights=[1,100])
  return model

### Loading Data

In [13]:
def load_real_samples(filename):
  # load the compressed arrays
  data = load(filename)
  # unpack the arrays
  X1, X2 = data['arr_0'], data['arr_1']
  # scale from [0,255] to [-1,1]
  X1 = (X1 - 127.5) / 127.5
  X2 = (X2 - 127.5) / 127.5
  return [X1, X2]

### Real Sample Generator

In [7]:
def generate_real_samples(dataset, n_samples, patch_shape):
  # unpack dataset
  trainA, trainB = dataset
  # choose a random instance
  xi = randint(0, trainA.shape[0], n_samples)
  xj = randint(0, trainA.shape[0], n_samples)
  # retrieve selected images
  xi1, xi2 = trainA[xi], trainB[xi]
  xj = trainA[xj]
  y = ones((n_samples, patch_shape, patch_shape, 1))
  return [xi1, xi2], xj, y

### Fake Sample Generator

In [8]:
def generate_fake_samples(g_model, samples, patch_shape):
  # generate fake instance
  X = g_model.predict(samples)
  # create 'fake' class labels (0)
  y = zeros((len(X), patch_shape, patch_shape, 1))
  return X, y

### G_Model performance.

In [9]:
def summarize_performance(step, g_model, dataset, n_samples=3):
  # select a sample of input images
  [X_realA, X_realB], _ = generate_real_samples(dataset, n_samples, 1)
  # generate a batch of fake samples
  X_fakeB, _ = generate_fake_samples(g_model, X_realA, 1)
  # scale all pixels from [-1,1] to [0,1]
  X_realA = (X_realA + 1) / 2.0
  X_realB = (X_realB + 1) / 2.0
  X_fakeB = (X_fakeB + 1) / 2.0
  # plot real source images
  for i in range(n_samples):
    pyplot.subplot(3, n_samples, 1 + i)
    pyplot.axis('off')
    pyplot.imshow(X_realA[i])
    # plot generated target image
  for i in range(n_samples):
    pyplot.subplot(3, n_samples, 1 + n_samples + i)
    pyplot.axis('off')
    pyplot.imshow(X_fakeB[i])
    # plot real target image
  for i in range(n_samples):
    pyplot.subplot(3, n_samples, 1 + n_samples*2 + i)
    pyplot.axis('off')
    pyplot.imshow(X_realB[i])
  # save plot to file
  filename1 = 'plot_%06d.png' % (step+1)
  pyplot.savefig(filename1)
  pyplot.close()
  # save the generator model
  filename2 = 'model_%06d.h5' % (step+1)
  g_model.save(filename2)
  print('>Saved: %s and %s' % (filename1, filename2))

### Model Trainer

In [10]:
def train(d_model, g_model, s_model, gan_model, dataset, n_epochs=250, n_batch=1):
  # determine the output square shape of the discriminator
  n_patch = d_model.output_shape[1]
  # unpack dataset
  trainA, trainB = dataset
  print("unpacked")
  # calculate the number of batches per training epoch
  bat_per_epo = int(len(trainA) / n_batch)
  print("bat_per_epo calculated")
  # calculate the number of training iterations
  n_steps = bat_per_epo * n_epochs
  print("training iteration calculated")
  # manually enumerate epochs
  for i in range(n_steps):
    # select a batch of real samples
    [X_realA, X_realB], X_j, y_real = generate_real_samples(dataset, n_batch, n_patch)
    print("batch of real samples")
    # generate a batch of fake samples
    X_geni, y_geni = generate_fake_samples(g_model, X_realA, n_patch)
    X_genj, y_genj = generate_fake_samples(g_model, X_j, n_patch)
    print("fake sample generated")
    # update discriminator for real samples
    d_loss1 = d_model.train_on_batch([X_realA, X_realB], y_real)
    print("d_loss1")
    # update discriminator for generated samples
    d_loss2 = d_model.train_on_batch([X_realA, X_geni], y_geni)
    print("d_loss2")
    # update the siamese network
    s_loss = s_model.train_on_batch([X_realA, X_j, X_geni, X_genj])
    print("s_loss")
    # update the generator
    g_loss, _, _ = gan_model.train_on_batch(X_realA, [y_real, X_realB])
    print("g_loss")
    # summarize performance
    print('>%d, d1[%.3f] d2[%.3f] g[%.3f]' % (i+1, d_loss1, d_loss2, g_loss))
    # summarize model performance
    if (i+1) % (bat_per_epo * 100) == 0:
      summarize_performance(i, g_model, dataset)
      summarize_performance(i, s_model, dataset) 

### Main Function

In [None]:
dataset = load_real_samples('potr2hedc_256.npz')
print('Loaded', dataset[0].shape, dataset[1].shape)
# define input shape based on the loaded dataset
image_shape = dataset[0].shape[1:]
# define the models
d_model = discriminator(image_shape)
print("D_MODEL")
g_model = generator(image_shape)
print("G_MODEL")
s_model = siamese(image_shape)
print("S_MODEL")
# define the composite model
gan_model = Tvel_gan(g_model, d_model, s_model, image_shape)
print("GAN_MODEL")
# train model
train(d_model, g_model, s_model, gan_model, dataset)