# Evaluation of the Adaptive latent space augmentation encoder

This notebook consists of the python scripts evaluating the trained model on face images.
The evaluation notebook consists two out of  three subsections, each focusing on different aspects of the model's performance.

5.1 Trained Model Performance
  - MSE
  - Perceptual Metrics - IS, FID
  - Sample quality assessment (Reconstruction Examples)

5.2 Latent space Exploration
  - Visualizing Latent space using t-SNE or PCA
  - Interpolation in Latent Space

5.3 Computational Efficiency
  - FLOPS calculations
  - Model parameters estimation.





### Importing Libraries

The below snippet imports necessary libraries including Tensorflow, scikit-learn, OpenCV, and Matplotlib for various tasks such as data preprocessing, model evaluation, and visualization.

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
import pickle
import os
import cv2
import time
#import tensorflow_probability as tfp

from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, Model, losses
from tensorflow.keras.layers import Layer, Input, Conv2D, Dense, Flatten, Reshape, Lambda, Dropout
from tensorflow.keras.layers import Conv2DTranspose, MaxPooling2D, UpSampling2D, LeakyReLU, BatchNormalization
from tensorflow.keras.activations import relu
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.preprocessing.image import ImageDataGenerator

### Custom layers and Model definition

The below code snippet defines custom layers for the VAE model such as GaussianSampling, DownConvBlock, UpConvBlock, Encoder, Decoder, and the VAE model itself.

In [None]:
class GaussianSampling(Layer):
    def call(self, inputs):
        means, logvar = inputs
        epsilon = tf.random.normal(shape=tf.shape(means), mean=0., stddev=1.)
        samples = means + tf.exp(0.5*logvar)*epsilon

        return samples

class DownConvBlock(Layer):
    count = 0
    def __init__(self, filters, kernel_size=(3,3), strides=1, padding='same'):
        super(DownConvBlock, self).__init__(name=f"DownConvBlock_{DownConvBlock.count}")
        DownConvBlock.count+=1
        self.forward = Sequential([Conv2D(filters, kernel_size, strides, padding)])
        self.forward.add(BatchNormalization())
        self.forward.add(layers.LeakyReLU(0.2))

    def call(self, inputs):
        return self.forward(inputs)

class UpConvBlock(Layer):
    count = 0
    def __init__(self, filters, kernel_size=(3,3), padding='same'):
        super(UpConvBlock, self).__init__(name=f"UpConvBlock_{UpConvBlock.count}")
        UpConvBlock.count += 1
        self.forward = Sequential([Conv2D(filters, kernel_size, 1, padding),])
        self.forward.add(layers.LeakyReLU(0.2))
        self.forward.add(UpSampling2D((2,2)))

    def call(self, inputs):
        return self.forward(inputs)

class Encoder(Layer):
    def __init__(self, z_dim, name='encoder'):
        super(Encoder, self).__init__(name=name)

        self.features_extract = Sequential([
            DownConvBlock(filters = 32, kernel_size=(3,3), strides=2),
            DownConvBlock(filters = 32, kernel_size=(3,3), strides=2),
            DownConvBlock(filters = 64, kernel_size=(3,3), strides=2),
            DownConvBlock(filters = 64, kernel_size=(3,3), strides=2),
            Flatten()])

        self.dense_mean = Dense(z_dim, name='mean')
        self.dense_logvar = Dense(z_dim, name='logvar')
        self.sampler = GaussianSampling()

    def call(self, inputs):
        x = self.features_extract(inputs)
        mean = self.dense_mean(x)
        logvar = self.dense_logvar(x)
        z = self.sampler([mean, logvar])
        return z, mean, logvar

class Decoder(Layer):
    def __init__(self, z_dim, name='decoder'):
        super(Decoder, self).__init__(name=name)

        self.forward = Sequential([
                        Dense(8*8*64, activation='relu'),
                        Reshape((8,8,64)),
                        UpConvBlock(filters=64, kernel_size=(3,3)),
                        UpConvBlock(filters=64, kernel_size=(3,3)),
                        UpConvBlock(filters=32, kernel_size=(3,3)),
                        UpConvBlock(filters=32, kernel_size=(3,3)),
                        Conv2D(filters=3, kernel_size=(3,3), strides=1, padding='same', activation='sigmoid'),

        ])

    def call(self, inputs):
        return self.forward(inputs)


class VAE(Model):
    def __init__(self, z_dim, name='VAE'):
        super(VAE, self).__init__(name=name)
        self.encoder = Encoder(z_dim)
        self.decoder = Decoder(z_dim)
        self.mean = None
        self.logvar = None

    def call(self, inputs):
        z, self.mean, self.logvar = self.encoder(inputs)
        out = self.decoder(z)
        return out

def log_normal_pdf(sample, mean, logvar, raxis=1):
    log2pi = tf.math.log(2. * np.pi)
    return tf.reduce_sum(
        -.5 * ((sample - mean) ** 2. * tf.exp(-logvar) + logvar + log2pi),
        axis=raxis)

def compute_loss(model, x):
    z, mean, logvar = model.encoder(x)
    x_logit = model.decoder(z)

    # Reconstruction loss
    cross_ent = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=x)
    logpx_z = -tf.reduce_sum(cross_ent, axis=[1, 2, 3])

    # KL divergence loss
    logpz = log_normal_pdf(z, 0., 0.)
    logqz_x = log_normal_pdf(z, mean, logvar)

    # Total loss
    loss = -tf.reduce_mean(logpx_z + logpz - logqz_x)

    return loss


### Data and Model Loading

This snippet loads the data, preprocesses by resizing and normalizing the images.
It loads a pre-trained VAE model from a checkpoint file to evaluates its performance.

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
# Instantiate the model
loaded_model = VAE(z_dim=64)  # Assuming z_dim is 128

# Call the model once to build its variables
sample_input = tf.ones((1, 128,128,3))  # Adjust input shape as per your model
_ = loaded_model(sample_input)

# Load weights from the checkpoint
checkpoint_path = "../data/ouput/checkpoints/final_model.h5"
loaded_model.load_weights(checkpoint_path)

In [None]:
num_files = 20000 # Define the desired number of files
celebA_dir = '/content/drive/MyDrive/Master_project/CelebA_data/data/img_align_celeba'
# List all JPEG files in the directory
image_filenames = glob.glob(os.path.join(celebA_dir, '*.jpg'))

# Take only the first 'num_files' files
image_filenames = image_filenames[:num_files]

# Check if there are any JPEG files found
if not image_filenames:
    raise ValueError("No JPEG files found in the directory.")

# Split the dataset into training and testing sets
train_files, test_files = train_test_split(image_filenames, test_size=0.9, random_state=42)

# Print the number of images in each split
print("Number of train images:", len(train_files))
print("Number of test images:", len(test_files))

In [None]:
def preprocess_images(image_filenames, target_size=(128, 128)):
    # Load, resize, and convert images to RGB
    images = [cv2.cvtColor(cv2.resize(cv2.imread(filename), target_size), cv2.COLOR_BGR2RGB) for filename in image_filenames if cv2.imread(filename) is not None]
    # Convert images to float32 and normalize
    preprocessed_images = np.array(images).astype('float32') / 255.0
    return preprocessed_images

In [None]:
target_size = (128,128)

test_images = preprocess_images(test_files, target_size)
print(f"test_images:{test_images.shape}")

## 5.1 Trained Model performance

We evaluate the trained model's effectiveness of generative models by employing various evaluation metrics. These metrics include mean squared error (MSE), perceptual metrics such as Incpetion Score (IS) and Frechet Inception distance (FID), as well as sample quality assessment.

### Trained Model Performance metrics:

- Mean Squared error:

- Perceptuual metrics:
  - Inception Score:
  - Frechet Inception distance:

- Sample quality evaluation


In [None]:
def compute_mse(images1, images2):
    if images1.shape != images2.shape:
        raise ValueError("Shapes of input images must be the same.")
    mse = np.mean((images1 - images2)**2)
    rmse = np.sqrt(mse)
    lmse = np.log(mse + 1e-9) # Adding a small epsilon to avoid log(0)
    return mse, rmse, lmse

# Function to compute validation loss
def compute_validation_loss(model, validation_images):
    z_test, encoded_imgs_mean, encoded_imgs_logvar = model.encoder(validation_images)
    #z_test_augmented = latent_space_augmentations(z_test, cutout_mask_size, mixup_alpha)
    # Generate fake images using the trained model
    decoded_imgs = model.decoder(z_test)

    # Convert images to NumPy arrays
    test_images_np = validation_images
    decoded_imgs_np = decoded_imgs.numpy()

    # Compute mean squared error (MSE) as the validation loss
    val_loss, rmse, lmse = compute_mse(test_images_np, decoded_imgs_np)
    # Convert to TensorFlow tensor
    val_loss_tf = tf.constant(val_loss, dtype=tf.float32)
    rmse_tf = tf.constant(rmse, dtype = tf.float32)
    lmse_tf = tf.constant(lmse, dtype = tf.float32)
    return val_loss_tf, rmse_tf, lmse_tf

In [None]:
MSE, RMSE, LMSE = compute_validation_loss(loaded_model, test_images)
print("Trained Model Performance:")
print(f"MSE:{MSE}, RMSE:{RMSE}, LMSE:{LMSE}")

In [None]:
def generate_images(model, images):
    z, mean, logvar = model.encoder(images)
    images = model.decoder(z)
    return images
generated_images = generate_images(loaded_model, test_images)

### Inception Score, FID

In [None]:
## Inception Score, FID
from scipy.stats import entropy
from skimage.transform import resize
from tqdm import tqdm
from tensorflow.keras.applications.inception_v3 import InceptionV3, preprocess_input

from tensorflow.keras.models import load_model
from tensorflow.keras import backend as K

In [None]:
def inception_score(images, inception_model, batch_size=16):
    scores = []
    for i in tqdm(range(0, len(images), batch_size)):
        batch = images[i:i+batch_size]
        batch = preprocess_input(batch)
        preds = inception_model.predict(batch)
        p_yx = preds.mean(axis=0)
        kl_divs = []
        for pred in preds:
            kl_divs.append(entropy(pred, p_yx))
        scores.append(np.exp(np.mean(kl_divs)))  # Append to the list instead of extending
    return np.mean(scores), np.std(scores)

In [None]:
from scipy.linalg import sqrtm

def calculate_fid(real_images, generated_images, inception_model):
    # Resize images to 299x299 as required by InceptionV3
    real_resized = np.array([resize(image, (128, 128, 3)) for image in real_images])
    gen_resized = np.array([resize(image, (128, 128, 3)) for image in generated_images])

    # Preprocess images
    real_preprocessed = preprocess_input(real_resized)
    gen_preprocessed = preprocess_input(gen_resized)

    # Get feature representations
    real_features = inception_model.predict(real_preprocessed)
    gen_features = inception_model.predict(gen_preprocessed)

    # Calculate mean and covariance
    mu_real, sigma_real = real_features.mean(axis=0), np.cov(real_features, rowvar=False)
    mu_gen, sigma_gen = gen_features.mean(axis=0), np.cov(gen_features, rowvar=False)

    # Calculate FID
    diff = mu_real - mu_gen
    cov_sqrt = sqrtm(sigma_real.dot(sigma_gen))  # Use scipy's square root function
    if np.iscomplexobj(cov_sqrt):
        cov_sqrt = cov_sqrt.real
    fid = np.dot(diff, diff) + np.trace(sigma_real + sigma_gen - 2 * cov_sqrt)
    return fid

In [None]:
from tensorflow.keras.applications.inception_v3 import InceptionV3, preprocess_input

# Load the InceptionV3 model
inception_model = InceptionV3(include_top=False, pooling='avg', input_shape=(128, 128, 3))

# Calculate and print Inception Score
inception_score_mean, inception_score_std = inception_score(generated_images, inception_model)
print("Inception Score:", inception_score_mean, "+/-", inception_score_std)

# Calculate and print FID
fid = calculate_fid(test_images, generated_images, inception_model)
print("FID:", fid)

### Sample quality generation

This code snippet visualizes pairs of original images and their reconstructions generated by a Variational Autoencoder (VAE) model. It defines a function to plot the original and reconstructed images side by side, and then selects a subset of test images to visualize. Finally, it calls the visualization function with the VAE model and the selected subset of images.


In [None]:
# Function to visualize input images and their reconstructions
def visualize_reconstructions(model, images, num_images=10):
    # Generate reconstructions
    reconstructed_images = model(images)

    # Plot original images and their reconstructions
    plt.figure(figsize=(15, 6))
    for i in range(num_images):
        # Original image
        plt.subplot(2, num_images, i + 1)
        plt.imshow(images[i])
        plt.title('Original')
        plt.axis('off')

        # Reconstructed image
        plt.subplot(2, num_images, num_images + i + 1)
        plt.imshow(reconstructed_images[i])
        plt.title('Reconstructed')
        plt.axis('off')
    plt.show()

# Choose the number of images to visualize (e.g., 10 or 16)
num_images_to_visualize = 10

# Select a subset of test images for visualization
subset_images_for_visualization = test_images[:num_images_to_visualize]

# Visualize input images and their reconstructions
visualize_reconstructions(loaded_model, subset_images_for_visualization)

## Latent Space Analysis

### T-SNE of latent space of VAE trained model

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE

# Load data and encode it using the encoder
# Assuming you have a dataset named 'x_train'
encoded_data, _, _ = loaded_model.encoder(test_images)

# Apply t-SNE to reduce the dimensionality of the latent space representations to two dimensions
tsne = TSNE(n_components=2, random_state=42)
latent_space_tsne = tsne.fit_transform(encoded_data)

# Visualize the t-SNE embeddings using a scatter plot
plt.figure(figsize=(8, 6))
plt.scatter(latent_space_tsne[:, 0], latent_space_tsne[:, 1], c='b', alpha=0.5)
plt.title('t-SNE Visualization of Latent Space')
plt.xlabel('t-SNE Component 1')
plt.ylabel('t-SNE Component 2')
plt.show()


### Exploring latent space for generated faces

In [None]:
# example of loading the generator model and generating images
from numpy import asarray
from numpy.random import randn
from numpy.random import randint
from keras.models import load_model
from matplotlib import pyplot

# generate points in latent space as input for the generator
def generate_latent_points(latent_dim, n_samples):
 # generate points in the latent space
 x_input = randn(latent_dim * n_samples)
 # reshape into a batch of inputs for the network
 z_input = x_input.reshape(n_samples, latent_dim)
 return z_input

# create a plot of generated images
def plot_generated(examples, n):
 # plot images
 for i in range(n * n):
  # define subplot
  pyplot.subplot(n, n, 1 + i)
  # turn off axis
  pyplot.axis('off')
  # plot raw pixel data
  pyplot.imshow(examples[i, :, :])
pyplot.show()

# generate images
latent_points = generate_latent_points(64, 25)
# generate images
X  = loaded_model.decoder(latent_points)
# scale from [-1,1] to [0,1]
X = (X + 1) / 2.0
# plot the result
plot_generated(X, 5)

In [None]:
from numpy import linspace

# uniform interpolation between two points in latent space
def interpolate_points(p1, p2, n_steps=9):
  # interpolate ratios between the points
  ratios = linspace(0, 1, num=n_steps)
  # linear interpolate vectors
  vectors = list()
  for ratio in ratios:
    v = (1.0 - ratio) * p1 + ratio * p2
    vectors.append(v)
  return asarray(vectors)

pts = generate_latent_points(64, 2)
# interpolate points in latent space
interpolated = interpolate_points(pts[0], pts[1])
# generate images
try:
    X = loaded_model.decoder(interpolated)
    # Scale from [-1,1] to [0,1]
    X = (X + 1) / 2.0
    # Print shapes
    print("Shape of interpolated images:", X.shape)
    # Plot the result
    plot_generated(X, len(interpolated))
except Exception as e:
    print("Error occurred during interpolation:", e)

## Computational analysis 

Many deep learning research papers specifically report the following metrics to compare Time complexity(Speed of inference) and space complexity (Model size).

1. Time complexity in terms of FLOPS (floating-point operations) 
2. Model size in terms of the number of parameters 

### FLOPS Calculation


In [None]:
import tensorflow as tf
import numpy as np

def get_flops(model, model_inputs) -> float:
        """
        Calculate FLOPS [GFLOPs] for a tf.keras.Model or tf.keras.Sequential model
        in inference mode. It uses tf.compat.v1.profiler under the hood.
        """
        # if not hasattr(model, "model"):
        #     raise wandb.Error("self.model must be set before using this method.")

        if not isinstance(
            model, (tf.keras.models.Sequential, tf.keras.models.Model)
        ):
            raise ValueError(
                "Calculating FLOPS is only supported for "
                "`tf.keras.Model` and `tf.keras.Sequential` instances."
            )

        from tensorflow.python.framework.convert_to_constants import (
            convert_variables_to_constants_v2_as_graph,
        )

        # Compute FLOPs for one sample
        batch_size = 1
        inputs = [
            tf.TensorSpec([batch_size] + inp.shape[1:], inp.dtype)
            for inp in model_inputs
        ]

        # convert tf.keras model into frozen graph to count FLOPs about operations used at inference
        real_model = tf.function(model).get_concrete_function(inputs)
        frozen_func, _ = convert_variables_to_constants_v2_as_graph(real_model)

        # Calculate FLOPs with tf.profiler
        run_meta = tf.compat.v1.RunMetadata()
        opts = (
            tf.compat.v1.profiler.ProfileOptionBuilder(
                tf.compat.v1.profiler.ProfileOptionBuilder().float_operation()
            )
            .with_empty_output()
            .build()
        )

        flops = tf.compat.v1.profiler.profile(
            graph=frozen_func.graph, run_meta=run_meta, cmd="scope", options=opts
        )

        tf.compat.v1.reset_default_graph()

        # convert to GFLOPs
        return flops.total_float_ops, (flops.total_float_ops / 1e9)/2

In [None]:
#Usage
if __name__ =="__main__":
    image_model = tf.keras.applications.EfficientNetB0(include_top=False, weights=None)

    x = tf.constant(np.random.randn(1,128,128,3))

    print(get_flops(loaded_model, [x]))

### Model size in terms of parameters


In [None]:
import tensorflow as tf

# Count total number of parameters
total_params = loaded_model.count_params()

# Count trainable parameters
trainable_params = sum([tf.keras.backend.count_params(w) for w in loaded_model.trainable_weights])

print("Total number of parameters:", total_params)
print("Total number of trainable parameters:", trainable_params)
