# Exam Solution: VAE for Image Reconstruction and Generation

**Student:** Rimoldi

**Exam Session:** July 15, 2025

This notebook implements the proposed solution for the exam. The goal is to design a deep neural network capable of encoding images into a probabilistic latent space, then reconstructing or generating new images, closely following the structure of the provided example code and the numerical order of the exam sheet.

Notes :
- This notebook takes around 6 minutes to run.
- I underlined the differences between the exam in the following code like this: <font color="red">**CHANGE**</font>.

In [16]:
%pip install tensorflow

import warnings
import random
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, ops, Model
import matplotlib.pyplot as plt
import seaborn as sns
import string
import unicodedata
import nltk
from nltk.corpus import stopwords
import itertools
import pickle
import requests

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score, f1_score, mean_squared_error, confusion_matrix

warnings.filterwarnings("ignore")
np.random.seed(42)
tf.random.set_seed(42)

Note: you may need to restart the kernel to use updated packages.


# 0 - Introduction

This section introduces the dataset loading process, utilizing the requests library to download the necessary data from the GitHub repository.

In [17]:
url = "https://raw.githubusercontent.com/PaRi29/DeepLearningExam_VAE/main/assets/input_data.pkl"
print(f"Downloading data from {url}...")
response = requests.get(url)
response.raise_for_status()
with open("input_data.pkl", "wb") as f:
    f.write(response.content)

Downloading data from https://raw.githubusercontent.com/PaRi29/DeepLearningExam_VAE/main/assets/input_data.pkl...


# 1: MODEL

**Model Choice:** Variational Autoencoder (VAE).


# 2: INPUT AND PREPROCESSING

Normalization: Since we are working with grayscale images, pixel values are normalized from the [0, 255] range to the [0, 1] range by dividing each value by 255.0. This process helps stabilize and speed up model training.

Input Format (Shape): An additional channel dimension is added to the images (since they are grayscale, there is only one channel). The resulting input shape is (batch_size, 28, 28, 1), which is required by convolutional layers.

In [18]:
try:
    with open("input_data.pkl", "rb") as f:
        dd = pickle.load(f)
    print("Data loaded successfully.")
    data = dd['data']
    labels = dd['labels']
except Exception as e:
    print(f"Error loading data: {e}")
    raise

def preprocess_images(images):
    images = images.astype("float32") / 255.0
    images = np.expand_dims(images, axis=-1)
    return images

data = preprocess_images(data)

# Manually split data and labels into training and validation sets
val_fraction = 0.2
num_samples = data.shape[0]
num_val = int(num_samples * val_fraction)

# Shuffle the data before splitting
indices = np.arange(num_samples)
np.random.seed(42)
np.random.shuffle(indices)
data = data[indices]
labels = labels[indices]

x_val = data[:num_val]
x_train = data[num_val:]

INPUT_SHAPE = x_train.shape[1:]

print(f"Training data shape: {x_train.shape}")
print(f"Validation data shape: {x_val.shape}")

Data loaded successfully.
Training data shape: (5600, 28, 28, 1)
Validation data shape: (1400, 28, 28, 1)


# 3: MODEL CONFIGURATION

### a. Model Composition
The VAE consists of an Encoder, a Decoder, and a specialized `Sampling` layer. 
**Conceptual Outline:** `Input Image` -> **[Encoder]** -> `Latent Distribution` -> **[Sampling]** -> `Latent Vector` -> **[Decoder]** -> `Reconstructed Image`

### b. Image Projection (Encoder)
The Encoder is a CNN that maps an image to a probabilistic latent space. It reduces the image's dimensionality through convolutional and pooling layers, finally outputting the mean (`z_mean`) and log-variance (`z_log_var`) of the learned distribution.

### c. Image Reconstruction (Decoder)
The Decoder is a deconvolutional network, symmetric to the encoder. It takes a latent vector `z` (sampled from the distribution defined by `z_mean` and `z_log_var`), upsamples it through transposed convolutions (or upsampling layers), and reconstructs the image.

### d. Generation of New Images
To generate new images, we discard the encoder and use only the trained decoder. A random vector `z` is sampled from a standard normal distribution N(0,1) and passed to the decoder, which then generates a new, coherent image that was not in the original dataset.

### e. Hyperparameters
Key hyperparameters to tune include `learning_rate`, `batch_size`, `latent_dim` (the size of the latent space), and `kl_reg` (the weight of the KL divergence loss). A random search is performed to find a good combination of these values.

In [19]:
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z (reparameterization trick)."""
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = ops.shape(z_mean)[0]
        dim = ops.shape(z_mean)[1]
        epsilon = keras.random.normal(shape=(batch, dim))
        return z_mean + ops.exp(0.5 * z_log_var) * epsilon

def vae_enc(input_shape, latent_dim, filter_base=32, dense_units=16):
    """Encoder model (Projection), configurable via hyperparameters."""
    encoder_inputs = keras.Input(shape=input_shape)
    
    # Block 1
    x = layers.Conv2D(filter_base, 3, activation="relu", padding="same")(encoder_inputs)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D(2, padding="same")(x)  # 28x28 -> 14x14
    
    # Block 2
    x = layers.Conv2D(filter_base * 2, 3, activation="relu", padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D(2, padding="same")(x)  # 14x14 -> 7x7
    
    x = layers.Flatten()(x)
    x = layers.Dense(dense_units, activation="relu")(x)
    z_mean = layers.Dense(latent_dim, name="z_mean")(x)
    z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
    z = Sampling()([z_mean, z_log_var])
    
    encoder = Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
    return encoder

def vae_dec(latent_dim, filter_base=32, dense_units=16):
    """Decoder model (Reconstruction), configurable via hyperparameters."""
    latent_inputs = keras.Input(shape=(latent_dim,))
    
    # Initial part to prepare for reconstruction
    x = layers.Dense(7 * 7 * (filter_base * 2), activation="relu")(latent_inputs)
    x = layers.Reshape((7, 7, filter_base * 2))(x)
    
    # Block 1
    x = layers.UpSampling2D(2)(x)  # 7x7 -> 14x14
    x = layers.Conv2D(filter_base * 2, 3, activation="relu", padding="same")(x)
    x = layers.BatchNormalization()(x)
    
    # Block 2
    x = layers.UpSampling2D(2)(x)  # 14x14 -> 28x28
    x = layers.Conv2D(filter_base, 3, activation="relu", padding="same")(x)
    x = layers.BatchNormalization()(x)
    
    decoder_outputs = layers.Conv2D(1, 3, activation="sigmoid", padding="same")(x)
    decoder = Model(latent_inputs, decoder_outputs, name="decoder")
    return decoder


In [20]:
# Define the parameter grid and search function for hyperparameters (Point 3e)
param_grid = {
    'learning_rate': [1e-4, 1e-3],
    'batch_size':    [64, 128],
    'latent_dim':    [2, 8],
    'kl_reg':        [0.5, 1.0, 2.0],
    'optimizer':     ['adam', 'rmsprop'],
    'filter_base':   [16, 32],
    'dense_units':   [16, 32],
    'patience':      [2, 3, 5],
    'min_delta':     [0.0, 1e-4, 1e-3],
    'restore_best_weights': [True, False],
    'monitor':       ['val_loss', 'val_reconstruction_loss']
}
num_samples = 5 # Number of random combinations to test


def random_search_vae(VAE_MODEL, INPUT_SHAPE, param_grid, x_train, x_val, samples=num_samples):
    """
    Performs a random search over the defined parameter grid.
    
    Args:
        VAE_MODEL: The VAE model class (which must be defined).
        INPUT_SHAPE: The shape of the input images.
        param_grid: A dictionary of hyperparameters to search over.
        x_train: Training data.
        x_val: Validation data.
        samples: The number of random combinations to try.
    
    Returns:
        The dictionary containing the best hyperparameter configuration found.
    """
    combos = list(itertools.product(*param_grid.values()))
    sampled_combos = random.sample(combos, min(samples, len(combos)))
    configs = [dict(zip(param_grid.keys(), c)) for c in sampled_combos]
    best_val_loss = np.inf
    best_cfg = None

    for idx, cfg in enumerate(configs):
        print(f"\n=== Training config {idx+1}/{len(configs)} ===")
        print("Config:", cfg)
        
        # Build model with current config
        encoder = vae_enc(INPUT_SHAPE, cfg['latent_dim'], cfg['filter_base'], cfg['dense_units'])
        decoder = vae_dec(cfg['latent_dim'], cfg['filter_base'], cfg['dense_units'])
        vae_model = VAE_MODEL(encoder, decoder, reg=cfg['kl_reg'])
        
        # Select and configure optimizer
        if cfg['optimizer'] == 'adam':
            optimizer = keras.optimizers.Adam(learning_rate=cfg['learning_rate'])
        else: # rmsprop
            optimizer = keras.optimizers.RMSprop(learning_rate=cfg['learning_rate'])
            
        vae_model.compile(optimizer=optimizer)
        
        # Define Early Stopping callback with all relevant hyperparameters
        early_stopping = keras.callbacks.EarlyStopping(
            monitor=cfg['monitor'],
            patience=cfg['patience'],
            min_delta=cfg['min_delta'],
            verbose=1,
            restore_best_weights=cfg['restore_best_weights']
        )

        # Train the model
        history = vae_model.fit(
            x_train, x_train,
            epochs=50, # Set a high number; Early Stopping will terminate training
            batch_size=cfg['batch_size'],
            validation_data=(x_val, x_val),
            callbacks=[early_stopping],
            verbose=0 # Suppress epoch-by-epoch output for cleaner search
        )
        
        # Get the best validation loss from this run (thanks to restore_best_weights)
        # Use the same metric as monitored
        monitor_metric = cfg['monitor']
        if monitor_metric in history.history:
            final_val_loss = min(history.history[monitor_metric])
        else:
            # fallback to val_loss if custom metric not found
            final_val_loss = min(history.history['val_loss'])
        print(f"Best {monitor_metric} for this config: {final_val_loss:.4f}")
        
        if final_val_loss < best_val_loss:
            best_val_loss = final_val_loss
            best_cfg = cfg
            print(f"*** New best model found! ***")

    print("\n=== Best Results Summary ===")
    print(f"Best validation loss: {best_val_loss:.4f}")
    print("Best configuration:", best_cfg)
    return best_cfg


# 4: OUTPUT

The output layer of the decoder is a `Conv2D` layer designed as follows:
- **Filters:** `1`. This is because the output images are grayscale and have only one channel.
- **Activation Function:** `sigmoid`. The input images were normalized to the range [0, 1]. The sigmoid function outputs values in this same [0, 1] range, making it the natural choice for the final layer. This ensures the reconstructed pixel values are on the same scale as the input, which is essential for calculating the reconstruction loss.

### 5. LOSS

CHANGE with respect to exam: In the exam, I **mistakenly used Mean Absolute Error (MAE)** as the reconstruction loss.  
While MAE is sometimes used in **standard autoencoders** due to its robustness to outliers, it is **not the canonical choice for Variational Autoencoders (VAEs)**.

The loss function for a **VAE** consists of two main components, which reflect its **probabilistic generative nature**:

---

**1. Reconstruction Loss**  
For VAEs, the reconstruction term is derived from the assumption that pixels are generated from a Gaussian distribution. Under this assumption, the appropriate reconstruction loss is the **Mean Squared Error (MSE)**:

$$
\mathcal{L}_{\text{recon}} = \frac{1}{N} \sum_{i=1}^{N} (x_i - \hat{x}_i)^2
$$

Where:
- $x_i$ is the original pixel value,
- $\hat{x}_i$ is the reconstructed pixel value,
- $N$ is the total number of pixels.


**2. Kullback-Leibler (KL) Divergence**  
This term ensures that the latent space distribution learned by the encoder remains close to a prior distribution (usually a standard normal $\mathcal{N}(0, I)$):

$$
\mathcal{L}_{\text{KL}} = -\frac{1}{2} \sum_{j=1}^{d} \left(1 + \log(\sigma_j^2) - \mu_j^2 - \sigma_j^2 \right)
$$

Where:
- $\mu_j$ and $\sigma_j$ are the predicted mean and standard deviation for latent dimension $j$,
- $d$ is the dimensionality of the latent space.

This term is **crucial** for enabling the model to generalize and generate new data by sampling from the latent space.

---

**3. Total Loss Function**

The final VAE loss combines the two components:

$$
\mathcal{L}_{\text{total}} = \mathcal{L}_{\text{recon}} + \beta \cdot \mathcal{L}_{\text{KL}}
$$

Where:
- $\beta$ is a hyperparameter that balances reconstruction accuracy with latent space regularization.  
  In the standard VAE, $\beta = 1$, but it can be tuned (e.g., in $\beta$-VAEs) to control disentanglement.

---

**Summary**  
My initial response mistakenly applied a loss formulation more suited to standard autoencoders.  
A correct VAE implementation must:
- Use **MSE** for reconstruction (derived from Gaussian likelihood),
- Include the **KL divergence** for latent regularization,
- Combine both in a total loss that supports both reconstruction and generative capabilities.

This correction reflects a deeper understanding of **why** these loss terms are used in VAEs and how they relate to the model’s probabilistic foundations.

In [21]:
class VAE(Model):
    def __init__(self, encoder, decoder, reg=1.0, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.reg = reg
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    @tf.function
    def train_step(self, data):
        if isinstance(data, tuple):
            data = data[0]
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)
            reconstruction_loss = ops.mean(
                ops.sum(keras.losses.mean_squared_error(data, reconstruction), axis=(1, 2))
            )
            kl_loss = -0.5 * (1 + z_log_var - ops.square(z_mean) - ops.exp(z_log_var))
            kl_loss = ops.mean(ops.sum(kl_loss, axis=1))
            total_loss = reconstruction_loss + self.reg * kl_loss

        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }
    
    def test_step(self, data):
        if isinstance(data, tuple):
            data = data[0]
        z_mean, z_log_var, z = self.encoder(data, training=False)
        reconstruction = self.decoder(z, training=False)
        reconstruction_loss = ops.mean(
            ops.sum(keras.losses.mean_squared_error(data, reconstruction), axis=(1, 2))
        )
        kl_loss = -0.5 * (1 + z_log_var - ops.square(z_mean) - ops.exp(z_log_var))
        kl_loss = ops.mean(ops.sum(kl_loss, axis=1))
        total_loss = reconstruction_loss + self.reg * kl_loss
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

    def call(self, inputs):
        _, _, z = self.encoder(inputs)
        return self.decoder(z)

# 6: MODEL EVALUATION

The model is evaluated through a comprehensive process that includes hyperparameter tuning, final training, and both quantitative and qualitative assessment.


### Hyperparameter Search Execution
First, we execute the random search defined in Point 3e to find the best hyperparameters using a validation set.

In [22]:
# Create a validation set for hyperparameter tuning
x_train, x_val = train_test_split(x_train_full, test_size=0.2, random_state=42)

# Execute the search
best_config = random_search_vae(param_grid, x_train, x_val, VAE_MODEL=VAE)

NameError: name 'x_train_full' is not defined

### Final Model Training

Using the best hyperparameters found, we instantiate and train a final model on the entire training dataset for a larger number of epochs.

In [None]:
# Imposta i migliori iperparametri trovati o usa i default se la ricerca non ha avuto successo
if best_config is None:
    print("Random search fallita o saltata. Uso valori di default.")
    best_config = {
        'learning_rate': 1e-3,
        'batch_size': 128,
        'latent_dim': 2,
        'kl_reg': 1.0,
        'filter_base': 16,
        'dense_units': 16,
        'optimizer': 'adam',
        'patience': 3,
        'min_delta': 1e-4,
        'restore_best_weights': True,
        'monitor': 'val_loss'
    }

LATENT_DIM = best_config.get('latent_dim', 2)
BATCH_SIZE = best_config.get('batch_size', 128)
LEARNING_RATE = best_config.get('learning_rate', 1e-3)
KL_REG = best_config.get('kl_reg', 1.0)
FILTER_BASE = best_config.get('filter_base', 16)
DENSE_UNITS = best_config.get('dense_units', 16)
OPTIMIZER = best_config.get('optimizer', 'adam')
EPOCHS = 20

print("\n--- Training Final Model with Best Hyperparameters ---")
print(f"Config: {best_config}")

# Crea encoder e decoder con la struttura aggiornata
final_encoder = vae_enc(INPUT_SHAPE, LATENT_DIM, filter_base=FILTER_BASE, dense_units=DENSE_UNITS)
final_decoder = vae_dec(LATENT_DIM, filter_base=FILTER_BASE, dense_units=DENSE_UNITS)
final_vae = VAE(final_encoder, final_decoder, reg=KL_REG)

if OPTIMIZER == 'adam':
    optimizer = keras.optimizers.Adam(learning_rate=LEARNING_RATE)
elif OPTIMIZER == 'rmsprop':
    optimizer = keras.optimizers.RMSprop(learning_rate=LEARNING_RATE)
else:
    optimizer = keras.optimizers.Adam(learning_rate=LEARNING_RATE)

final_vae.compile(optimizer=optimizer)

final_encoder.summary()
final_decoder.summary()

# EarlyStopping callback se richiesto
callbacks = []
if 'patience' in best_config and 'monitor' in best_config:
    callbacks.append(
        keras.callbacks.EarlyStopping(
            monitor=best_config['monitor'],
            patience=best_config['patience'],
            min_delta=best_config.get('min_delta', 0.0),
            restore_best_weights=best_config.get('restore_best_weights', True)
        )
    )

history = final_vae.fit(
    x_train_full, x_train_full,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    validation_data=(x_test, x_test),
    callbacks=callbacks
)

### Final Model Assessment
The reconstruction capabilities are evaluated both quantitatively and qualitatively.

1.  **Quantitative Evaluation:** We use the trained model to reconstruct the test set images and calculate the **Mean Squared Error (MSE)** and **Mean Absolute Error (MAE)** between the original and reconstructed images. Lower values indicate better performance.
2.  **Qualitative Evaluation:** We visually inspect the results by plotting original test images against their reconstructions. This helps assess the fidelity and sharpness of the output. We also visualize the generative latent space (if 2D) to confirm that the model has learned a smooth manifold for generating new, coherent images.

In [None]:
# Quantitative Evaluation of Reconstruction
reconstructed_images = final_vae.predict(x_test)
test_mse = mean_squared_error(x_test.flatten(), reconstructed_images.flatten())

print(f"\nTest Set Evaluation:")
print(f"Mean Squared Error (MSE): {test_mse:.4f}")

# Qualitative Evaluation
def plot_reconstructions(original, reconstructed, n=10):
    plt.figure(figsize=(20, 4))
    plt.suptitle("Reconstruction Evaluation: Original vs Reconstructed")
    for i in range(n):
        ax = plt.subplot(2, n, i + 1)
        plt.imshow(original[i].reshape(28, 28), cmap='gray')
        ax.set_title("Original")
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
        ax = plt.subplot(2, n, i + 1 + n)
        plt.imshow(reconstructed[i].reshape(28, 28), cmap='gray')
        ax.set_title("Reconstructed")
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
    plt.show()

def plot_latent_space(vae, n=15, figsize=15):
    digit_size = 28
    scale = 1.5
    figure = np.zeros((digit_size * n, digit_size * n))
    grid_x = np.linspace(-scale, scale, n)
    grid_y = np.linspace(-scale, scale, n)[::-1]
    for i, yi in enumerate(grid_y):
        for j, xi in enumerate(grid_x):
            z_sample = np.array([[xi, yi]])
            x_decoded = vae.decoder.predict(z_sample, verbose=0)
            digit = x_decoded[0].reshape(digit_size, digit_size)
            figure[i * digit_size : (i + 1) * digit_size, j * digit_size : (j + 1) * digit_size] = digit
    plt.figure(figsize=(figsize, figsize))
    plt.title("Generative Latent Space Exploration")
    plt.imshow(figure, cmap="Greys_r")
    plt.show()

print("\n--- Qualitative Evaluation ---")
plot_reconstructions(x_test, reconstructed_images)

# The latent space plot is only meaningful for 2D latent spaces
if LATENT_DIM == 2:
    print("\n--- Latent Space Visualization (for LATENT_DIM=2) ---")
    plot_latent_space(final_vae)
