In [1]:
import numpy as np
import matplotlib.pyplot as plt

import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras import (
    layers,
    models,
    datasets,
    callbacks,
    losses,
    optimizers,
    metrics,
    utils
)

from scipy.stats import norm
import pandas as pd

from vae_utils import get_vector_from_label, add_vector_to_images, morph_faces

In [9]:
def preprocess_vae_faces(
    images     , 
    scale      = 255.0
):
    r"""
    """
    #-------------------------
    images = tf.cast(images, "float32") / scale
    return images

In [3]:
class Sampling(layers.Layer):
    r"""
    New sampling layer which allows sampling from distribution 
      defined by z_mean and z_log_var
    Create new layers in Keras by subclassing the abstract Layer class and defining
      the call method, which describes how a tensor is transformed by the layer.
    Reparametrization Trick:
        Rather than sample directly from a normal distribution with parameters z_mean and
        z_log_var, sample epsilon from a standard normal and then manually adjust the sample
        to have correct mean and variance.

        This is useful because it ensures gradients can backpropagate freely through the layer.
        By keeping all of the randomness of the layer contained within the variable epsilon, 
        the partial derivative of the layer output wrt its input can be shown to be deterministic.
    """
    def call(
        self, 
        inputs
    ):
        r"""
        Inputs should be z_mean, z_log_var
        """
        z_mean, z_log_var = inputs
        #-----
        batch = tf.shape(z_mean)[0]
        dim   = tf.shape(z_mean)[1]
        #-----
        epsilon = K.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5*z_log_var)*epsilon

In [4]:
class VAE(models.Model):
    r"""
    Variational Autoencoder class.
    Inherits from abstract Keras Model class
    """
    def __init__(
        self            , 
        encoder         , 
        decoder         , 
        recon_loss_beta = 500, 
        **kwargs
    ):
        super(VAE, self).__init__(**kwargs)
        self.encoder                     = encoder
        self.decoder                     = decoder
        self.recon_loss_beta             = recon_loss_beta
        self.total_loss_tracker          = metrics.Mean(name='total_loss')
        self.reconstruction_loss_tracker = metrics.Mean(name='reconstruction_loss')
        self.kl_loss_tracker             = metrics.Mean(name='kl_loss')

    @property
    def metrics(self):
        return [
            self.total_loss_tracker, 
            self.reconstruction_loss_tracker, 
            self.kl_loss_tracker
        ]

    def call(
        self, 
        inputs
    ):
        r"""
        Describes what we would like returned when we call VAE on a particular image
        """
        z_mean, z_log_var, z = encoder(inputs)
        reconstruction       = decoder(z)
        return z_mean, z_log_var, reconstruction

    def train_step(
        self, 
        data
    ):
        r"""
        Describes one training step of the VAE, including the calculation of the loss function

        TensorFlow's Gradient Tape is a mechanism that allows the computation of gradients of
          operations executed during a forward pass of the model.
        To use it, wrap the code that performs the operations you want to differentiate in a 
          tf.GradientTape() context.
        Once the operations are recorded, the gradient of the loss function can be calculated
          with respect to some variables by calling tape.gradient().

        tf.reduce_mean: Computes the mean of elements across dimensions of a tensor.
        """
        with tf.GradientTape() as tape:
            z_mean, z_log_var, reconstruction = self(data)
            #-----
            reconstruction_loss = tf.reduce_mean(
                self.recon_loss_beta * losses.binary_crossentropy(
                    y_true = data, 
                    y_pred = reconstruction, 
                    axis   = (1,2,3)
                )
            )
            #-----
            kl_loss = tf.reduce_mean(
                tf.reduce_sum(
                    -0.5*(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)), 
                    axis = 1, 
                )
            )
            #-----
            total_loss = reconstruction_loss + kl_loss
        #-------------------------
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        #-------------------------
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        #-------------------------
        return {m.name: m.result() for m in self.metrics}

In [5]:
IMAGE_SIZE = 32
CHANNELS = 3
BATCH_SIZE = 128
NUM_FEATURES = 128
Z_DIM = 200
LEARNING_RATE = 0.0005
EPOCHS = 10
BETA = 2000
LOAD_MODEL = False

In [6]:
# Load the data
train_data = utils.image_dataset_from_directory(
    directory = r'C:\Users\buxto\Documents\LocalData\celeba-dataset\img_align_celeba\img_align_celeba',
    labels        = None,
    color_mode    = "rgb",
    image_size    = (IMAGE_SIZE, IMAGE_SIZE),
    batch_size    = BATCH_SIZE,
    shuffle       = True,
    seed          = 42,
    interpolation = "bilinear",
)

Found 202599 files.


In [10]:
train = train_data.map(lambda x: preprocess_vae_faces(x))

In [11]:
# Encoder
encoder_input = layers.Input(
    shape=(IMAGE_SIZE, IMAGE_SIZE, CHANNELS), name="encoder_input"
)
x = layers.Conv2D(NUM_FEATURES, kernel_size=3, strides=2, padding="same")(
    encoder_input
)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Conv2D(NUM_FEATURES, kernel_size=3, strides=2, padding="same")(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Conv2D(NUM_FEATURES, kernel_size=3, strides=2, padding="same")(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Conv2D(NUM_FEATURES, kernel_size=3, strides=2, padding="same")(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
shape_before_flattening = K.int_shape(x)[1:]  # the decoder will need this!

x = layers.Flatten()(x)
z_mean = layers.Dense(Z_DIM, name="z_mean")(x)
z_log_var = layers.Dense(Z_DIM, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])

encoder = models.Model(encoder_input, [z_mean, z_log_var, z], name="encoder")
encoder.summary()




In [12]:
# Decoder
decoder_input = layers.Input(shape=(Z_DIM,), name="decoder_input")
x = layers.Dense(np.prod(shape_before_flattening))(decoder_input)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Reshape(shape_before_flattening)(x)
x = layers.Conv2DTranspose(
    NUM_FEATURES, kernel_size=3, strides=2, padding="same"
)(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Conv2DTranspose(
    NUM_FEATURES, kernel_size=3, strides=2, padding="same"
)(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Conv2DTranspose(
    NUM_FEATURES, kernel_size=3, strides=2, padding="same"
)(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Conv2DTranspose(
    NUM_FEATURES, kernel_size=3, strides=2, padding="same"
)(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
decoder_output = layers.Conv2DTranspose(
    CHANNELS, kernel_size=3, strides=1, activation="sigmoid", padding="same"
)(x)
decoder = models.Model(decoder_input, decoder_output)
decoder.summary()

In [13]:
# Create a variational autoencoder
vae = VAE(encoder, decoder)

In [14]:
# Train the VAE
# Compile the variational autoencoder
optimizer = optimizers.Adam(learning_rate=LEARNING_RATE)
vae.compile(optimizer=optimizer)