# Import packages

In [None]:
import numpy as np
import tensorflow as tf
import keras
import os, sys
import random

from sklearn.model_selection import train_test_split
from tensorflow import keras
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense, BatchNormalization, Layer

sys.path.insert(1, str(os.path.abspath(os.path.join(os.getcwd(), os.pardir))) + "\\Timestamp2Vec\\")
from helper_functions import *

SEED = 123
random.seed(SEED)

# Data preparation

In [None]:
data_location = os.path.join(os.path.join(os.path.join(os.environ['USERPROFILE']), 'Desktop'), 'data_thesis')
data = np.load(data_location + "/vectorized_dates.npy", allow_pickle=True)

In [None]:
# transform the Booleans to int
data = np.asarray(data, dtype="float64")
np.shape(data)

### Train and Test set

In [None]:
# create train and test split, using SEED
train_data, test_data, _, _ = train_test_split(
    data, data, test_size=0.2, random_state=SEED
)
train_data.shape

In [None]:
# free up space
del data

In [None]:
# normalize data to [0, 1]
# obtain the min and max value
min_val = np.amin(train_data, axis=0)
max_val = np.amax(train_data, axis=0)

# take half of the training set for "testing"

train_data = train_data[:round(len(train_data) / 5)]
test_data = test_data[:round(len(test_data) / 5)]


# normalize the train and test data
train_data = normalize(train_data)
test_data = normalize(test_data)

# store tensors on CPU to save enough space on GPU
with tf. device("cpu:0"):
    train_data = tf.cast(train_data, tf.float32)
    test_data = tf.cast(test_data, tf.float32)

# Variational Autoencoder model

### Model parameters

In [None]:
# LATENT_DIM = 6
X_SHAPE = train_data[2].shape[0]
EPOCHS = 3
BATCH_SIZE = 256

### VAE

### Sampling layer

In [None]:
class Sampling(Layer):
    # Sampling layer of the VAE, creation of the latent variable z
    # The sampling layer uses as distribution a normal distribution

    def call(self, inputs):
        # obtain the mean and the logvar's of each dimension
        z_mean, z_log_var = inputs
        # get the batchsize
        batch = tf.shape(z_mean)[0]
        # get the dimension of the data
        dim = tf.shape(z_mean)[1]
        # sample random values from the normal distribution
        epsilon = tf.keras.backend.random_normal(shape =(batch, dim))
        # perform the sample step
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

#### VAE Class

In [None]:
class VariationalAutoEncoder(Model):
  def __init__(self, encoder, decoder, X_SHAPE):
    super(VariationalAutoEncoder, self).__init__()
    self.encoder = encoder
    self.decoder = decoder
    self.X_shape = X_SHAPE

    
  def train_step(self, x):
    if isinstance(x, tuple):
            x = x[0]
    with tf.GradientTape() as tape:
      # map to latent space and obtain z_mean, z_log_var, z
      z_mean, z_log_var, z = self.encoder(x)
      # decode z to obtain the reconstruction
      decoded = self.decoder(z)
      # obtain the reconstruction loss
      reconstruction_loss = tf.reduce_mean(
              keras.losses.mean_squared_error(x, decoded)
            )
      reconstruction_loss *= X_SHAPE
      # obtain the kl_loss
      kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
      kl_loss = tf.reduce_mean(kl_loss)
      total_loss = reconstruction_loss + kl_loss
    grads = tape.gradient(total_loss, self.trainable_weights)
    self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
    return {
      "loss": total_loss,
      "reconstruction_loss": reconstruction_loss,
      "kl_loss": kl_loss,
      }


  def test_step(self, x):
    if isinstance(x, tuple):
            x = x[0]
    # map to latent space and obtain z_mean, z_log_var, z
    z_mean, z_log_var, z = self.encoder(x)
    # decode z to obtain the reconstruction
    decoded = self.decoder(z)
    # obtain the reconstruction loss
    reconstruction_loss = tf.reduce_mean(
            keras.losses.mean_squared_error(x, decoded)
          )
    reconstruction_loss *= X_SHAPE
    # obtain the kl_loss
    kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
    kl_loss = tf.reduce_mean(kl_loss)
    total_loss = reconstruction_loss + kl_loss
    return {
      "loss": total_loss,
      "reconstruction_loss": reconstruction_loss,
      "kl_loss": kl_loss,
      }

#### Train

In [None]:
def train(variational_autoencoder):
        callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=2, mode = 'min', restore_best_weights=True)
        history = variational_autoencoder.fit(train_data, train_data, 
                epochs=EPOCHS, 
                batch_size=BATCH_SIZE,
                validation_data=(test_data, test_data),
                shuffle=True,
                verbose=0,
                        callbacks=[callback])
        return history

## Architecture

In [None]:
def create_encoder(model_params):
    # define the input of the encoder
    input_encoder = keras.Input(shape = (X_SHAPE,))

    # define the layers of the model
    x = Dense(first_layer[model_params[0]], activation=activations[model_params[3]])(input_encoder)
    x = BatchNormalization()(x)
    x = Dense(second_layer[model_params[1]], activation=activations[model_params[3]])(x)
    x = BatchNormalization()(x)
    x = Dense(third_layer[model_params[2]], activation=activations[model_params[3]])(x)
    
    # obtain the mean and variance to sample from
    z_mean = Dense(latent_dimensions[model_params[4]], name ="z_mean")(x)
    z_log_var = Dense(latent_dimensions[model_params[4]], name ="z_log_var")(x)
    # sample, using the z_mean and z_log_var
    z = Sampling()([z_mean, z_log_var])
    # define the encoder model
    encoder = Model(input_encoder, [z_mean, z_log_var, z], name ="encoder")
    return encoder

In [None]:
def create_decoder(model_params):
    input_decoder = keras.Input(shape = latent_dimensions[model_params[4]],)
    # define the layers of the model
    x = Dense(third_layer[model_params[2]], activation=activations[model_params[3]])(input_decoder)
    x = BatchNormalization()(x)
    x = Dense(second_layer[model_params[1]], activation=activations[model_params[3]])(x)
    x = BatchNormalization()(x)
    x = Dense(first_layer[model_params[0]], activation=activations[model_params[3]])(x)
    decoder_output = Dense(X_SHAPE, activation="sigmoid")(x)

    decoder = Model(input_decoder, decoder_output, name ="decoder")
    return decoder

In [None]:
def create_vae(model_params):
    encoder = create_encoder(model_params)
    decoder = create_decoder(model_params)
    vae = VariationalAutoEncoder(encoder, decoder, X_SHAPE)
    vae.compile(optimizer="adam")
    return vae

# Evolutionary Algorithm

## EA Class

In [None]:
class EA(object):
    def __init__(self, pop_size, first_layer, second_layer, third_layer, activations, latent_dimensions, a, penalty, max_number_params, vae_architecture):
        self.pop_size = pop_size
        self.first_layer  = first_layer
        self.second_layer = second_layer
        self.third_layer = third_layer
        self.activations = activations
        self.latent_dimensions = latent_dimensions
        self.a = a
        self.penalty = penalty
        self.max_number_params = max_number_params
        self.vae_architecture = vae_architecture
        
    
    def evaluate(self, x):
        # obtain the model and optimizer of the individual
        model = self.vae_architecture(x)
        # obtain the history of training
        history = train(model)
        # obtain most recent val_loss
        val_loss = history.history["val_loss"][-1]
        # calculate the number of parameters of the model
        number_param = model.encoder.count_params() + model.decoder.count_params()
        # evaluate the model
        evaluation = val_loss + self.penalty * (number_param / self.max_number_params)
        return evaluation
    
    
    def check_mutation(self, x, option_list):
        if np.around(x) in range(len(option_list)):
            # if the mutated element is an option of the option_list
            return int(np.around(x))
        else:
            # if the mutated element is not an option of the option_list, return a random element
            return np.random.randint(0, len(option_list))
    
    
    def mutate(self, x1, x2, x3):
        # create a mutation based on the 3 individuals of the triple 
        mutated = x1 + (self.a * (x3 - x2))
        # check for each element of the mutation is valid (whether the index is a valid index)
        for i in range(mutated.shape[0]):
            if i < mutated.shape[0] - 2:
                mutated[i] = self.check_mutation(mutated[i], self.first_layer)
            elif i == mutated.shape[0] - 1:
                # activation
                mutated[i] = (self.check_mutation(mutated[i], self.activations))
            else:
                # latent dim
                mutated[i] = (self.check_mutation(mutated[i], self.latent_dimensions))
        return mutated
    
    
    def recombine(self, candidate, mutation):
        # for each element of the candidate
        for i in range(candidate.shape[0]):
            # sample a random integer between 0 and 2 (probability of an element being recombined or not)
            # (0, 2) is used, because (0, 1) only returns 0
            prob = np.random.randint(0, 2)
            # if the sampled integer is equal to 1, recombine. Else do nothing
            if prob == 1:
                candidate[i] = mutation[i]
        return candidate
    
    
    def select_triple(self, ind_x, population):
        # select 3 random individuals of the population (3 random integers)
        x1, x2, x3 = np.random.choice(range(len(population))), np.random.choice(range(len(population))), np.random.choice(range(len(population)))
        # check whether there are 2 identical individuals (indices) (and the inviduals are not the same as the candidate)
        while ind_x in [x1, x2, x3] or x1 in [x2, x3] or x2 == x3:
            # if the same --> select 3 random individuals of the population
            x1, x2, x3 = np.random.choice(range(len(population))), np.random.choice(range(len(population))), np.random.choice(range(len(population)))
        # return the 3 individuals (the individuals with the corresponding indices in the population)
        return population[x1], population[x2], population[x3]
    
    
    
    def step(self, x_old, f_old):
        x_new, f_new = np.copy(x_old), np.copy(f_old)
        for i in range(self.pop_size):
            # for each individual of the population:
            # make a copy of the individual (if the candidate is worse than the original individual,
            # we do not want the individual to be changed already)
            candidate = np.copy(x_new[i])
            # select 3 random individuals of the population, that are not equal to each other and
            # are not equal to the candidate
            x1, x2, x3 = self.select_triple(i, x_new)
            # mutate the triple
            mutated_triple = self.mutate(x1, x2, x3)
            # recombine the mutated triple with the candidate
            candidate = self.recombine(candidate, mutated_triple)
            # train and evaluate the candidate
            f_candidate = self.evaluate(candidate)
            if f_candidate < f_new[i]:
                # if the candidate is better than the original version, replace it in the population
                x_new[i] = candidate
                f_new[i] = f_candidate
        return x_new, f_new

## Helper functions

In [None]:
def init_population(pop_size, first_layer, second_layer, third_layer, activations, latent_dimensions):
    population = []
    for _ in range(pop_size):
        individual = []
        individual.append(np.random.randint(0, len(first_layer)))
        individual.append(np.random.randint(0, len(second_layer)))
        individual.append(np.random.randint(0, len(third_layer)))
        individual.append(np.random.randint(0, len(activations)))
        individual.append(np.random.randint(0, len(latent_dimensions)))
        model_params = np.asarray(individual, dtype=object)
        population.append(model_params)
    return np.asarray(population)

def evaluate_init_pop(ea, population):
    result = []
    for i in range(population.shape[0]):
        print("Individual %d" %i)
        individual = population[i]
        evaluation = ea.evaluate(individual)
        result.append(evaluation)
    return np.asarray(result)

## Model variables

In [None]:
num_generations = 10 # if necessary, please increase the number of generations
pop_size = 10
a = 1.0
penalty = 0.01
max_number_params = 1134

# lists with different options
first_layer = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
second_layer = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
third_layer = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
activations = ["relu", "sigmoid", "tanh", "elu"]
latent_dimensions = [2, 4, 6, 8]

## Run EA

In [None]:
ea = EA(pop_size, first_layer, second_layer, third_layer, activations, latent_dimensions, a, penalty, max_number_params, create_vae)
# init
pop = init_population(pop_size, first_layer, second_layer, third_layer, activations, latent_dimensions)
f = evaluate_init_pop(ea, pop)

# We want to gather populations and values of the best candidates to further
# analyze the algorithm.
populations = []
populations.append(pop)
f_best = [f.min()]

# Run the EA.
for i in range(num_generations):
    print('Generation: {}, best fitness: {:.5f}'.format(i, f.min()))
    current_best = np.where(f == f.min())[0][0]
    np.save("Data/important_variables/population.npy", populations[len(populations) - 1])
    np.save("Data/important_variables/best.npy", pop[current_best])
    pop, f = ea.step(pop, f)
    populations.append(pop)
    if f.min() < f_best[-1]:
        f_best.append(f.min())
    else:
        f_best.append(f_best[-1])
print('FINISHED!')
# obtain the best model of the population of models (with the best fitness)
index_best_parameters = np.where(f == f.min())[0][0]
best_model = create_vae(pop[index_best_parameters])
print("Best model, encoder: ")
print(best_model.encoder.summary())
print("Best model, decoder: ")
print(best_model.decoder.summary())

In [None]:
index_best_parameters = np.where(f == f.min())[0][0]
best_model = create_vae(pop[index_best_parameters])
print("Best model, encoder: ")
print(best_model.encoder.summary())
print("Best model, decoder: ")
print(best_model.decoder.summary())