In [9]:
from tensorflow.keras.models import Model
from tensorflow import keras
from tensorflow.keras import layers
import pandas as pd
import numpy as np
from tensorflow.keras.callbacks import EarlyStopping, CSVLogger
from typing import Tuple
from sklearn.preprocessing import StandardScaler, MinMaxScaler


### Normalization

In [10]:
def normalize(data: pd.DataFrame, features: list, feature_range: Tuple = None, scaler=None) -> (pd.DataFrame, any):
        if feature_range is not None and scaler is None:
            scaler = MinMaxScaler(feature_range=feature_range)
            scaler.fit(data)
        elif feature_range is None and scaler is None:
            scaler = MinMaxScaler(feature_range=(-1, 1))
            scaler.fit(data)

        data = scaler.transform(data)

        return pd.DataFrame(columns=features, data=data), scaler

### Split handler

In [12]:
def create_splits(input_data: pd.DataFrame, without_val: bool = False) -> Tuple:
        """
        Creates train val test split of the data provided
        @param input_data: The input data which should be split
        @param without_val: If true only a test and a train set will be created
        @return: A tuple containing all the sets.
        """

        # No validation set will be created
        if without_val:
            return train_test_split(input_data, test_size=0.2, random_state=1, shuffle=True)

        # Create validation set
        X_dev, X_val = train_test_split(input_data, test_size=0.05, random_state=1, shuffle=True)
        X_train, X_test = train_test_split(X_dev, test_size=0.25, random_state=1, shuffle=True)
        return X_train, X_val, X_test

### Create custom sampling layer

In [4]:
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

### Create VAE 

In [5]:
class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder: Model = encoder
        self.decoder: Model = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)
            reconstruction_loss_fn = keras.losses.MeanSquaredError()
            reconstruction_loss = reconstruction_loss_fn(data, reconstruction)
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            total_loss = reconstruction_loss + kl_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        return self.decoder(z)

### Adjust hyperparameters

In [14]:
latent_dim = 500 # Latent dims
data_path = "" # Path to you data

### Load data

In [15]:
data = pd.read_csv(data_path, sep='\t', index_col=0)
# truncate everything after the 4th decimal
data = np.trunc(1000 * data) / 1000

train_data, val_data, test_data = SplitHandler.create_splits(input_data=data, without_val=False)

train_data, scaler = normalize(train_data, features=train_data.columns)
val_data, _ = normalize(val_data, features=val_data.columns, scaler=scaler)
test_data, _ = normalize(data=test_data, features=test_data.columns, scaler=scaler)

FileNotFoundError: [Errno 2] No such file or directory: ''

### Build model

In [16]:
input_dimensions = train_data.shape[1]

encoder_inputs = keras.Input(shape=(input_dimensions,))
x = layers.Dense(units=input_dimensions / 2, activation="relu")(encoder_inputs)
x = layers.Dense(units=input_dimensions / 3, activation="relu")(x)
x = layers.Dense(units=input_dimensions / 4, activation="relu")(x)

z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])
encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
encoder.summary()

latent_inputs = keras.Input(shape=(latent_dim,))
x = layers.Dense(units=input_dimensions / 4, activation="relu")(latent_inputs)
x = layers.Dense(units=input_dimensions / 3, activation="relu")(x)
x = layers.Dense(units=input_dimensions / 2, activation="relu")(x)

decoder_outputs = layers.Dense(units=input_dimensions, activation="relu")(x)
decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()

vae: VAE = VAE(encoder, decoder)
vae.compile(optimizer=keras.optimizers.Adam())

# vae.summary()

callbacks = []

early_stop = EarlyStopping(monitor="reconstruction_loss",
                           mode="min", patience=5,
                           restore_best_weights=True)
callbacks.append(early_stop)

csv_logger = CSVLogger(os.path.join(base_path, 'training.log'),
                       separator='\t')
callbacks.append(csv_logger)

history = vae.fit(train_data,
                  callbacks=callbacks,
                  validation_data=(val_data, val_data),
                  epochs=500, batch_size=128)


NameError: name 'train_data' is not defined