# Variational AutoEncoder

## Setup

In [2]:
import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import numpy as np
import tensorflow as tf
import keras
from keras import layers




## Create a sampling layer

In [3]:

class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.random.normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

def sampling(args):
    z_mean, z_log_var = args
    epsilon = tf.keras.backend.random_normal(shape=(tf.shape(z_mean)[0], latent_dim))
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon

## Build the encoder

In [None]:
latent_dim = 2
num_classes = 10

encoder_inputs = keras.Input(shape=(28, 28, 1))
x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(encoder_inputs)
x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)
x = layers.Flatten()(x)
x = layers.Dense(16, activation="relu")(x)
# Conditional input
label = keras.Input(shape=(num_classes,))
x = layers.concatenate([x, label])
z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = layers.Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var])
encoder = keras.Model([encoder_inputs, label], [z_mean, z_log_var, z], name="encoder")
#encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
encoder.summary()

## Build the decoder

In [None]:
latent_inputs = keras.Input(shape=(latent_dim,))
x = layers.concatenate([latent_inputs , label])
x = layers.Dense(7 * 7 * 64, activation="relu")(latent_inputs)
x = layers.Reshape((7, 7, 64))(x)
x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)
x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)
decoder_outputs = layers.Conv2DTranspose(1, 3, activation="sigmoid", padding="same")(x)
decoder = keras.Model([latent_inputs, label], decoder_outputs, name="decoder")
#decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()

## Define the VAE as a `Model` with a custom `train_step`

In [None]:

class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)
            reconstruction_loss = tf.reduce_mean(
                tf.reduce_sum(
                    keras.losses.binary_crossentropy(data, reconstruction),
                    axis=(1, 2),
                )
            )
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            total_loss = reconstruction_loss + kl_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }


In [None]:
class CVAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):
        #print(data[1])
        images, labels = data

        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder([images, labels])
            reconstruction = self.decoder([z, labels])
            
            # Reconstruction loss
            reconstruction_loss = tf.reduce_mean(
                tf.reduce_sum(
                    keras.losses.binary_crossentropy(images, reconstruction),
                    axis=(1, 2),
                )
            )

            # KL Divergence loss
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))

            # Total loss
            total_loss = reconstruction_loss + kl_loss

        # Backpropagation
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        # Update metrics
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

## Train the VAE

In [None]:
len(mnist_digits), mnist_digits[0].shape, mnist_digits.shape, mnist_digits.dtype

In [None]:
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
mnist_digits = np.concatenate([x_train, x_test], axis=0)
mnist_digits = np.expand_dims(mnist_digits, -1).astype("float32") / 255

vae = VAE(encoder, decoder)
vae.compile(optimizer=keras.optimizers.Adam())
vae.fit(mnist_digits, epochs=30, batch_size=128)

In [None]:
# Load MNIST dataset
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

# Concatenate train and test sets
mnist_digits = np.concatenate([x_train, x_test], axis=0)

# Expand dimensions and normalize pixel values
#mnist_digits = np.expand_dims(mnist_digits, -1).astype("float32") / 255.0
x_train = np.expand_dims(x_train, -1).astype("float32") / 255.0

# One-hot encode labels for conditional input
num_classes = 10
y_train_one_hot = keras.utils.to_categorical(y_train, num_classes)
y_test_one_hot = keras.utils.to_categorical(y_test, num_classes)

# Create CVAE model
cvae = CVAE(encoder, decoder)
cvae.compile(optimizer=keras.optimizers.Adam())

# Train the CVAE model
train_data = (x_train, y_train_one_hot)
cvae.fit(train_data, epochs=30, batch_size=128)

In [None]:
import tensorflow as tf

# Assuming your training data is a tuple of (images, labels)
# Example: train_data = (x_train, y_train_one_hot)
# Make sure it's a tuple with two elements: images and labels

train_data = (x_train, y_train_one_hot)

# Create a tf.data.Dataset
dataset = tf.data.Dataset.from_tensor_slices(train_data)
dataset = dataset.shuffle(buffer_size=len(x_train)).batch(128)
cvae.fit(dataset, epochs=30)

In [None]:
# Assuming you have a tuple of (images, labels)
train_data = (x_train, y_train_one_hot)

# Create a tf.data.Dataset
dataset = tf.data.Dataset.from_tensor_slices(train_data)
dataset = dataset.shuffle(buffer_size=len(x_train)).batch(128)

# Train the model using the tf.data.Dataset
cvae.fit(dataset, epochs=30)

In [None]:
def vae_loss(x, x_decoded_mean, z_log_var, z_mean):
    x = tf.keras.backend.flatten(x)
    x_decoded_mean = tf.keras.backend.flatten(x_decoded_mean)
    xent_loss = keras.losses.binary_crossentropy(x, x_decoded_mean)
    kl_loss = -0.5 * tf.reduce_mean(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
    return xent_loss + kl_loss

vae.compile(optimizer='adam', loss=vae_loss)
vae.fit(x_train, x_train, epochs=10, batch_size=32, validation_data=(x_test, x_test))

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

class CVAE(keras.Model):
    def __init__(self, latent_dim, num_classes):
        super(CVAE, self).__init__()

        self.latent_dim = latent_dim
        self.num_classes = num_classes

        # Build the encoder and decoder during instantiation
        self.encoder = self.build_encoder()
        self.decoder = self.build_decoder()

    def build_encoder(self):
        encoder_inputs = keras.Input(shape=(28, 28, 1))  # Assuming input shape is (28, 28, 1)
        label_inputs = keras.Input(shape=(self.num_classes,))

        # Use a dense layer to match label dimensions to the number of channels in encoder_inputs
        label_dense = layers.Dense(28 * 28)(label_inputs)
        label_reshaped = layers.Reshape((28, 28, 1))(label_dense)
        x = layers.concatenate([encoder_inputs, label_reshaped])

        x = layers.Conv2D(32, 3, strides=2, padding='same', activation='relu')(x)
        x = layers.Conv2D(64, 3, strides=2, padding='same', activation='relu')(x)
        x = layers.Conv2D(128, 3, strides=2, padding='same', activation='relu')(x)

        x = layers.Flatten()(x)

        z_mean = layers.Dense(self.latent_dim)(x)
        z_log_var = layers.Dense(self.latent_dim)(x)

        return keras.Model([encoder_inputs, label_inputs], [z_mean, z_log_var])

    def build_decoder(self):
        latent_inputs = keras.Input(shape=(self.latent_dim,))
        label_inputs = keras.Input(shape=(self.num_classes,))

        x = layers.concatenate([latent_inputs, label_inputs])

        x = layers.Dense(7 * 7 * 128, activation='relu')(x)
        x = layers.Reshape((7, 7, 128))(x)

        x = layers.Conv2DTranspose(64, 3, strides=2, padding='same', activation='relu')(x)
        x = layers.Conv2DTranspose(32, 3, strides=2, padding='same', activation='relu')(x)
        decoder_outputs = layers.Conv2DTranspose(1, 3, padding='same', activation='sigmoid')(x)

        return keras.Model([latent_inputs, label_inputs], decoder_outputs)

    def call(self, inputs, training=None, mask=None):
        encoder_inputs, label_inputs = inputs

        def sampling(args):
            z_mean, z_log_var = args
            epsilon = tf.random.normal(shape=tf.shape(z_mean))
            return z_mean + tf.exp(0.5 * z_log_var) * epsilon

        z_mean, z_log_var = self.encoder(encoder_inputs)
        z = sampling([z_mean, z_log_var]).numpy()
        decoder_outputs = self.decoder([z, label_inputs])

        return decoder_outputs

# Instantiate the model
latent_dim = 20
num_classes = 10
cvae = CVAE(latent_dim, num_classes)

# Display model summary
cvae.build((None, 28, 28, 1))  # Explicitly build the model with a batch input shape
cvae.summary()

In [5]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

class CVAE(keras.Model):
    def __init__(self, latent_dim, num_classes):
        super(CVAE, self).__init__()
        self.latent_dim = latent_dim
        self.num_classes = num_classes

        # Encoder
        self.encoder_inputs = layers.Input(shape=(28, 28, 1))
        self.label_inputs = layers.Input(shape=(num_classes,))
        
        # Apply a dense layer to label_inputs
        label_dense = layers.Dense(28 * 28)(self.label_inputs)
        label_reshaped = layers.Reshape((28, 28, 1))(label_dense)
        
        # Concatenate encoder_inputs and reshaped label
        x = layers.concatenate([self.encoder_inputs, label_reshaped])
        x = layers.Conv2D(32, 3, strides=2, padding='same', activation='relu')(x)
        x = layers.Conv2D(64, 3, strides=2, padding='same', activation='relu')(x)
        x = layers.Flatten()(x)

        self.z_mean = layers.Dense(latent_dim, name='z_mean')(x)
        self.z_log_var = layers.Dense(latent_dim, name='z_log_var')(x)
        self.z = Sampling()(inputs=[self.z_mean, self.z_log_var])

        # Decoder
        self.decoder_inputs = layers.Input(shape=(latent_dim,))
        x = layers.concatenate([self.decoder_inputs, self.label_inputs])
        x = layers.Dense(7 * 7 * 64, activation="relu")(x)
        x = layers.Reshape((7, 7, 64))(x)
        x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)
        x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)
        self.decoder_outputs = layers.Conv2DTranspose(1, 3, activation="sigmoid", padding="same")(x)

        # Build the models
        self.encoder = keras.Model([self.encoder_inputs, self.label_inputs], [self.z_mean, self.z_log_var, self.z], name='encoder')
        self.decoder = keras.Model([self.decoder_inputs, self.label_inputs], self.decoder_outputs, name='decoder')

    def call(self, inputs, training=None, mask=None):
        encoder_inputs, decoder_inputs, label_inputs = inputs
        z_mean, z_log_var, z = self.encoder([encoder_inputs, label_inputs])
        decoder_outputs = self.decoder([decoder_inputs, label_inputs])
        return decoder_outputs

# Instantiate the model
latent_dim = 20
num_classes = 10
cvae = CVAE(latent_dim, num_classes)

# Display model summary
cvae.build([(None, 28, 28, 1), (None, latent_dim), (None, num_classes)])
cvae.summary()


Model: "cvae_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 encoder (Functional)        [(None, 20),              153208    
                              (None, 20),                        
                              (None, 20)]                        
                                                                 
 decoder (Functional)        (None, 28, 28, 1)         152897    
                                                                 
Total params: 306105 (1.17 MB)
Trainable params: 306105 (1.17 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [None]:
# Load MNIST dataset
(x_train, y_train), (_, _) = keras.datasets.mnist.load_data()

# Preprocess the data
x_train = x_train.astype('float32') / 255.0
x_train = x_train.reshape((x_train.shape[0], 28, 28, 1))

# One-hot encode labels
y_train_one_hot = keras.utils.to_categorical(y_train, num_classes)

# Compile the model
cvae.compile(optimizer=keras.optimizers.Adam(), loss='binary_crossentropy')

# Train the model
cvae.fit([x_train, y_train_one_hot, y_train_one_hot], x_train, epochs=30, batch_size=128)

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

(x_train, y_train), (_, _) = keras.datasets.mnist.load_data()
x_train = x_train.astype('float32') / 255.0
x_train = x_train.reshape((x_train.shape[0], 28, 28, 1))

# One-hot encode labels
y_train_one_hot = tf.one_hot(y_train, 10)

# Define the sampling layer
class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

# Define the CVAE model
class CVAE(keras.Model):
    def __init__(self, latent_dim, num_classes):
        super(CVAE, self).__init__()
        self.latent_dim = latent_dim
        self.num_classes = num_classes

        # Encoder
        self.encoder_inputs = layers.Input(shape=(28, 28, 1))
        self.label_inputs = layers.Input(shape=(self.num_classes,))
        x = layers.Conv2D(32, 3, strides=2, padding='same', activation='relu')(self.encoder_inputs)
        x = layers.Conv2D(64, 3, strides=2, padding='same', activation='relu')(x)
        x = layers.Flatten()(x)
        x = layers.concatenate([x, self.label_inputs])  # Concatenate here
        z_mean = layers.Dense(latent_dim, name="z_mean")(x)
        z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)

        # Sampler
        z = Sampling()([z_mean, z_log_var])

        # Encoder Model
        self.encoder = keras.Model([self.encoder_inputs, self.label_inputs], [z_mean, z_log_var, z], name="encoder")

        # Decoder
        decoder_inputs = layers.Input(shape=(latent_dim,))
        x = layers.concatenate([decoder_inputs, self.label_inputs])
        x = layers.Dense(7 * 7 * 64, activation='relu')(x)
        x = layers.Reshape((7, 7, 64))(x)
        x = layers.Conv2DTranspose(64, 3, strides=2, padding='same', activation='relu')(x)
        x = layers.Conv2DTranspose(32, 3, strides=2, padding='same', activation='relu')(x)
        decoder_outputs = layers.Conv2DTranspose(1, 3, padding='same', activation='sigmoid')(x)

        # Decoder Model
        self.decoder = keras.Model([decoder_inputs, self.label_inputs], decoder_outputs, name="decoder")

    def call(self, inputs, training=None, mask=None):
        encoder_inputs, decoder_inputs, label_inputs = inputs
        z_mean, z_log_var, z = self.encoder([encoder_inputs, label_inputs])
        decoder_outputs = self.decoder([z, label_inputs])
        return decoder_outputs, z_mean, z_log_var

# Define the loss function for CVAE
def vae_loss(x, x_decoded_mean, z_mean, z_log_var):
    reconstruction_loss = keras.losses.binary_crossentropy(tf.keras.backend.flatten(x), tf.keras.backend.flatten(x_decoded_mean))
    reconstruction_loss *= 28 * 28  # Assuming input shape is (28, 28, 1)

    kl_loss = 1 + z_log_var - tf.keras.backend.square(z_mean) - tf.keras.backend.exp(z_log_var)
    kl_loss = tf.keras.backend.sum(kl_loss, axis=-1)
    kl_loss *= -0.5

    return tf.keras.backend.mean(reconstruction_loss + kl_loss)

# Instantiate the model with specified latent dimension and number of classes
latent_dim = 20
num_classes = 10
cvae = CVAE(latent_dim, num_classes)

# Compile the model with the custom loss function
cvae.compile(optimizer=keras.optimizers.Adam(learning_rate=0.001), loss=vae_loss)

# Train the model
cvae.fit([x_train, y_train_one_hot], x_train, epochs=100, batch_size=128)

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def generate_images(model, num_images, latent_dim, num_classes):
    # Generate random latent vectors
    random_latent_vectors = np.random.normal(size=(num_images, latent_dim))
    
    # Generate random one-hot encoded class labels
    random_labels = np.eye(num_classes)[np.random.choice(num_classes, num_images)]
    print(random_labels)

    # Generate images from the decoder
    generated_images = model.decoder.predict([random_latent_vectors, random_labels])

    # Display the generated images
    plt.figure(figsize=(10, 10))
    for i in range(num_images):
        plt.subplot(1, num_images, i + 1)
        plt.imshow(generated_images[i, :, :, 0], cmap='gray')
        plt.axis('off')
    plt.show()

# Assuming your trained model is named 'cvae'
generate_images(cvae, num_images=5, latent_dim=20, num_classes=10)

In [None]:
dataset

In [None]:
x_train.shape, y_train_one_hot.shape

## Display a grid of sampled digits

In [None]:
import matplotlib.pyplot as plt


def plot_latent_space(vae, n=30, figsize=15):
    # display a n*n 2D manifold of digits
    digit_size = 28
    scale = 1.0
    figure = np.zeros((digit_size * n, digit_size * n))
    # linearly spaced coordinates corresponding to the 2D plot
    # of digit classes in the latent space
    grid_x = np.linspace(-scale, scale, n)
    grid_y = np.linspace(-scale, scale, n)[::-1]

    for i, yi in enumerate(grid_y):
        for j, xi in enumerate(grid_x):
            z_sample = np.array([[xi, yi]])
            x_decoded = vae.decoder.predict(z_sample, verbose=0)
            digit = x_decoded[0].reshape(digit_size, digit_size)
            figure[
                i * digit_size : (i + 1) * digit_size,
                j * digit_size : (j + 1) * digit_size,
            ] = digit

    plt.figure(figsize=(figsize, figsize))
    start_range = digit_size // 2
    end_range = n * digit_size + start_range
    pixel_range = np.arange(start_range, end_range, digit_size)
    sample_range_x = np.round(grid_x, 1)
    sample_range_y = np.round(grid_y, 1)
    plt.xticks(pixel_range, sample_range_x)
    plt.yticks(pixel_range, sample_range_y)
    plt.xlabel("z[0]")
    plt.ylabel("z[1]")
    plt.imshow(figure, cmap="Greys_r")
    plt.show()


plot_latent_space(vae)

## Display how the latent space clusters different digit classes

In [None]:

def plot_label_clusters(vae, data, labels):
    # display a 2D plot of the digit classes in the latent space
    z_mean, _, _ = vae.encoder.predict(data, verbose=0)
    plt.figure(figsize=(12, 10))
    plt.scatter(z_mean[:, 0], z_mean[:, 1], c=labels)
    plt.colorbar()
    plt.xlabel("z[0]")
    plt.ylabel("z[1]")
    plt.show()


(x_train, y_train), _ = keras.datasets.mnist.load_data()
x_train = np.expand_dims(x_train, -1).astype("float32") / 255

plot_label_clusters(vae, x_train, y_train)