**Required imports**

In [None]:
import os
import matplotlib.pyplot as plt
import random
import itertools
import pandas as pd
import numpy as np

import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import tensorflow.keras.backend as K
from tensorflow.keras import (
    layers,
    models,
    callbacks,
    utils,
    metrics,
    losses,
    optimizers,
)

from google.colab import drive

Downloading CelebA dataset using kaggle API and unpacking it on local machine.

In [None]:
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
os.chdir('/content/drive/MyDrive/kaggle')
os.environ['KAGGLE_CONFIG_DIR'] = '/content/drive/MyDrive/kaggle'
!kaggle datasets download -d jessicali9530/celeba-dataset
os.chdir('/content')
!cp '/content/drive/MyDrive/kaggle/celeba-dataset.zip' .
!unzip -q 'celeba-dataset.zip'

celeba-dataset.zip: Skipping, found more recently modified local copy (use --force to force download)


In [None]:
IMG_SIZE = 64
CHANNELS = 3

BATCH_SIZE = 256
SEED = 42
MAX_EPOCHS = 50
PATIENCE = 5

LEARNING_RATE = 1e-4

Z_DIM = 50
BETA = 10000

In [None]:
MODEL_SAVE_PATH = '/content/drive/MyDrive/ML_DL/FaceGeneration/Models'
BEST_MODEL_SAVE_PATH = '/content/drive/MyDrive/ML_DL/FaceGeneration'
LOG_SAVE_PATH = '/content/drive/MyDrive/ML_DL/FaceGeneration'
RECONSTRUCTION_IMAGES_SAVE_PATH = '/content/drive/MyDrive/ML_DL/FaceGeneration/Reconstructions'
GENERATED_IMAGES_SAVE_PATH = '/content/drive/MyDrive/ML_DL/FaceGeneration/Generated'

random.seed(SEED)

# Data generators

In [None]:
partition_df = pd.read_csv('/content/list_eval_partition.csv')
partition_df['partition'] = partition_df['partition'].replace({0: 'train'})
partition_df['partition'] = partition_df['partition'].replace({1: 'validation'})
partition_df['partition'] = partition_df['partition'].replace({2: 'test'})

train_df = partition_df[partition_df['partition'] == 'train']
val_df = partition_df[partition_df['partition'] == 'validation']
test_df = partition_df[partition_df['partition'] == 'test']

print(f'Whole dataframe length: {len(partition_df)}')
print(f'Train dataframe length: {len(train_df)}')
print(f'Validation dataframe length: {len(val_df)}')
print(f'Test dataframe length: {len(test_df)}')

Whole dataframe length: 202599
Train dataframe length: 162770
Validation dataframe length: 19867
Test dataframe length: 19962


In [None]:
datagen = ImageDataGenerator(rescale=1./255.)

In [None]:
train_generator = datagen.flow_from_dataframe(
  dataframe=train_df,
  directory='/content/img_align_celeba/img_align_celeba',
  x_col='image_id',
  y_col=None,
  target_size=(IMG_SIZE, IMG_SIZE),
  batch_size=BATCH_SIZE,
  class_mode=None,
  shuffle=True,
  seed=SEED,
  interpolation="bilinear",
)

val_generator = datagen.flow_from_dataframe(
  dataframe=val_df,
  directory='/content/img_align_celeba/img_align_celeba',
  x_col='image_id',
  y_col=None,
  target_size=(IMG_SIZE, IMG_SIZE),
  batch_size=BATCH_SIZE,
  class_mode=None,
  shuffle=True,
  seed=SEED,
  interpolation="bilinear",
)

test_generator = datagen.flow_from_dataframe(
  dataframe=test_df,
  directory='/content/img_align_celeba/img_align_celeba',
  x_col='image_id',
  y_col=None,
  target_size=(IMG_SIZE, IMG_SIZE),
  batch_size=BATCH_SIZE,
  class_mode=None,
  shuffle=True,
  seed=SEED,
  interpolation="bilinear",
)

Found 162770 validated image filenames.
Found 19867 validated image filenames.
Found 19962 validated image filenames.


# Model architecture

In [None]:
class Sampling(layers.Layer):
  def call(self, inputs):
    z_mean, z_log_var = inputs
    batch = tf.shape(z_mean)[0]
    dim = tf.shape(z_mean)[1]
    epsilon = K.random_normal(shape=(batch, dim))

    return z_mean + tf.exp(0.5 * z_log_var) * epsilon

In [None]:
encoder_input = layers.Input(shape=(IMG_SIZE, IMG_SIZE, CHANNELS), name="encoder_input")

x = layers.Conv2D(32, kernel_size=3, strides=2, padding="same")(    encoder_input)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)

x = layers.Conv2D(64, kernel_size=3, strides=2, padding="same")(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)

x = layers.Conv2D(128, kernel_size=3, strides=2, padding="same")(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)

x = layers.Conv2D(256, kernel_size=3, strides=2, padding="same")(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)

shape_before_flattening = K.int_shape(x)[1:]

x = layers.Flatten()(x)
z_mean = layers.Dense(Z_DIM, name="z_mean")(x)
z_log_var = layers.Dense(Z_DIM, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])

encoder = models.Model(encoder_input, [z_mean, z_log_var, z], name="encoder")
encoder.summary()

Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 encoder_input (InputLayer)  [(None, 64, 64, 3)]          0         []                            
                                                                                                  
 conv2d (Conv2D)             (None, 32, 32, 32)           896       ['encoder_input[0][0]']       
                                                                                                  
 batch_normalization (Batch  (None, 32, 32, 32)           128       ['conv2d[0][0]']              
 Normalization)                                                                                   
                                                                                                  
 leaky_re_lu (LeakyReLU)     (None, 32, 32, 32)           0         ['batch_normalization[0]

In [None]:
decoder_input = layers.Input(shape=(Z_DIM,), name="decoder_input")

x = layers.Dense(np.prod(shape_before_flattening))(decoder_input)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)

x = layers.Reshape(shape_before_flattening)(x)

x = layers.Conv2DTranspose(256, kernel_size=3, strides=2, padding="same")(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)

x = layers.Conv2DTranspose(128, kernel_size=3, strides=2, padding="same")(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)

x = layers.Conv2DTranspose(64, kernel_size=3, strides=2, padding="same")(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)

x = layers.Conv2DTranspose(32, kernel_size=3, strides=2, padding="same")(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)

decoder_output = layers.Conv2DTranspose(3, kernel_size=3, strides=1, activation="sigmoid", padding="same")(x)

decoder = models.Model(decoder_input, decoder_output)
decoder.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 decoder_input (InputLayer)  [(None, 50)]              0         
                                                                 
 dense (Dense)               (None, 4096)              208896    
                                                                 
 batch_normalization_4 (Bat  (None, 4096)              16384     
 chNormalization)                                                
                                                                 
 leaky_re_lu_4 (LeakyReLU)   (None, 4096)              0         
                                                                 
 reshape (Reshape)           (None, 4, 4, 256)         0         
                                                                 
 conv2d_transpose (Conv2DTr  (None, 8, 8, 256)         590080    
 anspose)                                                    

In [None]:
class VAE(models.Model):
  def __init__(self, encoder, decoder, **kwargs):
    super(VAE, self).__init__(**kwargs)

    self.encoder = encoder
    self.decoder = decoder

    self.total_loss_tracker = metrics.Mean(name="total_loss")
    self.reconstruction_loss_tracker = metrics.Mean(name="reconstruction_loss")
    self.kl_loss_tracker = metrics.Mean(name="kl_loss")

  @property
  def metrics(self):
    return [
      self.total_loss_tracker,
      self.reconstruction_loss_tracker,
      self.kl_loss_tracker,
    ]

  def call(self, inputs):
    z_mean, z_log_var, z = encoder(inputs)
    reconstruction = decoder(z)
    return z_mean, z_log_var, reconstruction

  def train_step(self, data):
    with tf.GradientTape() as tape:
      z_mean, z_log_var, reconstruction = self(data)

      reconstruction_loss =  BETA * tf.reduce_mean(
        losses.binary_crossentropy(
          data, reconstruction, axis=(1, 2, 3)
        )
      )

      kl_loss = tf.reduce_mean(
        tf.reduce_sum(-0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)), axis=1)
        )

      total_loss = reconstruction_loss + kl_loss

      grads = tape.gradient(total_loss, self.trainable_weights)
      self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

      self.total_loss_tracker.update_state(total_loss)
      self.reconstruction_loss_tracker.update_state(reconstruction_loss)
      self.kl_loss_tracker.update_state(kl_loss)

    return {m.name: m.result() for m in self.metrics}

  def test_step(self, data):
    if isinstance(data, tuple):
      data = data[0]

    z_mean, z_log_var, reconstruction = self(data)

    reconstruction_loss =  BETA * tf.reduce_mean(
        losses.binary_crossentropy(
          data, reconstruction, axis=(1, 2, 3)
        )
      )

    kl_loss = tf.reduce_mean(
        tf.reduce_sum(-0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)), axis=1)
        )

    total_loss = reconstruction_loss + kl_loss

    return {
      "loss": total_loss,
      "reconstruction_loss": reconstruction_loss,
      "kl_loss": kl_loss,
    }

  def get_config(self):
    config = super().get_config().copy()
    config.update({
      'encoder': self.encoder.get_config(),
      'decoder': self.decoder.get_config(),
      'total_loss_tracker': self.total_loss_tracker.get_config(),
      'reconstruction_loss_tracker': self.reconstruction_loss_tracker.get_config(),
      'kl_loss_tracker': self.kl_loss_tracker.get_config(),
    })
    return config

  @classmethod
  def from_config(cls, config):
    encoder = models.Sequential.from_config(config.pop('encoder'))
    decoder = models.Sequential.from_config(config.pop('decoder'))

    return cls(encoder=encoder, decoder=decoder, **config)

In [None]:
vae = VAE(encoder, decoder)

# Model training

In [None]:
class ImageGeneratorCallback(callbacks.Callback):
  def __init__(self, vae, filepath):
    super().__init__()
    self.vae = vae
    self.filepath = filepath

    self.samples = np.random.normal(size=(25, Z_DIM))

  def on_epoch_end(self, epoch, logs=None):
    self.vae = self.model
    images = self.vae.decoder.predict(self.samples, verbose=0)
    filename = f"{self.filepath}/image_generated_epoch_{epoch}.png"
    self._plot_images(images, filename, epoch)

  def _plot_images(self, images, filename, epoch):
    fig, axs = plt.subplots(5, 5, figsize=(15, 15))
    plt.suptitle(f'Generated images - epoch {epoch}', fontsize=20, fontweight='bold')

    for i in range(5):
      for j in range(5):
        axs[i, j].imshow(images[i*5+j])
        axs[i, j].axis('off')

    plt.tight_layout()
    plt.subplots_adjust(top=0.9)
    plt.savefig(f"{self.filepath}/image_generated_epoch_{epoch}.png")
    plt.close(fig)

class ImageReconstructorCallback(callbacks.Callback):
  def __init__(self, vae, filepath):
    super().__init__()
    self.vae = vae
    self.filepath = filepath

    self.original_images = []

    while len(self.original_images) < 15:
      self.original_images.append(next(test_generator)[0])

    self.original_images = np.stack(self.original_images, axis=0)

  def on_epoch_end(self, epoch, logs=None):
    self.vae = self.model
    reconstructed_images = self.vae.predict(self.original_images, verbose=0)[2]
    filename = f"{self.filepath}/image_reconstructed_epoch_{epoch}.png"
    self._plot_images(reconstructed_images, filename, epoch)

  def _plot_images(self, reconstructed_images, filename, epoch):
    fig, axs = plt.subplots(5, 6, figsize=(15, 15))
    plt.suptitle(f'Reconstructed images - epoch {epoch}', fontsize=20, fontweight='bold')

    for i in range(5):
      for j in range(3):
        axs[i, j*2].imshow(self.original_images[i*3+j])
        axs[i, j*2].set_title('Original Image')
        axs[i, j*2].axis('off')

        axs[i, j*2 + 1].imshow(reconstructed_images[i*3+j])
        axs[i, j*2 + 1].set_title('Reconstructed Image')
        axs[i, j*2 + 1].axis('off')

    plt.tight_layout()
    plt.subplots_adjust(top=0.9)
    plt.savefig(f"{self.filepath}/image_reconstructed_epoch_{epoch}.png")
    plt.close(fig)

In [None]:
eary_stopping = callbacks.EarlyStopping(monitor='val_loss', patience=PATIENCE, restore_best_weights=True)
checkpoint = callbacks.ModelCheckpoint(f'{MODEL_SAVE_PATH}/VAE_' + '{epoch:03d}.tf', verbose=1, monitor='val_loss',save_best_only=False, mode='auto', save_format='tf')
best_model_save = callbacks.ModelCheckpoint(f'{BEST_MODEL_SAVE_PATH}/best_model.tf', save_best_only=True, monitor='val_loss', mode='min', save_format='tf')
csv_logger = callbacks.CSVLogger(f'{LOG_SAVE_PATH}/training_log.csv', append=True, separator=';')

image_generator = ImageGeneratorCallback(vae, GENERATED_IMAGES_SAVE_PATH)
image_reconstructor = ImageReconstructorCallback(vae, RECONSTRUCTION_IMAGES_SAVE_PATH)

callbacks_list = [checkpoint,
                  best_model_save,
                  eary_stopping,
                  csv_logger,
                  image_generator,
                  image_reconstructor
                 ]

optimizer = optimizers.Adam(learning_rate=LEARNING_RATE)
vae.compile(optimizer=optimizer)

In [None]:
vae.fit(
  train_generator,
  validation_data=val_generator,
  epochs=MAX_EPOCHS,
  shuffle=True,
  callbacks=callbacks_list
  )

Epoch 1/50
Epoch 1: saving model to /content/drive/MyDrive/ML_DL/FaceGeneration/Models/VAE_001.tf
Epoch 2/50
Epoch 2: saving model to /content/drive/MyDrive/ML_DL/FaceGeneration/Models/VAE_002.tf
Epoch 3/50
Epoch 3: saving model to /content/drive/MyDrive/ML_DL/FaceGeneration/Models/VAE_003.tf
Epoch 4/50
Epoch 4: saving model to /content/drive/MyDrive/ML_DL/FaceGeneration/Models/VAE_004.tf
Epoch 5/50
Epoch 5: saving model to /content/drive/MyDrive/ML_DL/FaceGeneration/Models/VAE_005.tf
Epoch 6/50
Epoch 6: saving model to /content/drive/MyDrive/ML_DL/FaceGeneration/Models/VAE_006.tf
Epoch 7/50
Epoch 7: saving model to /content/drive/MyDrive/ML_DL/FaceGeneration/Models/VAE_007.tf
Epoch 8/50
Epoch 8: saving model to /content/drive/MyDrive/ML_DL/FaceGeneration/Models/VAE_008.tf
Epoch 9/50
Epoch 9: saving model to /content/drive/MyDrive/ML_DL/FaceGeneration/Models/VAE_009.tf
Epoch 10/50
Epoch 10: saving model to /content/drive/MyDrive/ML_DL/FaceGeneration/Models/VAE_010.tf
Epoch 11/50
Epoch 