# Teste para rede LSTM 1_1

## Importar bibliotecas

In [None]:
import torch
from torch import nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

import pandas as pd
import numpy as np

import random
from itertools import product

import pickle
import os
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

In [2]:
subsistema = 'S'

## Importando dados

In [3]:
normalizacao = pd.read_csv(f'../../../Preprocessamento/Normalizacao_{subsistema}.txt', index_col=0)
normalizacao

Unnamed: 0,Carga,Exo_Temperatura
Min,5605.34445,3.1446
Max,24174.16568,39.708899


In [4]:
carga_max = normalizacao.loc['Max', 'Carga']
carga_min = normalizacao.loc['Min', 'Carga']
print(carga_max, carga_min)

24174.16568 5605.3444500000005


In [5]:
dados_teste = pd.read_csv(f'../../../Preprocessamento/{subsistema}_teste.csv', index_col='DataHora')
dados_teste.index = pd.to_datetime(dados_teste.index, format="%Y-%m-%d %H:%M:%S")
dados_teste

Unnamed: 0_level_0,Carga,Temperatura,seg,ter,qua,qui,sex,sab,dom,seno_dia_semana,...,mai,jun,jul,ago,set,out,nov,dez,seno_mes,cosseno_mes
DataHora,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2024-12-29 00:00:00,-0.217995,-0.051151,0,0,0,0,0,0,1,-0.781831,...,0,0,0,0,0,0,0,1,-2.449294e-16,1.0
2024-12-29 01:00:00,-0.305029,-0.084714,0,0,0,0,0,0,1,-0.781831,...,0,0,0,0,0,0,0,1,-2.449294e-16,1.0
2024-12-29 02:00:00,-0.380464,-0.082072,0,0,0,0,0,0,1,-0.781831,...,0,0,0,0,0,0,0,1,-2.449294e-16,1.0
2024-12-29 03:00:00,-0.432379,-0.116373,0,0,0,0,0,0,1,-0.781831,...,0,0,0,0,0,0,0,1,-2.449294e-16,1.0
2024-12-29 04:00:00,-0.467450,-0.121657,0,0,0,0,0,0,1,-0.781831,...,0,0,0,0,0,0,0,1,-2.449294e-16,1.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2025-12-31 19:00:00,-0.062229,0.276786,0,0,1,0,0,0,0,0.974928,...,0,0,0,0,0,0,0,1,-2.449294e-16,1.0
2025-12-31 20:00:00,-0.028580,0.211477,0,0,1,0,0,0,0,0.974928,...,0,0,0,0,0,0,0,1,-2.449294e-16,1.0
2025-12-31 21:00:00,-0.072162,0.211477,0,0,1,0,0,0,0,0.974928,...,0,0,0,0,0,0,0,1,-2.449294e-16,1.0
2025-12-31 22:00:00,-0.190839,0.200865,0,0,1,0,0,0,0,0.974928,...,0,0,0,0,0,0,0,1,-2.449294e-16,1.0


In [6]:
dados_teste.loc[dados_teste[dados_teste['Temperatura'] < -1].index,'Temperatura'] = -1

## Criação dos Datasets

In [None]:
class Dados(Dataset):
    def __init__(self, dados, modelo, entrada_carga = 48, horas_de_previsao = 24, horizonte = 2):
        # Recebendo e tratando os dados
        self.dados = dados

        # Pega dados de carga
        self.carga = self.dados['Carga']

        # Pega dados de temperatura
        self.temperatura = self.dados['Temperatura']
        
        # Pega dados de dia da semana no formato sen cos
        self.lista_dias_semana_S = list(self.dados.columns[2:4])
        self.dias_semana_S = self.dados[self.lista_dias_semana_S]

        # Pega dados de mes no formato sen cos
        self.lista_mes_S = list(self.dados.columns[4:])
        self.meses_S = self.dados[self.lista_mes_S]

        # Configurando outras variaveis
        self.entrada_carga = 48
        self.horas_de_previsao = horas_de_previsao
        self.horizonte = horizonte

        # Separa os dados
        self.entrada = []
        self.previsao = []
        for idx in range((len(self.carga) // 24) - self.entrada_carga // 24 - self.horizonte + 1):
            # Pula dia a dia
            idx = idx * 24
            
            # Salva o historico de carga
            carga_hist = self.carga[idx: idx + self.entrada_carga]

            # Salva os dados de temperatura do dia da previsão
            temperatura_H = self.temperatura[idx + self.entrada_carga + (self.horizonte - 1) * 24 : idx + self.entrada_carga + (self.horizonte) * 24]

            # Salva os dados de temperatura mínima, media e máxima
            temperatura_P = pd.Series([temperatura_H.min(), temperatura_H.mean(), temperatura_H.max()])
            
            # Salva os dados de dia da semana no formato sen cos
            dia_S = self.dias_semana_S.iloc[idx + self.entrada_carga + (self.horizonte - 1) * 24]

            # Salva os dados de mes no formato sen cos
            mes_S = self.meses_S.iloc[idx + self.entrada_carga + (self.horizonte - 1) * 24]

            # Gera as entradas concatenando os dados
            if modelo == 'M1_1':
                previsor_concat = pd.concat([carga_hist]).values
            elif modelo == 'M2_1':
                previsor_concat = pd.concat([carga_hist, temperatura_H]).values
            elif modelo == 'M2_2':
                previsor_concat = pd.concat([carga_hist, temperatura_P]).values
            elif modelo == 'M2_3':
                previsor_concat = pd.concat([carga_hist, dia_S]).values
            elif modelo == 'M2_4':
                previsor_concat = pd.concat([carga_hist, mes_S]).values
            elif modelo == 'M3_1':
                previsor_concat = pd.concat([carga_hist, temperatura_H, dia_S]).values
            elif modelo == 'M3_2':
                previsor_concat = pd.concat([carga_hist, temperatura_H, mes_S]).values
            elif modelo == 'M3_3':
                previsor_concat = pd.concat([carga_hist, temperatura_P, dia_S]).values
            elif modelo == 'M3_4':
                previsor_concat = pd.concat([carga_hist, temperatura_P, mes_S]).values
            elif modelo == 'M3_5':
                previsor_concat = pd.concat([carga_hist, dia_S, mes_S]).values
            elif modelo == 'M4_1':
                previsor_concat = pd.concat([carga_hist, temperatura_H, dia_S, mes_S]).values
            elif modelo == 'M4_2':
                previsor_concat = pd.concat([carga_hist, temperatura_P, dia_S, mes_S]).values
                
            self.entrada.append(previsor_concat)

            # Gera os dados de saida
            saida_carga = self.carga[idx + self.entrada_carga + (self.horizonte - 1) * 24 : idx + self.entrada_carga + (self.horizonte) * 24]
            self.previsao.append(saida_carga)
            
        self.entrada = torch.tensor(np.array(self.entrada), dtype = torch.float32)
        self.previsao = torch.tensor(np.array(self.previsao), dtype = torch.float32)

    def __len__(self):
        return (len(self.carga) // 24) - self.entrada_carga // 24 - self.horizonte + 1

    def __getitem__(self, idx):
        return self.entrada[idx], self.previsao[idx]

## LSTM

In [8]:
class LSTM(nn.Module):
    def __init__(self, n_entrada, n_primeira_camada, n_saida):
        super().__init__()
        self.lstm1 = nn.LSTM(
            input_size=n_entrada,
            hidden_size=n_primeira_camada,
            batch_first=True
        )
        self.features = nn.Sequential(
            nn.Linear(n_primeira_camada, n_saida),
            nn.Tanh(),
        )
    
    def forward(self, X):
        if X.dim() == 2:
            X = X.unsqueeze(1)
        
        saida, _ = self.lstm1(X)
        saida = saida[:, -1, :]
        saida = self.features(saida)
        return saida

## Função treinamento e AG

In [9]:
def treinar(params):
    pass

In [10]:
class AG:
    def __init__(self, limites, tamanho_populacao, geracoes, taxa_mutacao, funcao_custo):
        self.limites = limites
        self.numero_variaveis = len(self.limites)
        self.tamanho_populacao = tamanho_populacao
        self.geracoes = geracoes
        self.taxa_mutacao = taxa_mutacao
        self.funcao_custo = funcao_custo
        self.geracao_atual = 0

        intervalos = [range(a, b+1) for a, b in limites]
        possibilidades = [comb for comb in product(*intervalos)]
        self.dic_historico = {(possibilidade): [] for possibilidade in possibilidades}

        self.populacao = []

    def criar_individuo(self):
        return [round(random.uniform(a - 0.49, b + 0.49)) for (a, b) in self.limites]

    def criar_populacao(self):
        return [self.criar_individuo() for _ in range(self.tamanho_populacao)]

    def avaliar_fitness(self):
        custos = []
        for ind in self.populacao:
            chave = tuple(ind)
            if self.dic_historico[chave]:
                print(f'Indivíduo já avaliado.')
                custos.append(self.dic_historico[chave][0])
            else:
                fit = self.funcao_custo(ind)
                custos.append(fit)
                self.dic_historico[chave].append(fit)
            print(f'Indivíduo: {ind}. Fit: {custos[-1]}')
        self.fitness = [1 / (1 + c) for c in custos]

    def selecao_roleta(self):
        soma = sum(self.fitness)
        pick = random.uniform(0, soma)
        atual = 0
        for ind, fit in zip(self.populacao, self.fitness):
            atual += fit
            if atual > pick:
                return ind
        return self.populacao[-1]

    def cruzamento(self, pai1, pai2):
        n = len(pai1)
        indices_pai1 = random.sample(range(n), k=n//2)
        filho = [pai1[i] if i in indices_pai1 else pai2[i] for i in range(n)]
        return filho

    def mutacao(self, ind):
        for i in range(self.numero_variaveis):
            if random.random() < self.taxa_mutacao:
                a, b = self.limites[i]
                ind[i] = round(random.uniform(a - 0.49, b + 0.49))
        return ind

    def salvar(self, nome_arquivo='ag_estado.pkl'):
        with open(nome_arquivo, 'wb') as f:
            pickle.dump(self, f)
        print(f"Estado salvo em '{nome_arquivo}'.")

    @staticmethod
    def carregar(nome_arquivo='ag_estado.pkl'):
        with open(nome_arquivo, 'rb') as f:
            ag = pickle.load(f)
        print(f"Estado carregado de '{nome_arquivo}'.")
        return ag

    def algoritmo_genetico(self, continuar=False):
        if not continuar or not self.populacao:
            self.populacao = self.criar_populacao()

        for geracao in range(self.geracao_atual, self.geracoes):
            print(f'\nGeração {geracao + 1}')
            self.avaliar_fitness()

            # Elitismo de 2 melhores
            indices_melhores = sorted(range(len(self.fitness)), key=lambda i: self.fitness[i], reverse=True)[:2]
            elite = [self.populacao[i] for i in indices_melhores]
            nova_pop = elite[:]

            while len(nova_pop) < self.tamanho_populacao:
                pai1 = self.selecao_roleta()
                pai2 = self.selecao_roleta()
                filho = self.cruzamento(pai1, pai2)
                filho = self.mutacao(filho)
                nova_pop.append(filho)

            self.populacao = nova_pop
            print(f"Elite: {elite}")

            # Salvar estado atual
            self.salvar()
            self.geracao_atual += 1

## Escolha da melhor configuração

In [11]:
with open("../../../Treinamento/LSTM/LSTM_1_DENSO_1/ag_estado.pkl", "rb") as f:
    ag = pickle.load(f)

In [12]:
configuracao_1 = []
valor_1 = np.inf
configuracao_2 = []
valor_2 = np.inf
for configuracao in ag.dic_historico:
    if len(ag.dic_historico[configuracao]) > 0:
        if len(configuracao_1) == 0:
            configuracao_1 = configuracao
            valor_1 = ag.dic_historico[configuracao][0]
        elif len(configuracao_2) == 0:
            configuracao_2 = configuracao
            valor_2 = ag.dic_historico[configuracao][0]
        elif ag.dic_historico[configuracao][0] < valor_1:
            configuracao_2 = configuracao_1
            valor_2 = valor_1
            configuracao_1 = configuracao
            valor_1 = ag.dic_historico[configuracao][0]
        elif ag.dic_historico[configuracao][0] < valor_2:
            configuracao_2 = configuracao
            valor_2 = ag.dic_historico[configuracao][0]
print(configuracao_1, valor_1, configuracao_2, valor_2)

(5, 3, 0, 1, 2) 0.0037512236197168626 (10, 3, 0, 1, 0) 0.0037634276901371776


In [13]:
melhor_configuracao = 1

## Função teste

In [None]:
def teste_finalistas(conf_id):
    base_dir = f"../../../Treinamento/LSTM/LSTM_1_DENSO_1/conf_{conf_id}"

    arquivos_modelos = sorted([
        f for f in os.listdir(base_dir)
        if f.startswith("modelo_seed_") and f.endswith(".pt")
    ])

    mse_list, mae_list, mape_list, r2_list = [], [], [], []
    mse_n_list, mae_n_list, mape_n_list, r2_n_list = [], [], [], []

    y_true_all = []
    y_pred_all = []

    for arquivo in arquivos_modelos:
        caminho = os.path.join(base_dir, arquivo)
        checkpoint = torch.load(caminho, map_location="cpu", weights_only=False)

        configuracao = checkpoint["configuracao"]
        modelo_entrada, n_neuronios, ano_entrada, taxa_aprendizado, otm = configuracao

        # HIPERPARÂMETROS
        lista_modelo_entrada = (
            [f"M1_{i}" for i in range(1, 2)] +
            [f"M2_{i}" for i in range(1, 5)] +
            [f"M3_{i}" for i in range(1, 6)] + 
            [f"M4_{i}" for i in range(1, 3)]
        )
        lista_modelo_n_entradas = [48, 48+24, 48+3, 48+2, 48+2, 48+24+2, 48+24+2, 48+3+2, 48+3+2, 48+2+2, 48+24+2+2, 48+3+2+2]
        n_entradas = lista_modelo_n_entradas[modelo_entrada]
        modelo_entrada = lista_modelo_entrada[modelo_entrada]

        lista_n_neuronios = [24, round(0.25*(2*n_entradas-24)+24), round(0.5*(2*n_entradas-24)+24), round(0.75*(2*n_entradas-24)+24), 2*n_entradas]
        n_neuronios = lista_n_neuronios[n_neuronios]

        lista_ano_entrada = [2018, 2020, 2022]
        ano_entrada = lista_ano_entrada[ano_entrada]

        lista_taxa_aprendizado = [0.01, 0.001, 0.0001]
        taxa_aprendizado = lista_taxa_aprendizado[taxa_aprendizado]

        lista_otm = ['Adam', 'Rprop', 'RMSprop']
        otm = lista_otm[otm]

        # DATASET
        teste_dataset = Dados(dados_teste, modelo_entrada)
        teste_dataloader = DataLoader(
            teste_dataset,
            batch_size=len(teste_dataset),
            shuffle=False
        )

        # MODELO
        rede = LSTM(n_entrada = n_entradas, n_primeira_camada = n_neuronios, n_saida = 24)
        rede.load_state_dict(checkpoint["state_dict"])
        rede.eval()

        with torch.no_grad():
            for entrada_b, saida_b in teste_dataloader:

                entrada_teste = entrada_b
                saida_teste = saida_b

                if torch.isnan(entrada_teste).any():
                    idx = torch.isnan(entrada_teste).any(dim=1)
                    print(f"NaN na entrada — {arquivo}")
                    print("Índices problemáticos:", idx.nonzero())
                    continue


                previsao = rede(entrada_teste)

                previsao = previsao.cpu().numpy()
                saida_teste = saida_teste.cpu().numpy()

                if np.isnan(previsao).any():
                    print(f"NaN detectado — {arquivo} ignorado")
                    continue

                # FLATTEN
                y_true_n = saida_teste.reshape(-1)
                y_pred_n = previsao.reshape(-1)

                # DESNORMALIZA
                y_true = (y_true_n + 1) * (carga_max - carga_min) / 2 + carga_min
                y_pred = (y_pred_n + 1) * (carga_max - carga_min) / 2 + carga_min

                # MÉTRICAS NORMALIZADAS
                mse_n = mean_squared_error(y_true_n, y_pred_n)
                mae_n = mean_absolute_error(y_true_n, y_pred_n)

                eps = 1e-6
                mape_n = np.mean(np.abs((y_true_n - y_pred_n) / (y_true_n + eps))) * 100
                r2_n = r2_score(y_true_n, y_pred_n)

                # MÉTRICAS DESNORMALIZADAS
                mse = mean_squared_error(y_true, y_pred)
                mae = mean_absolute_error(y_true, y_pred)
                mape = np.mean(np.abs((y_true - y_pred) / (y_true + eps))) * 100
                r2 = r2_score(y_true, y_pred)

                # ARMAZENAMENTO
                mse_n_list.append(mse_n)
                mae_n_list.append(mae_n)
                mape_n_list.append(mape_n)
                r2_n_list.append(r2_n)

                mse_list.append(mse)
                mae_list.append(mae)
                mape_list.append(mape)
                r2_list.append(r2)

                y_true_all.append(y_true)
                y_pred_all.append(y_pred)

    return {
        "normalizado": {
            "mse": np.array(mse_n_list),
            "mae": np.array(mae_n_list),
            "mape": np.array(mape_n_list),
            "r2": np.array(r2_n_list),
        },
        "desnormalizado": {
            "mse": np.array(mse_list),
            "mae": np.array(mae_list),
            "mape": np.array(mape_list),
            "r2": np.array(r2_list),
        },
        "y_true": np.concatenate(y_true_all),
        "y_pred": np.concatenate(y_pred_all)
    }


## Teste

In [15]:
if melhor_configuracao == 1:
    resultados = teste_finalistas('1')
elif melhor_configuracao == 2:
    resultados = teste_finalistas('2')
else:
    print("ERRO")

In [16]:
print("\nRESULTADOS NO TESTE")

print("\nNormalizado:")
for k, v in resultados["normalizado"].items():
    print(f"{k.upper():5s}: {v.mean():.6f} ± {v.std():.6f}")

print("\nDesnormalizado:")
for k, v in resultados["desnormalizado"].items():
    print(f"{k.upper():5s}: {v.mean():.6f} ± {v.std():.6f}")


RESULTADOS NO TESTE

Normalizado:
MSE  : 0.010210 ± 0.000894
MAE  : 0.071766 ± 0.003694
MAPE : 146654.656250 ± 55102.953125
R2   : 0.882974 ± 0.010248

Desnormalizado:
MSE  : 880132.623151 ± 77072.362572
MAE  : 666.302643 ± 34.301197
MAPE : 4.813325 ± 0.240200
R2   : 0.882974 ± 0.010248


In [17]:
with open("resultado.pkl", "wb") as arquivo:
    pickle.dump(resultados, arquivo)