<div style="width: 100%; clear: both;">
<div style="float: left; width: 50%;">
<img src="http://www.uoc.edu/portal/_resources/common/imatges/marca_UOC/UOC_Masterbrand.jpg", align="left">
</div>
<div style="float: right; width: 50%;">
<p style="margin: 0; padding-top: 22px; text-align:right;">M0.532 · Pattern Recognition</p>
<p style="margin: 0; text-align:right;">Computational Engineering and Mathematics Master</p>
<p style="margin: 0; text-align:right; padding-button: 100px;">Computers, Multimedia and Telecommunications Department</p>
</div>
</div>
<div style="width:100%;">&nbsp;</div>

Reference Notebook: <br>



https://keras.io/examples/generative/vae/

## Setup

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

## Create a sampling layer

In [None]:
# variational autoencoders are characterized for having a Sampling layer:
# this layer generates the mean and covariance that define a chosen distribution of latent vectors
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]

        # epsilon is the random value that will chose in the latent space
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        # lets define the output as the mean of the distribution and the random part depending on epsilon:
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon


## Build the encoder

In [None]:
# we chose 2 as the latent space dimension to be able to display the space (last part of the notebook)
latent_dim = 2

# encoder has the input size of the images (28,28):
encoder_inputs = keras.Input(shape=(28, 28, 1))
# convolutional layer with 32 layers and stride 2, the output has size (14,14,32)
x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(encoder_inputs)
# convolutional layer with 64 layers and stride 2, the output has size (7,7,64)
x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)
# lets flatten (https://keras.io/api/layers/reshaping_layers/flatten/) the 
# signal to a flat representation of size 7x7x64 = 3136 
x = layers.Flatten()(x)
# a dense layer with only 16 channels:
x = layers.Dense(16, activation="relu")(x)

# and the characteristic part of variational autoencoders: 
# produce a single latent vector, which gets passed into the decoder
z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
# we use the sampling function defined above
z = Sampling()([z_mean, z_log_var])

# and build the model:
encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
encoder.summary()

## Build the decoder

In [None]:
latent_inputs = keras.Input(shape=(latent_dim,))
# the decoder receives the latent vector as an input and the 3136 size vector
x = layers.Dense(7 * 7 * 64, activation="relu")(latent_inputs)
# reshape: inverse layer of the flatten in the encoder
x = layers.Reshape((7, 7, 64))(x)

# https://keras.io/api/layers/convolution_layers/convolution2d_transpose/
# upsampling the representation to a (14,14,64) representation 
# (inverse layer of x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(encoder_inputs))
x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)

# upsampling the representation to a (28,28,32) representation
# (inverse layer of x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(x))
x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)

# generate the output: image of size (28,28,1)
decoder_outputs = layers.Conv2DTranspose(1, 3, activation="sigmoid", padding="same")(x)

# build the decoder:
decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()



## Define the VAE as a `Model` with a custom `train_step`

In [None]:
# we need to create the VAE with a custom training since we define two loss functions:
#  reconstruction_loss: loss in the output (how well we are generating the numbers)
#  kl divergence loss: KL stands for Kullback Leibler Divergence, it is a measure of divergence between two distributions (distribution in the encoder: mean + var)
class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)
            reconstruction_loss = tf.reduce_mean(
                tf.reduce_sum(
                    keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
                )
            )
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            total_loss = reconstruction_loss + kl_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }


## Train the VAE

In [None]:
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
mnist_digits = np.concatenate([x_train, x_test], axis=0)
mnist_digits = np.expand_dims(mnist_digits, -1).astype("float32") / 255

vae = VAE(encoder, decoder)
vae.compile(optimizer=keras.optimizers.Adam())
vae.fit(mnist_digits, epochs=30, batch_size=128)

## Display a grid of sampled digits

In [None]:
import matplotlib.pyplot as plt


def plot_latent_space(vae, n=30, figsize=15):
    # display a n*n 2D manifold of digits
    digit_size = 28
    scale = 1.0
    figure = np.zeros((digit_size * n, digit_size * n))
    # linearly spaced coordinates corresponding to the 2D plot
    # of digit classes in the latent space
    grid_x = np.linspace(-scale, scale, n)
    grid_y = np.linspace(-scale, scale, n)[::-1]

    for i, yi in enumerate(grid_y):
        for j, xi in enumerate(grid_x):
            z_sample = np.array([[xi, yi]])
            x_decoded = vae.decoder.predict(z_sample)
            digit = x_decoded[0].reshape(digit_size, digit_size)
            figure[
                i * digit_size : (i + 1) * digit_size,
                j * digit_size : (j + 1) * digit_size,
            ] = digit

    plt.figure(figsize=(figsize, figsize))
    start_range = digit_size // 2
    end_range = n * digit_size + start_range
    pixel_range = np.arange(start_range, end_range, digit_size)
    sample_range_x = np.round(grid_x, 1)
    sample_range_y = np.round(grid_y, 1)
    plt.xticks(pixel_range, sample_range_x)
    plt.yticks(pixel_range, sample_range_y)
    plt.xlabel("z[0]")
    plt.ylabel("z[1]")
    plt.imshow(figure, cmap="Greys_r")
    plt.show()


plot_latent_space(vae)

## Display how the latent space clusters different digit classes

In [None]:

def plot_label_clusters(vae, data, labels):
    # display a 2D plot of the digit classes in the latent space
    z_mean, _, _ = vae.encoder.predict(data)
    plt.figure(figsize=(12, 10))
    plt.scatter(z_mean[:, 0], z_mean[:, 1], c=labels)
    plt.colorbar()
    plt.xlabel("z[0]")
    plt.ylabel("z[1]")
    plt.show()


(x_train, y_train), _ = keras.datasets.mnist.load_data()
x_train = np.expand_dims(x_train, -1).astype("float32") / 255

plot_label_clusters(vae, x_train, y_train)