# GANs with Python

## Setup

### 1. Go to this link: bit.ly/45svhk0
### 2. Log in to your Google Drive account
### 3. Make sure your files are in the right place

In [None]:
import os
import tensorflow as tf
import numpy as np

import cv2
import matplotlib.pyplot as plt
from IPython import display
import pylab as pl

%matplotlib inline

do_preprocess = True
from_checkpoint = False

- `do_preprocess` - Determines if we run or skip the code for resizing our image files. This will need to be done at least once.
- `from_checkpoint` - Determines if the GAN starts from scratch, or picks up where it left off by loading a model file. For the first run this will have to be set to False.

## Check the GPU

In [None]:
print("Num GPUs Available: ", len(tf.config.list_physical_devices("GPU")))

Google knows to use the T4 Runtime when I run GPU code.

## Prepare the Folder Names

- One folder for the resized images that we will feed into the GAN
- Another folder for the model files that as as snapshots of how far the GAN has come along
- And another for the computer-generated images

The `try-except` is there so that this code can work on Colab or on a desktop.

In [None]:
GANS_WORKSHOP_FOLDER = "gans-workshop-files"
data_folder_name = "pokemon"

try:
    from google.colab import drive

    drive.mount("/content/drive")
    project_dir = "./drive/MyDrive/"
except ModuleNotFoundError:
    project_dir = os.getcwd()

workshop_dir = os.path.join(project_dir, GANS_WORKSHOP_FOLDER)
data_dir = os.path.join(workshop_dir, data_folder_name)
data_resized_dir = os.path.join(workshop_dir, f"{data_folder_name}-resized")
models_folder = os.path.join(workshop_dir, f"{data_folder_name}-models")
image_folder = os.path.join(workshop_dir, f"{data_folder_name}-gans-images")

## Preprocess the Files

This code is designed to work with 128x128 images, so we're going to resize the images and place them in a new folder.

In [None]:
if do_preprocess:
    # Make a folder for the resized images if one doesn't already exist
    if not os.path.isdir(data_resized_dir):
        os.mkdir(data_resized_dir)

    # Go through each of our input images, resize them, and then save them to the new folder
    for image_filename in os.listdir(data_dir):
        try:
            image = cv2.imread(os.path.join(data_dir, image_filename))
            image = cv2.resize(image, (128, 128))
            cv2.imwrite(os.path.join(data_resized_dir, image_filename), image)
        except Exception as e:
            print(str(e))

## Setup Helper Functions

These are some simple functions for getting image data. This helps with keeping the code modular.

In [None]:
# This part was taken from Udacity Face generator project
def get_image(image_path, mode):
    """Creates a numpy array from an image path.

    Args:
        image_path (str): The path of the image to lad.
        mode (_type_): The mode argument for the convert method.

    Returns:
        _type_: _description_
    """
    image = Image.open(image_path)

    return np.array(image.convert(mode))


def get_batch(image_files, mode):
    """Gets a batch of images

    Args:
        image_files (_type_): _description_
        mode (_type_): _description_

    Returns:
        _type_: _description_
    """
    data_batch = np.array(
        [get_image(sample_file, mode) for sample_file in image_files]
    ).astype(np.float32)

    # Make sure the images are in 4 dimensions
    if len(data_batch.shape) < 4:
        data_batch = data_batch.reshape(data_batch.shape + (1,))

    return data_batch

This will create a function that will plot several images in a grid. We'll use this to monitor the progress of the GAN.

In [None]:
import math
from PIL import Image


def images_square_grid(images, mode):
    """
    Save images as a square grid
    :param images: Images to be used for the grid
    :param mode: The mode to use for images
    :return: Image of images in a square grid
    """
    # Get maximum size for square grid of images
    save_size = math.floor(np.sqrt(images.shape[0]))

    # Scale to 0-255
    images = (((images - images.min()) * 255) / (images.max() - images.min())).astype(
        np.uint8
    )

    # Put images in a square arrangement
    images_in_square = np.reshape(
        images[: save_size * save_size],
        (save_size, save_size, images.shape[1], images.shape[2], images.shape[3]),
    )
    if mode == "L":
        images_in_square = np.squeeze(images_in_square, 4)

    # Combine images to grid image
    new_im = Image.new(mode, (images.shape[1] * save_size, images.shape[2] * save_size))
    for col_i, col_images in enumerate(images_in_square):
        for image_i, image in enumerate(col_images):
            im = Image.fromarray(image, mode)
            new_im.paste(im, (col_i * images.shape[1], image_i * images.shape[2]))

    return new_im

This creates a `Dataset` object for Tensorflow to use.

In [None]:
class Dataset(object):
    """
    Dataset
    """

    def __init__(self, data_files):
        """
        Initalize the class
        :param dataset_name: Database name
        :param data_files: List of files in the database
        """
        IMAGE_WIDTH = 128
        IMAGE_HEIGHT = 128

        self.image_mode = "RGB"
        image_channels = 3

        self.data_files = data_files
        self.shape = len(data_files), IMAGE_WIDTH, IMAGE_HEIGHT, image_channels

    def get_batches(self, batch_size):
        """
        Generate batches
        :param batch_size: Batch Size
        :return: Batches of data
        """
        IMAGE_MAX_VALUE = 255

        current_index = 0
        while current_index + batch_size <= self.shape[0]:
            data_batch = get_batch(
                self.data_files[current_index : current_index + batch_size],
                self.image_mode,
            )

            current_index += batch_size

            yield data_batch / IMAGE_MAX_VALUE - 0.5

This will display 9 of our resized images by using the `images_square_grid` function that was defined earlier.

In [None]:
resized_data_filenames = [
    data_resized_dir + "/" + i for i in os.listdir(data_resized_dir)
]
show_n_images = 9
train_images = get_batch(resized_data_filenames[:show_n_images], "RGB")
plt.imshow(images_square_grid(train_images, "RGB"))

Use the Tensorflow `placeholder` type for defining the model inputs.

In [None]:
def model_inputs(real_dim, z_dim):
    """
    Create the model inputs
    :param real_dim: tuple containing width, height and channels
    :param z_dim: The dimension of Z
    :return: Tuple of (tensor of real input images, tensor of z data, learning rate G, learning rate D)
    """
    inputs_real = tf.compat.v1.placeholder(
        tf.float32, (None, *real_dim), name="inputs_real"
    )
    inputs_z = tf.compat.v1.placeholder(tf.float32, (None, z_dim), name="input_z")
    learning_rate_G = tf.compat.v1.placeholder(tf.float32, name="learning_rate_G")
    learning_rate_D = tf.compat.v1.placeholder(tf.float32, name="learning_rate_D")

    return inputs_real, inputs_z, learning_rate_G, learning_rate_D

## Creating a Generator

In [None]:
def generator(z, output_channel_dim, is_train=True):
    """Build the generator network.

    Arguments
    ---------
    z : Input tensor for the generator
    output_channel_dim : Shape of the generator output
    n_units : Number of units in hidden layer
    reuse : Reuse the variables with tf.variable_scope
    alpha : leak parameter for leaky ReLU

    Returns
    -------
    out:
    """
    with tf.compat.v1.variable_scope("generator", reuse=not is_train):
        # First FC layer --> 8x8x1024
        fc1 = tf.compat.v1.layers.dense(z, 8 * 8 * 1024)

        # Reshape it
        fc1 = tf.reshape(fc1, (-1, 8, 8, 1024))

        # Leaky ReLU
        fc1 = tf.nn.leaky_relu(fc1, alpha=alpha)

        # Transposed conv 1 --> BatchNorm --> LeakyReLU
        # 8x8x1024 --> 16x16x512
        trans_conv1 = tf.compat.v1.layers.conv2d_transpose(
            inputs=fc1,
            filters=512,
            kernel_size=[5, 5],
            strides=[2, 2],
            padding="SAME",
            kernel_initializer=tf.compat.v1.truncated_normal_initializer(stddev=0.02),
            name="trans_conv1",
        )

        batch_trans_conv1 = tf.compat.v1.layers.batch_normalization(
            inputs=trans_conv1,
            training=is_train,
            epsilon=1e-5,
            name="batch_trans_conv1",
        )

        trans_conv1_out = tf.nn.leaky_relu(
            batch_trans_conv1, alpha=alpha, name="trans_conv1_out"
        )

        # Transposed conv 2 --> BatchNorm --> LeakyReLU
        # 16x16x512 --> 32x32x256
        trans_conv2 = tf.compat.v1.layers.conv2d_transpose(
            inputs=trans_conv1_out,
            filters=256,
            kernel_size=[5, 5],
            strides=[2, 2],
            padding="SAME",
            kernel_initializer=tf.compat.v1.truncated_normal_initializer(stddev=0.02),
            name="trans_conv2",
        )

        batch_trans_conv2 = tf.compat.v1.layers.batch_normalization(
            inputs=trans_conv2,
            training=is_train,
            epsilon=1e-5,
            name="batch_trans_conv2",
        )

        trans_conv2_out = tf.nn.leaky_relu(
            batch_trans_conv2, alpha=alpha, name="trans_conv2_out"
        )

        # Transposed conv 3 --> BatchNorm --> LeakyReLU
        # 32x32x256 --> 64x64x128
        trans_conv3 = tf.compat.v1.layers.conv2d_transpose(
            inputs=trans_conv2_out,
            filters=128,
            kernel_size=[5, 5],
            strides=[2, 2],
            padding="SAME",
            kernel_initializer=tf.compat.v1.truncated_normal_initializer(stddev=0.02),
            name="trans_conv3",
        )

        batch_trans_conv3 = tf.compat.v1.layers.batch_normalization(
            inputs=trans_conv3,
            training=is_train,
            epsilon=1e-5,
            name="batch_trans_conv3",
        )

        trans_conv3_out = tf.nn.leaky_relu(
            batch_trans_conv3, alpha=alpha, name="trans_conv3_out"
        )

        # Transposed conv 4 --> BatchNorm --> LeakyReLU
        # 64x64x128 --> 128x128x64
        trans_conv4 = tf.compat.v1.layers.conv2d_transpose(
            inputs=trans_conv3_out,
            filters=64,
            kernel_size=[5, 5],
            strides=[2, 2],
            padding="SAME",
            kernel_initializer=tf.compat.v1.truncated_normal_initializer(stddev=0.02),
            name="trans_conv4",
        )

        batch_trans_conv4 = tf.compat.v1.layers.batch_normalization(
            inputs=trans_conv4,
            training=is_train,
            epsilon=1e-5,
            name="batch_trans_conv4",
        )

        trans_conv4_out = tf.nn.leaky_relu(
            batch_trans_conv4, alpha=alpha, name="trans_conv4_out"
        )

        # Transposed conv 5 --> tanh
        # 128x128x64 --> 128x128x3
        logits = tf.compat.v1.layers.conv2d_transpose(
            inputs=trans_conv4_out,
            filters=3,
            kernel_size=[5, 5],
            strides=[1, 1],
            padding="SAME",
            kernel_initializer=tf.compat.v1.truncated_normal_initializer(stddev=0.02),
            name="logits",
        )

        out = tf.tanh(logits, name="out")

        return out

## Creating a Discriminator

In [None]:
def discriminator(x, is_reuse=False, alpha=0.2):
    """Build the discriminator network.

    Arguments
    ---------
    x : Input tensor for the discriminator
    n_units: Number of units in hidden layer
    reuse : Reuse the variables with tf.variable_scope
    alpha : leak parameter for leaky ReLU

    Returns
    -------
    out, logits:
    """
    with tf.compat.v1.variable_scope("discriminator", reuse=is_reuse):
        # Input layer 128*128*3 --> 64x64x64
        # Conv --> BatchNorm --> LeakyReLU
        conv1 = tf.compat.v1.layers.conv2d(
            inputs=x,
            filters=64,
            kernel_size=[5, 5],
            strides=[2, 2],
            padding="SAME",
            kernel_initializer=tf.compat.v1.truncated_normal_initializer(stddev=0.02),
            name="conv1",
        )

        batch_norm1 = tf.compat.v1.layers.batch_normalization(
            conv1, training=True, epsilon=1e-5, name="batch_norm1"
        )

        conv1_out = tf.nn.leaky_relu(batch_norm1, alpha=alpha, name="conv1_out")

        # 64x64x64--> 32x32x128
        # Conv --> BatchNorm --> LeakyReLU
        conv2 = tf.compat.v1.layers.conv2d(
            inputs=conv1_out,
            filters=128,
            kernel_size=[5, 5],
            strides=[2, 2],
            padding="SAME",
            kernel_initializer=tf.compat.v1.truncated_normal_initializer(stddev=0.02),
            name="conv2",
        )

        batch_norm2 = tf.compat.v1.layers.batch_normalization(
            conv2, training=True, epsilon=1e-5, name="batch_norm2"
        )

        conv2_out = tf.nn.leaky_relu(batch_norm2, alpha=alpha, name="conv2_out")

        # 32x32x128 --> 16x16x256
        # Conv --> BatchNorm --> LeakyReLU
        conv3 = tf.compat.v1.layers.conv2d(
            inputs=conv2_out,
            filters=256,
            kernel_size=[5, 5],
            strides=[2, 2],
            padding="SAME",
            kernel_initializer=tf.compat.v1.truncated_normal_initializer(stddev=0.02),
            name="conv3",
        )

        batch_norm3 = tf.compat.v1.layers.batch_normalization(
            conv3, training=True, epsilon=1e-5, name="batch_norm3"
        )

        conv3_out = tf.nn.leaky_relu(batch_norm3, alpha=alpha, name="conv3_out")

        # 16x16x256 --> 16x16x512
        # Conv --> BatchNorm --> LeakyReLU
        conv4 = tf.compat.v1.layers.conv2d(
            inputs=conv3_out,
            filters=512,
            kernel_size=[5, 5],
            strides=[1, 1],
            padding="SAME",
            kernel_initializer=tf.compat.v1.truncated_normal_initializer(stddev=0.02),
            name="conv4",
        )

        batch_norm4 = tf.compat.v1.layers.batch_normalization(
            conv4, training=True, epsilon=1e-5, name="batch_norm4"
        )

        conv4_out = tf.nn.leaky_relu(batch_norm4, alpha=alpha, name="conv4_out")

        # 16x16x512 --> 8x8x1024
        # Conv --> BatchNorm --> LeakyReLU
        conv5 = tf.compat.v1.layers.conv2d(
            inputs=conv4_out,
            filters=1024,
            kernel_size=[5, 5],
            strides=[2, 2],
            padding="SAME",
            kernel_initializer=tf.compat.v1.truncated_normal_initializer(stddev=0.02),
            name="conv5",
        )

        batch_norm5 = tf.compat.v1.layers.batch_normalization(
            conv5, training=True, epsilon=1e-5, name="batch_norm5"
        )

        conv5_out = tf.nn.leaky_relu(batch_norm5, alpha=alpha, name="conv5_out")

        # Flatten it
        flatten = tf.reshape(conv5_out, (-1, 8 * 8 * 1024))

        # Logits
        logits = tf.compat.v1.layers.dense(inputs=flatten, units=1, activation=None)

        out = tf.sigmoid(logits)

        return out, logits

## Compute the Loss

The loss tells us how well the GAN is doing.

In [None]:
def model_loss(input_real, input_z, output_channel_dim, alpha):
    """
    Get the loss for the discriminator and generator
    :param input_real: Images from the real dataset
    :param input_z: Z input
    :param out_channel_dim: The number of channels in the output image
    :return: A tuple of (discriminator loss, generator loss)
    """
    # Generator network here
    g_model = generator(input_z, output_channel_dim)
    # g_model is the generator output

    # Discriminator network here
    d_model_real, d_logits_real = discriminator(input_real, alpha=alpha)
    d_model_fake, d_logits_fake = discriminator(g_model, is_reuse=True, alpha=alpha)

    # Calculate losses
    d_loss_real = tf.reduce_mean(
        tf.nn.sigmoid_cross_entropy_with_logits(
            logits=d_logits_real, labels=tf.ones_like(d_model_real)
        )
    )
    d_loss_fake = tf.reduce_mean(
        tf.nn.sigmoid_cross_entropy_with_logits(
            logits=d_logits_fake, labels=tf.zeros_like(d_model_fake)
        )
    )
    d_loss = d_loss_real + d_loss_fake

    g_loss = tf.reduce_mean(
        tf.nn.sigmoid_cross_entropy_with_logits(
            logits=d_logits_fake, labels=tf.ones_like(d_model_fake)
        )
    )

    return d_loss, g_loss

In [None]:
def model_optimizers(d_loss, g_loss, lr_D, lr_G, beta1):
    """
    Get optimization operations
    :param d_loss: Discriminator loss Tensor
    :param g_loss: Generator loss Tensor
    :param learning_rate: Learning Rate Placeholder
    :param beta1: The exponential decay rate for the 1st moment in the optimizer
    :return: A tuple of (discriminator training operation, generator training operation)
    """
    # Get the trainable_variables, split into G and D parts
    t_vars = tf.compat.v1.trainable_variables()
    g_vars = [var for var in t_vars if var.name.startswith("generator")]
    d_vars = [var for var in t_vars if var.name.startswith("discriminator")]

    update_ops = tf.compat.v1.get_collection(tf.compat.v1.GraphKeys.UPDATE_OPS)

    # Generator update
    gen_updates = [op for op in update_ops if op.name.startswith("generator")]

    # Optimizers
    with tf.control_dependencies(gen_updates):
        d_train_opt = tf.compat.v1.train.AdamOptimizer(
            learning_rate=lr_D, beta1=beta1
        ).minimize(d_loss, var_list=d_vars)
        g_train_opt = tf.compat.v1.train.AdamOptimizer(
            learning_rate=lr_G, beta1=beta1
        ).minimize(g_loss, var_list=g_vars)

    return d_train_opt, g_train_opt

This will show an image of our fake data while the GAN is running.

In [None]:
def show_generator_output(
    sess, n_images, input_z, out_channel_dim, image_mode, image_path, save, show
):
    """
    Show example output for the generator
    :param sess: TensorFlow session
    :param n_images: Number of Images to display
    :param input_z: Input Z Tensor
    :param out_channel_dim: The number of channels in the output image
    :param image_mode: The mode to use for images ("RGB" or "L")
    :param image_path: Path to save the image
    """
    cmap = None if image_mode == "RGB" else "gray"
    z_dim = input_z.get_shape().as_list()[-1]
    example_z = np.random.uniform(-1, 1, size=[n_images, z_dim])

    samples = sess.run(
        generator(input_z, out_channel_dim, False), feed_dict={input_z: example_z}
    )

    images_grid = images_square_grid(samples, image_mode)

    if save:
        # Save image
        images_grid.save(image_path, "JPEG")

    if show:
        display.clear_output(wait=True)
        plt.imshow(images_grid, cmap=cmap)
        display.display(plt.gcf())
        # plt.show()

In [None]:
config = tf.compat.v1.ConfigProto()
config.gpu_options.allow_growth = True

## Define the Training Function

In [None]:
def train(
    epoch_count,
    batch_size,
    z_dim,
    learning_rate_D,
    learning_rate_G,
    beta1,
    get_batches,
    data_shape,
    data_image_mode,
    alpha,
):
    """
    Train the GAN
    :param epoch_count: Number of epochs
    :param batch_size: Batch Size
    :param z_dim: Z dimension
    :param learning_rate: Learning Rate
    :param beta1: The exponential decay rate for the 1st moment in the optimizer
    :param get_batches: Function to get batches
    :param data_shape: Shape of the data
    :param data_image_mode: The image mode to use for images ("RGB" or "L")
    """
    # Create our input placeholders
    input_images, input_z, lr_G, lr_D = model_inputs(data_shape[1:], z_dim)

    # Losses
    d_loss, g_loss = model_loss(input_images, input_z, data_shape[3], alpha)

    # Optimizers
    d_opt, g_opt = model_optimizers(d_loss, g_loss, lr_D, lr_G, beta1)

    g_losses = []
    d_losses = []

    with tf.compat.v1.Session(config=config) as sess:
        sess.run(tf.compat.v1.global_variables_initializer())

        # Saver
        saver = tf.compat.v1.train.Saver()

        num_epoch = 0

        if not os.path.isdir(image_folder):
            os.mkdir(image_folder)

        if not os.path.isdir(models_folder):
            os.mkdir(models_folder)
        model_save_path = os.path.join(models_folder, "model.cpkt")

        if from_checkpoint:
            saver.restore(sess, model_save_path)
            image_path = "new_train/new_gen_image.jpg"
            show_generator_output(
                sess,
                1,
                input_z,
                data_shape[3],
                data_image_mode,
                image_path,
                False,
                True,
            )

        for epoch_i in range(epoch_count):
            num_epoch += 1
            if num_epoch % 5 == 0:
                saver.save(sess, model_save_path)
                print("Model saved")

            # saves model every 50 epochs
            if epoch_i > 50 and epoch_i % 50 == 0:
                saver.save(
                    sess, model_save_path, global_step=epoch_i, write_meta_graph=False
                )
            for batch_images in get_batches(batch_size):
                # Random noise
                batch_z = np.random.uniform(-1, 1, size=(batch_size, z_dim))
                # Run optimizers
                _ = sess.run(
                    d_opt,
                    feed_dict={
                        input_images: batch_images,
                        input_z: batch_z,
                        lr_D: learning_rate_D,
                    },
                )
                _ = sess.run(
                    g_opt,
                    feed_dict={
                        input_images: batch_images,
                        input_z: batch_z,
                        lr_G: learning_rate_G,
                    },
                )

            # will calculate losses and generate an image for each epoch

            train_loss_d = d_loss.eval({input_z: batch_z, input_images: batch_images})
            train_loss_g = g_loss.eval({input_z: batch_z})
            g_losses.append(train_loss_g)
            d_losses.append(train_loss_d)
            # Save it
            image_name = str(epoch_i) + ".jpg"
            image_path = os.path.join(image_folder, image_name)

            plt.title(f"Epoch {epoch_i + 1}")
            show_generator_output(
                sess, 9, input_z, data_shape[3], data_image_mode, image_path, True, True
            )
            print(
                "Epoch {}/{} |".format(epoch_i + 1, epoch_count),
                "Discriminator Loss: {:.4f} |".format(train_loss_d),
                "Generator Loss: {:.4f}".format(train_loss_g),
            )

    return d_losses, g_losses

## Set the Parameters

In [None]:
# Size input image for discriminator
real_size = (128, 128, 3)

# Size of latent vector to generator
z_dim = 100
learning_rate_D = 0.000005  # Thanks to Alexia Jolicoeur Martineau https://ajolicoeur.wordpress.com/cats/
learning_rate_G = 0.00002  # Thanks to Alexia Jolicoeur Martineau https://ajolicoeur.wordpress.com/cats/
batch_size = 32
epochs = 2000
alpha = 0.2
beta1 = 0.5

In [None]:
# Load the data and train the network here
dataset = Dataset(resized_data_filenames)

In [None]:
dataset.shape

## Train the Model

In [None]:
with tf.Graph().as_default():
    d_losses, g_losses = train(
        epochs,
        batch_size,
        z_dim,
        learning_rate_D,
        learning_rate_G,
        beta1,
        dataset.get_batches,
        dataset.shape,
        dataset.image_mode,
        alpha,
    )

In [None]:
fig, ax = plt.subplots()
d_losses = np.array(d_losses)
g_losses = np.array(g_losses)
plt.plot(d_losses, label="Discriminator", alpha=0.5)
plt.plot(g_losses, label="Generator", alpha=0.5)
plt.title("Training Losses")
plt.legend()

## Improving the Generated Images

- Give it a larger dataset (~10K images)
- Run for a larger number of epochs