# Codificador Automático

In [107]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scienceplots
import tensorflow as tf
from tensorflow import keras
from tqdm import tqdm

from keras.models import Sequential
from keras.layers import (
    Dense,
    LSTM,
    Input,
    RepeatVector,
    TimeDistributed,
    Flatten,
    Dropout,
)

plt.style.use(["science", "ieee", "notebook"])

plt.rcParams["font.size"] = 12
plt.rcParams["font.family"] = "Times New Roman"
plt.rcParams["figure.figsize"] = (9, 4)

In [129]:
def folder_to_sequence(folder_path: str, window_size: int) -> np.ndarray:
    """
    Função usada para converter os dados .csv de uma pasta
    para o formato aceito pelo autocodificador LSTM.

    Parameters
    ----------
    folder_path: str
        Caminho (pasta) em que os dados estão localizados.

    window_size: int
        Tamanho da janela de dados que o modelo receberá.

    Returns
    -------
    np.ndarray
        Array contendo as sequencias na forma de sequencias
    """

    X = None

    for filename in tqdm(os.listdir("data/" + folder_path)):
        if filename.endswith(".csv"):
            df_old = pd.read_csv("data/" + folder_path + "/" + filename)
            df = calibrator.apply_calibration(df_old)
            sequences = df_to_sequence(df.PT105, window_size)
            if X is None:
                X = sequences
            else:
                X = np.concatenate((X, sequences))
    return X


def df_to_sequence(data: pd.DataFrame, window_size: int) -> np.ndarray:
    x = []
    for i in range(len(data) - window_size):
        row = [[r] for r in data[i : i + window_size]]
        x.append(row)

    return np.array(x)


def train_test_split_ae(
    sequence: np.ndarray, test_size: float =0.25, shuffle: bool =True
    ) -> tuple[np.ndarray, np.ndarray]:
    """
    Função responsável por dividir o conjunto de dados em 
    treino e validação baseado em uma fração `test_size`.

    Parameters
    ----------
    sequence: np.ndarray
        Sequência a ser dividida..

    test_size: float
        Fração que representa o tamanho do conjunto de 
        validação em relação ao tamanho da sequencia de 
        entrada

    shuffle: bool
        Se os conjuntos são embaralhados ou não.

    Returns
    -------
    tuple[np.ndarray, np.ndarray]
        Os valores na ordem Xtrain, Xtest
    """
    n_samples = len(sequence)

    if shuffle:
        idx = np.random.permutation(n_samples)
    else:
        idx = np.arange(n_samples)

    test_set_size = int(n_samples * test_size)
    test_idx = idx[:test_set_size]
    train_idx = idx[test_set_size:]

    X_train = sequence[train_idx]
    X_test = sequence[test_idx]

    return X_train, X_test

In [137]:
class Calibrator:
    def __init__(self):
        files = ["PT105"]
        self.data: dict = {}
        for filename in files:
            df = pd.read_csv(
                f"data/calibracao/{filename}.csv", delimiter=",", decimal=","
            )
            x, y = df.iloc[:, 0], df.iloc[:, 1]
            self.a, self.b = np.polyfit(x, y, 1)
            self.data[filename] = (self.a, self.b)

    def __str__(self) -> str:
        return str(self.data)

    def apply_calibration(self, df: pd.DataFrame) -> pd.DataFrame:
        df["PT105"] = self.a * df["cDAQ1Mod1/ai2"] + self.b
        return df

class MinMaxScaler_AE:
    def __init__(self):
        self.min_val: list[float] = None
        self.max_val: list[float] = None

    def __str__(self) -> str:
        return f"min: {self.min_val}\nmax: {self.max_val}\n"
    
    def fit(self, X_train):
        """
        Ajusta o scaler com base nos valores mínimos e máximos do conjunto de treino.
        
        Args:
            X_train (numpy array): Dados de treino.
        """
        self.min_val = np.min(X_train, axis=0)
        self.max_val = np.max(X_train, axis=0)
    
    def transform(self, X):
        """
        Transforma os dados com base nos mínimos e máximos calculados no treino.
        
        Args:
            X (numpy array): Dados a serem normalizados.
        
        Returns:
            X_scaled (numpy array): Dados normalizados entre 0 e 1.
        """
        return (X - self.min_val) / (self.max_val - self.min_val)
    
    def inverse_transform(self, X_scaled: np.ndarray) -> np.ndarray:
        return X_scaled * (self.max_val - self.min_val) + self.min_val

In [140]:
calibrator = Calibrator()
scaler = MinMaxScaler_AE()

In [None]:
window_size = 20
bigX = folder_to_sequence("VIDRO-B3", window_size=window_size)
print("data shape: ", bigX.shape)

In [165]:
Xtrain, Xval = train_test_split_ae(bigX)
scaler.fit(Xtrain)

Xtrain_N, Xval_N = scaler.transform(Xtrain), scaler.transform(Xval)

In [166]:
# teste 1
autoencoder = Sequential(
    [
        Input((None, 1)),  # Indica que as séries temporais são de apenas uma feature
        LSTM(window_size // 2, return_sequences=False),
        Dropout(0.2),
        RepeatVector(window_size),
        LSTM(window_size // 2, return_sequences=True),
        Dropout(0.2),
        TimeDistributed(Dense(1)),
        Flatten(),
    ]
)

In [None]:
autoencoder.summary()

In [None]:
callbacks = [
    keras.callbacks.ModelCheckpoint(
        "best_model.keras",
        save_best_only=True,
        monitor="val_loss",
    ),
    keras.callbacks.ReduceLROnPlateau(
        monitor="val_loss", factor=0.5, patience=20, min_lr=1e-5
    ),
    keras.callbacks.EarlyStopping(monitor="val_loss", patience=80, verbose=1),
]

autoencoder.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-3), loss="mse")

history = autoencoder.fit(
    Xtrain_N,
    Xtrain_N,
    validation_data = (Xval_N, Xval_N),
    epochs=500,
    batch_size=64,
    shuffle=True,
    callbacks=callbacks,
)

In [None]:
loss = history.history["loss"]
val_loss = history.history["val_loss"]

epochs = np.arange(1, len(loss) + 1)

plt.yscale("log", base=10)
plt.plot(epochs, loss, c="k", label="Treino", lw=3)
plt.plot(epochs, val_loss, c="grey", label="Validação", lw=3)
plt.scatter(
    np.argmin(val_loss), np.min(val_loss), label=f"Mínimo = {np.min(val_loss):.2E}"
)
plt.ylabel("Erro Médio Quadrático")
plt.xlabel("Iteração de Treino")
plt.xlim((0, len(loss)))
plt.legend()

plt.savefig("images/curva-de-aprendizdo.pdf", dpi=300, bbox_inches="tight")

In [None]:
predictions = autoencoder.predict(Xtrain_N)

In [None]:
index: int = window_size*4
plt.figure(figsize=(16, 9))
plt.plot(Xtrain_N[index])
plt.plot(predictions[index])