<a href="https://colab.research.google.com/github/alex-parisi/DCGAN-Face-Generator/blob/main/DCGAN_Face_Generator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Generating Faces with a Deep Convolutional Generative Adversarial Network (DCGAN)

A DCGAN uses two networks (discriminator and generator) working against one another in attempt to generate images that could pass as "authentic". A discriminator network is trained to determine whether or not an inputted image is a genuine image or an image generated by the generator network - which is attempting to generate images that will deceive the discriminator.


 

The discriminator has a relatively standard layout in image recognition, and consists of an input layer, three convolution layers, a dropout layer, and then a fully connected layer. The convolution layers use a leakly ReLu activation function, and the fully connected layer uses a sigmoid activation function.

<img src='https://drive.google.com/uc?id=1JOLpN18ANTYuiz6c5An_T5body47yFZw'>

The above image shows the layout of the generator in this DCGAN. A vector of random noise is upscaled through convolution layers until the appropriate image size is reached.

# Imports

In [2]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
import os
import gdown
from zipfile import ZipFile
import random
import glob
import imageio
import cv2
from google.colab import auth

# Authenticate and Initiate TPU's
In order to connect to Google Cloud Services (GCS) to load the dataset, you must authenticate your Google account. Run the snippet below and follow the link, then paste the access key into the input box and press Enter.

In [1]:
auth.authenticate_user()

Before running this, ensure the Google Colab notebook is set to use TPU's. Go to "Edit", then "Notebook settings", and set the Hardware Accelerator to "TPU".

<br>This will initiate the TPU's, which will offer this program almost an 800% increase in speed compared to running locally on a GTX 970.

In [None]:
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='grpc://' + os.environ['COLAB_TPU_ADDR'])

tf.config.experimental_connect_to_cluster(resolver)
tf.tpu.experimental.initialize_tpu_system(resolver)

strategy = tf.distribute.experimental.TPUStrategy(resolver)

# Prepare dataset

In order to use TPU's on Google Colab, you cannot use a local filesystem for data - you **must** use a GCS bucket to hold your images. Note that also when using Keras, the dataset object cannot be used to load images directly from a GCS bucket. Therefore, we must first convert the dataset to a .tfrecords format file and upload that to the GCS bucket, which Keras can then convert to a dataset object and use for training. This only needs to be performed once, as once the .tfrecords file is uploaded to the GCS bucket, we can refer to it as long as the training set doesn't change.

First, you must sign up for GCS [here](https://cloud.google.com/). There are free options available - personally I am using the 90 day free trial.

Then, access the "Storage" section [here](https://cloud.google.com/storage) and create a bucket. This is where you will upload your .tfrecords file.

Remember the name that you use for the bucket, as the link you will use throughout is "gs://< bucket_name>"

Download celeb-a dataset

In [None]:
os.makedirs("celeba_gan")
url = "https://drive.google.com/uc?id=1O7m1010EJjLE5QxLZiM9Fpjs7Oj6e684"
output = "celeba_gan/data.zip"
gdown.download(url, output, quiet=True)

Extract to local filesystem in runtime environment

In [None]:
with ZipFile("celeba_gan/data.zip", "r") as zipobj:
    zipobj.extractall("celeba_gan")

Assemble list of filenames in celeb-a dataset

In [None]:
in_pics = []
for path, subdirs, files in os.walk(os.path.join(os.getcwd(), 'celeba_gan')):
  for name in files:
    if name.startswith('.'):
      continue
    if '.png' in name or '.jpg' in name:
      in_pics.append(os.path.join(path, name))

Shuffle filenames

In [None]:
random.shuffle(in_pics)
print('enumerated pics: ', len(in_pics))

Write image data in each filename to a .tfrecords file

In [None]:
TFRecord_write_file = os.path.join(os.getcwd(), 'celeba_gan.tfrecords')
print('Writing TFRecord', TFRecord_write_file)
with tf.io.TFRecordWriter(TFRecord_write_file) as writer:
    for i in range(len(in_pics)):
        with tf.io.gfile.GFile(in_pics[i], 'rb') as fid:
            img = fid.read()
        example = tf.train.Example(
                        features=tf.train.Features(
                            feature={
                                'image': tf.train.Feature(bytes_list = tf.train.BytesList(value=[img])),
                            }))
        writer.write(example.SerializeToString())
print('Writing TFRecord done')

Upload .tfrecords file to the GCS bucket you created.
<br>Replace: ```gs://celeba-alexp/celeba_gan.tfrecords```
<br>With: ```gs://< bucket_name>/celeba_gan.tfrecords```


In [None]:
!gsutil cp /content/celeba_gan.tfrecords gs://celeba-alexp/celeba_gan.tfrecords

# Assemble dataset from GCS bucket

Define the extract function to parse the .tfrecords file and load the dataset within the TPU scope

Replace: ```gs://celeba-alexp/celeba_gan.tfrecords```
<br> With: ```gs://< bucket_name>/celeba_gan.tfrecords```

In [4]:
def TFRecord_extract_fn(data_record):
    features = {
        'image': tf.io.FixedLenFeature([], tf.string),
    }
    sample = tf.io.parse_single_example(data_record, features)
    sample = tf.image.decode_jpeg(sample['image'], channels=3)
    sample = tf.image.convert_image_dtype(sample, tf.float32)
    sample = tf.image.resize(sample, [64, 64])
    return sample

with strategy.scope():
  dataset = tf.data.TFRecordDataset('gs://celeba-alexp/celeba_gan.tfrecords')
  dataset = dataset.map(TFRecord_extract_fn)
  dataset = dataset.batch(32)

# Define Models

As stated above, the DCGAN is split into two competing neural networks: a discriminator and a generator. The discriminator attempts to determine whether or not an inputted image is authentic, i.e. a member of the original dataset, or is a fake generated by the generator. The generator attempts to create an image authentic enough to trick the discriminator into making an incorrect classification.

The discriminator has the shape:

*   (None, 64, 64, 3)
*   (None, 32, 32, 64)
*   (None, 16, 16, 128)
*   (None, 8, 8, 128)
*   (None, 8192)
*   (None, 1)

The generator has the shape:

*   (None, 128)
*   (None, 4, 4, 1024)
*   (None, 8, 8, 512)
*   (None, 16, 16, 256)
*   (None, 32, 32, 128)
*   (None, 64, 64, 3)

<br>Note that "None" is the batch size, which in this case is 32




In [12]:
with strategy.scope():
  discriminator = keras.Sequential(
      [
          keras.Input(shape=(64, 64, 3)),
          layers.Conv2D(64, kernel_size=4, strides=2, padding="same"),
          layers.LeakyReLU(alpha=0.2),
          layers.Conv2D(128, kernel_size=4, strides=2, padding="same"),
          layers.LeakyReLU(alpha=0.2),
          layers.Conv2D(128, kernel_size=4, strides=2, padding="same"),
          layers.LeakyReLU(alpha=0.2),
          layers.Flatten(),
          layers.Dropout(0.2),
          layers.Dense(1, activation="sigmoid"),
      ],
      name="discriminator",
  )

  latent_dim = 128

  generator = keras.Sequential(
      [
          keras.Input(shape=(latent_dim,)),
          layers.Dense(4 * 4 * 1024),
          layers.Reshape((4, 4, 1024)),
          layers.Conv2DTranspose(512, kernel_size=5, strides=2, padding="same"),
          layers.LeakyReLU(alpha=0.2),
          layers.Conv2DTranspose(256, kernel_size=5, strides=2, padding="same"),
          layers.LeakyReLU(alpha=0.2),
          layers.Conv2DTranspose(128, kernel_size=5, strides=2, padding="same"),
          layers.LeakyReLU(alpha=0.2),
          layers.Conv2DTranspose(3, kernel_size=5, strides=2, padding="same", activation="sigmoid"),
      ],
      name="generator",
  )

Create the GAN Network. This step is complicated, as I am using a custom training loop. This is a necessary step because we need to establish a shared loss function - this ensures that as the discriminator gets better at discriminating, the generator will concurrently get better at generating.

In [14]:
with strategy.scope():  
  class GAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
      super(GAN, self).__init__()
      self.discriminator = discriminator
      self.generator = generator
      self.latent_dim = latent_dim

    def compile(self, d_optimizer, g_optimizer, loss_fn):
      super(GAN, self).compile()
      self.d_optimizer = d_optimizer
      self.g_optimizer = g_optimizer
      self.loss_fn = loss_fn
      self.d_loss_metric = keras.metrics.Mean(name="d_loss")
      self.g_loss_metric = keras.metrics.Mean(name="g_loss")

    @property
    def metrics(self):
      return [self.d_loss_metric, self.g_loss_metric]

    def train_step(self, real_images):
      # Sample random points in the latent space
      batch_size = tf.shape(real_images)[0]
      random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

      # Decode them to fake images
      generated_images = self.generator(random_latent_vectors)

      # Combine them with real images
      combined_images = tf.concat([generated_images, real_images], axis=0)

      # Assemble labels discriminating real from fake images
      labels = tf.concat(
        [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
      )
      # Add random noise to the labels
      labels += 0.05 * tf.random.uniform(tf.shape(labels))

      # Train the discriminator
      with tf.GradientTape() as tape:
        predictions = self.discriminator(combined_images)
        d_loss = self.loss_fn(labels, predictions)
      grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
      self.d_optimizer.apply_gradients(
        zip(grads, self.discriminator.trainable_weights)
      )

      # Sample random points in the latent space
      random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

      # Assemble labels that say "all real images"
      misleading_labels = tf.zeros((batch_size, 1))

      # Train the generator
      with tf.GradientTape() as tape:
        predictions = self.discriminator(self.generator(random_latent_vectors))
        g_loss = self.loss_fn(misleading_labels, predictions)
      grads = tape.gradient(g_loss, self.generator.trainable_weights)
      self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))

      # Update metrics
      self.d_loss_metric.update_state(d_loss)
      self.g_loss_metric.update_state(g_loss)
      return {
        "d_loss": self.d_loss_metric.result(),
        "g_loss": self.g_loss_metric.result(),
      }

Create a monitor function that executes at the end of each epoch. This will save a checkpoint of the model (if the epoch is a multiple of 5) and will generate 10 images from the generator and save them. The seed used to generate these images is saved alongside model information when a checkpoint occurs.

In [15]:
with strategy.scope():  
  class GANMonitor(keras.callbacks.Callback):
      def __init__(self, num_img=3, latent_dim=128):
          self.num_img = num_img
          self.latent_dim = latent_dim

      def on_epoch_end(self, epoch, logs=None):
          if (epoch + 1) % 5 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix, options=local_device_option)
          generated_images = self.model.generator(random_latent_vectors_monitor)
          generated_images *= 255
          generated_images.numpy()
          for i in range(self.num_img):
              img = keras.preprocessing.image.array_to_img(generated_images[i])
              img.save("generated_img_%03d_%d.png" % (epoch + 0, i))

Finally, create the GAN model

In [16]:
with strategy.scope():
  gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)

# Training

Setup checkpoints that will save the model information and generator seed for later use, or in case training gets interrupted.

In [17]:
random_latent_vectors_monitor = tf.random.normal(shape=(10, latent_dim))
gen_var = tf.Variable(random_latent_vectors_monitor)

discriminator_optimizer = keras.optimizers.Adam(learning_rate=0.0001)
generator_optimizer = keras.optimizers.Adam(learning_rate=0.0001)

checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                 discriminator_optimizer=discriminator_optimizer,
                                 generator=generator,
                                 discriminator=discriminator,
                                 gen_var=gen_var,
                                 gan=gan)

local_device_option = tf.train.CheckpointOptions(experimental_io_device="/job:localhost")
ckpt_manager = tf.train.CheckpointManager(checkpoint, checkpoint_dir, max_to_keep=3)

Begin training

In [None]:
epochs = 100  # In practice, use ~100 epochs

with strategy.scope():
  random_latent_vectors_monitor = tf.constant(gen_var.numpy())

  if ckpt_manager.latest_checkpoint:
    checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir), options=local_device_option)
  gan.compile(
      d_optimizer=discriminator_optimizer,
      g_optimizer=generator_optimizer,
      loss_fn=keras.losses.BinaryCrossentropy(reduction=tf.keras.losses.Reduction.SUM),
  )

  gan.fit(
      dataset, epochs=epochs, callbacks=[GANMonitor(num_img=10, latent_dim=latent_dim)]
  )

Epoch 1/100
Instructions for updating:
use `experimental_local_results` instead.


Instructions for updating:
use `experimental_local_results` instead.


Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100

# Save to GIF

In [None]:
anim_file = 'dcgan0.gif'

with imageio.get_writer(anim_file, mode='I') as writer:
    filenames = glob.glob('generated_img*_0.png')
    filenames = sorted(filenames)
    for filename in filenames:
        image = imageio.imread(filename)
        image = cv2.resize(image, dsize=(512, 512), interpolation=cv2.INTER_CUBIC)
        writer.append_data(image)
    image = imageio.imread(filename)
    writer.append_data(image)