# L11: Generative models and Gans


###Problem Statement:

`You are working as a Data Scientist at Kyoto Animation, a Japanese animation studio`


* In 2020, about 179 new animes were released and total more than 4505 anime have been released.  

- Anime industry wants to develop an automated system to generate newer anime characters.



<img src="https://drive.google.com/uc?export=view&id=1mHR9Ud9bFz2PtsPRnlfGIQlp11fODR5N" style="width:480px; margin-bottom:32px"/>

#### How would a human create new anime characters ?
1. Character Profile: Choose the artistic style, skin tone, hairstyle and gesture.
2. Rough Character Sketches
3. Developing the Character Design
4. Coloring an Anime Character

## How does the data look?

### Does the dataset have any labels?
- No, since we are going to work with unsupervised machine learning, we won't be needing any labels to train our network.


In [None]:
# Load the Drive helper and mount
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!gdown 1tkKn01cnF3MH7-8mQzIay7ShMcgdZXF7

In [None]:
# unzip the dataset in local directory
!unzip '/content/animefacedataset.zip' -d '/content/animefacedataset'

### How many samples in the data?


In [None]:
import os
import gdown
import numpy as np
import tensorflow as tf
from tensorflow import keras
import matplotlib.pyplot as plt

image_dimensions = (64, 64)
batch_size = 256
# set path to the dataset
dataset_path = "./animefacedataset"

dataset = keras.preprocessing.image_dataset_from_directory(
    dataset_path, label_mode=None, image_size=image_dimensions, batch_size=batch_size
)

# scaling images to -1 to 1
dataset = dataset.map(lambda x: (x - 127.5) / 127.5)




### Is the data enough?

### What is the dimension of each sample image?
- Let’s visualize some samples and check the image dimension


In [None]:
from matplotlib import pyplot

# Display grid of images from dataset
def display_images(total=9): # default total images to display = 9
    num=total
    for x in dataset:
        pyplot.subplot(330 + 1 + total - num)
        plt.imshow(( (x.numpy()*0.5 + 0.5) * 255).astype("int32")[0]) #pyplot.imshow((x.numpy() * 255).astype("int32")[0])
        num-=1
        if not num:
            break

In [None]:
display_images()

In [None]:
def display_single(dataloader):
    for x in dataloader:
        plt.axis("off")
        print("Image Dimensions: ",x.numpy().shape)
        plt.imshow(((x.numpy()*0.5 + 0.5) * 255).astype("int32")[0])
        break

display_single(dataset)

In [None]:
import numpy as np
from sklearn.utils import shuffle
import time
import cv2
from tqdm.notebook import tqdm
from PIL import Image
from keras.layers import Input, Dense, Reshape, Flatten, Dropout
from keras.layers import BatchNormalization, Activation, ZeroPadding2D
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model
from keras.layers.core import Activation
from keras.layers.core import Flatten, Dropout
from keras.layers import Input, merge
from keras.layers.pooling import MaxPooling2D
from keras.layers.convolutional import Conv2D, Conv2DTranspose
import matplotlib.pyplot as plt
import keras.backend as K
from keras.initializers import RandomNormal

img_shape = (64, 64, 3)

## discriminative model


In [None]:
def get_disc_normal(image_shape=(64,64,3)):
    image_shape = image_shape

    dropout_prob = 0.4

    #kernel_init = RandomNormal(mean=0.0, stddev=0.01)
    kernel_init = 'glorot_uniform'

    dis_input = Input(shape = image_shape)

    discriminator = Conv2D(filters = 64, kernel_size = (4,4), strides = (2,2), padding = "same", data_format = "channels_last", kernel_initializer = kernel_init)(dis_input)
    discriminator = LeakyReLU(0.2)(discriminator)
    #discriminator = MaxPooling2D(pool_size=(2, 2))(discriminator)

    #discriminator = Dropout(dropout_prob)(discriminator)
    discriminator = Conv2D(filters = 128, kernel_size = (4,4), strides = (2,2), padding = "same", data_format = "channels_last", kernel_initializer = kernel_init)(discriminator)
    discriminator = BatchNormalization(momentum = 0.5)(discriminator)
    discriminator = LeakyReLU(0.2)(discriminator)
    #discriminator = MaxPooling2D(pool_size=(2, 2))(discriminator)

    #discriminator = Dropout(dropout_prob)(discriminator)
    discriminator = Conv2D(filters = 256, kernel_size = (4,4), strides = (2,2), padding = "same", data_format = "channels_last", kernel_initializer = kernel_init)(discriminator)
    discriminator = BatchNormalization(momentum = 0.5)(discriminator)
    discriminator = LeakyReLU(0.2)(discriminator)
    #discriminator = MaxPooling2D(pool_size=(2, 2))(discriminator)

    #discriminator = Dropout(dropout_prob)(discriminator)
    discriminator = Conv2D(filters = 512, kernel_size = (4,4), strides = (2,2), padding = "same", data_format = "channels_last", kernel_initializer = kernel_init)(discriminator)
    discriminator = BatchNormalization(momentum = 0.5)(discriminator)
    discriminator = LeakyReLU(0.2)(discriminator)
    #discriminator = MaxPooling2D(pool_size=(2, 2))(discriminator)

    discriminator = Flatten()(discriminator)

    #discriminator = MinibatchDiscrimination(100,5)(discriminator)
    discriminator = Dense(1)(discriminator)
    discriminator = Activation('sigmoid')(discriminator)

    discriminator_model = Model(dis_input, discriminator)
    discriminator_model.summary()
    return discriminator_model

In [None]:
discriminator = get_disc_normal()

## Generative model

In [None]:
latent_dim = 128

def get_gen_normal(noise_shape = (1,1,128)):
    noise_shape = noise_shape
    """
    Changing padding = 'same' in the first layer makes a lot fo difference!!!!
    """
    #kernel_init = RandomNormal(mean=0.0, stddev=0.01)
    kernel_init = 'glorot_uniform'

    gen_input = Input(shape = noise_shape) #if want to directly use with conv layer next
    #gen_input = Input(shape = [noise_shape]) #if want to use with dense layer next

    generator = Conv2DTranspose(filters = 512, kernel_size = (4,4), strides = (1,1), padding = "valid", data_format = "channels_last", kernel_initializer = kernel_init)(gen_input)
    generator = BatchNormalization(momentum = 0.5)(generator)
    generator = LeakyReLU(0.2)(generator)

    generator = Conv2DTranspose(filters = 256, kernel_size = (4,4), strides = (2,2), padding = "same", data_format = "channels_last", kernel_initializer = kernel_init)(generator)
    generator = BatchNormalization(momentum = 0.5)(generator)
    generator = LeakyReLU(0.2)(generator)

    generator = Conv2DTranspose(filters = 128, kernel_size = (4,4), strides = (2,2), padding = "same", data_format = "channels_last", kernel_initializer = kernel_init)(generator)
    generator = BatchNormalization(momentum = 0.5)(generator)
    generator = LeakyReLU(0.2)(generator)

    generator = Conv2DTranspose(filters = 64, kernel_size = (4,4), strides = (2,2), padding = "same", data_format = "channels_last", kernel_initializer = kernel_init)(generator)
    generator = BatchNormalization(momentum = 0.5)(generator)
    generator = LeakyReLU(0.2)(generator)

    generator = Conv2D(filters = 64, kernel_size = (3,3), strides = (1,1), padding = "same", data_format = "channels_last", kernel_initializer = kernel_init)(generator)
    generator = BatchNormalization(momentum = 0.5)(generator)
    generator = LeakyReLU(0.2)(generator)

    generator = Conv2DTranspose(filters = 3, kernel_size = (4,4), strides = (2,2), padding = "same", data_format = "channels_last", kernel_initializer = kernel_init)(generator)
    generator = Activation('tanh')(generator)

    generator_model = Model(gen_input, generator)
    generator_model.summary()

    return generator_model

In [None]:
generator = get_gen_normal()

In [None]:
tf.keras.utils.plot_model(generator)

## Model Training

In [None]:
class GAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(GAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(GAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn
        self.d_loss_metric = keras.metrics.Mean(name="d_loss")
        self.g_loss_metric = keras.metrics.Mean(name="g_loss")

    @property
    def metrics(self):
        return [self.d_loss_metric, self.g_loss_metric]

    def train_step(self, real_images):
        # Sample random points in the latent space
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, 1, 1, self.latent_dim))

        # Decode them to fake images
        generated_images = self.generator(random_latent_vectors)

        # Combine them with real images
        combined_images = tf.concat([generated_images, real_images], axis=0)

        # # Assemble labels discriminating real from fake images
        # labels = tf.concat(
        #     [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        # )

        # Assemble labels discriminating real from fake images
        labels = tf.concat(
            [tf.zeros((batch_size, 1)), tf.ones((batch_size, 1))], axis=0
        )

        # Add random noise to the labels - important trick!
        # labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # Train the discriminator
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )

        # Sample random points in the latent space
        random_latent_vectors = tf.random.normal(shape=(batch_size, 1, 1, self.latent_dim))

        # # Assemble labels that say "all real images"
        # misleading_labels = tf.zeros((batch_size, 1))
        # Assemble labels that say "all real images"
        misleading_labels = tf.ones((batch_size, 1))

        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as tape:
            predictions = self.discriminator(self.generator(random_latent_vectors))
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))

        # Update metrics
        self.d_loss_metric.update_state(d_loss)
        self.g_loss_metric.update_state(g_loss)
        return {
            "d_loss": self.d_loss_metric.result(),
            "g_loss": self.g_loss_metric.result(),
        }

The function save_img_batch will save the batch of images generated by GANS.

In [None]:
import matplotlib.gridspec as gridspec

def save_img_batch(img_batch,img_save_dir):
    img_batch.numpy()
    plt.figure(figsize=(4,4))
    gs1 = gridspec.GridSpec(4, 4)
    gs1.update(wspace=0, hspace=0)
    rand_indices = np.random.choice(img_batch.shape[0],16,replace=False)
    #print(rand_indices)
    for i in range(16):
        #plt.subplot(4, 4, i+1)
        ax1 = plt.subplot(gs1[i])
        ax1.set_aspect('equal')
        rand_index = rand_indices[i]
        image = img_batch[rand_index, :,:,:]
        fig = plt.imshow(image)
        plt.axis('off')
        fig.axes.get_xaxis().set_visible(False)
        fig.axes.get_yaxis().set_visible(False)
    plt.tight_layout()
    plt.savefig(img_save_dir,bbox_inches='tight',pad_inches=0)
    plt.show()

- GANS are very difficult to train. So we will be monitoring the output of the GANS generator after every epoch and save the batch of generated images

In [None]:
class GANMonitor(keras.callbacks.Callback):
    def __init__(self, num_img=16, latent_dim=128, file_writer=None):
        self.num_img = num_img
        self.latent_dim = latent_dim
        self.file_writer = file_writer
        self.random_latent_vectors = tf.random.normal(shape=(self.num_img, 1, 1, self.latent_dim))
        # Directory to save generated outputs after each epoch
        self.path = './generate_per_epoch'
        if not os.path.exists(self.path):
            os.mkdir(self.path)


    def on_epoch_end(self, epoch, logs=None):
        # random_latent_vectors = tf.random.normal(shape=(self.num_img, 1, 1, self.latent_dim))
        generated_images = self.model.generator(self.random_latent_vectors)
        generated_images = (generated_images*0.5 + 0.5)
        # generated_images.numpy()
        save_img_batch(generated_images, self.path + '/' + 'generated_img_%03d.png' % (epoch) )

        # Convert to image and log
        with self.file_writer.as_default():
            tf.summary.image("Epoch end generated data", generated_images, step=epoch)

        # img = keras.preprocessing.image.array_to_img(generated_images)
        # img.save("generated_img_%03d.png" % (epoch))
        # for i in range(self.num_img):
        #     img = keras.preprocessing.image.array_to_img(generated_images[i])
        #     img.save("generated_img_%03d_%d.png" % (epoch, i))

- You need to save the logs in the tensorboard so that it can be viewed and loaded any time

In [None]:
training_log_dir = './logs/training_logs'
epoch_end_logdir = './logs/epoch_end_logs'

In [None]:
epochs = 60  #  try ~100 epochs
latent_dim = 128
# training_log_dir = "./drive/MyDrive/Datasets - DSML/IntroToGANs/logs/training_logs"

# Sets up a timestamped log directory.
# epoch_end_logdir = "./drive/MyDrive/Datasets - DSML/IntroToGANs/logs/epoch_end_logs"
# Creates a file writer for the log directory.
file_writer = tf.summary.create_file_writer(epoch_end_logdir)

gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
    d_optimizer=keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5),
    g_optimizer=keras.optimizers.Adam(learning_rate=0.00015, beta_1=0.5),
    loss_fn=keras.losses.BinaryCrossentropy(),
)

tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=training_log_dir, histogram_freq=1)

- Start training the GANS and observe the generated output after every epoch

In [None]:
%%time
history = gan.fit(
                    dataset, epochs=epochs,
                    callbacks=[GANMonitor(num_img=16, latent_dim=latent_dim, file_writer=file_writer),
                             tensorboard_callback])

**How the loss changes over time?**
- Visualizing losses is quite useful for debugging the training process.
 > For GANs, we expect: the generator's loss to reduce over time, without the discriminator's loss getting too high.

In [None]:
from tensorflow.keras.callbacks import TensorBoard
training_log_dir = './logs/training_logs/train'

In [None]:
%reload_ext tensorboard
%tensorboard --logdir={training_log_dir}

### Why did discriminator loss increase?

- Discriminator consist of two loss parts (1st: detect real image as real; 2nd detect fake image as fake). 'Full discriminator loss' is sum of these two parts.

- The loss should be as small as possible for both the generator and the discriminator. But there is a catch: the smaller the discriminator loss becomes, the more the generator loss increases and vice versa.

- The images are getting more realistic (which is all we really care about) so the generator's loss is improving, while the discriminator is doing the same quality job, but getting tougher data.

In [None]:
epoch_end_logdir = './logs/epoch_end_logs'

%reload_ext tensorboard
%tensorboard --logdir={epoch_end_logdir}

In [None]:

pretrained_weights = "/content/pretrained_weights"
if not os.path.exists(pretrained_weights):
  os.makedirs(pretrained_weights)


In [None]:
# Load Model Weights if required
generator.save(os.path.join(pretrained_weights,'generator.h5'))
discriminator.save(os.path.join(pretrained_weights,'discriminator.h5'))

#generator     = tf.keras.models.load_model("/content/drive/MyDrive/DSML_Course_Curriculum/Intro_to_GANs/pretrained_weights/generator.h5")
#discriminator = tf.keras.models.load_model("/content/drive/MyDrive/DSML_Course_Curriculum/Intro_to_GANs/pretrained_weights/discriminator.h5")

### Full Training as a GIF:
Here's how the generated images look, after every epoch of training.



In [None]:
import imageio
# loading all generated images
images = []
gen_img_path =  '/content/generate_per_epoch/'
filenames = [os.path.join(gen_img_path, f) for f in os.listdir(gen_img_path) if 'generated' in f]
filenames.sort()

# Saving as gif file
for filename in filenames:
    images.append(imageio.imread(filename))
imageio.mimsave('/content/movie.gif', images)

In [None]:
from IPython.display import Image
# Display gif
Image(open('/content/movie.gif','rb').read())


## Finetuning

In [None]:
# Hyperparameters
batch_size = 512
epochs = 60
latent_dim = 100
image_dimensions = (64, 64)

# dataloader
dataset_path = "./animefacedataset"
dataset = keras.preprocessing.image_dataset_from_directory(
    dataset_path, label_mode=None, image_size=image_dimensions, batch_size=batch_size
)

# Model initialization
generator = get_gen_normal(noise_shape = (1,1,latent_dim))
discriminator = get_disc_normal()

# path to log folder
training_log_dir = './mode_collapse_logs/training_logs'
epoch_end_logdir = './mode_collapse_logs/epoch_end_logs'

# Sets up a timestamped log directory.
# Creates a file writer for the log directory.
file_writer = tf.summary.create_file_writer(epoch_end_logdir)

# callbacks
gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
    d_optimizer=keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5),
    g_optimizer=keras.optimizers.Adam(learning_rate=0.00015, beta_1=0.5),
    loss_fn=keras.losses.BinaryCrossentropy(),
)

tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=training_log_dir, histogram_freq=1)

In [None]:
history = gan.fit(
                  dataset, epochs=epochs,
                  callbacks=[GANMonitor(num_img=16, latent_dim=latent_dim, file_writer=file_writer),
                             tensorboard_callback])