<h1>üß† Practical 10 ‚Äî Implementing & Understanding <b>Variational Autoencoder (VAE)</b></h1>
<hr>

<h2>üéØ Objective</h2>
<p>
To implement and understand the working of a <b>Variational Autoencoder (VAE)</b> ‚Äî a generative model that learns probabilistic latent representations and can generate new data similar to the training samples.
</p>

<hr>

<h2>üß© 1. Theory</h2>
<p>
A <b>Variational Autoencoder (VAE)</b> is a <b>generative deep learning model</b> that can generate new samples resembling the training data (e.g., handwritten digits, faces).  
Unlike a standard Autoencoder, which encodes data into a single latent vector, a VAE encodes each input as a <b>distribution</b> characterized by a mean (<b>Œº</b>) and variance (<b>œÉ¬≤</b>).
</p>

<hr>

<h2>‚öôÔ∏è 2. Structure of a VAE</h2>

<table>
<tr><th>Component</th><th>Description</th></tr>
<tr>
<td><b>Encoder (Recognition / Inference Network)</b></td>
<td>Maps input <b>x</b> to latent variables <b>z</b>, represented by mean (<b>Œº</b>) and standard deviation (<b>œÉ</b>): <br>
q<sub>œÜ</sub>(z|x) = N(Œº(x), œÉ¬≤(x))</td>
</tr>
<tr>
<td><b>Latent Space</b></td>
<td>A lower-dimensional probabilistic space from which <b>z</b> is sampled. Continuous and smooth, enabling interpolation.</td>
</tr>
<tr>
<td><b>Decoder (Generative Network)</b></td>
<td>Maps sampled <b>z</b> back to reconstruction <b>·∫ã</b>, approximating original input <b>x</b>.</td>
</tr>
</table>

<hr>

<h2>üßÆ 3. Mathematical Formulation (HTML version)</h2>

<ul>
<li><b>Encoder Outputs:</b> Œº = f<sub>Œº</sub>(x), &nbsp; logœÉ¬≤ = f<sub>œÉ</sub>(x)</li>
<li><b>Sampling Layer (Reparameterization Trick):</b> z = Œº + œÉ ‚äô Œµ, &nbsp; Œµ ‚àº N(0, I)</li>
<li><b>Decoder Output:</b> ·∫ã = g(z)</li>
</ul>

<hr>

<h2>üß© 4. Reparameterization Trick</h2>
<p>
Direct sampling from N(Œº, œÉ¬≤) breaks backpropagation.  
To make it differentiable, we reparameterize:
</p>
<p style="margin-left:20px;">
<b>z = Œº + œÉ √ó Œµ</b>, &nbsp; where &nbsp; Œµ ‚àº N(0, I)
</p>
<p>This ensures gradients can flow through Œº and œÉ during training.</p>

<hr>

<h2>‚öñÔ∏è 5. VAE Loss Function</h2>

<p>The total loss combines two components:</p>

<ul>
<li><b>Reconstruction Loss:</b> Measures how well the decoder reconstructs input x from ·∫ã.  
Usually <b>MSE</b> or <b>Binary Cross-Entropy</b>.</li>
<li><b>KL Divergence Loss:</b> Regularizes the latent space so that q<sub>œÜ</sub>(z|x) ‚âà p(z), where p(z) = N(0, I).</li>
</ul>

<p><b>Total Objective:</b></p>
<p style="margin-left:20px;">
L<sub>VAE</sub> = L<sub>reconstruction</sub> + Œ≤ √ó D<sub>KL</sub>(q<sub>œÜ</sub>(z|x) || p(z))
</p>

<p>
Œ≤ (beta) is a weight used in <b>Œ≤-VAE</b> variants to control the trade-off between reconstruction accuracy and regularization.
</p>

<hr>

<h2>üß† 6. Intuitive Understanding</h2>

<ul>
<li>The <b>KL term</b> encourages a smooth and continuous latent space.</li>
<li>The <b>Reconstruction term</b> ensures input features are captured effectively.</li>
<li>Together, they enable:
  <ul>
    <li>Smooth interpolation between inputs</li>
    <li>Generation of new, realistic samples</li>
    <li>Structured, interpretable latent features</li>
  </ul>
</li>
</ul>

<hr>

<h2>üîç 7. Model Architecture Summary</h2>

<table>
<tr><th>Component</th><th>Purpose</th><th>Activation</th></tr>
<tr><td>Encoder Hidden Layers</td><td>Extract nonlinear features</td><td>ReLU / LeakyReLU</td></tr>
<tr><td>Encoder Output Layers</td><td>Compute mean (Œº) and log variance (logœÉ¬≤)</td><td>Linear</td></tr>
<tr><td>Sampling Layer</td><td>Generate latent vector via reparameterization</td><td>‚Äî</td></tr>
<tr><td>Decoder Hidden Layers</td><td>Map z back to reconstruction</td><td>ReLU / LeakyReLU</td></tr>
<tr><td>Decoder Output Layer</td><td>Generate reconstructed sample</td><td>Sigmoid (0‚Äì1 data) / Tanh (‚Äì1‚Äì1) / Linear (continuous)</td></tr>
</table>

<hr>

<h2>üß© 8. Simplified VAE Flow</h2>

<pre>
        Input (x)
            ‚Üì
     [ Encoder Network ]
            ‚Üì
      Œº(x), œÉ(x)
            ‚Üì
     z = Œº + œÉ * Œµ
            ‚Üì
     [ Decoder Network ]
            ‚Üì
     Reconstructed Output (·∫ã)
</pre>

<hr>

<h2>üß∞ 9. Difference: Autoencoder vs. Variational Autoencoder</h2>

<table>
<tr><th>Aspect</th><th>Autoencoder</th><th>Variational Autoencoder (VAE)</th></tr>
<tr><td>Latent Representation</td><td>Deterministic vector</td><td>Probabilistic (Œº, œÉ¬≤)</td></tr>
<tr><td>Sampling</td><td>No sampling</td><td>Uses reparameterization</td></tr>
<tr><td>Regularization</td><td>None</td><td>KL divergence</td></tr>
<tr><td>Generative Ability</td><td>Cannot generate new data</td><td>Can generate new samples</td></tr>
<tr><td>Loss Function</td><td>Reconstruction only</td><td>Reconstruction + KL Divergence</td></tr>
</table>

<hr>

<h2>üìä 10. Advantages</h2>
<ul>
<li>‚úÖ Learns a continuous latent space</li>
<li>‚úÖ Can generate new data</li>
<li>‚úÖ Enables interpolation and anomaly detection</li>
<li>‚úÖ Stable training (compared to GANs)</li>
</ul>

<h2>‚ö†Ô∏è 11. Disadvantages</h2>
<ul>
<li>‚ö†Ô∏è Blurry outputs due to Gaussian assumptions</li>
<li>‚ö†Ô∏è May underfit complex datasets</li>
<li>‚ö†Ô∏è Requires careful Œ≤ balancing</li>
</ul>

<hr>

<h2>üß™ 12. Common Applications</h2>

<table>
<tr><th>Domain</th><th>Example</th></tr>
<tr><td>Image Generation</td><td>Generate digits (MNIST), faces (CelebA)</td></tr>
<tr><td>Anomaly Detection</td><td>Detect deviations from learned patterns</td></tr>
<tr><td>Data Compression</td><td>Compress high-dimensional inputs</td></tr>
<tr><td>Semi-supervised Learning</td><td>Leverage latent structure for classification</td></tr>
<tr><td>Representation Learning</td><td>Extract meaningful low-dimensional features</td></tr>
</table>

<hr>

<h2>üßæ 13. Implementation Notes</h2>

<p>If your <b>Basic Autoencoder</b> uses:</p>

<pre>
encoder: Linear + ReLU
decoder: Linear + Sigmoid
</pre>

<p>Then your <b>VAE</b> should use:</p>

<pre>
# Encoder
encoder_hidden: ReLU
encoder_output (Œº, logœÉ¬≤): Linear

# Sampling (Reparameterization)
z = Œº + œÉ * Œµ

# Decoder
decoder_hidden: ReLU
decoder_output: Sigmoid (for normalized data)
</pre>

<hr>

<h2>üìö References</h2>
<ol>
<li>Kingma, D.P. & Welling, M. (2014). <i>Auto-Encoding Variational Bayes</i>.</li>
<li>Doersch, C. (2016). <i>Tutorial on Variational Autoencoders</i>.</li>
<li>Goodfellow et al. (2016). <i>Deep Learning</i> ‚Äî MIT Press.</li>
</ol>

<hr>

<h2>‚úÖ Summary</h2>
<p>
A <b>Variational Autoencoder</b> merges deep learning with probabilistic modeling to learn a <b>structured, continuous latent space</b> that supports both <b>generation</b> and <b>representation learning</b>.  
It forms the basis for modern generative AI systems.
</p>

<hr>



In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from keras import layers
import matplotlib.pyplot as plt

# Dimensionality reduction imports
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
import umap

# VAE sampling layer
class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.random.normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

latent_dim = 1000  # 2D latent space

# Encoder
encoder_inputs = keras.Input(shape=(28, 28, 1))
x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(encoder_inputs)
x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)
x = layers.Flatten()(x)
x = layers.Dense(16, activation="relu")(x)
z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])
encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")

# Decoder
latent_inputs = keras.Input(shape=(latent_dim,))
x = layers.Dense(7 * 7 * 64, activation="relu")(latent_inputs)
x = layers.Reshape((7, 7, 64))(x)
x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)
x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)
decoder_outputs = layers.Conv2DTranspose(1, 3, activation="sigmoid", padding="same")(x)
decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")

# VAE model definition
class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder

    def train_step(self, data):
        if isinstance(data, tuple):
            data = data[0]
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)
            reconstruction_loss = tf.reduce_mean(
                keras.losses.binary_crossentropy(data, reconstruction)
            ) * 28 * 28
            kl_loss = -0.5 * tf.reduce_sum(
                1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var), axis=1
            )
            kl_loss = tf.reduce_mean(kl_loss)
            total_loss = reconstruction_loss + kl_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        return {
            "loss": total_loss,
            "reconstruction_loss": reconstruction_loss,
            "kl_loss": kl_loss,
        }

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstruction = self.decoder(z)
        return reconstruction

# Prepare and normalize MNIST
(x_train, _), (x_test, y_test) = keras.datasets.mnist.load_data()
x_test = np.expand_dims(x_test, -1).astype("float32") / 255.0
x_train = np.expand_dims(x_train, -1).astype("float32") / 255.0

vae = VAE(encoder, decoder)
vae.compile(optimizer=keras.optimizers.Adam())
vae.fit(x_train, epochs=50, batch_size=128, verbose=0)

# Get latent vectors (means) for test set
z_mean, _, _ = encoder.predict(x_test, batch_size=128, verbose=0)

# For scalability, sample 2000 points
n_samples = 2000
indices = np.random.choice(len(z_mean), size=n_samples, replace=False)
z_subset = z_mean[indices]
y_subset = y_test[indices]

# Apply PCA
pca = PCA(n_components=2)
z_pca = pca.fit_transform(z_subset)

# Apply t-SNE
tsne = TSNE(n_components=2, perplexity=30, n_iter=1000, random_state=42)
z_tsne = tsne.fit_transform(z_subset)

# Apply UMAP
z_umap = umap.UMAP(n_components=2, random_state=42).fit_transform(z_subset)

# Plot all three projections for comparison
fig, axs = plt.subplots(1, 3, figsize=(21, 7))
methods = [('PCA', z_pca), ('t-SNE', z_tsne), ('UMAP', z_umap)]
for i, (name, z_vis) in enumerate(methods):
    sc = axs[i].scatter(z_vis[:,0], z_vis[:,1], c=y_subset, cmap='tab10', s=7, alpha=0.7)
    axs[i].set_title(name)
    axs[i].set_xlabel("Dim 1")
    axs[i].set_ylabel("Dim 2")
fig.colorbar(sc, ax=axs, ticks=range(10))
plt.suptitle("MNIST VAE 2D Latent Space: PCA vs t-SNE vs UMAP")
plt.tight_layout()
plt.show()
# Display original and reconstructed images
n = 10 # Number of images to display
plt.figure(figsize=(20, 4))
for i in range(n):
    # Original images
    ax = plt.subplot(2, n, i + 1)
    plt.imshow(x_test[i].reshape(28, 28))
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    # Reconstructed images
    ax = plt.subplot(2, n, i + 1 + n)
    reconstructed_img = vae.predict(x_test[i].reshape(1, 28, 28, 1))
    plt.imshow(reconstructed_img.reshape(28, 28))
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
plt.suptitle("Original vs. Reconstructed Images")
plt.show()

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from keras import layers
import matplotlib.pyplot as plt

# Dimensionality reduction imports
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
import umap

# -------------------------
# VAE Sampling layer
# -------------------------
class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.random.normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

# -------------------------
# Model hyperparams
# -------------------------
latent_dim = 2  # note: set >1 for richer latent representations

# -------------------------
# Encoder
# -------------------------
encoder_inputs = keras.Input(shape=(28, 28, 1))
x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(encoder_inputs)
x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)
x = layers.Flatten()(x)
x = layers.Dense(16, activation="relu")(x)
z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])
encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
encoder.summary()

# -------------------------
# Decoder
# -------------------------
latent_inputs = keras.Input(shape=(latent_dim,))
x = layers.Dense(7 * 7 * 64, activation="relu")(latent_inputs)
x = layers.Reshape((7, 7, 64))(x)
x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)
x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)
decoder_outputs = layers.Conv2DTranspose(1, 3, activation="sigmoid", padding="same")(x)
decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()

# -------------------------
# VAE model definition
# -------------------------
class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder

        # --- metrics to track (Keras will reset them each epoch) ---
        self.total_loss_tracker = keras.metrics.Mean(name="loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

        # Validation metrics (optional separate trackers)
        self.val_total_loss_tracker = keras.metrics.Mean(name="val_loss")
        self.val_reconstruction_loss_tracker = keras.metrics.Mean(name="val_reconstruction_loss")
        self.val_kl_loss_tracker = keras.metrics.Mean(name="val_kl_loss")

    @property
    def metrics(self):
        # Keras uses this list to reset metrics at the start of each epoch.
        # Include both train and val trackers so fit() can show/record them.
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
            self.val_total_loss_tracker,
            self.val_reconstruction_loss_tracker,
            self.val_kl_loss_tracker,
        ]

    def compute_losses(self, data, training=False):
        """Return (total_loss, reconstruction_loss, kl_loss) for a batch."""
        z_mean, z_log_var, z = self.encoder(data, training=training)
        reconstruction = self.decoder(z, training=training)
        # Binary crossentropy returns per-pixel loss (shape: batch, 28*28)
        reconstruction_loss = tf.reduce_mean(
            keras.losses.binary_crossentropy(data, reconstruction)
        ) * 28 * 28
        kl_loss = -0.5 * tf.reduce_sum(
            1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var), axis=1
        )
        kl_loss = tf.reduce_mean(kl_loss)
        total_loss = reconstruction_loss + kl_loss
        return total_loss, reconstruction_loss, kl_loss

    def train_step(self, data):
        if isinstance(data, tuple):
            data = data[0]  # unsupervised, so ignore labels if any
        with tf.GradientTape() as tape:
            total_loss, reconstruction_loss, kl_loss = self.compute_losses(data, training=True)
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        # update training metrics
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        # return dict of metric name -> value (Keras uses this for History)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

    def test_step(self, data):
        """Called during validation: compute losses and update val metrics."""
        if isinstance(data, tuple):
            data = data[0]
        total_loss, reconstruction_loss, kl_loss = self.compute_losses(data, training=False)

        # Update validation metrics trackers
        self.val_total_loss_tracker.update_state(total_loss)
        self.val_reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.val_kl_loss_tracker.update_state(kl_loss)

        return {
            "loss": self.val_total_loss_tracker.result(),
            "reconstruction_loss": self.val_reconstruction_loss_tracker.result(),
            "kl_loss": self.val_kl_loss_tracker.result(),
        }

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstruction = self.decoder(z)
        return reconstruction

# -------------------------
# Prepare and normalize MNIST
# -------------------------
(x_train, _), (x_test, y_test) = keras.datasets.mnist.load_data()
x_test = np.expand_dims(x_test, -1).astype("float32") / 255.0
x_train = np.expand_dims(x_train, -1).astype("float32") / 255.0

# -------------------------
# Training helper + plotting
# -------------------------
def smooth_array(x, window_len=5):
    """Simple moving-average smoothing. Returns original if window_len<=1 or too short."""
    x = np.asarray(x)
    if window_len is None or window_len <= 1 or x.size < window_len:
        return x
    s = np.r_[x[window_len-1:0:-1], x, x[-2:-window_len-1:-1]]
    w = np.ones(window_len)/window_len
    y = np.convolve(w, s, mode='valid')
    # align trimmed edges back to original length
    trim = (window_len // 2)
    return y[trim: trim + x.size]

def train_vae_and_plot(vae, x_train, epochs=50, batch_size=128,
                       validation_data=None, callbacks=None,
                       verbose=0, smooth_window=None, **fit_kwargs):
    """
    Trains the VAE (using model.fit) and plots loss curves.
    - smooth_window: int or None. If int>1, plot moving-average-smoothed curves too.
    Returns: history (keras History)
    """
    history = vae.fit(
        x_train,
        epochs=epochs,
        batch_size=batch_size,
        validation_data=validation_data,
        callbacks=callbacks,
        verbose=verbose,
        **fit_kwargs
    )

    hist = history.history
    steps = range(1, len(hist["loss"]) + 1)

    plt.figure(figsize=(10,5))
    # raw curves
    plt.plot(steps, hist["loss"], label="Train Total Loss", alpha=0.6)
    if "reconstruction_loss" in hist:
        plt.plot(steps, hist["reconstruction_loss"], label="Train Reconstruction", alpha=0.6)
    if "kl_loss" in hist:
        plt.plot(steps, hist["kl_loss"], label="Train KL", alpha=0.6)

    if "val_loss" in hist:
        plt.plot(steps, hist["val_loss"], label="Val Total Loss", alpha=0.9)
    if "val_reconstruction_loss" in hist:
        plt.plot(steps, hist["val_reconstruction_loss"], label="Val Reconstruction", alpha=0.9)
    if "val_kl_loss" in hist:
        plt.plot(steps, hist["val_kl_loss"], label="Val KL", alpha=0.9)

    # optional smoothed curves
    if smooth_window is not None and isinstance(smooth_window, int) and smooth_window > 1:
        try:
            sm_loss = smooth_array(np.array(hist["loss"]), window_len=smooth_window)
            plt.plot(steps, sm_loss, linestyle="--", label=f"Smoothed Train Total (w={smooth_window})")
            if "val_loss" in hist:
                sm_val = smooth_array(np.array(hist["val_loss"]), window_len=smooth_window)
                plt.plot(steps, sm_val, linestyle="--", label=f"Smoothed Val Total (w={smooth_window})")
        except Exception:
            pass

    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title("VAE Training & Validation Losses")
    plt.legend()
    plt.grid(alpha=0.2)
    plt.show()

    return history

# -------------------------
# Instantiate, compile and train
# -------------------------
vae = VAE(encoder, decoder)
vae.compile(optimizer=keras.optimizers.Adamax(learning_rate=1e-3))
# -------------------------
# Callbacks: EarlyStopping + ReduceLROnPlateau
# -------------------------
early_stop = keras.callbacks.EarlyStopping(
    monitor='val_loss',         # monitor validation total loss
    patience=6,                 # number of epochs with no improvement before stopping
    restore_best_weights=True,  # restore model weights from the epoch with best monitored value
    min_delta=1e-4,             # minimum change to qualify as improvement
    verbose=1
)

reduce_lr = keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=3,
    min_lr=1e-6,
    verbose=1
)

# Train and plot (this returns the History object)
#history = train_vae_and_plot(vae, x_train, epochs=50, batch_size=128, verbose=0)
history = train_vae_and_plot(
    vae,
    x_train,
    epochs=100,             # higher max epochs; early stopping will stop earlier if no improvement
    batch_size=128,
    validation_data=x_test, # validation inputs (no labels needed)
    callbacks=[early_stop, reduce_lr],
    verbose=0,
    smooth_window=7
)
print("Training ran for epochs:", len(history.history['loss']))
# -------------------------
# Visualize latent projections (PCA, t-SNE, UMAP)
# -------------------------
# Get latent vectors (means) for test set
z_mean, _, _ = encoder.predict(x_test, batch_size=128, verbose=0)

# For scalability, sample 2000 points
n_samples = 2000
indices = np.random.choice(len(z_mean), size=n_samples, replace=False)
z_subset = z_mean[indices]
y_subset = y_test[indices]

# Apply PCA
pca = PCA(n_components=2)
z_pca = pca.fit_transform(z_subset)

# Apply t-SNE
tsne = TSNE(n_components=2, perplexity=30, max_iter=1000, random_state=42)
z_tsne = tsne.fit_transform(z_subset)

# Apply UMAP
z_umap = umap.UMAP(n_components=2, random_state=42).fit_transform(z_subset)

# Plot all three projections for comparison
fig, axs = plt.subplots(1, 3, figsize=(21, 7))
methods = [('PCA', z_pca), ('t-SNE', z_tsne), ('UMAP', z_umap)]
for i, (name, z_vis) in enumerate(methods):
    sc = axs[i].scatter(z_vis[:,0], z_vis[:,1], c=y_subset, cmap='tab10', s=7, alpha=0.7)
    axs[i].set_title(name)
    axs[i].set_xlabel("Dim 1")
    axs[i].set_ylabel("Dim 2")
fig.colorbar(sc, ax=axs, ticks=range(10))
plt.suptitle("MNIST VAE 2D Latent Space: PCA vs t-SNE vs UMAP")
plt.show()

# -------------------------
# Display original and reconstructed images
# -------------------------
n = 10 # Number of images to display
plt.figure(figsize=(20, 4))
for i in range(n):
    # Original images
    ax = plt.subplot(2, n, i + 1)
    plt.imshow(x_test[i].reshape(28, 28), cmap='gray')
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    # Reconstructed images
    ax = plt.subplot(2, n, i + 1 + n)
    reconstructed_img = vae.predict(x_test[i].reshape(1, 28, 28, 1),verbose=0)
    plt.imshow(reconstructed_img.reshape(28, 28), cmap='gray')
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
plt.suptitle("Original vs. Reconstructed Images")
plt.show()
# -------------------------
# Generate New Images from Random Latent Samples
# -------------------------

# Number of new images to generate
n_images = 20

# Sample random latent vectors from standard normal distribution
random_latent_vectors = np.random.normal(size=(n_images, latent_dim))

# Decode these latent points into new images
generated_images = decoder.predict(random_latent_vectors,verbose=0)

# Plot the generated images
plt.figure(figsize=(20, 4))
for i in range(n_images):
    ax = plt.subplot(2, n_images//2, i + 1)
    plt.imshow(generated_images[i].reshape(28, 28), cmap="gray")
    ax.axis("off")

plt.suptitle("New Images Generated by the VAE")
plt.show()
# -------------------------
# Generate 2D latent space grid
# -------------------------
grid_size = 15
figure = np.zeros((28 * grid_size, 28 * grid_size))
# Linearly spaced coordinates corresponding to the quantiles of a normal distribution
grid_x = np.linspace(-3, 3, grid_size)
grid_y = np.linspace(-3, 3, grid_size)

for i, yi in enumerate(grid_y):
    for j, xi in enumerate(grid_x):
        z_sample = np.array([[xi, yi]])
        x_decoded = decoder.predict(z_sample, verbose=0)
        digit = x_decoded[0].reshape(28, 28)
        figure[i * 28: (i + 1) * 28,
               j * 28: (j + 1) * 28] = digit

plt.figure(figsize=(10, 10))
plt.imshow(figure, cmap="gray")
plt.axis("off")
plt.title("Latent Space Grid: Smooth Interpolation Between Digits")
plt.show()


In [None]:
# Full flexible VAE notebook cell ‚Äî replace MNIST with your own dataset directory
import os
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from keras import layers

# Dimensionality reduction (visualization)
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import umap

# -------------------------
# USER CONFIG ‚Äî modify these
# -------------------------
dataset_path = "/path/to/your/dataset"   # <-- set this: directory with subfolders per class OR flat (see note)
image_size = (100, 100)                  # desired H, W for model (must be divisible by 2**n_downsamples)
channels = 3                             # 1 for grayscale, 3 for RGB
batch_size = 64
latent_dim = 2                           # set 2 to visualize latent grid; >2 still works for generation
n_downsamples = 2                        # number of conv downsample blocks (each halves H,W)
base_filters = 32
use_mse = True                           # True -> MSE loss and linear decoder (recommended for RGB); False -> BCE + sigmoid
epochs = 100
validation_split = 0.15                  # fraction of data to hold out for validation
seed = 42
smooth_window = 7                        # None or int>1 for smoothing plotted curves
autotune = tf.data.AUTOTUNE

# -------------------------
# Helper: smoothing for plots (optional)
# -------------------------
def smooth_array(x, window_len=5):
    x = np.asarray(x)
    if window_len is None or window_len <= 1 or x.size < window_len:
        return x
    s = np.r_[x[window_len-1:0:-1], x, x[-2:-window_len-1:-1]]
    w = np.ones(window_len)/window_len
    y = np.convolve(w, s, mode='valid')
    trim = (window_len // 2)
    return y[trim: trim + x.size]

# -------------------------
# Data loading: image_dataset_from_directory
# - expects dataset_path with subfolders per class. If images are flat, 
#   put them under a single subfolder or use a custom loader.
# -------------------------
if not os.path.exists(dataset_path):
    raise FileNotFoundError(f"Dataset path not found: {dataset_path}")

# Use two calls to create training and validation splits reproducibly
train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    dataset_path,
    labels=None,                       # unsupervised VAE ‚Äî ignore labels
    label_mode=None,
    batch_size=batch_size,
    image_size=image_size,
    shuffle=True,
    seed=seed,
    validation_split=validation_split,
    subset="training",
)

val_ds = tf.keras.preprocessing.image_dataset_from_directory(
    dataset_path,
    labels=None,
    label_mode=None,
    batch_size=batch_size,
    image_size=image_size,
    shuffle=False,
    seed=seed,
    validation_split=validation_split,
    subset="validation",
)

# Normalize to [0,1] float32
def normalize_ds(ds):
    ds = ds.map(lambda x: tf.cast(x, tf.float32) / 255.0, num_parallel_calls=autotune)
    ds = ds.cache().prefetch(autotune)
    return ds

train_ds = normalize_ds(train_ds)
val_ds = normalize_ds(val_ds)

# Convert small subset of val to numpy arrays for visualization & encoder.predict later
def ds_to_numpy(ds, max_samples=5000):
    arr = []
    count = 0
    for batch in ds:
        # batch shape: (B, H, W, C)
        b = batch.numpy()
        arr.append(b)
        count += b.shape[0]
        if count >= max_samples:
            break
    if not arr:
        return np.zeros((0, *image_size, channels), dtype=np.float32)
    return np.concatenate(arr, axis=0)[:max_samples]

x_val_np = ds_to_numpy(val_ds, max_samples=2000)

# -------------------------
# Sampling layer
# -------------------------
class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.random.normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

# -------------------------
# Dynamic encoder/decoder builder (for arbitrary input shape)
# -------------------------
def build_conv_vae(input_shape=(100,100,3), latent_dim=2, base_filters=32, n_downsamples=2, use_mse=False):
    H, W, C = input_shape
    down_factor = 2 ** n_downsamples
    if H % down_factor != 0 or W % down_factor != 0:
        raise ValueError(f"image H/W must be divisible by 2**{n_downsamples} = {down_factor}. Got {H}x{W}.")
    # Build encoder
    encoder_inputs = keras.Input(shape=input_shape)
    x = encoder_inputs
    filters = base_filters
    for i in range(n_downsamples):
        x = layers.Conv2D(filters, 3, strides=2, padding="same", activation="relu")(x)
        x = layers.BatchNormalization()(x)
        filters *= 2
    conv_h = H // down_factor
    conv_w = W // down_factor
    filters_last = filters // 2
    x = layers.Flatten()(x)
    x = layers.Dense(128, activation="relu")(x)
    x = layers.BatchNormalization()(x)
    z_mean = layers.Dense(latent_dim, name="z_mean")(x)
    z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
    z = Sampling()([z_mean, z_log_var])
    encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")

    # Build decoder (mirror)
    latent_inputs = keras.Input(shape=(latent_dim,))
    x = layers.Dense(conv_h * conv_w * filters_last, activation="relu")(latent_inputs)
    x = layers.Reshape((conv_h, conv_w, filters_last))(x)
    filters = filters_last
    for i in range(n_downsamples):
        # progressively reduce filters back toward base_filters
        out_filters = max(base_filters, filters // 2)
        x = layers.Conv2DTranspose(out_filters, 3, strides=2, padding="same", activation="relu")(x)
        x = layers.BatchNormalization()(x)
        filters = out_filters
    # final output
    dec_act = "linear" if use_mse else "sigmoid"
    x = layers.Conv2DTranspose(C, 3, padding="same", activation=dec_act)(x)
    decoder = keras.Model(latent_inputs, x, name="decoder")

    recon_scale = float(H * W * C)  # used to scale BCE or sum MSE
    return encoder, decoder, recon_scale

# Build models according to user config
input_shape = (image_size[0], image_size[1], channels)
encoder, decoder, recon_scale = build_conv_vae(
    input_shape=input_shape,
    latent_dim=latent_dim,
    base_filters=base_filters,
    n_downsamples=n_downsamples,
    use_mse=use_mse
)
encoder.summary(); decoder.summary()

# -------------------------
# VAE class with metrics (works with model.fit and callbacks)
# -------------------------
class VAE(keras.Model):
    def __init__(self, encoder, decoder, recon_scale=1.0, use_mse=False, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.recon_scale = recon_scale
        self.use_mse = use_mse

        # training metrics
        self.total_loss_tracker = keras.metrics.Mean(name="loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

        # validation metrics
        self.val_total_loss_tracker = keras.metrics.Mean(name="val_loss")
        self.val_reconstruction_loss_tracker = keras.metrics.Mean(name="val_reconstruction_loss")
        self.val_kl_loss_tracker = keras.metrics.Mean(name="val_kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
            self.val_total_loss_tracker,
            self.val_reconstruction_loss_tracker,
            self.val_kl_loss_tracker,
        ]

    def compute_losses(self, data, training=False):
        z_mean, z_log_var, z = self.encoder(data, training=training)
        reconstruction = self.decoder(z, training=training)

        if self.use_mse:
            # sum MSE per image, then mean over batch
            mse_per_image = tf.reduce_sum(tf.math.squared_difference(data, reconstruction), axis=[1,2,3])
            reconstruction_loss = tf.reduce_mean(mse_per_image)
        else:
            # binary_crossentropy returns per-pixel average; scale to sum per image
            reconstruction_loss = tf.reduce_mean(
                keras.losses.binary_crossentropy(data, reconstruction)
            ) * self.recon_scale

        kl_loss = -0.5 * tf.reduce_sum(
            1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var), axis=1
        )
        kl_loss = tf.reduce_mean(kl_loss)
        total_loss = reconstruction_loss + kl_loss
        return total_loss, reconstruction_loss, kl_loss

    def train_step(self, data):
        if isinstance(data, tuple):
            data = data[0]
        with tf.GradientTape() as tape:
            total_loss, reconstruction_loss, kl_loss = self.compute_losses(data, training=True)
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

    def test_step(self, data):
        if isinstance(data, tuple):
            data = data[0]
        total_loss, reconstruction_loss, kl_loss = self.compute_losses(data, training=False)

        self.val_total_loss_tracker.update_state(total_loss)
        self.val_reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.val_kl_loss_tracker.update_state(kl_loss)

        return {
            "loss": self.val_total_loss_tracker.result(),
            "reconstruction_loss": self.val_reconstruction_loss_tracker.result(),
            "kl_loss": self.val_kl_loss_tracker.result(),
        }

    def call(self, inputs):
        _, _, z = self.encoder(inputs)
        return self.decoder(z)

# -------------------------
# Instantiate, compile, callbacks
# -------------------------
vae = VAE(encoder, decoder, recon_scale=recon_scale, use_mse=use_mse)
vae.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-3))

early_stop = keras.callbacks.EarlyStopping(
    monitor='val_loss', patience=8, restore_best_weights=True, min_delta=1e-4, verbose=1
)
reduce_lr = keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss', factor=0.5, patience=3, min_lr=1e-6, verbose=1
)

# -------------------------
# Train and plot helper
# -------------------------
def train_vae_and_plot(vae, train_ds, val_ds, epochs=50, smooth_window=None, callbacks=None, verbose=1):
    history = vae.fit(
        train_ds,
        epochs=epochs,
        validation_data=val_ds,
        callbacks=callbacks,
        verbose=verbose
    )
    hist = history.history
    steps = range(1, len(hist["loss"]) + 1)

    plt.figure(figsize=(10,5))
    plt.plot(steps, hist["loss"], label="Train Total", alpha=0.6)
    if "reconstruction_loss" in hist:
        plt.plot(steps, hist["reconstruction_loss"], label="Train Recon", alpha=0.6)
    if "kl_loss" in hist:
        plt.plot(steps, hist["kl_loss"], label="Train KL", alpha=0.6)
    if "val_loss" in hist:
        plt.plot(steps, hist["val_loss"], label="Val Total", alpha=0.9)
    if "val_reconstruction_loss" in hist:
        plt.plot(steps, hist["val_reconstruction_loss"], label="Val Recon", alpha=0.9)
    if "val_kl_loss" in hist:
        plt.plot(steps, hist["val_kl_loss"], label="Val KL", alpha=0.9)

    if smooth_window and isinstance(smooth_window, int) and smooth_window > 1:
        try:
            plt.plot(steps, smooth_array(np.array(hist["loss"]), smooth_window), "--", label=f"Smoothed Train (w={smooth_window})")
            if "val_loss" in hist:
                plt.plot(steps, smooth_array(np.array(hist["val_loss"]), smooth_window), "--", label=f"Smoothed Val (w={smooth_window})")
        except Exception:
            pass

    plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.title("Training & Validation Losses")
    plt.legend(); plt.grid(alpha=0.2); plt.show()
    return history

# Run training
history = train_vae_and_plot(vae, train_ds, val_ds, epochs=epochs, smooth_window=smooth_window, callbacks=[early_stop, reduce_lr], verbose=1)

# -------------------------
# Useful outputs: reconstructions, generation, latent viz (if latent_dim==2)
# -------------------------
# Get some validation images for quick visualization (use x_val_np)
if x_val_np.shape[0] > 0:
    n_display = min(10, x_val_np.shape[0])
    sample_imgs = x_val_np[:n_display]
    # Reconstructions
    recon = vae.predict(sample_imgs, verbose=0)
    fig = plt.figure(figsize=(2*n_display,4))
    for i in range(n_display):
        ax = plt.subplot(2, n_display, i+1)
        img = sample_imgs[i]
        if channels == 1:
            plt.imshow(img.squeeze(), cmap="gray")
        else:
            plt.imshow(np.clip(img, 0, 1))
        ax.axis("off")
        ax = plt.subplot(2, n_display, i+1+n_display)
        r = recon[i]
        if channels == 1:
            plt.imshow(r.squeeze(), cmap="gray")
        else:
            plt.imshow(np.clip(r, 0, 1))
        ax.axis("off")
    plt.suptitle("Original (top) vs Reconstructed (bottom)")
    plt.show()

# Generate new images by sampling z ~ N(0,I)
n_gen = 12
z_rand = np.random.normal(size=(n_gen, latent_dim))
generated = decoder.predict(z_rand, verbose=0)
plt.figure(figsize=(12,4))
for i in range(n_gen):
    ax = plt.subplot(2, n_gen//2, i+1)
    img = generated[i]
    if channels == 1:
        plt.imshow(img.squeeze(), cmap="gray")
    else:
        plt.imshow(np.clip(img, 0, 1))
    ax.axis("off")
plt.suptitle("Generated samples from random z ~ N(0,I)")
plt.show()

# If latent_dim == 2: show latent grid
if latent_dim == 2:
    grid_size = 12
    grid_x = np.linspace(-3, 3, grid_size)
    grid_y = np.linspace(-3, 3, grid_size)
    figure = np.zeros((image_size[0] * grid_size, image_size[1] * grid_size, channels))
    for i, yi in enumerate(grid_y):
        for j, xi in enumerate(grid_x):
            z_sample = np.array([[xi, yi]])
            x_decoded = decoder.predict(z_sample, verbose=0)[0]
            y0 = i * image_size[0]
            x0 = j * image_size[1]
            figure[y0:y0+image_size[0], x0:x0+image_size[1]] = x_decoded
    plt.figure(figsize=(10,10))
    if channels == 1:
        plt.imshow(figure.squeeze(), cmap="gray")
    else:
        plt.imshow(np.clip(figure,0,1))
    plt.axis("off")
    plt.title("Latent space grid (2D)")
    plt.show()

# Latent visualization (PCA, t-SNE, UMAP) using x_val_np if available
if x_val_np.shape[0] >= 50:
    # get z_mean for validation set via encoder
    z_means = []
    batch_limit = 2000  # cap to avoid huge compute
    count = 0
    for batch in val_ds:
        b = batch
        z_mean, _, _ = encoder.predict(b, verbose=0)
        z_means.append(z_mean)
        count += z_mean.shape[0]
        if count >= batch_limit:
            break
    if z_means:
        z_all = np.concatenate(z_means, axis=0)[:2000]
        # If latent_dim > 2, run PCA -> 2D first
        if latent_dim > 2:
            pca = PCA(n_components=2)
            z_2d = pca.fit_transform(z_all)
        else:
            z_2d = z_all
        # t-SNE
        try:
            z_tsne = TSNE(n_components=2, perplexity=30, random_state=seed).fit_transform(z_2d)
        except Exception:
            z_tsne = None
        # UMAP
        try:
            z_umap = umap.UMAP(n_components=2, random_state=seed).fit_transform(z_2d)
        except Exception:
            z_umap = None

        plt.figure(figsize=(15,4))
        plt.subplot(1,3,1)
        plt.scatter(z_2d[:,0], z_2d[:,1], s=5, alpha=0.7)
        plt.title("PCA -> 2D latent")
        if z_tsne is not None:
            plt.subplot(1,3,2)
            plt.scatter(z_tsne[:,0], z_tsne[:,1], s=5, alpha=0.7)
            plt.title("t-SNE on latent")
        if z_umap is not None:
            plt.subplot(1,3,3)
            plt.scatter(z_umap[:,0], z_umap[:,1], s=5, alpha=0.7)
            plt.title("UMAP on latent")
        plt.suptitle("Latent Visualizations")
        plt.show()

print("Done. Trained epochs:", len(history.history['loss']))


In [None]:
# Minimal VAE (memorable exam version) ‚Äî TensorFlow / Keras
import numpy as np, tensorflow as tf
from tensorflow import keras
from keras import layers
import matplotlib.pyplot as plt

# Hyperparams
latent_dim = 2
epochs = 10
batch_size = 128

# --- Sampling layer (reparameterization) ---
class Sampling(layers.Layer):
    def call(self, inputs):
        mu, logvar = inputs
        eps = tf.random.normal(shape=tf.shape(mu))
        return mu + tf.exp(0.5 * logvar) * eps

# --- Encoder ---
inp = keras.Input(shape=(28,28,1))
x = layers.Conv2D(32,3, strides=2, padding='same', activation='relu')(inp)
x = layers.Flatten()(x)
x = layers.Dense(64, activation='relu')(x)
mu = layers.Dense(latent_dim)(x)
logvar = layers.Dense(latent_dim)(x)
z = Sampling()([mu, logvar])
encoder = keras.Model(inp, [mu, logvar, z], name='enc')

# --- Decoder ---
latent_in = keras.Input(shape=(latent_dim,))
x = layers.Dense(7*7*32, activation='relu')(latent_in)
x = layers.Reshape((7,7,32))(x)
x = layers.Conv2DTranspose(32,3, strides=2, padding='same', activation='relu')(x)
out = layers.Conv2DTranspose(1,3, padding='same', activation='sigmoid')(x)
decoder = keras.Model(latent_in, out, name='dec')

# --- VAE model with custom train_step (simple) ---
class VAE(keras.Model):
    def __init__(self, enc, dec, **kwargs):
        super().__init__(**kwargs)
        self.enc = enc; self.dec = dec
    def train_step(self, data):
        if isinstance(data, tuple): data = data[0]
        with tf.GradientTape() as tape:
            mu, logvar, z = self.enc(data, training=True)
            recon = self.dec(z, training=True)
            # reconstruction (BCE per-pixel) and KL
            recon_loss = tf.reduce_mean(keras.losses.binary_crossentropy(data, recon)) * 28 * 28
            kl = -0.5 * tf.reduce_mean(tf.reduce_sum(1 + logvar - tf.square(mu) - tf.exp(logvar), axis=1))
            loss = recon_loss + kl
        grads = tape.gradient(loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        return {"loss": loss, "recon": recon_loss, "kl": kl}
    def call(self, inputs):
        _,_,z = self.enc(inputs)
        return self.dec(z)

# --- data (MNIST) ---
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
x_train = np.expand_dims(x_train, -1).astype("float32")/255.
x_test  = np.expand_dims(x_test, -1).astype("float32")/255.

# --- compile & train ---
vae = VAE(encoder, decoder)
vae.compile(optimizer=keras.optimizers.Adam())
vae.fit(x_train, epochs=epochs, batch_size=batch_size, verbose=1)

# --- generate some samples ---
z_rand = np.random.normal(size=(16, latent_dim))
gen = decoder.predict(z_rand)
plt.figure(figsize=(6,6))
for i in range(16):
    plt.subplot(4,4,i+1); plt.imshow(gen[i].squeeze(), cmap='gray'); plt.axis('off')
plt.suptitle('Generated samples'); plt.show()
