In [25]:
# https://github.com/DiveshRKubal/GenerativeAI/blob/main/Variational%20Autoencoders/Variational_Autoencoders_Implementation.ipynb
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import norm
import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras import (
    layers,
    models,
    datasets,
    callbacks,
    losses,
    optimizers,
    metrics,
)

import keras

from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
import keras_tuner
from tensorflow.keras.callbacks import TensorBoard
from tensorboard.plugins.hparams import api as hp

import numpy as np
from tensorflow import keras
from keras.layers import Input, Dense, Lambda
import tensorflow as tf
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from keras import backend as K
from sklearn.model_selection import train_test_split


# from tensorflow.python.framework.ops import disable_eager_execution

# disable_eager_execution()

In [26]:
# Define the desired number of samples for each class
class_samples = [2030, 3020]  # Adjust these numbers as needed

# Calculate weights based on the desired number of samples
class_weights = [num_samples / sum(class_samples) for num_samples in class_samples]


# Generate a synthetic dataset with different numbers of samples for each class
X, y = make_classification(
    n_samples=sum(class_samples),
    n_features=4,
    n_informative=4,
    n_redundant=0,
    n_classes=2,
    weights=class_weights,
    random_state=42,
)


# Create a Pandas DataFrame with the generated data
columns = ["Feature_1", "Feature_2", "Feature_3", "Feature_4"]
synthetic_df = pd.DataFrame(data=X, columns=columns)

for column in synthetic_df:
    std = np.std(synthetic_df[column])
    mean = np.mean(synthetic_df[column])
    synthetic_df[column] = synthetic_df[column]-mean
    synthetic_df[column] = synthetic_df[column]/std


synthetic_df["target"] = y

# Display the first few rows of the synthetic dataset
synthetic_array =synthetic_df.values
train_data, test_data = train_test_split(
    synthetic_array, test_size=0.2, random_state=42)

In [27]:
%load_ext tensorboard

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


## Data Preprocessing

# Designing and Building Variational Autoencoder

First, we need to create a new Sampling layer for sampling from the distribution defined by z_mean and z_log_var.

In [28]:
from itertools import product

# https://www.tensorflow.org/tensorboard/hyperparameter_tuning_with_hparams

list_of_layers = [8, 32, 128, 256, 512, 1024]
list_of_latent = [2, 4, 16, 128, 256]
list_of_boolean = [True, False]
list_of_drop_out = [0.1, 0.3, 0.5]


hyperparameters_product = product(
    list_of_layers, list_of_layers, list_of_latent, list_of_boolean, list_of_boolean,
    list_of_drop_out
)
# Calculate the total number of combinations
number_of_combinations = len(list(hyperparameters_product))

print("Total number of combinations:", number_of_combinations)


Total number of combinations: 2160


In [29]:


HP_NUM_embedded = hp.HParam("num_embedded", hp.Discrete(list_of_latent))
HP_NUM_UNITS = hp.HParam("num_units", hp.Discrete(list_of_layers))
HP_NUM_UNITS1 = hp.HParam("num_units1", hp.Discrete(list_of_layers))
HP_BOOL_BATCH = hp.HParam("bool_batch", hp.Discrete(list_of_boolean))
HP_DROP_OUT_BOOL = hp.HParam("drop_out", hp.Discrete(list_of_boolean))
HP_DROP_OUT_RATE = hp.HParam("drop_out_rate", hp.Discrete(list_of_drop_out))
VAL_LOSS = "val_loss"





with tf.summary.create_file_writer("logs/hparam_tuning_new2/").as_default():
    hp.hparams_config(
        hparams=[
            HP_NUM_embedded,
            HP_BOOL_BATCH,
            HP_NUM_UNITS,
            HP_NUM_UNITS1,
            HP_DROP_OUT_BOOL,
            HP_DROP_OUT_RATE,
        ],
        metrics=[hp.Metric(VAL_LOSS, display_name="val_loss")],
    )

In [30]:
def build_model(hparams):
    class Sampling(layers.Layer):
        # We create a new layer by subclassing the keras base Layer
        def call(self, inputs):
            z_mean, z_log_var = inputs
            batch = tf.shape(z_mean)[0]
            dim = tf.shape(z_mean)[1]
            epsilon = K.random_normal(shape=(batch, dim))
            return z_mean + tf.exp(0.5 * z_log_var) * epsilon

    embed_dim = hparams[HP_NUM_embedded]

    encoder_input = layers.Input(shape=(5,), name="encoder_input")
    x = layers.Dense(hparams[HP_NUM_UNITS], activation="relu", name="h1")(encoder_input)
    if hparams[HP_BOOL_BATCH] == True:
        x = layers.BatchNormalization()(x)

    if hparams[HP_BOOL_BATCH] == True:
        x = layers.Dropout(hparams[HP_DROP_OUT_RATE])(x)

    x = layers.Dense(hparams[HP_NUM_UNITS1], activation="relu", name="h2")(x)
    if hparams[HP_BOOL_BATCH] == True:
        x = layers.BatchNormalization()(x)

    n_x3 = hparams[HP_NUM_UNITS1]

    x = layers.Dense(n_x3, activation="relu", name="h3")(x)

    if hparams[HP_BOOL_BATCH] == True:
        x = layers.BatchNormalization()(x)

    if hparams[HP_BOOL_BATCH] == True:
        x = layers.Dropout(hparams[HP_DROP_OUT_RATE])(x)

    # Split x3 into two halves
    half_size = n_x3 // 2
    x3_first_half = layers.Lambda(lambda x: x[:, :half_size], name="select_z_mean")(x)
    x3_second_half = layers.Lambda(lambda x: x[:, half_size:], name="select_z_var")(x)

    z_mean = layers.Dense(embed_dim, name="z_mean")(x3_first_half)
    z_log_var = layers.Dense(embed_dim, name="z_log_var")(x3_second_half)

    # The Sampling layer samples a point z in the latent space from the
    # normal distribution defined by the parameters z_mean and z_log_var.
    z = Sampling()([z_mean, z_log_var])

    # The Keras Model that defines the encoder—a model that takes an input
    # image and outputs z_mean, z_log_var and a sampled point z from the
    # normal distribution defined by these parameters.
    encoder = models.Model(encoder_input, [z_mean, z_log_var, z], name="encoder")

    # Decoder
    decoder_input = layers.Input(shape=(embed_dim,), name="decoder_input")

    x = layers.Dense(hparams[HP_NUM_UNITS1], activation="relu", name="h4")(
        decoder_input
    )

    if hparams[HP_BOOL_BATCH] == True:
        x = layers.BatchNormalization()(x)

    if hparams[HP_BOOL_BATCH] == True:
        x = layers.Dropout(hparams[HP_DROP_OUT_RATE])(x)

    x = layers.Dense(hparams[HP_NUM_UNITS1], activation="relu", name="h5")(x)

    if hparams[HP_BOOL_BATCH] == True:
        x = layers.BatchNormalization()(x)

    if hparams[HP_BOOL_BATCH] == True:
        x = layers.Dropout(hparams[HP_DROP_OUT_RATE])(x)

    n_x6 = hparams[HP_NUM_UNITS]

    x = layers.Dense(n_x6, activation="relu", name="h6")(x)

    if hparams[HP_BOOL_BATCH] == True:
        x = layers.BatchNormalization()(x)

    if hparams[HP_BOOL_BATCH] == True:
        x = layers.Dropout(hparams[HP_DROP_OUT_RATE])(x)

    # Split x6 into two parts (80% and 20%)
    cont_decoder_input = layers.Lambda(
        lambda x: x[:, : int(4 / 5 * n_x6)], name="select_x6_cont"
    )(x)
    class_decoder_input = layers.Lambda(
        lambda x: x[:, int(4 / 5 * n_x6) :], name="select_x6_class"
    )(x)

    cont_decoder_outputs = layers.Dense(
        4, activation="linear", name="cont_decoder_output"
    )(cont_decoder_input)
    class_decoder_output = layers.Dense(
        1, activation="sigmoid", name="classification_output"
    )(class_decoder_input)

    decoder = models.Model(decoder_input, [cont_decoder_outputs, class_decoder_output])

    class VAE(models.Model):
        def __init__(self, encoder, decoder, **kwargs):
            super(VAE, self).__init__(**kwargs)
            self.encoder = encoder
            self.decoder = decoder
            self.total_loss_tracker = metrics.Mean(name="total_loss")
            self.reconstruction_loss_tracker_cont = metrics.Mean(
                name="reconstruction_loss_cont"
            )
            self.reconstruction_loss_tracker_class = metrics.Mean(
                name="reconstruction_loss_class"
            )

            self.kl_loss_tracker = metrics.Mean(name="kl_loss")

        @property
        def metrics(self):
            return [
                self.total_loss_tracker,
                self.reconstruction_loss_tracker_cont,
                self.reconstruction_loss_tracker_class,
                self.kl_loss_tracker,
            ]

        def call(self, inputs):
            """Call the model on a particular input."""
            z_mean, z_log_var, z = encoder(inputs)
            reconstruction = decoder(z)
            return z_mean, z_log_var, reconstruction

        def train_step(self, data):
            """Step run during training."""
            with tf.GradientTape() as tape:
                # TensorFlow's Gradient Tape helps calculate gradients during a forward pass.
                # To use it, we need wrap the code that performs the operations you want to differentiate within a tf.GradientTape() context.
                # After recording the operations, we can compute the gradient of the loss function concerning certain variables using tape.gradient().
                # These gradients are then used to update the variables with the optimizer.

                z_mean, z_log_var, reconstruction = self(data)
                beta = 500
                reconstruction_loss_cont = tf.reduce_mean(
                    beta * losses.mean_squared_error(data[:, :4], reconstruction[0])
                )
                reconstruction_loss_class = tf.reduce_mean(
                    beta * losses.binary_crossentropy(data[:, 4:], reconstruction[1])
                )
                kl_loss = tf.reduce_mean(
                    tf.reduce_sum(
                        -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)),
                        axis=1,
                    )
                )
                total_loss = (
                    reconstruction_loss_cont + reconstruction_loss_class + kl_loss
                )

            grads = tape.gradient(total_loss, self.trainable_weights)
            self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

            self.total_loss_tracker.update_state(total_loss)
            self.reconstruction_loss_tracker_cont.update_state(reconstruction_loss_cont)
            self.reconstruction_loss_tracker_class.update_state(
                reconstruction_loss_class
            )
            self.kl_loss_tracker.update_state(kl_loss)

            return {m.name: m.result() for m in self.metrics}

        def test_step(self, data):
            """Step run during validation."""
            if isinstance(data, tuple):
                data = data[0]

            z_mean, z_log_var, reconstruction = self(data)
            beta = 500
            reconstruction_loss_cont = tf.reduce_mean(
                beta * losses.mean_squared_error(data[:, :4], reconstruction[0])
            )
            reconstruction_loss_class = tf.reduce_mean(
                beta * losses.binary_crossentropy(data[:, 4:], reconstruction[1])
            )
            kl_loss = tf.reduce_mean(
                tf.reduce_sum(
                    -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)),
                    axis=1,
                )
            )
            total_loss = reconstruction_loss_cont + reconstruction_loss_class + kl_loss

            mean_diff = tf.reduce_mean(data[:, :4] - reconstruction[0])

            return {
                "loss": total_loss,
                "reconstruction_loss_cont": reconstruction_loss_cont,
                "reconstruction_loss_class": reconstruction_loss_class,
                "kl_loss": kl_loss,
                "mean_diff": mean_diff,
            }

    log_dir = "logs/"
    vae = VAE(encoder, decoder)
    optimizer = optimizers.Adam(learning_rate=0.0005)
    vae.compile(optimizer=optimizer)
    hist = vae.fit(
        train_data,
        epochs=1,
        batch_size=16,
        shuffle=True,
        validation_data=(test_data, test_data),
        callbacks=[
            tf.keras.callbacks.TensorBoard(log_dir),  # log metrics
            hp.KerasCallback(log_dir, hparams),  # log hparams
        ],
    )
    return hist.history["val_loss"]

In [31]:
def run(run_dir, hparams):
    with tf.summary.create_file_writer(run_dir).as_default():
        hp.hparams(hparams)  # record the values used in this trial
        val_loss = build_model(hparams)[0]
        tf.summary.scalar(VAL_LOSS, val_loss, step=1)

In [32]:
import os

# Define the logs directory
logs_dir = "logs/hparam_tuning_new2/"

# Check if the logs directory exists
if os.path.exists(logs_dir):
    # Get a list of all files in the logs directory
    log_files = os.listdir(logs_dir)

    # Filter the list to only include files with the naming convention "run-<number>"
    run_numbers = [
        int(file.split("-")[1]) for file in log_files if file.startswith("run-")
    ]

    # Determine the next run number by finding the maximum run number and incrementing it by 1
    next_run_num = max(run_numbers) + 1 if run_numbers else 0
else:
    # If the logs directory doesn't exist, start from run-0
    next_run_num = 0

print("Next run number:", next_run_num)

Next run number: 1


In [33]:
session_num = next_run_num

for embedded in HP_NUM_embedded.domain.values:
    for num_units in HP_NUM_UNITS.domain.values:
        for num_units1 in HP_NUM_UNITS1.domain.values:
            for drop_out_enc in HP_BOOL_BATCH.domain.values:
                if drop_out_enc:  # Only execute if drop_out_enc is True
                    for drop_out_rate in HP_DROP_OUT_RATE.domain.values:
                        hparams = {
                            HP_NUM_embedded: embedded,
                            HP_BOOL_BATCH: drop_out_enc,
                            HP_NUM_UNITS: num_units,
                            HP_NUM_UNITS1: num_units1,
                            HP_DROP_OUT_RATE: drop_out_rate,
                        }
                        run_name = "run-%d" % session_num
                        print("--- Starting trial: %s" % run_name)
                        print({h.name: hparams[h] for h in hparams})
                        run("logs/hparam_tuning_new2/" + run_name, hparams)
                        session_num += 1
                else:
                    hparams = {
                        HP_NUM_embedded: embedded,
                        HP_NUM_UNITS: num_units,
                        HP_NUM_UNITS1: num_units1,
                        HP_BOOL_BATCH: drop_out_enc,
                        HP_DROP_OUT_RATE: 0,
                    }
                    run_name = "run-%d" % session_num
                    print("--- Starting trial: %s" % run_name)
                    print({h.name: hparams[h] for h in hparams})
                    run("logs/hparam_tuning_new2/" + run_name, hparams)
                    session_num += 1

--- Starting trial: run-1
{'num_embedded': 2, 'num_units': 8, 'num_units1': 8, 'bool_batch': False, 'drop_out_rate': 0}


--- Starting trial: run-2
{'num_embedded': 2, 'bool_batch': True, 'num_units': 8, 'num_units1': 8, 'drop_out_rate': 0.1}
--- Starting trial: run-3
{'num_embedded': 2, 'bool_batch': True, 'num_units': 8, 'num_units1': 8, 'drop_out_rate': 0.3}
--- Starting trial: run-4
{'num_embedded': 2, 'bool_batch': True, 'num_units': 8, 'num_units1': 8, 'drop_out_rate': 0.5}
--- Starting trial: run-5
{'num_embedded': 2, 'num_units': 8, 'num_units1': 32, 'bool_batch': False, 'drop_out_rate': 0}
--- Starting trial: run-6
{'num_embedded': 2, 'bool_batch': True, 'num_units': 8, 'num_units1': 32, 'drop_out_rate': 0.1}


KeyboardInterrupt: 