The setup:

tensorflow is also imported to be able to utilize its dataset processing functionalities


In [0]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

It is also necessary to create a folder to store the results and another one to store the training process

In [0]:
!mkdir generated_training_images
!mkdir generated_final_images

Dataset preparation:

There is not a lot of things to do with the dataset, just load it from the library and put together both the training and testing sets since we just want as much data as posible to train the generator

In [0]:
batch_size = 64
# The idea is just to get lots of data to train
# the generator, so labels are not needed
(x_train, _), (x_test, _) = keras.datasets.fashion_mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
# Data is in ints from 0 to 255
all_digits = all_digits.astype("float32") / 255
# -1 makes it copy the value from the source array
# the last 1 is the number of channels
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
# batch divides in batches
# prefetch makes later 32 elements be prepared while processing the current one,
# improving latency and throughput but using extra memory
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size).prefetch(32)


This is the discriminator model, the one in charge of deciding wether the data it is presented to it is coming from the generator or from the original dataset


In [0]:
discriminator = keras.Sequential(
    [
     keras.Input(shape=(28, 28, 1)),
     # 64 filters on 3x3 kernels, same convolution and 2 pad to go
     # to 14 x 14
     layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
     # DCGANS first layer of discriminator is not batch normalized
     # so that it can learn the statistic qualities of the data
     # leaky relu better for training, alpha is the negative slope coeff
     layers.LeakyReLU(alpha=0.2),
     # Normally depth ramps up
     layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
     layers.BatchNormalization(),
     layers.LeakyReLU(alpha=0.2),
     # Gets the max on the whole 7x7 tensor in each of the 64 channels
     # returns size of samples, channels
     # DCGAN paper says having this over fully connected layer improves 
     # model stability, but hurts convergence speed
     # As this is a small model, we have no problem with speed
     # Middle ground is flatten and then feed into sigmoid
     # Dense default activation is linear
     layers.GlobalMaxPooling2D(),
     layers.Dense(1),
    ],
    name="discriminator",
)
discriminator.summary()

Model: "discriminator"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_5 (Conv2D)            (None, 14, 14, 64)        640       
_________________________________________________________________
leaky_re_lu_6 (LeakyReLU)    (None, 14, 14, 64)        0         
_________________________________________________________________
conv2d_6 (Conv2D)            (None, 7, 7, 128)         73856     
_________________________________________________________________
batch_normalization_4 (Batch (None, 7, 7, 128)         512       
_________________________________________________________________
leaky_re_lu_7 (LeakyReLU)    (None, 7, 7, 128)         0         
_________________________________________________________________
global_max_pooling2d_2 (Glob (None, 128)               0         
_________________________________________________________________
dense_3 (Dense)              (None, 1)               

This is the generator model, the one in charge of generating images that look like those in the dataset


In [0]:
# The dimension of the latent space, like the feature space of convnets
latent_dim = 128

generator = keras.Sequential(
    [
     # From latent space
     keras.Input(shape=(latent_dim,)),
     # No dense in DCGAN paper
     layers.Dense(7 * 7 * 128),
     layers.Reshape((7, 7, 128)),
     # Via a transposed convolution, to image
     layers.Conv2DTranspose(128, (4, 4), strides=(2,2), padding="same"),
     layers.BatchNormalization(),
     layers.LeakyReLU(alpha=0.2),
     layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
     layers.BatchNormalization(),
     layers.LeakyReLU(alpha=0.2),
     # No batchnorm here
     # DCGAN paper said tanh, initially sigmoid
     layers.Conv2D(1, (7, 7), padding="same", activation="tanh"),
    ],
    name="generator",
)

generator.summary()

Model: "generator"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_2 (Dense)              (None, 6272)              809088    
_________________________________________________________________
reshape (Reshape)            (None, 7, 7, 128)         0         
_________________________________________________________________
conv2d_transpose (Conv2DTran (None, 14, 14, 128)       262272    
_________________________________________________________________
batch_normalization_2 (Batch (None, 14, 14, 128)       512       
_________________________________________________________________
leaky_re_lu_4 (LeakyReLU)    (None, 14, 14, 128)       0         
_________________________________________________________________
conv2d_transpose_1 (Conv2DTr (None, 28, 28, 128)       262272    
_________________________________________________________________
batch_normalization_3 (Batch (None, 28, 28, 128)       51

This class overrides the methods compile and train_step from the keras model class.

compile is overridden to get the optimizers and loss from the user.

train_step is overriden to use the GAN training algorithm with k (discriminator training steps per generator training step) = 1.


In [0]:
class GAN(keras.Model):
  def __init__(self, discriminator, generator, latent_dim):
    # For python inheritance
    super(GAN, self).__init__()
    self.discriminator = discriminator
    self.generator = generator
    # Latent space inheritance
    self.latent_dim = latent_dim

  # Override this to use own signature for receiving two optimizers
  def compile(self, d_optimizer, g_optimizer, loss_fn):
    super(GAN, self).compile()
    # Compiles the model and then just adds the optimizers and loss
    self.d_optimizer = d_optimizer
    self.g_optimizer = g_optimizer
    self.loss_fn = loss_fn

  # For custom learning algorithm, train_step(self, data)
  def train_step(self, real_images):
    # ADD BATCH NORM??? CHECK SALIMANS
    # CHECK REGULARIZATION

    # If calling fit(x, y...), data will be tuple(x, y)
    # and we don't want labels
    # When calling from dataset it will be whatever dataset yields
    # per batch
    if isinstance(real_images, tuple):
      # Just want the x from (x, y)
      real_images = real_images[0]
    batch_size = tf.shape(real_images)[0]
    # Better to sample from gaussian than uniform
    random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
    generated_images = self.generator(random_latent_vectors)
    # Make a single dataset with both fake and real images
    combined_images = tf.concat([generated_images, real_images], axis=0)
    # Make the labels part by concatting horizontally (ax0) batch_size 1s and 0s
    labels = tf.concat(
        [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
    )
    # Add random noise to labels
    # Salimans 2016 says that it should be one sided and replacing
    # 1 or 0 with a or b
    # tf.random.uniform [0, 1)
    labels += 0.05 * tf.random.uniform(tf.shape(labels))

    # Train the discriminator

    # This records the operations to automatically differentiate the 
    # variables inside the context
    with tf.GradientTape() as tape:
      predictions = self.discriminator(combined_images)
      d_loss = self.loss_fn(labels, predictions)
    # Derivative of loss with respect to the weights 
    # CHECK THIS
    gradients = tape.gradient(d_loss, self.discriminator.trainable_weights)
    # zip gives you an iterator of tuples made out of the passed iterators
    self.d_optimizer.apply_gradients(
        zip(gradients, self.discriminator.trainable_weights)
    )

    # Create new random vectors
    random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
    made_real_labels = tf.zeros((batch_size, 1))

    # Train the generator
    with tf.GradientTape() as tape:
      predictions = self.discriminator(self.generator(random_latent_vectors))
      # This tells how many real images
      g_loss = self.loss_fn(made_real_labels, predictions)
    gradients = tape.gradient(g_loss, self.generator.trainable_weights)
    self.g_optimizer.apply_gradients(zip(gradients, self.generator.trainable_weights))
    return {"d_loss": d_loss, "g_loss": g_loss}

**For this to work there has to be a folder named generated_training_images**

This is just a keras callback that runs the function on_epoch_end whenever the model finishes an epoch. In this case this is used to save three images to see the model progress during training.

In [0]:
class GANMonitor(keras.callbacks.Callback):
  def __init__(self, num_img=3, latent_dim=128):
    self.num_img = num_img
    self.latent_dim = latent_dim

  def on_epoch_end(self, epoch, logs=None):
    random_latent_vectors = tf.random.normal(shape=(self.num_img, self.latent_dim))
    generated_images = self.model.generator(random_latent_vectors)
    generated_images *= 255
    generated_images.numpy()
    for i in range(self.num_img):
      img = keras.preprocessing.image.array_to_img(generated_images[i])
      img.save("./generated_training_images/generated_img_{i}_{epoch}.png".format(i=i, epoch=epoch))


This is the model instantiation, compilation and training.
30 epochs produces fine results, altough 100 were used for generating the images of the report.

In [0]:
epochs = 30

gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
    d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    # The vector of predictions will be binary
    loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)
gan.fit(
    dataset, epochs=epochs, callbacks=[GANMonitor(num_img=3, latent_dim=latent_dim)]
)

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
 199/1094 [====>.........................] - ETA: 15s - d_loss: 0.1754 - g_loss: 2.7475

KeyboardInterrupt: ignored

You might want to save the model results or configuration in drive.

In [0]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


**For this to work there has to be a folder named generated_final_images**

This is for saving the model and generating result images

In [0]:
generator.save('gen2')
generator2 = keras.models.load_model('gen2')
num_images = 10
random_latent_vectors = tf.random.normal(shape=(num_images, 128))
generated_images = generator2(random_latent_vectors)
for i in range(0, num_images):
  img = keras.preprocessing.image.array_to_img(generated_images[i])
  img.save("./generated_final_images/generated_img_{i}.png".format(i=i))

Instructions for updating:
If using Keras pass *_constraint arguments to layers.
INFO:tensorflow:Assets written to: gen2/assets
