# Convolutional Variational Autoencoder

In [1]:
from IPython import display

import glob
import imageio
import matplotlib.pyplot as plt
import numpy as np
import PIL
import tensorflow as tf
import tensorflow_probability as tfp
import time

2025-05-07 08:18:10.040437: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1746598690.108863   11227 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1746598690.126251   11227 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1746598690.254926   11227 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1746598690.254961   11227 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1746598690.254964   11227 computation_placer.cc:177] computation placer alr

## 1 Load Dataset

In [2]:
(x_train, _), (x_test, _) = tf.keras.datasets.mnist.load_data()
x_train.shape, x_test.shape

((60000, 28, 28), (10000, 28, 28))

## 2 Preprocessing

In [3]:
import numpy.typing as npt


def preprocess_images(images: npt.NDArray[np.float32]) -> npt.NDArray[np.float32]:
    """Reshape and normalize images.

    Parameters
    ----------
    images : npt.NDArray[np.float32]
        Images to be preprocessed.

    Returns
    -------
    npt.NDArray[np.float32]
        Preprocessed images.
    """

    images = images.reshape(shape=(images.shape[0], 28, 28, 1)) / 255.0
    return np.where(images > 0.5, 1.0, 0.0).astype(np.float32)

In [4]:
batch_size = 32

ds_train = (
    tf.data.Dataset.from_tensor_slices(tensors=x_train)
    .shuffle(buffer_size=len(x_train))
    .batch(batch_size=batch_size)
)
ds_test = (
    tf.data.Dataset.from_tensor_slices(tensors=x_test)
    .shuffle(buffer_size=len(x_test))
    .batch(batch_size=batch_size)
)

I0000 00:00:1746598693.470790   11227 gpu_device.cc:2019] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 3414 MB memory:  -> device: 0, name: NVIDIA GeForce GTX 1050 Ti with Max-Q Design, pci bus id: 0000:01:00.0, compute capability: 6.1


## 3 Define Architecture

In [None]:
class CVAE(tf.keras.Model):
    """Conditional Variational Autoencoder (CVAE) class."""

    def __init__(self, latent_dim: int) -> None:
        """Instantiate the CVAE

        Parameters
        ----------
        latent_dim : int
            Size of the latent space.
        """
        super(CVAE, self).__init__()
        self.latent_dim = latent_dim
        self.encoder = tf.keras.Sequential(
            [
                tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
                tf.keras.layers.Conv2D(
                    filters=32,
                    kernel_size=3,
                    strides=(2, 2),
                    activation=tf.nn.relu,
                ),
                tf.keras.layers.Conv2D(
                    filters=64,
                    kernel_size=3,
                    strides=(2, 2),
                    activation=tf.nn.relu,
                ),
                tf.keras.layers.Flatten(),
                tf.keras.layers.Dense(units=latent_dim + latent_dim),
            ]
        )

        self.decoder = tf.keras.Sequential(
            [
                tf.keras.layers.InputLayer(input_shape=(latent_dim,)),
                tf.keras.layers.Dense(units=7 * 7 * 32, activation=tf.nn.relu),
                tf.keras.layers.Reshape(target_shape=(7, 7, 32)),
                tf.keras.layers.Conv2DTranspose(
                    filters=64,
                    kernel_size=3,
                    strides=2,
                    padding="same",
                    activation=tf.nn.relu,
                ),
                tf.keras.layers.Conv2DTranspose(
                    filters=32,
                    kernel_size=3,
                    strides=2,
                    padding="same",
                    activation=tf.nn.relu,
                ),
                tf.keras.layers.Conv2DTranspose(),
            ]
        )

    @tf.function
    def sample(self, eps: tf.Tensor = None) -> tf.Tensor:
        """Sample with the decoder.

        Parameters
        ----------
        eps : tf.Tensor, optional
            Independently samples from standard normal distribution. If None, generate 100 samples.

        Returns
        -------
        tf.Tensor
            Generated sample images.
        """
        if eps is None:
            eps = tf.random.normal(shape=(100, self.latent_dim))
        return self.decode(eps, apply_sigmoid=True)
    
    def encode(self, x: tf.Tensor) -> tuple[tf.Tensor, tf.Tensor]:
        """Encode the input images into a latent space.

        Parameters
        ----------
        x : tf.Tensor
            Input images.

        Returns
        -------
        tuple[tf.Tensor, tf.Tensor]
            Mean and log variance of the images in the latent space.
        """
        mean, logvar = tf.split(value=self.encoder(x=x), num_or_size_splits=2, axis=1)
        return mean, logvar
    
    def reparameterize(self, mean: tf.Tensor, logvar: tf.Tensor) -> tf.Tensor:
        """Reparameterization 
        
        Get the input for the decoder.

        Parameters
        ----------
        mean : tf.Tensor
            Mean values of the input images in the latent space.
        logvar : tf.Tensor
            Log variance of the input images in the latent space.

        Returns
        -------
        tf.Tensor
            Input for the decoder.
        """
        eps = tf.random.normal(shape=mean.shape)
        return eps * tf.exp(logvar * 0.5) + mean
    
    def decode(self, z: tf.Tensor, apply_sigmoid: bool = False):
        """Generate images from the latent space.

        Parameters
        ----------
        z : tf.Tensor
            Reparameterized embeddings of the images from latent space.
        apply_sigmoid : bool, optional
            If to apply sigmoid before output, by default False

        Returns
        -------
        tf.Tensor
            Generated images with the value for each pixel as logits or probabilities.
        """
        logits = self.decoder(z)
        if apply_sigmoid:
            probs = tf.sigmoid(logits)
            return probs
        return logits

## 4 Define Loss Function and Optimizer

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)


def log_normal_pdf(sample: tf.Tensor, mean: tf.Tensor, logvar: tf.Tensor, raxis: int=1):
    log2pi = tf.math.log(2.0 * np.pi)
    return tf.reduce_sum(
        input_tensor=-0.5
        * ((sample - mean) ** 2.0 * tf.exp(-logvar) + logvar + log2pi),
        axis=raxis,
    )
