## In this Jupyter Notebook, I am practing creating a CycleGAN deep learning model. I hope that this will give me a better understanding of making CycleGAN for usage in The FaRi Project.

### To-Do:
###     - Check each part of the architecture with print statements to ensure every part is working as expected to debug errors in executing correctly on Google Colaboratory.

In [None]:
# IMPORTS
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy  as np
import scipy.ndimage
from IPython.display import clear_output
tf.config.run_functions_eagerly(True)

# Training parameters
learning_rate = 0.0001
batch_size = 32
epochs = 25000

# Network parameters
image_dimension = 784  # Image size is 28 x 28

# Discriminator nodes
"""
This hidden dimension dictates that we are making a neural network
with a hidden dimension with 128 nodes. This helps in creating
the architecture of the neural networks used for discrimination and
generation

For the Discriminators, this is the architecture of the neural network

              INPUT         HIDDEN LAYER          OUTPUT
            784 Nodes         128 Nodes           1 Node

For the Generators, this is the architecture of the neural networks

              INPUT         HIDDEN LAYER          OUTPUT
            784 Nodes         128 Nodes           784 Nodes*


We have 784 input nodes since we are taking in the pixels of the image,
similarly to a handwritten digit classifier. Similarly, the 784 output values
correspond to every pixel that makes up the generated output image.

For an actual implementation, we would want to make this architecture more
advanced using a CNN and not a neural network with one layer
"""
H_dim = 128

"""
This function is what we are using in order to initialize the Weights and
Biases for our Discriminator and our Generator.
"""
def xavier_init(shape):
    initializer = tf.keras.initializers.GlorotNormal()
    return tf.Variable(initializer(shape=shape), trainable=True)



"""
CycleGAN requires two sets of inputs from the outside in order to train on
which will correspond to the placeholders that we see below. Initializing
the shape with None for the amount of examples because we do not know how many
inputs we are going to be receiving from our placeholders. However, we do know
that each example is going to have 784 features corresponding to the amount
of pixels in the image
"""

X_A = tf.Variable(tf.random.normal([batch_size, image_dimension], dtype=tf.float32))
X_B = tf.Variable(tf.random.normal([batch_size, image_dimension], dtype=tf.float32))

"""
Here we define our weights and bias dictionaries for Discriminator A
"""

Disc_A_W = {
    # This key corresponds to the Weights for hidden layer
    "disc_H" : tf.Variable(xavier_init([image_dimension, H_dim])),
    # This key corresponds to the Weights for the output layer
    "disc_final": tf.Variable(xavier_init([H_dim, 1]))
}

Disc_A_B = {
    #This key corresponds to the Biases for the hidden layer
    "disc_H": tf.Variable(xavier_init([H_dim])),
    # This key corresponds to the Biases for the output layer
    "disc_final": tf.Variable(xavier_init([1]))
}

"""
Here we define our weights and bias dictionaries for Discriminator B
"""

Disc_B_W = {
    # This key corresponds to the Weights for hidden layer
    "disc_H" : tf.Variable(xavier_init([image_dimension, H_dim])),
    # This key corresponds to the Weights for the output layer
    "disc_final": tf.Variable(xavier_init([H_dim, 1]))
}

Disc_B_B = {
    #This key corresponds to the Biases for the hidden layer
    "disc_H": tf.Variable(xavier_init([H_dim])),
    # This key corresponds to the Biases for the output layer
    "disc_final": tf.Variable(xavier_init([1]))
}

"""
Here we define our weights and bias dictionaries for Generator transforming
placeholder A to B
"""

Gen_AB_W = {
    # This key corresponds to the Weights for hidden layer
    "Gen_H" : tf.Variable(xavier_init([image_dimension, H_dim])),
    # This key corresponds to the Weights for the output layer
    "Gen_final": tf.Variable(xavier_init([H_dim, image_dimension]))
}

Gen_AB_B = {
    #This key corresponds to the Biases for the hidden layer
    "Gen_H": tf.Variable(xavier_init([H_dim])),
    # This key corresponds to the Biases for the output layer
    "Gen_final": tf.Variable(xavier_init([image_dimension]))
}

"""
Here we define our weights and bias dictionaries for Generator transforming
placeholder B to A
"""

Gen_BA_W = {
    # This key corresponds to the Weights for hidden layer
    "Gen_H" : tf.Variable(xavier_init([image_dimension, H_dim])),
    # This key corresponds to the Weights for the output layer
    "Gen_final": tf.Variable(xavier_init([H_dim, image_dimension]))
}

Gen_BA_B = {
    #This key corresponds to the Biases for the hidden layer
    "Gen_H": tf.Variable(xavier_init([H_dim])),
    # This key corresponds to the Biases for the output layer
    "Gen_final": tf.Variable(xavier_init([image_dimension]))
}



"""
Now we want to actually create the single hidden layer Discriminators and
Generators using TensorFlow, so we do that using the functions below.
"""

def Disc_A(x):
    # Cast input to float32 to avoid type mismatch
    x = tf.cast(x, tf.float32)

    # Here we are getting the hidden layer's predicted outputs using Y = wX + b
    hidden_layer_pred = tf.nn.leaky_relu(tf.add(tf.matmul(x, Disc_A_W["disc_H"]),
                                          Disc_A_B["disc_H"]))
    
    # Add batch normalization
    hidden_layer_pred = tf.keras.layers.BatchNormalization()(hidden_layer_pred)

    # Here we are getting the output's predicted value(s)
    output_layer_pred = tf.nn.sigmoid(tf.add(tf.matmul(hidden_layer_pred,
                                                      Disc_A_W["disc_final"]),
                                             Disc_A_B["disc_final"]))

    return output_layer_pred

def Disc_B(x):
    # Cast input to float32 to avoid type mismatch
    x = tf.cast(x, tf.float32)

    # Here are the predicted values for the hidden layer using Y = wX + b
    hidden_layer_pred = tf.nn.leaky_relu(tf.add(tf.matmul(x, Disc_B_W["disc_H"]),
                                          Disc_B_B["disc_H"]))
    
    # Add batch normalization
    hidden_layer_pred = tf.keras.layers.BatchNormalization()(hidden_layer_pred)

    # Here we get the output layer's predicted value(s)
    output_layer_pred = tf.nn.sigmoid(tf.add(tf.matmul(hidden_layer_pred,
                                                      Disc_B_W["disc_final"]),
                                             Disc_B_B["disc_final"]))

    return output_layer_pred

def Gen_AB(x):
    # Cast input to float32 to avoid type mismatch
    x = tf.cast(x, tf.float32)

    # Here are the predicted values for the hidden layer using Y = wX + b
    hidden_layer_pred = tf.nn.leaky_relu(tf.add(tf.matmul(x, Gen_AB_W["Gen_H"]),
                                          Gen_AB_B["Gen_H"]))
    
    # Add batch normalization
    hidden_layer_pred = tf.keras.layers.BatchNormalization()(hidden_layer_pred)

    # Here we get the output layer's predicted value(s)
    output_layer_pred = tf.nn.sigmoid(tf.add(tf.matmul(hidden_layer_pred,
                                                      Gen_AB_W["Gen_final"]),
                                             Gen_AB_B["Gen_final"]))

    return output_layer_pred

def Gen_BA(x):
    # Cast input to float32 to avoid type mismatch
    x = tf.cast(x, tf.float32)

    # Here are the predicted values for the hidden layer using Y = wX + b
    hidden_layer_pred = tf.nn.leaky_relu(tf.add(tf.matmul(x, Gen_BA_W["Gen_H"]),
                                          Gen_BA_B["Gen_H"]))
    
    # Add batch normalization
    hidden_layer_pred = tf.keras.layers.BatchNormalization()(hidden_layer_pred)

    # Here we get the output layer's predicted value(s)
    output_layer_pred = tf.nn.sigmoid(tf.add(tf.matmul(hidden_layer_pred,
                                                      Gen_BA_W["Gen_final"]),
                                             Gen_BA_B["Gen_final"]))

    return output_layer_pred

"""
Now let us actually build the CycleGAN Network

First, we begin by creating the GAN (Generative Adversarial Network)
for approximating A's distribution. In order to do this, we use B to A
Generator that we created, trained on input B. Then, we train the
Discriminator on real input from input A as well as fake input from the
B to A input we generated.
"""
X_BA = Gen_BA(X_B)
Disc_A_real = Disc_A(X_A)
Disc_A_fake = Disc_A(X_BA)

"""
Then, we create the GAN (Generative Adversarial Network) for approximating
B's distribution. In order to do this, we use A to B Generator that we
created, trained on input A. Then, we train the Discriminator on real input
from input B as well as fake input from the A to B input we generated.
"""
X_AB = Gen_AB(X_A)
Disc_B_real = Disc_B(X_B)
Disc_B_fake = Disc_B(X_AB)

"""
Now we calculate the Discriminator Loss Function.
"""
# Inject label noise by adding small random values to real and fake labels
real_labels = tf.ones_like(Disc_A_real) + 0.05 * tf.random.uniform(tf.shape(Disc_A_real))
fake_labels = tf.zeros_like(Disc_A_fake) + 0.05 * tf.random.uniform(tf.shape(Disc_A_fake))

# Use these noisy labels in the loss function
Loss_Disc_A = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=Disc_A_real, labels=real_labels)) + \
              tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=Disc_A_fake, labels=fake_labels))

# Repeat similarly for Disc_B:
real_labels_B = tf.ones_like(Disc_B_real) + 0.05 * tf.random.uniform(tf.shape(Disc_B_real))
fake_labels_B = tf.zeros_like(Disc_B_fake) + 0.05 * tf.random.uniform(tf.shape(Disc_B_fake))

Loss_Disc_B = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=Disc_B_real, labels=real_labels_B)) + \
              tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=Disc_B_fake, labels=fake_labels_B))

Disc_Loss = Loss_Disc_A + Loss_Disc_B

# Image Reconstruction
"""
We need to reconstruct the images back to their original input through the
Generators once more to figure out the reconstruction loss to close the cycle
of the CycleGAN. This in conjunction with the Generator's own loss will make
up the overall Generator loss
"""
X_BAB = Gen_AB(X_BA)
X_ABA = Gen_BA(X_AB)

# Generator Loss Function
"""
The Generator Loss Function is the Mean Squared Error of the output of the
Discriminator discriminating against fake input minus a vector of the same
shape made up of 1s.
"""
Loss_Gen_A = tf.reduce_mean(tf.square(Disc_B_fake - tf.ones_like(Disc_B_fake)))
Loss_Gen_B = tf.reduce_mean(tf.square(Disc_A_fake - tf.ones_like(Disc_A_fake)))
Loss_total = Loss_Gen_A + Loss_Gen_B

# Reconstruction Loss for CycleGAN
"""
Here we use the L1 norm for reconstruction loss.
"""
Loss_recon_A = tf.reduce_mean(10 * tf.abs(X_A - X_ABA))
Loss_recon_B = tf.reduce_mean(10 * tf.abs(X_B - X_BAB))
Loss_recon_total = Loss_recon_A + Loss_recon_B

Gen_Loss = Loss_total + Loss_recon_total

# Parameters list of Discriminator
Disc_param = [Disc_A_W["disc_H"], Disc_A_W["disc_final"], Disc_A_B["disc_H"],
              Disc_A_B["disc_final"], Disc_B_W["disc_H"], Disc_B_W["disc_final"],
              Disc_B_B["disc_H"], Disc_B_B["disc_final"]]

# Parameters list of Generator
Gen_param = [Gen_AB_W["Gen_H"], Gen_AB_W["Gen_final"], Gen_AB_B["Gen_H"],
             Gen_AB_B["Gen_final"], Gen_BA_W["Gen_H"], Gen_BA_W["Gen_final"],
             Gen_BA_B["Gen_H"], Gen_BA_B["Gen_final"]]

# Optimizers
Gen_optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
Disc_optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)



def train_step(X_A_batch, X_B_batch):
  X_A_batch = tf.cast(X_A_batch, tf.float32)
  X_A_batch = tf.reshape(X_A_batch, [batch_size, 784])  # Flatten to 784 dimensions

  X_B_batch = tf.cast(X_B_batch, tf.float32)
  X_B_batch = tf.reshape(X_B_batch, [batch_size, 784])  # Flatten to 784 dimensions

  print("X_A_batch:", X_A_batch)
  print("X_B_batch:", X_B_batch)


  with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
    # Generator forward pass
    X_BA = Gen_BA(X_B_batch)
    X_AB = Gen_AB(X_A_batch)

    # Discriminator forward pass
    Disc_A_real = Disc_A(X_A_batch)
    Disc_A_fake = Disc_A(X_BA)
    Disc_B_real = Disc_B(X_B_batch)
    Disc_B_fake = Disc_B(X_AB)

    # Losses
    Loss_Disc_A = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=Disc_A_real, labels=tf.ones_like(Disc_A_real))) + \
          tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=Disc_A_fake, labels=tf.zeros_like(Disc_A_fake)))
    Loss_Disc_B = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=Disc_B_real, labels=tf.ones_like(Disc_B_real))) + \
          tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=Disc_B_fake, labels=tf.zeros_like(Disc_B_fake)))

    # Apply gradient penalty
    lambda_gp = 1
    gp_A = gradient_penalty(Disc_A, X_A_batch, X_BA)
    gp_B = gradient_penalty(Disc_B, X_B_batch, X_AB)

    Loss_Disc_A += lambda_gp * gp_A
    Loss_Disc_B += lambda_gp * gp_B

    Disc_Loss = Loss_Disc_A + Loss_Disc_B

    # Reconstruction and generator losses
    X_BAB = Gen_AB(X_BA)
    X_ABA = Gen_BA(X_AB)

    Loss_Gen_A = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=Disc_B_fake, labels=tf.ones_like(Disc_B_fake)))
    Loss_Gen_B = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=Disc_A_fake, labels=tf.ones_like(Disc_A_fake)))
    Loss_total = Loss_Gen_A + Loss_Gen_B

    Loss_recon_A = tf.reduce_mean(10 * tf.abs(X_A_batch - X_ABA))
    Loss_recon_B = tf.reduce_mean(10 * tf.abs(X_B_batch - X_BAB))
    Loss_recon_total = Loss_recon_A + Loss_recon_B

    Gen_Loss = Loss_total + Loss_recon_total

  # Compute gradients
  Gen_gradients = gen_tape.gradient(Gen_Loss, Gen_param)
  Disc_gradients = disc_tape.gradient(Disc_Loss, Disc_param)

  # Clip gradients to avoid them growing too large
  Gen_gradients, _ = tf.clip_by_global_norm(Gen_gradients, 5.0)
  Disc_gradients, _ = tf.clip_by_global_norm(Disc_gradients, 5.0)

  print("Generator gradients:", Gen_gradients)
  print("Discriminator gradients:", Disc_gradients)

  # Apply gradients
  Gen_optimizer.apply_gradients(zip(Gen_gradients, Gen_param))
  Disc_optimizer.apply_gradients(zip(Disc_gradients, Disc_param))

  # Return the loss values (symbolic tensors)
  return Disc_Loss, Gen_Loss

In [None]:
# Download the Fashion MNIST dataset
!wget http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz
!wget http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz
!wget http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz
!wget http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz

# Check if the directory exists, create it if it doesn't
!if [ ! -d "MNIST_Fashion" ]; then mkdir MNIST_Fashion; fi

# Move the downloaded files to the directory
!cp *.gz MNIST_Fashion/

# Clearing the output
print("Dataset Download Complete! Please wait, the CycleGAN will begin training momentarily...")
clear_output(wait=True)

# Load Fashion MNIST data using TensorFlow 2.x API
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()

# Assembling training data from two domains
mid = int(X_train.shape[0] / 2)

# Real images Dataset 1
X_train_real = X_train[:mid]

# Rotated images Dataset 2
X_train_rot = X_train[mid:].reshape(-1, 28, 28)
X_train_rot = scipy.ndimage.rotate(X_train_rot, 90, axes=(1, 2))

# Random shuffling of data
def shuffle_data(x, size):
    start_index = np.random.randint(0, x.shape[0] - size)
    return x[start_index:start_index + size]

def gradient_penalty(discriminator, real_data, fake_data):
    alpha = tf.random.uniform(shape=[batch_size, 1], minval=0., maxval=1.)
    interpolated = alpha * real_data + (1 - alpha) * fake_data
    with tf.GradientTape() as tape:
        tape.watch(interpolated)
        pred = discriminator(interpolated)
    grads = tape.gradient(pred, interpolated)
    slopes = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=1))
    gradient_penalty = tf.reduce_mean((slopes - 1.)**2)
    return gradient_penalty

print("CycleGAN Training:")
# Training loop, using the train_step function
for epoch in range(epochs):
    # Shuffle the training data for both domains
    X_A_batch = shuffle_data(X_train_real, batch_size)
    X_B_batch = shuffle_data(X_train_rot, batch_size)

    # Perform a single training step
    train_step(X_A_batch, X_B_batch)

    # Print the losses every 500 steps
    if epoch % 500 == 0:
      tf.print(f"Epoch: {epoch}, Discriminator Loss: ", Disc_Loss, ", Generator Loss: ", Gen_Loss)

In [None]:
# Testing

n = 6
canvas1 = np.empty((28 * n, 28 * n))
canvas2 = np.empty((28 * n, 28 * n))

for i in range(n):

  test_A = shuffle_data{X_train_real, batch_size}
  text_B = shuffle_data(X_train_rot, batch_size)

  # Generate A images from B
  out_A = sess.run(X_BA, feed_dict = {X_B:test_B})
  # Generate B images from A
  out_B = sess.run(X_AB, feed_dict={X_A:test_A})

  for j in range(n):
    # Draw the generated digits
    canvas1[i * 28:(i + 1) * 28, j * 28:(j + 1) * 28] = out_A[j].reshape([28, 28])
  for j in range(n):
    # Draw the generated digits
    canvas2[i * 28:(i + 1) * 28, j * 28:(j + 1) * 28] = out_B[j].reshape([28, 28])

# One way of displaying
plt.figure(figsize=(n, n))
plt.imshow(canvas1, origin = "upper", cmap = "gray")
plt.show()

plt.figure(figsize=(n, n))
plt.imshow(canvas2, origin = "upper", cmap = "gray")
plt.show()

# Second way of displaying
f,ax = plt.subplots(1,2)
ax[0].imshow(canvas1, origin = "upper", cmap = "gray")
ax[1].imshow(canvas2, origin = "upper", cmap = "gray")
plt.show()