<a href="https://colab.research.google.com/github/MMaggieZhou/FunModels/blob/main/Draw_Anime_Faces_With_Generative_Adversarial_Network_Model_Training_with_WLoss.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Draw Anime Faces With Generative Adversarial Network

- The model uses DCGAN architecture per https://arxiv.org/abs/1511.06434
- Tensorflow is used as the training framework 
- The code isn't very super robust as validations are left to be implemented 

## Set up the environment 


In [1]:
WORKSPACE_DIR = '.' 

### Download the dataset 
The dataset is collected by https://speech.ee.ntu.edu.tw/~hylee/ml/2021-spring.php

In [2]:
# a pypi package to download large file from google drive 
!gdown --id 1IGrTr308mGAaCKotpkkm8wTKlWs9Jq-p -O "{WORKSPACE_DIR}/crypko_data.zip"
!unzip -q -o "{WORKSPACE_DIR}/crypko_data.zip" -d "{WORKSPACE_DIR}/"

Downloading...
From: https://drive.google.com/uc?id=1IGrTr308mGAaCKotpkkm8wTKlWs9Jq-p
To: /content/crypko_data.zip
100% 452M/452M [00:02<00:00, 202MB/s]


### Imports 

In [3]:
import os 
import glob

import tensorflow as tf
from tensorflow.keras import layers

import matplotlib.pyplot as plt

## Data Preprocessing 
1. Load Dataset From Directory
2. Resize the imaqe
3. **Normalize Image: it's very very very important that the image data is withint [-1, 1] for neural network!!!**

In [4]:
def load_dataset(directory_path, batch_size, image_size): 
    images = tf.keras.utils.image_dataset_from_directory(
        directory_path, 
        labels=None,
        batch_size=batch_size,
        shuffle=True,
        image_size=image_size
    )
    normalization_layer = tf.keras.layers.Rescaling(2.0/255, offset=-1)
    return images.map(lambda x: normalization_layer(x))

In [5]:
def validate_data_loading(): 
  image_batches = load_dataset(WORKSPACE_DIR, 64, (64, 64))
  # TODO: 
  # 1.validate dimension is (batch_size, height, width, 3)
  # 2.validate that all values are within [-1, 1]
  # 3.display 16 images 

  plt.figure(figsize=(10, 10))
  data = image_batches[0].take(16).map(lambda image: tf.keras.layers.Rescaling(255/2.0, offset=127.5)(image))
  # TODO: better way of display tensors 
  for i, image in enumerate(data):
    ax = plt.subplot(4, 4, i + 1)
    plt.imshow(image.numpy().astype("uint8"))
    plt.axis("off")

## Define Model Architecture 
Two Models are defined with Keras layers, aka Generator model and Discriminator model. 

DCGAN key points: 
- Generator consists of convolutional -transpose layers that given a latent vector of smaller dimension, generates a 2D image with larger dimension
- discriminator consists of convolutional layers, takes the large dimension 2D image, convolutes and eventually generate a binary output
- apply batch normalization after each layer, except for output layer for generator and input layer for discriminator. 
- apply random normal distribution for weight initialization convolution(transpose) layers 
- apply ReLU activation for convolution transpose layers and leaky ReLU for convolution layers





In [6]:
w_init = tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.02)
gamma_initializer = tf.keras.initializers.RandomNormal(mean=1.0, stddev=0.02)

class Clip(tf.keras.constraints.Constraint):
    def __call__(self, w):
        return tf.math.minimum(0.01, tf.math.maximum(w, -0.01))

constraint = Clip()

def add_dense_layer_for_noise(
    model,
    input_dim, 
    output_dim,
): 
    model.add(layers.Dense(
        units=output_dim, 
        input_shape=(input_dim,), 
        use_bias=False
    ))
    model.add(layers.BatchNormalization(
        gamma_initializer=gamma_initializer
    ))
    model.add(layers.ReLU())

# image size will be doubled 
def add_conv2d_transpose(
    model,
    num_output_filters, 
    add_batch_norm=True
):
    model.add(layers.Conv2DTranspose(
        num_output_filters, 
        5, # filter size
        strides=2, 
        padding='same', 
        use_bias=False, 
        kernel_initializer=w_init,
    ))
    if add_batch_norm:
      model.add(layers.BatchNormalization(
          gamma_initializer=gamma_initializer
      ))
      model.add(layers.ReLU())

# shrink size by half
def add_conv2d_for_input(model, input_dim, num_output_filters):
    model.add(layers.Conv2D(
      num_output_filters, 
      5, # filter size
      strides=2, 
      padding='same',
      input_shape=[input_dim, input_dim, 3],
      kernel_initializer=w_init,
      kernel_constraint=constraint,
    ))
    model.add(layers.LeakyReLU(0.2))

# shrink size by half
def add_conv2d(
    model, num_output_filters, filter_size=5, 
    use_batch_norm=True, padding='same', stride=2
):
    model.add(layers.Conv2D(
        num_output_filters, 
        filter_size, # filter size
        strides=stride, 
        padding=padding,
        kernel_initializer=w_init,
        kernel_constraint=constraint,
    ))
    if use_batch_norm:
      model.add(layers.BatchNormalization(
          gamma_initializer=gamma_initializer, 
          beta_constraint=constraint, 
          gamma_constraint=constraint
      ))
      model.add(layers.LeakyReLU(0.2))

In [7]:
def create_unconditional_generator(
    noise_dim,
    image_dim, # output image
):
    model = tf.keras.Sequential()
    add_dense_layer_for_noise(
        model, input_dim=noise_dim, 
        output_dim=(image_dim * 8) * (image_dim/16) * (image_dim/16)
    )

    model.add(layers.Reshape(
        (int(image_dim/16), int(image_dim/16), image_dim * 8))
    ) # image_dim/16 * image_dim/16 * filters

    add_conv2d_transpose(model, image_dim * 4) # image_dim/8 * image_dim/8 * filters
    add_conv2d_transpose(model, image_dim * 2) # image_dim/4 * image_dim/4 * filters
    add_conv2d_transpose(model, image_dim * 1) # image_dim/2 * image_dim/2 * filters

    add_conv2d_transpose(model, 3, add_batch_norm=False) # image_dim * image_dim * 3
    model.add(layers.Activation("tanh"))
    return model

def create_discriminator(image_dim): 
    model = tf.keras.Sequential()
    add_conv2d_for_input(model, image_dim, image_dim) # (image_dim /2, image_dim /2, image_dim)

    add_conv2d(model, image_dim * 2) # (image_dim /4, image_dim /4, image_dim * 2)
    add_conv2d(model, image_dim * 4) # (image_dim /8, image_dim /8, image_dim * 4)
    add_conv2d(model, image_dim * 8) # (image_dim /16, image_dim /16, image_dim * 8)

    add_conv2d(model, 1, filter_size=int(image_dim/16), use_batch_norm=False, padding='valid', stride=1) # (1, 1, 1)

    model.add(layers.Flatten())

    return model

In [None]:
def validate_generator():
  # TODO: validate layers dimensions
  generator = create_unconditional_generator(100, 64)


def validate_discriminator():
  # TODO
  discriminator = create_discriminator(64)

def test_output_values():
    generator = create_unconditional_generator(100, 64)
    discriminator = create_discriminator(64)
    noise = tf.random.normal([10, 100])
    fake_images = generator(noise, training=True)
    output = discriminator(fake_images, training=True)
    print(tf.reduce_mean(output))
    print(output)

## Training

In [8]:
# loss functions 
def discriminator_loss_wasserstein(real_output, fake_output): 
    return tf.reduce_mean(fake_output) - tf.reduce_mean(real_output)

def generator_loss_wasserstein(fake_output):
    return -tf.reduce_mean(fake_output)

In [47]:
# training step 

# may use @tf.function for optimization, but have to deal with dynamic variable step
def train_step(
    image_batch, batch_size, noise_dim, generator, discriminator, 
    generator_optimizer, discriminator_optimizer, discriminator_loss_func, 
    generator_loss_func, step
):
    # TODO: validate that image_batch size is same as batch_size
    # TODO：fine tune ratio of frequency that generator and discriminator are trained
    metrics = {}
    # 1. update discriminator 
    noise = tf.random.normal([batch_size, noise_dim])
    with tf.GradientTape() as disc_tape:
        generated_images = generator(noise, training=True)
        real_output = discriminator(image_batch, training=True)
        fake_output = discriminator(generated_images, training=True)
        disc_loss = discriminator_loss_func(real_output, fake_output)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))
    metrics['D_loss'] = disc_loss
  
    # 2.update generator
    if (step + 1) % 5 == 0:
        noise = tf.random.normal([batch_size, noise_dim])
        with tf.GradientTape() as gen_tape:
            generated_images = generator(noise, training=True)
            fake_output = discriminator(generated_images, training=True)
            gen_loss = generator_loss_func(fake_output)
        gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
        generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
        metrics['G_loss'] = gen_loss

    return metrics

def save_plot(examples, epoch, n):
    examples = (examples + 1) / 2.0
    for i in range(n * n):
        plt.subplot(n, n, i+1)
        plt.axis("off")
        plt.imshow(examples[i])  
    filename = f"samples/generated_plot_epoch-{epoch+1}.png"
    plt.savefig(filename)
    plt.close()

In [9]:
BATCH_SIZE = 64
IMAGE_DIM = 64
NOISE_DIM = 100
TEST_IMAGE_GRID_SIZE = 4

CLIP_VALUE_FOR_WGAN = 0.01
LEARNING_RATE = 1e-4
NUM_EPOCH = 50

def train(save_model=True):
    test_noises = tf.random.normal([TEST_IMAGE_GRID_SIZE ** 2, NOISE_DIM])
    image_batches = load_dataset(WORKSPACE_DIR, BATCH_SIZE, (IMAGE_DIM, IMAGE_DIM))

    generator = create_unconditional_generator(NOISE_DIM, IMAGE_DIM)
    generator.summary()
    discriminator = create_discriminator(IMAGE_DIM)
    discriminator.summary()
    generator_loss_func = generator_loss_wasserstein
    discriminator_loss_func = discriminator_loss_wasserstein
    generator_optimizer = tf.keras.optimizers.RMSprop(LEARNING_RATE)
    discriminator_optimizer = tf.keras.optimizers.RMSprop(LEARNING_RATE)

    step = 0
    for epoch in range(NUM_EPOCH):
        pbar = tf.keras.utils.Progbar(target=int(image_batches.cardinality()), stateful_metrics=[])
        i = 0 
        metrics = {'epoch': epoch}
        for image_batch in image_batches:
            metrics.update(train_step(
                image_batch=image_batch, 
                batch_size=BATCH_SIZE, 
                noise_dim=NOISE_DIM, 
                generator=generator, 
                discriminator=discriminator, 
                generator_optimizer=generator_optimizer, 
                discriminator_optimizer=discriminator_optimizer, 
                discriminator_loss_func=discriminator_loss_func, 
                generator_loss_func=generator_loss_func,
                step=step,
            ))
            pbar.update(i, values=metrics.items(), finalize=False)
            i += 1
            step += 1

        pbar.update(step, values=metrics.items(), finalize=True)
        save_plot(generator(test_noises, training=False), epoch, TEST_IMAGE_GRID_SIZE)
        if save_model:
          generator.save(f"saved_model/generator_epoch-{epoch}.h5")
          discriminator.save(f"saved_model/discriminator_epoch-{epoch}.h5")

In [51]:
# download models
from google.colab import files
files.download('saved_model/generator_epoch-50.h5')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
train()