## DEEP CONVOLUTIONAL GENERATIVE ADVERSERIAL NETWORK

#### The Discriminator and Generator

The purpose of the generator in a GAN is to learn to generate synthetic data that resembles real data samples. It does so by transforming the random noise vectors into meaningful output data, such as images. Through the adversarial training process, where it competes against the discriminator, the generator learns to produce outputs that are indistinguishable from real data to the discriminator.

The discriminator, on the other hand, is responsible for distinguishing between real and fake data samples. It receives both real and fake data samples as input during training and learns to differentiate between them.

By training the generator to fool the discriminator into classifying its generated samples as real, the generator indirectly learns the distribution of real data. However, it does so without explicitly seeing real data samples during training.

In [56]:
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
import pandas as pd
import tensorflow as tf
from tensorflow.keras import (
    layers,
    models,
    callbacks,
    losses,
    utils,
    metrics,
    optimizers,
    backend
)
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from keras.preprocessing.image import ImageDataGenerator
from IPython import display
from torch.utils.tensorboard import SummaryWriter
from collections import OrderedDict, namedtuple
from itertools import product
import time
import json
from IPython.display import display

# 0. Parameters

In [24]:
IMAGE_SIZE = 64
CHANNELS = 1
BATCH_SIZE = 128
Z_DIM = 100
EPOCHS = 100
LOAD_MODEL = True
ADAM_BETA_1 = 0.5
ADAM_BETA_2 = 0.999
LEARNING_RATE = 0.0002
DROPOUT_P = 0.25
NOISE_PARAM = 0.1

# 1. Data

In [25]:
train_data = utils.image_dataset_from_directory(
    "C:/Users/jorda/Documents/GenDL_datasets/lego_bricks_dl_dataset",
    labels = None,
    color_mode = "grayscale",
    image_size = (IMAGE_SIZE, IMAGE_SIZE),
    batch_size = BATCH_SIZE,
    shuffle = True,
    seed = 42,
    interpolation = "bilinear",
)

Found 46384 files belonging to 1 classes.


In [26]:
def preprocess(img):
    """Scales the data to be between [-1, 1] range."""
    img = (tf.cast(img, "float32") - 127.5) / 127.5
    return img

train = train_data.map(lambda x: preprocess(x))

# 2. Model Architecture

In [27]:
### The discriminator
discriminator = tf.keras.Sequential([
    layers.Input(shape=(IMAGE_SIZE, IMAGE_SIZE, CHANNELS)),
    layers.Conv2D(filters = 64, kernel_size = 4, strides = 2, padding = "same", use_bias = False, name="Conv2D_1"),
    layers.LeakyReLU(0.2),
    layers.Dropout(0.3),
    layers.Conv2D(filters = 128, kernel_size = 4, strides = 2, padding = "same", use_bias = False),
    layers.BatchNormalization(momentum = 0.9),
    layers.LeakyReLU(0.2),
    layers.Dropout(0.3),
    layers.Conv2D(filters = 256, kernel_size = 4, strides = 2, padding = "same", use_bias = False),
    layers.BatchNormalization(momentum = 0.9),
    layers.LeakyReLU(0.2),
    layers.Dropout(0.3),
    layers.Conv2D(filters = 512, kernel_size = 4, strides = 2, padding="same", use_bias = False),
    layers.BatchNormalization(momentum = 0.9),
    layers.LeakyReLU(0.2),
    layers.Dropout(0.3),
    layers.Conv2D(filters = 1, 
                  kernel_size = 4, 
                  strides = 1, 
                  padding = "valid", 
                  use_bias = False, 
                  activation = 'sigmoid'),
    layers.Flatten()
])

discriminator.summary()

Model: "sequential_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 Conv2D_1 (Conv2D)           (None, 32, 32, 64)        1024      
                                                                 
 leaky_re_lu_16 (LeakyReLU)  (None, 32, 32, 64)        0         
                                                                 
 dropout_8 (Dropout)         (None, 32, 32, 64)        0         
                                                                 
 conv2d_8 (Conv2D)           (None, 16, 16, 128)       131072    
                                                                 
 batch_normalization_14 (Ba  (None, 16, 16, 128)       512       
 tchNormalization)                                               
                                                                 
 leaky_re_lu_17 (LeakyReLU)  (None, 16, 16, 128)       0         
                                                      

In [28]:
### The generator
generator = tf.keras.Sequential([
    layers.Input(shape=(100, 1)),
    layers.Reshape((1, 1, 100)),
    layers.Conv2DTranspose(512, kernel_size=4, strides=1, padding="valid", use_bias = False),
    layers.BatchNormalization(momentum=0.9),
    layers.LeakyReLU(0.2),
    layers.Conv2DTranspose(256, kernel_size=4, strides=2, padding="same", use_bias = False),
    layers.BatchNormalization(momentum=0.9),
    layers.LeakyReLU(0.2),
    layers.Conv2DTranspose(128, kernel_size=4, strides=2, padding="same", use_bias = False),
    layers.BatchNormalization(momentum=0.9),
    layers.LeakyReLU(0.2),
    layers.Conv2DTranspose(64, kernel_size=4, strides=2, padding="same", use_bias = False),
    layers.BatchNormalization(momentum=0.9),
    layers.LeakyReLU(0.2),
    layers.Conv2DTranspose(
        1,
        kernel_size=4,
        strides=2,
        padding="same",
        use_bias = False,
        activation = 'tanh'
    )
])

generator.summary()

Model: "sequential_5"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 reshape_2 (Reshape)         (None, 1, 1, 100)         0         
                                                                 
 conv2d_transpose_10 (Conv2  (None, 4, 4, 512)         819200    
 DTranspose)                                                     
                                                                 
 batch_normalization_17 (Ba  (None, 4, 4, 512)         2048      
 tchNormalization)                                               
                                                                 
 leaky_re_lu_20 (LeakyReLU)  (None, 4, 4, 512)         0         
                                                                 
 conv2d_transpose_11 (Conv2  (None, 8, 8, 256)         2097152   
 DTranspose)                                                     
                                                      

# 3. Compiling the GAN

In [57]:
class DCGAN(models.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(DCGAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim

    def compile(self, d_optimizer, g_optimizer):
        super(DCGAN, self).compile()
        self.loss_fn = losses.BinaryCrossentropy()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.d_loss_metric = metrics.Mean(name="d_loss")
        self.d_real_acc_metric = metrics.BinaryAccuracy(name="d_real_acc")
        self.d_fake_acc_metric = metrics.BinaryAccuracy(name="d_fake_acc")
        self.d_acc_metric = metrics.BinaryAccuracy(name="d_acc")
        self.g_loss_metric = metrics.Mean(name="g_loss")
        self.g_acc_metric = metrics.BinaryAccuracy(name="g_acc")

    @property
    def metrics(self):
        return [
            self.d_loss_metric,
            self.d_real_acc_metric,
            self.d_fake_acc_metric,
            self.d_acc_metric,
            self.g_loss_metric,
            self.g_acc_metric,
        ]

    def train_step(self, real_images):
        # Sample random points in the latent space
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(
            shape=(batch_size, self.latent_dim)
        )

        # Train the discriminator on fake images
        with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
            generated_images = self.generator(
                random_latent_vectors, training=True
            )
            real_predictions = self.discriminator(real_images, training=True)
            fake_predictions = self.discriminator(
                generated_images, training=True
            )

            real_labels = tf.ones_like(real_predictions)
            real_noisy_labels = real_labels + NOISE_PARAM * tf.random.uniform(
                tf.shape(real_predictions)
            )
            fake_labels = tf.zeros_like(fake_predictions)
            fake_noisy_labels = fake_labels - NOISE_PARAM * tf.random.uniform(
                tf.shape(fake_predictions)
            )

            d_real_loss = self.loss_fn(real_noisy_labels, real_predictions)
            d_fake_loss = self.loss_fn(fake_noisy_labels, fake_predictions)
            d_loss = (d_real_loss + d_fake_loss) / 2.0

            g_loss = self.loss_fn(real_labels, fake_predictions)

        gradients_of_discriminator = disc_tape.gradient(
            d_loss, self.discriminator.trainable_variables
        )
        gradients_of_generator = gen_tape.gradient(
            g_loss, self.generator.trainable_variables
        )

        self.d_optimizer.apply_gradients(
            zip(gradients_of_discriminator, discriminator.trainable_variables)
        )
        self.g_optimizer.apply_gradients(
            zip(gradients_of_generator, generator.trainable_variables)
        )

        # Update metrics
        self.d_loss_metric.update_state(d_loss)
        self.d_real_acc_metric.update_state(real_labels, real_predictions)
        self.d_fake_acc_metric.update_state(fake_labels, fake_predictions)
        self.d_acc_metric.update_state(
            [real_labels, fake_labels], [real_predictions, fake_predictions]
        )
        self.g_loss_metric.update_state(g_loss)
        self.g_acc_metric.update_state(real_labels, fake_predictions)

        return {m.name: m.result() for m in self.metrics}

In [58]:
# Create a DCGAN
dcgan = DCGAN(
    discriminator=discriminator, generator=generator, latent_dim=100
)

##### Crucial checkpointing
Checkpointing allows me to save a model's weights during training. I can set the checkpoint at the end of an epoch and if the training process is long, I can just save at a checkpoint and resume training from there at a later time. 

In [59]:
if LOAD_MODEL:
    dcgan.load_weights("checkpoint/checkpoint.ckpt")

# 4. Training the GAN

In [60]:
# Compile it with the two optimizers and their user-defined parameters
dcgan.compile(
    d_optimizer=optimizers.Adam(
        learning_rate=0.0002, beta_1 = 0.5, beta_2 = 0.999
    ),
    g_optimizer=optimizers.Adam(
        learning_rate=0.0002, beta_1 = 0.5, beta_2 = 0.999
    ),
)

In [61]:
# Create a model save checkpoint
model_checkpoint_callback = callbacks.ModelCheckpoint(
    filepath="checkpoint/checkpoint.ckpt",
    save_weights_only=True,
    save_freq="epoch",
    verbose=1
)

tensorboard_callback = callbacks.TensorBoard(log_dir="./logs")

class ImageGenerator(callbacks.Callback):
    def __init__(self, num_img, latent_dim):
        self.num_img = num_img
        self.latent_dim = latent_dim
        self.run_count = 0
        self.run_data = []
        self.run_start_time = None
        self.loader = None
        self.tb = None

    @staticmethod
    def save_image(image, path):
        # Convert the image from NumPy array to PIL Image
        image = Image.fromarray(image.astype('uint8'))
        # Save the image
        image.save(path)
        
    def on_epoch_end(self, epoch, logs=None):
        if logs.get('loss') is not None:
            print(f"Train Loss: {logs.get('loss')}")
        if logs.get("accuracy") is not None:
            print(f"Train Accuracy: {logs.get('accuracy')}")
        if logs.get("val_loss") is not None:
            print(f"Validation Loss: {logs.get('val_loss')}")
        if logs.get("val_accuracy") is not None:
            print(f"Validation Accuracy: {logs.get('val_accuracy')}")
        print("\n")
        
        random_latent_vectors = tf.random.normal(
            shape=(self.num_img, self.latent_dim)
        )
        generated_images = self.model.generator(random_latent_vectors)
        generated_images = generated_images * 127.5 + 127.5
        generated_images = generated_images.numpy()
        self.save_image(generated_images, f"./output/generated_img_{epoch}03d.png")

In [62]:
dcgan.fit(
    train,
    epochs = EPOCHS,
    initial_epoch = 1,
    callbacks = [
        model_checkpoint_callback,
        tensorboard_callback,
        ImageGenerator(num_img = 10, latent_dim = Z_DIM)
    ],
)

Epoch 2/100


KeyboardInterrupt: 

In [None]:
# Save the final models
generator.save("./models/generator")
discriminator.save("./models/discriminator")

# Generate New Images

In [ ]:
# Sample some points in the latent space, from the standard normal distribution
grid_width, grid_height = (10, 3)
z_sample = np.random.normal(size=(grid_width * grid_height, Z_DIM))

In [ ]:
# Decode the sampled points
reconstructions = generator.predict(z_sample)

In [ ]:
# Draw a plot of decoded images
fig = plt.figure(figsize=(18, 5))
fig.subplots_adjust(hspace=0.4, wspace=0.4)

# Output the grid of faces
for i in range(grid_width * grid_height):
    ax = fig.add_subplot(grid_height, grid_width, i + 1)
    ax.axis("off")
    ax.imshow(reconstructions[i, :, :], cmap="Greys")

# My Takeaways

The discriminator can overpower the generator. If this happens it means the discriminator has overfit the training data. 'Overpower' means being able to discern the real from the fake with 100% accuracy. It also means that the generator can't learn from the discriminator when it's like that as it needs to know how to improve in generating images that look real. If the discriminator effectively says "You can't fool me - I know all your images are fake.", the generator has no feedback.

To prevent this from happening and to create smoother decision boundaries, I can:

- Increase the dropout rate - effectively telling the model to dampen the amount of information that flows through the network. Increasing the dropout rate means that the model won't know everything there is to know about the real images it is being trained on, which means it won't be able to learn how to discern with 100% certainty (just eventually very close to - meaning there'll always be feedback given to the generator for it to improve). The goal is always to improve the generator to a point where its images look near identical to the real.

- I can reduce the learning rate - so it takes longer for the discriminator to reach the highest level of discernment. This gives the generator more time to improve as it'll receive more feedback.

- I can reduce the number of convolutional filters in the discriminator - this just means that the discriminator is inspecting the real and fake images at reduced granularity. Reduced granularity means that it is less likely to overfit to the training data as it knows less about the real images. Knowing less about the real images makes it easier for the generator to produce fake images that are harder to distinguish from the real. Again, there'll always be feedback for the generator. This can have the unintended effect of capping the generator's learning capacity as it will receive less complex/detailed feedback on how to improve. It'll reach a certain point where it'll produce real-ish images but can't improve anymore without better feedback (which the discriminator won't be able to provide because it can't inspect the real images at the level of granularity required). 

- I can add noise to the labels when training the discriminator - adding noise to the prevents the discriminator from becoming too confident and overfitting to the data. Noise in the labels can disrupt the discriminator's learning process, forcing it to learn more robust features and decision boundaries. This can lead to more stable training dynamics and faster convergence of the GAN. The crucial part is that the noise has to be random and varying. I'm effectively adding small and varying amounts of loss to the model - encouraging the discriminator to dig deeper and wider in finding features in the dataset.  

- I could flip the labels of some images at random when training the discriminator. This seems heavy-handed as it has the potential of confusing or more likely slowing down convergence. However, if done on a very small scale it also forces the discriminator to dig deeper and wider when learning new features. 

If the discriminator is not powerful enough, it can lead to mode collapse. 'Mode collapse' occurs when the generator has learnt as much as it can know from the discriminator and is able to trick it into thinking all the images it produces are real but not enough for it to produce images to the same range and quality as the real dataset. It'll be able to produce all the ones that are less detailed but have no way to improve to be able to produce the most detailed. This follows from the above - a discriminator with reduced capacity caps the generator's abilities which can lead to mode collapse - stasis.

## Wasserstein GAN with Gradient Penalty (WGAN-GP) (improved model)

- Uses the Wasserstein loss function for both the discriminator and the generator. Using this loss function instead of binary cross-entropy results in a more stable convergence of the GAN. 
- First, the Wasserstein loss requires that we use 1 and –1 as labels (real or fake), rather than 1 and 0. We also remove the sigmoid activation from the final layer of the discriminator, so that predictions are no longer constrained to fall in the range [0, 1] but instead can now be any number in the range [-infinity, infinity]. For this reason, the discriminator in a WGAN is usually referred to as a **critic** that outputs a score rather than a probability. This loss function encourages the critic (discriminator) network to produce output scores that correspond to the **quality** of the generated samples and the real data distribution.
- Wasserstein distance is a more meaningful metric for comparing probability distributions. It measures the minimum amount of work needed to transform one distribution into another, which can be advantageous when dealing with complex data distributions.
- The WGAN generator tries to produce images that are scored as highly as possible by the critic (i.e., the critic is fooled into thinking they are real).
- With predictions no longer being constrained to the [0, 1] range resulting in very large numbers, a constraint is required. 
- In WGAN-GP, the gradient penalty term is added to the loss function to enforce the Lipschitz constraint on the critic (discriminator) network. This term penalizes the critic if its gradients deviate significantly from a target value, typically 1.0, helping to ensure smoother training dynamics and better convergence.
- A key addition is the gradient penalty loss included as part of the overall loss function, alongside the Wasserstein loss from the real and fake images.
- The interpolation path is a technique used specifically for evaluating the gradient penalty term. It involves generating interpolated images along straight lines between pairs of real and fake images and computing the gradients of the critic's output with respect to these interpolated images. By enforcing smoothness along these paths, the gradient penalty term helps stabilize training and ensures the critic adheres to the Lipschitz constraint.
- By providing more meaningful gradients to both the generator and discriminator, Wasserstein loss encourages the model to capture the entire data distribution more effectively.

##### Training the WGAN-GP

- No balancing between critic and generator is required - the critic can be trained a lot more - this is a key benefit. When using the Wasserstein loss, the critic must be trained to convergence before updating the generator, to ensure that the gradients for the generator update are accurate. This is in contrast to a standard GAN, where it is important not to let the discriminator get too strong.

- Therefore, with Wasserstein GANs, we can simply train the critic several times between generator updates, to ensure it is close to convergence. A typical ratio used is three to five critic updates per generator update.
- In theory, allowing the critic (discriminator) to be trained more effectively in WGAN-GP compared to a regular GAN can potentially lead to better identification of detailed features in the input dataset. However, it's important to note that the effectiveness of WGAN-GP depends on various factors, including the dataset, network architecture, and training parameters.

##### The gradient penalty loss function

In [1]:
def gradient_penalty(self, batch_size, rela_images, fake_images):
    # 1. Each image in the batch gets a random number, between
    #    0 and 1, stored as the vector alpha.
    alpha = tf.random.normal([batch_size, 1, 1, 1], 0.0, 1.0)
    diff = fake_images - real_images
    
    # 2. A set of interpolated images is calculated.
    interpolated = real_images + alpha * diff
    
    # 3. The current critic is asked to score each of these
    #    interpolated images.
    with tf.GradientTape() as gp_tape:
        gp_tape.watch(interpolated)
        pred = self.critic(interpolated, training=True)
    
    # 4. The gradient of the predictions is calculated with
    #    respect to the input images.
    grads = gp_tape.gradient(pred, [interpolated])[0]
    
    # 5. The L2 norm of this vector is calculated.
    norm = tf.sqrt(tf.reduce_sum(tf.sqaure(grads), axis=[1, 2, 3]))
    
    # 6. The function returns the average squared distance between the L2 norm and 1.
    gp = tf.reduce_mean((norm - 1.0) ** 2)
    return gp