In [None]:
import tensorflow as tf
from tensorflow import keras
import tensorflow_datasets as tfds

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from IPython import display
import os
import shutil
from shutil import copyfile
import random
import zipfile

# Load the dataset

In [None]:
# First try: load the dataset from TensorFlow dataset => failed
(ds_train_raw, ds_test_raw), ds_info = tfds.load(
    'celeb_a',
    split=['train', 'test'],
    shuffle_files=True,
    with_info=True
)

[1mDownloading and preparing dataset celeb_a/2.0.1 (download: 1.38 GiB, generated: 1.62 GiB, total: 3.00 GiB) to /root/tensorflow_datasets/celeb_a/2.0.1...[0m


HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Dl Completed...', max=1.0, style=Progre…

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Dl Size...', max=1.0, style=ProgressSty…





NonMatchingChecksumError: ignored

Second try:

Load the data from GoogleDrive => succeed

Problem: doesn't have attributes, cannot load as tensorflow image data

In [None]:
# Connect colab with GoogleDrive
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
local_zip = '/content/gdrive/MyDrive/HSLU/DLV/img_align_celeba.zip'

zip_ref = zipfile.ZipFile(local_zip, 'r')

zip_ref.extractall('/tmp/celeba')
zip_ref.close()

In [None]:
# Load data from zip file saved in the GoogleDrive (takes less than 2mins)
# not using it anymore
# !unzip gdrive/My\ Drive/HSLU/DLV/img_align_celeba

## Split the data into train and test datasets

In [None]:
# check that images are loaded
print(len(os.listdir('/tmp/celeba/img_align_celeba/')))

202599


In [None]:
# create file folders to store the data
try:
  os.mkdir('/tmp/celeba/image')
  os.mkdir('/tmp/celeba/image/train')
  os.mkdir('/tmp/celeba/image/test')

except OSError:
  pass

In [None]:
# check if folders are correctly created
print(len(os.listdir('/tmp/celeba/image/train')))

0


In [None]:
# split the dataset into train and test sets with 80/20 ratio
def split_data(SOURCE, TRAIN, TEST, SPLIT_SIZE):
  os.chdir(SOURCE)
  image_list = os.listdir(SOURCE)
  random.sample(image_list, len(image_list))
  train_size = len(image_list)*SPLIT_SIZE

  for i in range(len(image_list)):
    if os.path.getsize(image_list[i]) != 0:
      if i < train_size:
        shutil.copy(image_list[i],TRAIN)
      if i >= train_size:
        shutil.copy(image_list[i], TEST)
    else:
      print(image[i] + ' is zero length, so ignoring')

IMAGE_SOURCE_DIR = '/tmp/celeba/img_align_celeba'
TRAIN_DIR = '/tmp/celeba/image/train'
TEST_DIR = '/tmp/celeba/image/test'
split_size = 0.8

split_data(IMAGE_SOURCE_DIR, TRAIN_DIR, TEST_DIR, split_size)

In [None]:
# check the result
print(len(os.listdir('/tmp/celeba/image/train/')))
print(len(os.listdir('/tmp/celeba/image/test/')))

162080
40519


# Prepare the dataset

In [None]:
# Define global constants to be used in this notebook
batch_size=128
img_height=178
img_width=218
latent_dim=2

In [None]:
# the code here is wrong, it loads the data with two lables: 'train' and 'test'
# question: how to load the data correctly?
# note: also need to normalize the data by /255.
# question: need to reshape the images or not?
test_ds = tf.keras.preprocessing.image_dataset_from_directory(
    '/tmp/celeba/image/',
    image_size=(img_height, img_width),
    batch_size=batch_size)

Found 202599 files belonging to 2 classes.


# Build VAE model

## Sampling class

In [None]:
class Sampling(tf.keras.layers.Layer):
  def call(self, inputs):
    """Generates a random sample and combines with the encoder output
    
    Args:
      inputs -- output tensor from the encoder

    Returns:
      `inputs` tensors combined with a random sample
    """

    # unpack the output of the encoder
    mu, sigma = inputs

    # get the size and dimensions of the batch
    batch = tf.shape(mu)[0]
    dim = tf.shape(mu)[1]

    # generate a random tensor
    epsilon = tf.keras.backend.random_normal(shape=(batch, dim))

    # combine the inputs and noise
    return mu + tf.exp(0.5 * sigma) * epsilon

## Encoder

In [None]:
def encoder_layers(inputs, latent_dim):
  """Defines the encoder's layers.
  Args:
    inputs -- batch from the dataset
    latent_dim -- dimensionality of the latent space

  Returns:
    mu -- learned mean
    sigma -- learned standard deviation
    batch_2.shape -- shape of the features before flattening
  """

  # add the Conv2D layers followed by BatchNormalization
  x = tf.keras.layers.Conv2D(filters=32, kernel_size=3, strides=2, padding="same", activation='relu', name="encode_conv1")(inputs)
  x = tf.keras.layers.BatchNormalization()(x)
  x = tf.keras.layers.Conv2D(filters=64, kernel_size=3, strides=2, padding='same', activation='relu', name="encode_conv2")(x)

  # assign to a different variable so you can extract the shape later
  batch_2 = tf.keras.layers.BatchNormalization()(x)

  # flatten the features and feed into the Dense network
  x = tf.keras.layers.Flatten(name="encode_flatten")(batch_2)

  # arbitrarily used 20 units here but can change and see different results
  x = tf.keras.layers.Dense(20, activation='relu', name="encode_dense")(x)
  x = tf.keras.layers.BatchNormalization()(x)

  # add output Dense networks for mu and sigma, units equal to the declared latent_dim.
  mu = tf.keras.layers.Dense(latent_dim, name='latent_mu')(x)
  sigma = tf.keras.layers.Dense(latent_dim, name ='latent_sigma')(x)

  return mu, sigma, batch_2.shape

In [None]:
# define the encoder model that includes the Sampling layer
def encoder_model(latent_dim, input_shape):
  """Defines the encoder model with the Sampling layer
  Args:
    latent_dim -- dimensionality of the latent space
    input_shape -- shape of the dataset batch

  Returns:
    model -- the encoder model
    conv_shape -- shape of the features before flattening
  """

  # declare the inputs tensor with the given shape
  inputs = tf.keras.layers.Input(shape=input_shape)

  # get the output of the encoder_layers() function
  mu, sigma, conv_shape = encoder_layers(inputs, latent_dim=latent_dim)

  # feed mu and sigma to the Sampling layer
  z = Sampling()((mu, sigma))

  # build the whole encoder model
  model = tf.keras.Model(inputs, outputs=[mu, sigma, z])

  return model, conv_shape

## Decoder

In [None]:
# Decoder expands the latent representations back to the original image dimensions
def decoder_layers(inputs, conv_shape):
  """Defines the decoder layers.
  Args:
    inputs -- output of the encoder 
    conv_shape -- shape of the features before flattening

  Returns:
    tensor containing the decoded output
  """

  # feed to a Dense network with units computed from the conv_shape dimensions
  units = conv_shape[1] * conv_shape[2] * conv_shape[3]
  x = tf.keras.layers.Dense(units, activation = 'relu', name="decode_dense1")(inputs)
  x = tf.keras.layers.BatchNormalization()(x)
  
  # reshape output using the conv_shape dimensions
  x = tf.keras.layers.Reshape((conv_shape[1], conv_shape[2], conv_shape[3]), name="decode_reshape")(x)

  # upsample the features back to the original dimensions
  x = tf.keras.layers.Conv2DTranspose(filters=64, kernel_size=3, strides=2, padding='same', activation='relu', name="decode_conv2d_2")(x)
  x = tf.keras.layers.BatchNormalization()(x)
  x = tf.keras.layers.Conv2DTranspose(filters=32, kernel_size=3, strides=2, padding='same', activation='relu', name="decode_conv2d_3")(x)
  x = tf.keras.layers.BatchNormalization()(x)
  x = tf.keras.layers.Conv2DTranspose(filters=1, kernel_size=3, strides=1, padding='same', activation='sigmoid', name="decode_final")(x)
  
  return x

In [None]:
# define the Decoder model
def decoder_model(latent_dim, conv_shape):
  """Defines the decoder model.
  Args:
    latent_dim -- dimensionality of the latent space
    conv_shape -- shape of the features before flattening

  Returns:
    model -- the decoder model
  """

  # set the inputs to the shape of the latent space
  inputs = tf.keras.layers.Input(shape=(latent_dim,))

  # get the output of the decoder layers
  outputs = decoder_layers(inputs, conv_shape)

  # declare the inputs and outputs of the model
  model = tf.keras.Model(inputs, outputs)
  
  return model

## Kullback-Leibler Divergence for loss function

In [None]:
# Kullback_leibler Divergence is added to the reconstruction loss,
# to consider the random normal distribution introduced in the latent layer
def kl_reconstruction_loss(inputs, outputs, mu, sigma):
  """ Computes the Kullback-Leibler Divergence (KLD)
  Args:
    inputs -- batch from the dataset
    outputs -- output of the Sampling layer
    mu -- mean
    sigma -- standard deviation

  Returns:
    KLD loss
  """
  kl_loss = 1 + sigma - tf.square(mu) - tf.math.exp(sigma)
  kl_loss = tf.reduce_mean(kl_loss) * -0.5

  return kl_loss

## VAE Model

In [None]:
def vae_model(encoder, decoder, input_shape):
  """Defines the VAE model
  Args:
    encoder -- the encoder model
    decoder -- the decoder model
    input_shape -- shape of the dataset batch

  Returns:
    the complete VAE model
  """

  # set the inputs
  inputs = tf.keras.layers.Input(shape=input_shape)

  # get mu, sigma, and z from the encoder output
  mu, sigma, z = encoder(inputs)
  
  # get reconstructed output from the decoder
  reconstructed = decoder(z)

  # define the inputs and outputs of the VAE
  model = tf.keras.Model(inputs=inputs, outputs=reconstructed)

  # add the KL loss
  loss = kl_reconstruction_loss(inputs, z, mu, sigma)
  model.add_loss(loss)

  return model

In [None]:
# add a function to setup and get the different models
def get_models(input_shape, latent_dim):
  """Returns the encoder, decoder, and vae models"""
  encoder, conv_shape = encoder_model(latent_dim=latent_dim, input_shape=input_shape)
  decoder = decoder_model(latent_dim=latent_dim, conv_shape=conv_shape)
  vae = vae_model(encoder, decoder, input_shape=input_shape)
  return encoder, decoder, vae

In [None]:
# Get the encoder, decoder and 'master' model (called vae)
encoder, decoder, vae = get_models(input_shape=(178,218,3,), latent_dim=latent_dim)

# Train the model

In [None]:
# define loss function, optimizer, and metrics
optimizer = tf.keras.optimizers.Adam()
loss_metric = tf.keras.metrics.Mean()
bce_loss = tf.keras.losses.BinaryCrossentropy()

In [None]:
# add a function to show the progress of the image generation
def generate_and_save_images(model, epoch, step, test_input):
  """Helper function to plot our 16 images

  Args:

  model -- the decoder model
  epoch -- current epoch number during training
  step -- current step number during training
  test_input -- random tensor with shape (16, latent_dim)
  """

  # generate images from the test input
  predictions = model.predict(test_input)

  # plot the results
  fig = plt.figure(figsize=(4,4))

  for i in range(predictions.shape[0]):
      plt.subplot(4, 4, i+1)
      plt.imshow(predictions[i, :, :, 0])
      plt.axis('off')

  # tight_layout minimizes the overlap between 2 sub-plots
  fig.suptitle("epoch: {}, step: {}".format(epoch, step))
  plt.savefig('image_at_epoch_{:04d}_step{:04d}.png'.format(epoch, step))
  plt.show()

Below is the training loop.

In [None]:

# generate random vector as test input to the decoder
random_vector_for_generation = tf.random.normal(shape=[16, latent_dim])

# number of epochs
epochs = 50

# initialize the helper function to display outputs from an untrained model
generate_and_save_images(decoder, 0, 0, random_vector_for_generation)

for epoch in range(epochs):
  print('Start of epoch %d' % (epoch,))

  # iterate over the batches of the dataset.
  for step, x_batch_train in enumerate(train_dataset):
    with tf.GradientTape() as tape:

      # feed a batch to the VAE model
      reconstructed = vae(x_batch_train)

      # compute reconstruction loss
      # 38804=218*178
      flattened_inputs = tf.reshape(x_batch_train, shape=[-1])
      flattened_outputs = tf.reshape(reconstructed, shape=[-1])
      loss = bce_loss(flattened_inputs, flattened_outputs) * 38804 
      
      # add KLD regularization loss
      loss += sum(vae.losses)  

    # get the gradients and update the weights
    grads = tape.gradient(loss, vae.trainable_weights)
    optimizer.apply_gradients(zip(grads, vae.trainable_weights))

    # compute the loss metric
    loss_metric(loss)

    # display outputs every 100 steps
    if step % 2 == 0:
      display.clear_output(wait=False)    
      generate_and_save_images(decoder, epoch, step, random_vector_for_generation)
      print('Epoch: %s step: %s mean loss = %s' % (epoch, step, loss_metric.result().numpy()))