## Importing libraries and dataset

In [None]:
"""Importing the required datasets"""

import os
import numpy as np
import time
import datetime
import itertools
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_datasets as tfds
from IPython.display import clear_output
from keras.utils.vis_utils import plot_model

from EfficientNet import * #importing generator and discriminator models

tfds.disable_progress_bar() #disabling the progress bar for training
#load the tensorboard extension
%load_ext tensorboard

In [None]:
load_train_pepsi = np.load('datasets/train_pepsi.npy')
load_train_coke = np.load('datasets/train_coke.npy')
load_test_pepsi = np.load('datasets/test_pepsi.npy')
load_test_coke = np.load('datasets/test_coke.npy')

load_train_pepsi_label = np.load('datasets/train_pepsi_label.npy')
load_train_coke_label = np.load('datasets/train_coke_label.npy')
load_test_pepsi_label = np.load('datasets/test_pepsi_label.npy')
load_test_coke_label = np.load('datasets/test_coke_label.npy')


#creating tensorflow datasets with numpy arrays
trainB = tf.data.Dataset.from_tensor_slices((load_train_pepsi, load_train_pepsi_label))
trainA = tf.data.Dataset.from_tensor_slices((load_train_coke, load_train_coke_label))
testB = tf.data.Dataset.from_tensor_slices((load_test_pepsi, load_test_pepsi_label))
testA = tf.data.Dataset.from_tensor_slices((load_test_coke, load_test_coke_label))

In [None]:
print(load_train_pepsi.shape)
print(load_train_pepsi_label.shape)

print(load_train_pepsi_label[3])

## Dataset preprocessing

In [None]:
# Setting the original image size - high resolution images can be used and cropped below
orig_img_size = (286, 286)
# Size of the random crops that will be applied to images
input_img_size = (256, 256, 3)

#setting batch size for the application of the preprocessing operations to the images in the dataset
batch_size = 1

def normalise_img(img):
    img = tf.cast(img, dtype=tf.float32)
    # Map values in the range [-1, 1]
    return (img / 127.5) - 1.0


def preprocess_train_image(img, label):
    # Random flip
    img = tf.image.random_flip_left_right(img)
    # Resize to the original size first
    img = tf.image.resize(img, [*orig_img_size])
    # Random crop to 256X256
    img = tf.image.random_crop(img, size=[*input_img_size])
    # Normalise the pixel values in the range [-1, 1]
    img = normalise_img(img)
    return img

def preprocess_test_image(img, label):
    # Only resizing and normalization for the test images.
    img = tf.image.resize(img, [input_img_size[0], input_img_size[1]])
    img = normalise_img(img)
    return img


autotune = tf.data.experimental.AUTOTUNE
# Apply the preprocessing operations to the training data
trainA = (trainA.map(preprocess_train_image, num_parallel_calls=autotune).cache().shuffle(256).batch(batch_size))
trainB = (trainB.map(preprocess_train_image, num_parallel_calls=autotune).cache().shuffle(256).batch(batch_size))

# Apply the preprocessing operations to the test data
testA = (testA.map(preprocess_test_image, num_parallel_calls=autotune).cache().shuffle(256).batch(batch_size))
testB = (testB.map(preprocess_test_image, num_parallel_calls=autotune).cache().shuffle(256).batch(batch_size))

## Model Compiling

complete_efficientnet_generator() - Uses entire EfficientNet model with upsampling layers </br>


In [None]:
"""RESNET BASELINE IMPLEMENTATION"""

coke_to_pepsi_gen = resnet_generator(
    filters=64,
    num_resnet_blocks=9)
pepsi_to_coke_gen = resnet_generator(
    filters=64,
    num_resnet_blocks=9)

In [None]:
"""COMPLETE EFFICIENTNET IMPLEMENTATION"""

coke_to_pepsi_gen = complete_efficientnet_generator()
pepsi_to_coke_gen = complete_efficientnet_generator()

In [None]:
"""SPLIT EFFICIENTNET IMPLEMENTATION"""

coke_to_pepsi_gen = efficientnet_generator(name='coke_to_pepsi_gen')
pepsi_to_coke_gen = efficientnet_generator(name='pepsi_to_coke_gen')

In [None]:
coke_discriminator = discriminator(name="coke_discriminator")
pepsi_discriminator = discriminator(name="pepsi_discriminator")

## Display examples

In [None]:
coke_sample = next(itertools.cycle(trainA))
pepsi_sample = next(itertools.cycle(trainB))

In [None]:
plt.figure(figsize=(10, 10))

images = [coke_sample, pepsi_sample]
title = ['Coke Sample', 'Pepsi Sample']

for i in range(len(images)):
    plt.subplot(1, 2, i+1)
    plt.title(title[i])
    plt.imshow(images[i][0] * 0.5 + 0.5)
plt.show()

In [None]:
plt.figure(figsize=(8, 8))

plt.subplot(121)
plt.title('Is an image from Coke dataset?')
plt.imshow(pepsi_discriminator(coke_sample)[0, ..., -1], cmap='RdBu_r')

plt.subplot(122)
plt.title('Is an image from Pepsi dataset?')
plt.imshow(coke_discriminator(pepsi_sample)[0, ..., -1], cmap='RdBu_r')

plt.show()

## Defining losses

In [None]:
LAMBDA = 10 #additional weight for cycle loss
loss_obj = tf.keras.losses.BinaryCrossentropy(from_logits=True)

In [None]:
def discriminator_loss(real, generated):
  real_loss = loss_obj(tf.ones_like(real), real)
  
  generated_loss = loss_obj(tf.zeros_like(generated), generated)

  total_disc_loss = real_loss + generated_loss

  return total_disc_loss * 0.5

In [None]:
def generator_loss(generated):
  return loss_obj(tf.ones_like(generated), generated)

In [None]:
def mae(real_image, cycled_image):
  loss1 = tf.reduce_mean(tf.abs(real_image - cycled_image))
  
  return LAMBDA * loss1

In [None]:
def identity_loss(real_image, same_image):
  loss = tf.reduce_mean(tf.abs(real_image - same_image))
  return LAMBDA * 0.5 * loss

In [None]:
coke_to_pepsi_gen_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5) #learning rate - 2e-4 or 0.0002
pepsi_to_coke_gen_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

coke_discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
pepsi_discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

## Saving checkpoints

In [None]:
"""Initialisation of the checkpoint system that is used to save the models ouputs every 5 epochs. Checkpoints are also loaded for the model if they exist in the directory"""

checkpoint_path = "./checkpoints/train" #Setting the checkpoint path

ckpt = tf.train.Checkpoint(coke_to_pepsi_gen=coke_to_pepsi_gen,
                           pepsi_to_coke_gen=pepsi_to_coke_gen,
                           coke_discriminator=coke_discriminator,
                           pepsi_discriminator=pepsi_discriminator,
                           coke_to_pepsi_gen_optimizer=coke_to_pepsi_gen_optimizer,
                           pepsi_to_coke_gen_optimizer=pepsi_to_coke_gen_optimizer,
                           coke_discriminator_optimizer=coke_discriminator_optimizer,
                           pepsi_discriminator_optimizer=pepsi_discriminator_optimizer)

ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

# if a checkpoint exists, restore the latest checkpoint and display message .
if ckpt_manager.latest_checkpoint:
  ckpt.restore(ckpt_manager.latest_checkpoint)
  print ('Latest checkpoint has been restored')

In [None]:
def img_gen(model, test_input):
  """Returns the generated image after a sample has been through the generator model. This is used in the training loop to display the cyclic transformation"""

  prediction = model(test_input)

  display_list = [test_input[0], prediction[0]]
  
  return prediction

In [None]:
def generate_images(model, test_input):
  """Creates 1x2 plot containing the input image and that image after it has been through the generator specified"""

  prediction = model(test_input)
    
  plt.figure(figsize=(12, 12))

  display_list = [test_input[0], prediction[0]]
  title = ['Input Image', 'Predicted Image']

  for i in range(2):
    plt.subplot(1, 2, i+1)
    plt.title(title[i])
    # getting the pixel values between [0, 1] to plot.
    plt.imshow(display_list[i] * 0.5 + 0.5)
    plt.axis('off')
  plt.show()

In [None]:
"""Setting log directory and creating summary writer for tensorboard result tracking"""

log_dir="logs/fit/"
summary_writer = tf.summary.create_file_writer(log_dir + datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))

In [None]:
"""Paste into terminal to host tensorboard"""

tensorboard --logdir logs/fit 

## Training

In [None]:
@tf.function
def training_iteration(true_coke, true_pepsi, epoch):
  # persistent is set to True because the tape is used more than once to calculate the gradients.
  with tf.GradientTape(persistent=True) as tape:
    # Generator G translates X -> Y
    # Generator F translates Y -> X.
    
    """TRAINING"""

    fake_coke = pepsi_to_coke_gen(true_pepsi, training=True)
    fake_pepsi = coke_to_pepsi_gen(true_coke, training=True)
    
    coke_cycle = pepsi_to_coke_gen(fake_pepsi, training=True)
    pepsi_cycle = coke_to_pepsi_gen(fake_coke, training=True)

    # coke_identity and pepsi_identity are used for identity loss.
    coke_identity = pepsi_to_coke_gen(true_coke, training=True)
    pepsi_identity = coke_to_pepsi_gen(true_pepsi, training=True)

    real_coke_disc = coke_discriminator(true_coke, training=True)
    real_pepsi_disc = pepsi_discriminator(true_pepsi, training=True)
    
    fake_coke_disc = coke_discriminator(fake_coke, training=True)
    fake_pepsi_disc = pepsi_discriminator(fake_pepsi, training=True)

    """CALCULATING LOSSES"""

    # calculate generator loss with binary crossentropy
    coke_to_pepsi_gen_loss = generator_loss(fake_pepsi_disc)
    pepsi_to_coke_gen_loss = generator_loss(fake_coke_disc)
    
    #calculating separate mean absolute errors for x and y images
    coke_cycle_loss = mae(true_coke, coke_cycle)
    pepsi_cycle_loss = mae(true_pepsi, pepsi_cycle)

    #calculating the total mean absolute error (both x and y images)
    complete_cycle = mae(true_coke, coke_cycle) + mae(true_pepsi, pepsi_cycle)

    #total generator loss = adversarial loss + cycle loss
    total_coke_to_pepsi_gen_loss = coke_to_pepsi_gen_loss + complete_cycle + identity_loss(true_pepsi, pepsi_identity)
    total_pepsi_to_coke_gen_loss = pepsi_to_coke_gen_loss + complete_cycle + identity_loss(true_coke, coke_identity)

    coke_disc_loss = discriminator_loss(real_coke_disc, fake_coke_disc)
    pepsi_disc_loss = discriminator_loss(real_pepsi_disc, fake_pepsi_disc)

  """CALCULATING GRADIENT UPDATES"""

  #gradients for generator and discriminator
  coke_to_pepsi_gen_gradients = tape.gradient(total_coke_to_pepsi_gen_loss, coke_to_pepsi_gen.trainable_variables)
  pepsi_to_coke_gen_gradients = tape.gradient(total_pepsi_to_coke_gen_loss, pepsi_to_coke_gen.trainable_variables)
  coke_discriminator_gradients = tape.gradient(coke_disc_loss, coke_discriminator.trainable_variables)
  pepsi_discriminator_gradients = tape.gradient(pepsi_disc_loss, pepsi_discriminator.trainable_variables)

  """APPLYING GRADIENT UPDATES"""
  
  coke_to_pepsi_gen_optimizer.apply_gradients(zip(coke_to_pepsi_gen_gradients, coke_to_pepsi_gen.trainable_variables))
  pepsi_to_coke_gen_optimizer.apply_gradients(zip(pepsi_to_coke_gen_gradients, pepsi_to_coke_gen.trainable_variables))
  coke_discriminator_optimizer.apply_gradients(zip(coke_discriminator_gradients, coke_discriminator.trainable_variables))
  pepsi_discriminator_optimizer.apply_gradients(zip(pepsi_discriminator_gradients, pepsi_discriminator.trainable_variables))

  """WRITING LOSSES FOR TENSORBOARD"""
  with summary_writer.as_default():
    tf.summary.scalar('mae_x', coke_cycle_loss, step=epoch)
    tf.summary.scalar('mae_y', pepsi_cycle_loss, step=epoch)
    tf.summary.scalar('complete_cycle', complete_cycle, step=epoch)
    tf.summary.scalar('total_coke_to_pepsi_gen_loss', total_coke_to_pepsi_gen_loss, step=epoch)
    tf.summary.scalar('total_pepsi_to_coke_gen_loss', total_pepsi_to_coke_gen_loss, step=epoch)
    tf.summary.scalar('coke_disc_loss', coke_disc_loss, step=epoch)
    tf.summary.scalar('pepsi_disc_loss', pepsi_disc_loss, step=epoch)
 

In [None]:
epochs = 40

In [None]:
for i in range(epochs): #running the training loop for the number of epochs specified
  start_time = time.time()
  count = 0
  for coke, pepsi in tf.data.Dataset.zip((trainA, trainB)): #zipping both training datasets together to loop over together
    training_iteration(coke, pepsi, i) #for each epoch, run the training step on every image in the datasets
    if count % 10 == 0:
      print ('*', end=' ')
    count += 1

  clear_output(wait=True) #clearing output after each epoch to display new output
  
  # Using a consistent image (coke_sample) so that the progress of the model is clearly visible.
  generate_images(coke_to_pepsi_gen, coke_sample) #display generated image for each epoch
  
  img = img_gen(coke_to_pepsi_gen, coke_sample) #assign generated image to img for each epoch
  generate_images(pepsi_to_coke_gen, img) #display generated image back to original
  """
  if (i + 1) % 5 == 0: #every 5 epochs save the checkpoint and display message stating this update
    ckpt_save_path = ckpt_manager.save()
    print ('Saving checkpoint for epoch {} at {}'.format(i+1, ckpt_save_path))
  
  print ('Time taken for epoch {} is {} sec\n'.format(i + 1, time.time()-start_time)) #print the time taken for each epoch. This is taking current time from start
  """

## Testing

In [None]:
tensorboard dev upload --logdir logs/fit #add specific folder path 

### Final Tests

All Tests - https://tensorboard.dev/experiment/m0AUwKOdR2GM7PS55AZBdw/

In [None]:
!tensorboard dev delete --experiment_id #experiment_id_here

In [None]:
for img in testA.take(50):
  generate_images(coke_to_pepsi_generator, img)
  #generate_images(generator_f, translate_img)
