## Imports

In [None]:
from tensorflow import keras
from keras.models import Model
from keras import Input
from keras.layers import Dense, Lambda, Concatenate, Reshape
from keras.utils import plot_model
from keras import backend as K
from tensorflow.python.keras.engine.keras_tensor import KerasTensor
from sklearn.model_selection import train_test_split
from keras.callbacks import History
import tensorflow_probability as tfp
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import graphviz
import plotly
import plotly.express as px
import sys
import os
from typing import List, Tuple
import tensorflow as tf
import random


In [None]:
from google.colab import drive

In [None]:
gdrive_path='/content/gdrive/MyDrive/CM'

# This will mount your google drive under 'MyDrive'
drive.mount('/content/gdrive', force_remount=True)
# In order to access the files in this notebook we have to navigate to the correct folder
os.chdir(gdrive_path)
# Check manually if all files are present
print(sorted(os.listdir()))

### Preprocessing & defining data

In [None]:
from gensim.models import KeyedVectors

In [None]:
# # Specify the path to your Word2Vec dataset
# dataset_path = "GoogleNews-vectors-negative300.bin.gz"

# # Load the Word2Vec model
# model = KeyedVectors.load_word2vec_format(dataset_path, binary=True)


In [None]:
def load_input(dataset_path: str) -> np.ndarray:
    model = KeyedVectors.load_word2vec_format(dataset_path, binary=True)
    all_vectors = model.vectors
    return all_vectors

In [None]:
# in between offer cut down of data

cut_vectors = load_input("GoogleNews-vectors-negative300.bin.gz")[0:100000]

In [None]:
cut_vectors.shape

In [None]:
def normalise_abs_input(unnormalised_dataset: np.ndarray) -> np.ndarray:
    # Figure out the min value, so to make all vectors positive
    min_x_unormalised = min([min(i) for i in unnormalised_dataset])
    pos_x_train = unnormalised_dataset+abs(min_x_unormalised)

    # Normalise the data by dividing by the max of the positive data
    max_pos_x = max([max(i) for i in pos_x_train])
    x_train = pos_x_train/max_pos_x
    return x_train

In [None]:
normalized = normalise_abs_input(cut_vectors)

In [None]:
# Split the data into train and test sets
x_train, x_test = train_test_split(normalized, test_size=0.2, random_state=42)

# Split the train set into train and validation sets
x_train, x_val = train_test_split(x_train, test_size=0.1, random_state=42)

In [None]:
x_train.shape

In [None]:
x_test.shape

In [None]:
x_val.shape

In [None]:
type(x_train)

In [None]:
# number of neurons at the input layer (28 * 28 = 784)
original_dim = 300
# latent space dimension
latent_dim = 2
# hidden layer dimension
hl_dim = 128

In [None]:
# Defining the input of the decoder
latent_inputs = Input(shape=(latent_dim,), name='Input_Z_Sampling')

In [None]:
# Defining the input of the encoder
input = Input(shape=(original_dim,), name='Encoder_Input_Layer')

### Functions

1. Basic Functionalities

In [None]:
def sampling(args: Tuple[tf.Tensor, tf.Tensor]) -> tf.Tensor:
    """
    Samples from a multivariate normal distribution using the reparameterisation trick.
    Args:
        args: A tuple of two tensors representing the mean and log standard deviation of the approximate posterior distribution.
    Returns:
        tf.Tensor: A tensor representing a sample drawn from the multivariate normal distribution.
    """
    z_mean, z_log_sigma = args
    distribution = tfp.distributions.MultivariateNormalDiag(loc=z_mean, scale_diag=z_log_sigma)
    z = distribution.sample()
    return z

In [None]:
def create_encoder_model(input: KerasTensor, activation_mean: str = None, activation_sd: str = None,
                         hl_dim: int = 256, latent_dim: int = 2) -> Tuple[Model, KerasTensor, KerasTensor]:
    """
    Creates an encoder model that samples from a multivariate normal distribution using the reparameterization trick.

    Args:
        input (KerasTensor): The input tensor for the encoder model.
        activation_mean (str, optional): Activation function for the mean component of the latent space layer. Defaults to None.
        activation_sd (str, optional): Activation function for the standard deviation component of the latent space layer. Defaults to None.
        hl_dim (int, optional): Dimension of the hidden layers. Defaults to 256.
        latent_dim (int, optional): Dimension of the latent space. Defaults to 2.

    Returns:
        Tuple[Model, tf.Tensor, tf.Tensor]: A tuple containing the encoder model, the tensor representing the mean component of the latent space,
        and the tensor representing the log standard deviation component of the latent space.
    """

    hl_1 = Dense(units=hl_dim, activation='relu', name='Encoder_First_HL')(input)
    hl_2 = Dense(units=hl_dim, activation='relu', name='Encoder_Second_HL')(hl_1)

    z_mean = Dense(units=latent_dim, activation = activation_mean, name='z_Mean')(hl_2)
    z_log_sd = Dense(units=latent_dim, activation = activation_sd, name='z_log_SD')(hl_2)

    z = Lambda(sampling, name='z_Sampling_Layer')([z_mean, z_log_sd])

    encoder = Model(input, [z_mean, z_log_sd, z], name='Encoder_Model')
    return encoder, z_mean, z_log_sd

In [None]:
def create_decoder_model(latent_inputs: KerasTensor, activation_mean: str = None, hl_dim: int = 256,
                         original_dim: int = 784) -> Tuple[Model, KerasTensor]:
    """
    Creates a decoder model for generating output based on latent inputs.

    Args:
        latent_inputs (KerasTensor): The input tensor for the decoder model.
        activation_mean (str, optional): Activation function for the output mean layer. Defaults to None.
        hl_dim (int, optional): Dimension of the hidden layers. Defaults to 256.
        original_dim (int, optional): Dimension of the original data. Defaults to 784.

    Returns:
        Tuple[Model, tf.Tensor]: A tuple containing the decoder model and the tensor representing the output mean.
    """

    hl_dec1 = Dense(units = hl_dim, activation ='relu', name ='Decoder_First_HL')(latent_inputs)
    hl_dec2 = Dense(units = hl_dim, activation ='relu', name ='Decoder_Second_HL')(hl_dec1)

    output_mean = Dense(units = original_dim, activation = activation_mean, name ='Output_Mean')(hl_dec2)

    decoder = Model(latent_inputs, output_mean, name ='Decoder_Model')

    return decoder, output_mean

In [None]:
def loss_function(original_dim: int, input: KerasTensor, en_decoder_merged: KerasTensor, z_log_sd: KerasTensor, z_mean: KerasTensor) -> KerasTensor:
    """
    Calculates the loss function for a variational autoencoder (VAE).

    Args:
        original_dim (int): Dimension of the original data.
        input (Tensor): The input tensor.
        en_decoder_merged (Tensor): The tensor representing the merged output of the encoder and decoder.
        z_log_sd (Tensor): The tensor representing the log standard deviation of the latent space.
        z_mean (Tensor): The tensor representing the mean of the latent space.

    Returns:
        Tensor: The calculated ELBO loss.
    """

    r_loss = original_dim * keras.losses.mse(input, en_decoder_merged)

    kl_loss =  -0.5 * K.sum(1 + z_log_sd - K.square(z_mean) - K.exp(z_log_sd), axis = 1)

    elbo_loss = K.mean(r_loss + kl_loss)

    return(elbo_loss)

In [None]:
def run_all(
        input: KerasTensor = input,
        enc_activation_mean: str = None,
        enc_activation_sd: str = None,
        hl_dim: int = hl_dim,
        latent_dim: int = latent_dim,
        original_dim: int = original_dim,
        latent_inputs: KerasTensor = latent_inputs,
        dec_activation_mean: str = None,
        epochs: int = 3,
        batch_size: int = 128,
        x_val: np.ndarray = x_val,
        x_train: np.ndarray = x_train,
        learning_rate: float = 0.001
    ) -> Tuple[Model, Model, History, Model]:
    """
    Runs the entire pipeline for training a Variational Autoencoder (VAE) model.

    Args:
        input (KerasTensor): The input tensor for the encoder.
        enc_activation_mean (str, optional): Activation function for the mean component of the encoder's latent space layer. Defaults to None = Linear.
        enc_activation_sd (str, optional): Activation function for the standard deviation component of the encoder's latent space layer. Defaults to None = Linear.
        hl_dim (int, optional): Dimension of the hidden layers. Defaults to hl_dim.
        latent_dim (int, optional): Dimension of the latent space. Defaults to latent_dim.
        original_dim (int, optional): Dimension of the original data. Defaults to original_dim.
        latent_inputs (KerasTensor): The input tensor for the decoder's latent space.
        dec_activation_mean (str, optional): Activation function for the mean component of the decoder's output layer. Defaults to None = Linear.
        epochs (int, optional): Number of training epochs. Defaults to 3.
        batch_size (int, optional): Batch size for training. Defaults to 128.
        x_test (ndarray, optional): Test data. Defaults to x_test.
        x_train (ndarray, optional): Training data. Defaults to x_train.
        learning_rate (float, optional): Learning rate for optimization. Defaults to 0.001.

    Returns:
        Tuple: A tuple containing the encoder model, decoder model, training history, and VAE model.

    Notes:
        This function sets up the encoder and decoder models using the specified parameters.
        It creates the VAE model by combining the encoder and decoder models.
        The VAE model is then trained using the provided data.
        The function returns the encoder model, decoder model, training history, and VAE model.
    """

    encoder, z_mean, z_log_sd = create_encoder_model(input, activation_mean = enc_activation_mean, activation_sd = enc_activation_sd, hl_dim = hl_dim, latent_dim = latent_dim)
    decoder, output_mean = create_decoder_model(latent_inputs, activation_mean = dec_activation_mean, hl_dim = hl_dim, original_dim = original_dim)

    # note: we take z by specifying [2]
    en_decoder_merged = decoder(encoder(input)[2])

    vae = Model(inputs=input, outputs=en_decoder_merged, name='VAE_Model')

    elbo_loss = loss_function(original_dim, input, en_decoder_merged, z_log_sd, z_mean)

    vae.add_loss(elbo_loss)
    vae.compile(optimizer=keras.optimizers.Adam(learning_rate = learning_rate))

    history = vae.fit(x_train, x_train, epochs = epochs, batch_size = batch_size, validation_data = (x_val, x_val), verbose = 0)

    return encoder, decoder, history, vae

### Running the experiments & Plotting

In [None]:
def plot_loss(history: History) -> None:

    """
    Plots the training and validation loss over epochs.

    Args:
        history (History): The training history object obtained from model training.
    """

    fig, ax = plt.subplots(figsize = (16,9), dpi = 300)
    plt.title(label = 'Model Loss by Epoch', loc = 'center')

    ax.plot(history.history['loss'], label = 'Training Data', color = 'black')
    ax.plot(history.history['val_loss'], label = 'Test Data', color = 'red')
    ax.set(xlabel = 'Epoch', ylabel = 'Loss')
    plt.xticks(ticks = np.arange(len(history.history['loss']), step = 1), labels = np.arange(1, len(history.history['loss'])+1, step = 1))
    plt.legend()
    plt.show()

In [None]:
encoder, decoder, history, vae = run_all(epochs = 25)
plot_loss(history)

In [None]:
# Exploring when the optimisation converges
encoder, decoder, history, vae = run_all(epochs = 100)
print("Plotting the Latent Representation:")
plot_loss(history)
# after about epoch 60, the loss function of the test set does not decrease anymore.