To generate GIF

In [1]:
!pip install -q git+https://github.com/tensorflow/docs 
!pip install imageio

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.python.keras import Input, Sequential
from tensorflow.python.keras.engine.functional import Functional
from tensorflow.python.keras.engine.keras_tensor import KerasTensor
from tensorflow.python.keras.models import Model

gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)

from tensorflow.keras.layers import (Dense, Lambda, Conv2D, Conv2DTranspose, LeakyReLU, 
                                    BatchNormalization,ZeroPadding2D, Activation ,Flatten, Reshape, Cropping2D, Dropout)
from tensorflow.python.keras.utils.np_utils import to_categorical
from tensorflow.keras import backend as K

import glob
import imageio
import os
import PIL
import time

from IPython import display
from tensorflow.keras.optimizers import Adam, RMSprop

import tensorflow_docs.vis.embed as embed

## Import the MNIST data class and call it.

In [3]:
import MNIST_dataset as mnist
data = mnist.MNISTData(gan=True)

## Deep Convolutional Generative Adversarial Network with TensorFlow

The aim of this exercise is to implement a DCGAN architecture and to test it on the MNIST dataset.

Code partially adapted from [TensorFlow Documentation](https://www.tensorflow.org/tutorials/generative/dcgan).

In [4]:
class DCGAN():
    """Deep Convolutional Generative Adversarial Network"""
    
    batch_size = 256

    def __init__(self, data: mnist.MNISTData):

        self.x_train = data.x_train
        self.buffer_size = self.x_train.shape[0]

        #Preparing dataset
        self.train_dataset = tf.data.Dataset.from_tensor_slices(self.x_train)
        self.train_dataset = self.train_dataset.shuffle(self.buffer_size).batch(self.batch_size)
        
        self.latent_dim = 100

        self.generator = None
        self.g_optimizer = None        
        self.discriminator = None
        self.d_optimizer = None

        # Instantiate a loss function, i.e. function to compute the cross entropy loss
        self.loss_fn = tf.keras.losses.BinaryCrossentropy(from_logits=True)
        
        self.loss_record = np.array([None, None])
        
    def make_generator_model(self, optimizer = Adam(learning_rate=1e-4)):
        """Building the generator."""

        self.g_optimizer = optimizer
        self.generator = Sequential(
            [
                Input(shape=(self.latent_dim,)),
                # We want to generate 100 coefficients to reshape into a 7x7x256 map
                Dense(7 * 7 * 256, use_bias=False),
                BatchNormalization(momentum=0.8),
                LeakyReLU(alpha=0.2),
                Reshape((7, 7, 256)),

                Conv2DTranspose(128, kernel_size=5, padding='same', use_bias=False),
                BatchNormalization(momentum=0.8),
                LeakyReLU(alpha=0.2),
        
                Conv2DTranspose(64, kernel_size=5, strides=(2, 2), padding='same', use_bias=False),
                BatchNormalization(momentum=0.8),
                LeakyReLU(alpha=0.2),

                Conv2DTranspose(1, kernel_size=5, strides=(2, 2), padding='same', use_bias=False),
                Activation("tanh"),
            ],
            name="generator",
        )
        #assert self.generator.output_shape == (None, 28, 28, 1)

    def make_discriminator_model(self, optimizer = Adam(learning_rate=5e-5)):
        """Building the discriminator."""

        self.d_optimizer = optimizer
        self.discriminator = Sequential(
            [
                Input(shape=(28, 28, 1)),
                Conv2D(64, kernel_size=5, strides=(2, 2), padding='same'),
                LeakyReLU(alpha=0.2),
                Dropout(0.25),

                Conv2D(128, kernel_size=5, strides=(2, 2), padding='same'),
                LeakyReLU(alpha=0.2),
                Dropout(0.25),

                Flatten(),
                Dense(1),
                Activation('sigmoid'),
            ],
            name="discriminator"
        )
                   
    def discriminator_loss(self, real_output, fake_output):
        real_loss = self.loss_fn(tf.ones_like(real_output), real_output)
        fake_loss = self.loss_fn(tf.zeros_like(fake_output), fake_output)
        total_loss = real_loss + fake_loss
        return total_loss
    
    def generator_loss(self, fake_output):
        return self.loss_fn(tf.ones_like(fake_output), fake_output)

    def train_dcgan(self, epochs=100, save_interval = 10):
        """Training loop."""
        seed = tf.random.normal([16, self.latent_dim])
        start_loop = time.time()
        for epoch in range(epochs):
            start_epoch = time.time()

            for image_batch in self.train_dataset:
                gen_loss, disc_loss = self.train_step(tf.cast(image_batch,float))

            print('Time for epoch {} is {} sec'.format(epoch + 1, time.time() - start_epoch))
            print('Generation loss: {} Discriminator loss: {} '.format(gen_loss, disc_loss))
            self.loss_record = np.vstack([self.loss_record, np.array([gen_loss, disc_loss])])
        
            # If at save interval, than save and disply the generated image samples
            if epoch % save_interval == 0:
                self.generate_and_save_images(epoch + 1, seed)

        print("\nTotal time taken: %.2fs" % (time.time() - start_loop))
        print("\n\n\n")

    # The function to be "compiled".
    @tf.function
    def train_step(self, images):
        """GAN training step."""
        
        # Sample normally distributed points in the latent space
        random_latent_vec = tf.random.normal([self.batch_size, self.latent_dim])
    
        with tf.GradientTape() as g_tape, tf.GradientTape() as d_tape:
            gen_imgs = self.generator(random_latent_vec, training=True)

            real_output = self.discriminator(images, training=True)
            fake_output = self.discriminator(gen_imgs, training=True)

            g_loss = self.generator_loss(fake_output)
            d_loss = self.discriminator_loss(real_output, fake_output)

        gradients_of_generator = g_tape.gradient(g_loss, self.generator.trainable_variables)
        gradients_of_discriminator = d_tape.gradient(d_loss, self.discriminator.trainable_variables)

        self.g_optimizer.apply_gradients(zip(gradients_of_generator, self.generator.trainable_variables))
        self.d_optimizer.apply_gradients(zip(gradients_of_discriminator, self.discriminator.trainable_variables))
        
        return g_loss, d_loss
    
    def generate_and_save_images(self, epoch, test_input, save=True):

        pred = self.generator(test_input, training=False)

        fig = plt.figure(figsize=(5, 5))

        for i in range(pred.shape[0]):
            plt.subplot(5, 5, i+1)
            plt.imshow(pred[i, :, :, 0] * 127.5 + 127.5, cmap='gray')
            plt.axis('off')
        if save:
            plt.savefig('./images/GAN/training_checkpoints/image_at_epoch_{:04d}.png'.format(epoch))
        plt.show()

    def gif(self):
        """Creation of the gif with generated images over epochs."""

        with imageio.get_writer('./images/GAN/dcgan.gif', mode='I') as writer:
            filenames = glob.glob('./images/GAN/training_checkpoints/image*.png')
            filenames = sorted(filenames)
            for filename in filenames:
                image = imageio.imread(filename)
                writer.append_data(image)
            image = imageio.imread(filename)
            writer.append_data(image)
            embed.embed_file('./images/GAN/dcgan.gif')
    
    def plot_losses(self):
        """Plotting the Generator and Discriminator losses"""
        plt.plot(self.loss_record[:])
        plt.ylabel('Loss')
        plt.xlabel('Epoch')
        plt.legend(['Generator Loss', 'Discriminator Loss'], loc='best')
        plt.title('Generator and Discriminator losses')
        plt.savefig('./images/GAN/g_d_losses.png')

        plt.show()
    
    def dir(self):
        """Creation of the folders path to store the results."""
        dir = os.path.join("images")
        if not os.path.exists(dir):
            os.mkdir(dir)
        dir2 = os.path.join("./images/GAN")
        if not os.path.exists(dir2):
            os.mkdir(dir2)
        dir3 = os.path.join("./images/GAN/training_checkpoints")
        if not os.path.exists(dir3):
            os.mkdir(dir3)



In [5]:
gan = DCGAN(data)
gan.make_generator_model()
gan.make_discriminator_model()
gan.dir()
gan.train_dcgan(epochs=2, save_interval=1)
gan.plot_losses()
gan.gif()

KeyboardInterrupt: 