# Import Libraries

In [None]:
import os
import time
import json

import numpy as np
import pandas as pd
import tensorflow as tf

from tensorflow.keras import layers, models, backend as K
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras import layers, models, regularizers

# Data Acquisition

In [2]:
df_train = pd.read_csv("../data/normalized_train.csv")
df_val = pd.read_csv("../data/normalized_val.csv")
df_test = pd.read_csv("../data/normalized_test.csv")

# Auxiliar Code

In [None]:
class SparseAutoencoder(models.Model):
    def __init__(self, input_shape, bottleneck_dim, encoder_layers, decoder_layers, sparsity_param=0.05, beta=1e-3):
        super(SparseAutoencoder, self).__init__()
        self.sparsity_param = sparsity_param
        self.beta = beta

        self.encoder = models.Sequential(name="encoder")
        self.encoder.add(layers.Input(shape=(input_shape,)))

        for units in encoder_layers:
            self.encoder.add(layers.Dense(units, activation="relu", dtype="float32"))
            self.encoder.add(layers.BatchNormalization(dtype="float32"))

        self.bottleneck = layers.Dense(bottleneck_dim, activation="sigmoid", activity_regularizer=regularizers.l1(1e-5), dtype="float32", name="bottleneck")
        self.encoder.add(self.bottleneck)

        self.decoder = models.Sequential(name="decoder")
        self.decoder.add(layers.Input(shape=(bottleneck_dim,)))

        for units in decoder_layers:
            self.decoder.add(layers.Dense(units, activation="relu", dtype="float32"))
            self.decoder.add(layers.BatchNormalization(dtype="float32"))

        self.decoder.add(layers.Dense(input_shape, activation="linear", dtype="float32"))

    def call(self, x, training=False):
        encoded = self.encoder(x, training=training)
        decoded = self.decoder(encoded, training=training)

        self.add_loss(self.sparsity_loss(encoded))
        
        return decoded

    def get_encoder(self):
        return tf.keras.models.Model(inputs=self.encoder.inputs, outputs=self.encoder.get_layer("bottleneck").output)

    def get_decoder(self):
        return self.decoder

    def sparsity_loss(self, encoded):
        rho_hat = tf.reduce_mean(encoded, axis=0)
        kl_div = tf.reduce_sum(
            self.sparsity_param * tf.math.log(self.sparsity_param / (rho_hat + 1e-10)) +
            (1 - self.sparsity_param) * tf.math.log((1 - self.sparsity_param) / (1 - rho_hat + 1e-10))
        )
        return self.beta * kl_div


In [None]:
def train_models_for_config(config, df_train, df_val, df_test, input_dim=15):
    print(f"Treinando configuração: {config['name']}\n")

    total_results = []

    for latent_dim in range(input_dim, 0, -1):
        print(f"Treinando modelo com espaço latente = {latent_dim}\n")

        autoencoder = SparseAutoencoder(
            input_shape=input_dim,
            bottleneck_dim=latent_dim,
            encoder_layers=config["encoder_layers"],
            decoder_layers=config["decoder_layers"],
            sparsity_param=0.05,
            beta=1e-3
        )

        autoencoder.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
            loss="mse"
        )

        lr_schedule = tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=10, min_lr=1e-5)
        early_stopping = tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=20, restore_best_weights=True)

        history = autoencoder.fit(
            df_train.drop(columns=["FuelType"]).values.astype("float32"),
            df_train.drop(columns=["FuelType"]).values.astype("float32"),
            epochs=1000,
            batch_size=32,
            shuffle=True,
            validation_data=(
                df_val.drop(columns=["FuelType"]).values.astype("float32"),
                df_val.drop(columns=["FuelType"]).values.astype("float32")
            ),
            callbacks=[lr_schedule, early_stopping],
            verbose=1
        )

        num_epochs = len(history.history["loss"])

        test_reconstructed = autoencoder.predict(df_test.drop(columns=["FuelType"]).values.astype("float32"))
        mse = np.mean((df_test.drop(columns=["FuelType"]).values.astype("float32") - test_reconstructed) ** 2)

        encoder = autoencoder.get_encoder()
        model_path = f"results/{config['name']}_latent_{latent_dim}"
        encoder.save(f"{model_path}_encoder.keras")

        encoder_size = os.path.getsize(f"{model_path}_encoder.keras") / 1024

        result = {
            "Configuração": config['name'],
            "Espaço Latente": latent_dim,
            "Tamanho Encoder (KB)": encoder_size,
            "Número de Épocas": num_epochs,
            "Erro MSE": mse
        }

        pd.DataFrame(result, index=[0]).to_csv(f"{model_path}.csv", index=False)

        total_results.append(result)

        print(f"Modelo com latente={latent_dim} salvo!\n")

    pd.DataFrame(total_results).to_csv(f"results/{config['name']}_summary.csv", index=False)

    print("\nTreinamento concluído para todas as configurações!\n")

# Symmetrical

In [None]:
config = {
    "name": "sparse_autoencoder_symmetric_c1",
    "encoder_layers": [128, 64, 32],
    "decoder_layers": [32, 64, 128]
}

train_models_for_config(config, df_train, df_val, df_test)

In [None]:
config = {
    "name": "sparse_autoencoder_symmetric_c2",
    "encoder_layers": [256, 128, 64],
    "decoder_layers": [64, 128, 256]
}

train_models_for_config(config, df_train, df_val, df_test)

# Asymmetrical

In [None]:
config = {
    "name": "sparse_autoencoder_asymmetric_c1",
    "encoder_layers": [128, 64],
    "decoder_layers": [64, 128, 256]
}

train_models_for_config(config, df_train, df_val, df_test)

In [None]:
config = {
    "name": "sparse_autoencoder_asymmetric_c2",
    "encoder_layers": [128, 64],
    "decoder_layers": [64, 128, 256, 512]
}

train_models_for_config(config, df_train, df_val, df_test)

In [None]:
config = {
    "name": "sparse_autoencoder_asymmetric_c3",
    "encoder_layers": [256, 128],
    "decoder_layers": [128, 256, 512]
}

train_models_for_config(config, df_train, df_val, df_test)

In [None]:
config = {
    "name": "sparse_autoencoder_asymmetric_c4",
    "encoder_layers": [256, 128],
    "decoder_layers": [128, 256, 512, 1024]
}

train_models_for_config(config, df_train, df_val, df_test)