# Variational Autoencoder: _Functional API_ with MNIST & TensorFlow 2

In [1]:
import tensorflow as tf
import tensorflow
from tensorflow import keras
from tensorflow.keras.datasets import mnist
import matplotlib.pyplot as plt
import seaborn as sns
from tensorflow.keras.layers import Conv2D, MaxPooling2D, ReLU, LeakyReLU, Dense, Flatten, Reshape, Input, InputLayer, Activation
from tensorflow.keras.layers import BatchNormalization, Dropout, Conv2DTranspose, Reshape, Lambda, Activation
from tensorflow.keras import models, layers, datasets
from tensorflow.keras import backend as K
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.initializers import RandomNormal
import numpy as np

from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.utils import plot_model


In [2]:
print(f"TF version: {tf.__version__}")

TF version: 2.6.0


In [3]:
# Check GPU availibility-
gpu_devices = tf.config.list_physical_devices('GPU')
print(f"GPU: {gpu_devices}")

GPU: []


In [4]:
if gpu_devices:
    print(f"GPU: {gpu_devices}")
    details = tf.config.experimental.get_device_details(gpu_devices[0])
    print(f"GPU details: {details.get('device_name', 'Unknown GPU')}")
else:
    print("No GPU found")

No GPU found


#### Data preprocessing and cleaning:

In [5]:
# input image dimensions
img_rows, img_cols = 28, 28

In [6]:
# Load MNIST dataset-
(X_train, y_train), (X_test, y_test) = mnist.load_data()

In [7]:
if tf.keras.backend.image_data_format() == 'channels_first':
    X_train = X_train.reshape(X_train.shape[0], 1, img_rows, img_cols)
    X_test = X_test.reshape(X_test.shape[0], 1, img_rows, img_cols)
    input_shape = (1, img_rows, img_cols)
else:
    X_train = X_train.reshape(X_train.shape[0], img_rows, img_cols, 1)
    X_test = X_test.reshape(X_test.shape[0], img_rows, img_cols, 1)
    input_shape = (img_rows, img_cols, 1)

In [8]:
print(f"\ninput_shape to be used: {input_shape}")


input_shape to be used: (28, 28, 1)


In [9]:
# Specify hyper-parameters-
batch_size = 64
num_classes = 10
num_epochs = 200

In [10]:
# Convert datasets to floating point types-
X_train = X_train.astype('float32')
X_test = X_test.astype('float32')

By default the image data consists of integers between 0 and 255 for each pixel channel. Neural networks work best when each input is inside the range –1 to 1, so we need to divide by 255.

In [11]:
# Normalize the training and testing datasets-
X_train /= 255.0
X_test /= 255.0

In [12]:
# convert class vectors/target to binary class matrices or one-hot encoded values-
y_train = tf.keras.utils.to_categorical(y_train, num_classes)
y_test = tf.keras.utils.to_categorical(y_test, num_classes)

In [13]:
print("\nDimensions of training and testing sets are:")
print(f"X_train.shape: {X_train.shape}, y_train.shape: {y_train.shape}")
print(f"X_test.shape: {X_test.shape}, y_test.shape: {y_test.shape}")


Dimensions of training and testing sets are:
X_train.shape: (60000, 28, 28, 1), y_train.shape: (60000, 10)
X_test.shape: (10000, 28, 28, 1), y_test.shape: (10000, 10)


### Define Variational Autoencoder using _Functional API_ & _Convolutional_ layers

In [14]:
# Specify latent space dimensions-
latent_space_dim = 3

In [48]:
class VAE():
    def __init__(self, latent_dim):
        self.latent_dim = latent_dim
        
        # Define encoder-
        encoder_input = Input(shape = (28, 28, 1))

        x = Conv2D(
            filters = 32, kernel_size = 3,
            strides = 2, padding = 'same')(encoder_input)
        x = LeakyReLU()(x)

        x = Conv2D(
            filters = 64, kernel_size = 3,
            strides = 2, padding = 'same')(x)
        x = LeakyReLU()(x)

        x = Conv2D(
            filters = 64, kernel_size = 3,
            strides = 1, padding = 'same')(x)
        x = LeakyReLU()(x)

        x = Conv2D(
            filters = 64, kernel_size = 3,
            strides = 1, padding = 'same')(x)
        x = LeakyReLU()(x)

        shape_before_flattening = K.int_shape(x)[1:]
        x = Flatten()(x)

        # Instead of connecting the flattened layer directly to the 3-D latent space, we connect
        # it to layers 'mu' and 'log_var'-
        self.mu = Dense(units = self.latent_dim)(x)
        self.log_var = Dense(units = self.latent_dim)(x)

        # The Keras model that outputs the values of 'mu' & 'log_var' for a given input image-
        self.encoder_mu_log = Model(encoder_input, (self.mu, self.log_var))
        
        def sampling(args):
            mu, log_var = args
            epsilon = K.random_normal(shape = K.shape(mu), mean = 0.0, stddev = 1.0)
            return mu + K.exp(log_var / 2) * epsilon

        # This Lambda layer samples a point 'z' in the latent space from the normal distribution
        # defined by the parameters 'mu' and 'log_var'-
        encoder_output = Lambda(sampling, name = 'encoder_output')([self.mu, self.log_var])

        self.encoder = Model(encoder_input, encoder_output)
        
        
        # Define decoder-
        decoder_input = Input(shape = (self.latent_dim))
        
        x = Dense(np.prod(shape_before_flattening))(decoder_input)
        x = Reshape(shape_before_flattening)(x)

        x = Conv2DTranspose(
            filters = 64, kernel_size = (3, 3),
            strides = (1, 1), padding = 'same')(x)
        x = LeakyReLU()(x)

        x = Conv2DTranspose(
            filters = 64, kernel_size = (3, 3),
            strides = (2, 2), padding = 'same')(x)
        x = LeakyReLU()(x)

        x = Conv2DTranspose(
            filters = 32, kernel_size = (3, 3),
            strides = (2, 2), padding = 'same')(x)
        x = LeakyReLU()(x)

        x = Conv2DTranspose(
            filters = 1, kernel_size = (3, 3),
            strides = (1, 1), padding = 'same')(x)
        x = Activation('sigmoid')(x)

        decoder_output = x

        self.decoder = Model(decoder_input, decoder_output)
        
   
        # The complete autoencoder-

        # The input to the autoencoder is the same as the input to the encoder.
        model_input = encoder_input

        # The output from the autoencoder is the output from the encoder passed through
        # the decoder.
        model_output = self.decoder(encoder_output)

        # The Keras model that defines the full autoencoder — a model that takes an image,
        # and passes it through the encoder and back out through the decoder to generate
        # a reconstruction of the original image.
        self.model = Model(model_input, model_output)

        
    def compile(self, learning_rate, r_loss_factor):
        self.learning_rate = learning_rate

        def vae_r_loss(y_true, y_pred):
            r_loss = K.mean(K.square(y_true - y_pred), axis = [1,2,3])
            return r_loss_factor * r_loss

        def vae_kl_loss(y_true, y_pred):
            kl_loss =  -0.5 * K.sum(1 + self.log_var - K.square(self.mu) - K.exp(self.log_var), axis = 1)
            return kl_loss

        def vae_loss(y_true, y_pred):
            r_loss = vae_r_loss(y_true, y_pred)
            kl_loss = vae_kl_loss(y_true, y_pred)
            return  r_loss + kl_loss

        self.model.compile(
            optimizer = tf.keras.optimizers.Adam(learning_rate = learning_rate),
            loss = vae_loss,
            metrics = [vae_r_loss, vae_kl_loss]
        )

        return self.model
    

In [49]:
model = VAE(latent_dim = latent_space_dim)

In [50]:
model = model.compile(learning_rate = 0.003, r_loss_factor = 1000)

In [51]:
model.summary()

Model: "model_22"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_12 (InputLayer)           [(None, 28, 28, 1)]  0                                            
__________________________________________________________________________________________________
conv2d_28 (Conv2D)              (None, 14, 14, 32)   320         input_12[0][0]                   
__________________________________________________________________________________________________
leaky_re_lu_40 (LeakyReLU)      (None, 14, 14, 32)   0           conv2d_28[0][0]                  
__________________________________________________________________________________________________
conv2d_29 (Conv2D)              (None, 7, 7, 64)     18496       leaky_re_lu_40[0][0]             
___________________________________________________________________________________________

In [52]:
model(X_train[:2, :]).shape

TensorShape([2, 28, 28, 1])

In [53]:
# Define early stopping criterion-
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor = 'val_loss', min_delta = 0.0001,
    patience = 4,
    restore_best_weights = True
)
# Adjust 'min_delta' according to training loss for early stopping
# to effectively happen.

In [54]:
print(f"number of training epochs = {num_epochs}")

number of training epochs = 200


In [None]:
# Train autoencoder-
training_hist = model.fit(
    x = X_train, y = X_train,
    batch_size = batch_size, shuffle = True,
    validation_data = (X_test, X_test),
    epochs = num_epochs, callbacks = [early_stopping]
    )

The last conv layer is flattened and connected to a Dense layer of size 2, which represents our 2-D latent space.

In [15]:
# Define encoder-
encoder_input = Input(shape = (28, 28, 1))

x = Conv2D(
    filters = 32, kernel_size = 3,
    strides = 2, padding = 'same')(encoder_input)
x = LeakyReLU()(x)

x = Conv2D(
    filters = 64, kernel_size = 3,
    strides = 2, padding = 'same')(x)
x = LeakyReLU()(x)

x = Conv2D(
    filters = 64, kernel_size = 3,
    strides = 1, padding = 'same')(x)
x = LeakyReLU()(x)

x = Conv2D(
    filters = 64, kernel_size = 3,
    strides = 1, padding = 'same')(x)
x = LeakyReLU()(x)

# shape_before_flattening = K.int_shape(x)[1:]
x = Flatten()(x)

# Instead of connecting the flattened layer directly to the 3-D latent space, we connect
# it to layers 'mu' and 'log_var'-
mu = Dense(units = latent_space_dim)(x)
log_var = Dense(units = latent_space_dim)(x)

# The Keras model that outputs the values of 'mu' & 'log_var' for a given input image-
encoder_mu_log = Model(encoder_input, (mu, log_var))

In [None]:
# print(f"shape_before_flattening: {shape_before_flattening}")
# shape_before_flattening: (7, 7, 64)

In [16]:
def sampling(args):
    mu, log_var = args
    epsilon = K.random_normal(shape = K.shape(mu), mean = 0.0, stddev = 1.0)
    return mu + K.exp(log_var / 2) * epsilon

#### Lambda layer:

A Lambda layer simple wraps any function into Keras layer. For example, the following layer squares its input:

```Lambda(lambda x: x ** 2)```

They are useful when you want to apply a function to a tensor that isn’t already included as one of the out-of-the-box Keras layer types.

In [17]:
# This Lambda layer samples a point 'z' in the latent space from the normal distribution
# defined by the parameters 'mu' and 'log_var'-
encoder_output = Lambda(sampling)([mu, log_var])

In [18]:
# The Keras model that defines the encoder — a model that takes an input image and encodes it
# into the 2D latent space, by sampling a point from the multivariate normal distribution
# defined by 'mu' and 'log_var'-
encoder = Model(encoder_input, encoder_output)

In [19]:
# Sanity check-
encoder.summary()

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 28, 28, 1)]  0                                            
__________________________________________________________________________________________________
conv2d (Conv2D)                 (None, 14, 14, 32)   320         input_1[0][0]                    
__________________________________________________________________________________________________
leaky_re_lu (LeakyReLU)         (None, 14, 14, 32)   0           conv2d[0][0]                     
__________________________________________________________________________________________________
conv2d_1 (Conv2D)               (None, 7, 7, 64)     18496       leaky_re_lu[0][0]                
____________________________________________________________________________________________

In [20]:
# Print randomly initialized weights of first
# filter of first conv layer-
encoder.weights[0][:, :, :, 0]

<tf.Tensor: shape=(3, 3, 1), dtype=float32, numpy=
array([[[ 0.00384198],
        [-0.03363007],
        [-0.05510987]],

       [[ 0.01851153],
        [-0.02809589],
        [-0.12507896]],

       [[-0.08668809],
        [ 0.04580538],
        [ 0.1180629 ]]], dtype=float32)>

In [21]:
# Further sanity check-
encoder(X_train[:2, :]).shape

TensorShape([2, 3])

In [22]:
print(f"Encoder output for latent space = {latent_space_dim}:\n{encoder(X_train[:2, :]).numpy()}")

Encoder output for latent space = 3:
[[ 1.7884558  -0.26782405 -0.92898864]
 [-2.579872    1.31416     0.49726126]]


#### Define decoder-
The decoder is a mirror image of the encoder, except instead of convolutional layers, we use convolutional transpose layers.
Note that the decoder doesn’t have to be a mirror image of the encoder. It can be anything you want, as long as the output from the last layer of the decoder is the same size as the input to the encoder (since our loss function will be comparing these pixelwise).


- __The decoder of a variational autoencoder is identical to the decoder of a plain autoencoder__.

- __The only other part we need to change is the loss function__.

#### Convolutional Transpose Layers:

Standard convolutional layers allow us to halve the size of an input tensor in both height and width, by setting strides = 2.

The convolutional transpose layer uses the same principle as a standard convolutional layer (passing a filter across the image), but is different in that setting strides = 2 'doubles' the size of the input tensor in both height and width.

In a convolutional transpose layer, the 'strides' parameter determines the internal zero padding between pixels in the image.

In Keras, the 'Conv2DTranspose' layer allows us to perform convolutional transpose operations on tensors. By stacking these layers, we can gradually expand the size of each layer, using strides = 2, until we get back to the original image dimension of
28 × 28.


In [25]:
decoder_input = Input(shape = (latent_space_dim))

x = Dense(np.prod((7, 7, 64)))(decoder_input)
x = Reshape((7, 7, 64))(x)

x = Conv2DTranspose(
    filters = 64, kernel_size = (3, 3),
    strides = (1, 1), padding = 'same')(x)
x = LeakyReLU()(x)

x = Conv2DTranspose(
    filters = 64, kernel_size = (3, 3),
    strides = (2, 2), padding = 'same')(x)
x = LeakyReLU()(x)

x = Conv2DTranspose(
    filters = 32, kernel_size = (3, 3),
    strides = (2, 2), padding = 'same')(x)
x = LeakyReLU()(x)

x = Conv2DTranspose(
    filters = 1, kernel_size = (3, 3),
    strides = (1, 1), padding = 'same')(x)
x = Activation('sigmoid')(x)

decoder_output = x

decoder = Model(decoder_input, decoder_output)

In [26]:
# Sanity check-
decoder.summary()

Model: "model_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_4 (InputLayer)         [(None, 3)]               0         
_________________________________________________________________
dense_3 (Dense)              (None, 3136)              12544     
_________________________________________________________________
reshape (Reshape)            (None, 7, 7, 64)          0         
_________________________________________________________________
conv2d_transpose (Conv2DTran (None, 7, 7, 64)          36928     
_________________________________________________________________
leaky_re_lu_4 (LeakyReLU)    (None, 7, 7, 64)          0         
_________________________________________________________________
conv2d_transpose_1 (Conv2DTr (None, 14, 14, 64)        36928     
_________________________________________________________________
leaky_re_lu_5 (LeakyReLU)    (None, 14, 14, 64)        0   

In [27]:
# Sanity check-
print(f"Decoder output using encoder output for latent space = {latent_space_dim}:\n"
      f"{decoder(encoder(X_train[:2, :])).shape}")

Decoder output using encoder output for latent space = 3:
(2, 28, 28, 1)


#### Joining the Encoder to the Decoder
To train the encoder and decoder simultaneously, we need to define a model that will represent the flow of an image through the encoder and back out through the decoder.


In [28]:
# The complete autoencoder-

# The input to the autoencoder is the same as the input to the encoder.
model_input = encoder_input

# The output from the autoencoder is the output from the encoder passed through
# the decoder.
model_output = decoder(encoder_output)

# The Keras model that defines the full autoencoder—a model that takes an image,
# and passes it through the encoder and back out through the decoder to generate
# a reconstruction of the original image.
model = Model(model_input, model_output)

In [28]:
# Final sanity check-
model.summary()

Model: "model_10"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_6 (InputLayer)            [(None, 28, 28, 1)]  0                                            
__________________________________________________________________________________________________
conv2d_16 (Conv2D)              (None, 14, 14, 32)   320         input_6[0][0]                    
__________________________________________________________________________________________________
leaky_re_lu_19 (LeakyReLU)      (None, 14, 14, 32)   0           conv2d_16[0][0]                  
__________________________________________________________________________________________________
conv2d_17 (Conv2D)              (None, 7, 7, 64)     18496       leaky_re_lu_19[0][0]             
___________________________________________________________________________________________

In [29]:
# Loop through each trainable layer and print it's shape-
tot_params = 0
for layer in model.weights:
    loc_param = tf.math.count_nonzero(layer, axis = None).numpy()
    tot_params += loc_param
    print(f"param.shape: {layer.shape} has {loc_param} params")

param.shape: (3, 3, 1, 32) has 288 params
param.shape: (32,) has 0 params
param.shape: (3, 3, 32, 64) has 18432 params
param.shape: (64,) has 0 params
param.shape: (3, 3, 64, 64) has 36864 params
param.shape: (64,) has 0 params
param.shape: (3, 3, 64, 64) has 36864 params
param.shape: (64,) has 0 params
param.shape: (3136, 3) has 9408 params
param.shape: (3,) has 0 params
param.shape: (3136, 3) has 9408 params
param.shape: (3,) has 0 params
param.shape: (3, 3136) has 9408 params
param.shape: (3136,) has 0 params
param.shape: (3, 3, 64, 64) has 36864 params
param.shape: (64,) has 0 params
param.shape: (3, 3, 64, 64) has 36864 params
param.shape: (64,) has 0 params
param.shape: (3, 3, 32, 64) has 18432 params
param.shape: (32,) has 0 params
param.shape: (3, 3, 1, 32) has 288 params
param.shape: (1,) has 0 params


In [30]:
print(f"Total number of trainable parameters in VAE model = {tot_params}")

Total number of trainable parameters in VAE model = 213120


In [31]:
# Sanity check-
model(X_train[:2, :]).shape

TensorShape([2, 28, 28, 1])

### VAE _loss_ function:

Previously, our loss function only consisted of the RMSE loss between images and their reconstruction after being passed through the encoder and decoder. This reconstruction loss also appears in a variational autoencoder, but we require one extra component: The Kullback–Leibler (KL) divergence.

- KL divergence is a way of measuring how much one probability distribution differs from another.
- In a VAE, we want to measure how different our normal distribution with parameters mu and log_var is from the standard normal distribution.
- The sum is taken over all the dimensions in the latent space.
- KL loss is minimized to 0 when mu = 0 and log_var = 0 for all dimensions. As these two terms start to differ from 0, KL loss increases.
- In summary, the KL divergence term penalizes the network for encoding observations to mu and log_var variables that differ significantly from the parameters of a standard normal distribution, namely mu = 0 and log_var = 0.


#### Why does this addition to the loss function help?

1. First, we now have a well-defined distribution that we can use for choosing points in the latent space—the standard normal distribution. If we sample from this distribution, we know that we’re very likely to get a point that lies within the limits of what the VAE is used to seeing.

1. Secondly, since this term (KL divergence) tries to force all encoded distributions toward the standard normal distribution, there is less chance that large gaps will form between point clusters. Instead, the encoder will try to use the space around the origin symmetrically and efficiently.


- In the code, the loss function for a VAE is simply the addition of the reconstruction loss and the KL divergence loss term; loss = reconstruction loss + KL divergence loss.

- We weight the reconstruction loss with a term, r_loss_factor, that ensures that it is well balanced with the KL divergence loss.

- If we weight the reconstruction loss too heavily, the KL loss will not have the desired regulatory effect and we will see the same problems that we experienced with the plain autoencoder.

- If the weighting term is too small, the KL divergence loss will dominate and the reconstructed images will be poor.

- This weighting term is one of the parameters to tune when you’re training your VAE.


In [33]:
# Weight the reconstruction loss 'r_loss_factor' to ensure that it is well balanced with the KL divergence loss-
r_loss_factor = 1000

In [34]:
def vae_r_loss(y_true, y_pred):
    # Reconstruction loss-
    r_loss = K.mean(K.square(y_true - y_pred), axis = [1,2,3])
    return r_loss_factor * r_loss


In [35]:
def vae_kl_loss(y_true, y_pred):
    # KL-Divergence loss-
    kl_loss = -0.5 * K.sum(1 + log_var - K.square(mu) - K.exp(log_var), axis = 1)
    return kl_loss


In [36]:
def vae_loss(y_true, y_pred):
    # VAE loss = Reconstruction loss + KL-Divergence loss
    r_loss = vae_r_loss(y_true, y_pred)
    kl_loss = vae_kl_loss(y_true, y_pred)
    return r_loss + kl_loss


In [37]:
K.square(mu), K.exp(log_var)

(<KerasTensor: shape=(None, 3) dtype=float32 (created by layer 'tf.math.square')>,
 <KerasTensor: shape=(None, 3) dtype=float32 (created by layer 'tf.math.exp')>)

In [38]:
# Compile model-
model.compile(
    optimizer = tf.keras.optimizers.Adam(learning_rate = 0.003),
    loss = vae_loss,
    metrics = [vae_r_loss, vae_kl_loss]
)

In [32]:
# Define early stopping criterion-
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor = 'val_loss', min_delta = 0.0001,
    patience = 4,
    restore_best_weights = True
)
# Adjust 'min_delta' according to training loss for early stopping
# to effectively happen.

In [33]:
print(f"number of training epochs = {num_epochs}")

number of training epochs = 200


In [None]:
# Train autoencoder-
training_hist = model.fit(
    x = X_train, y = X_train,
    batch_size = batch_size, shuffle = True,
    validation_data = (X_test, X_test),
    epochs = num_epochs, callbacks = [early_stopping]
    )