Modelo CNN-LSTM com Otimização de Hiperparâmetros
funcionando com PyTorch e Optuna

In [7]:
import os
import logging
from sklearn.impute import SimpleImputer, KNNImputer
from datetime import datetime
import pandas as pd
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.ensemble import ExtraTreesRegressor
import torch
from pytorch_forecasting import TemporalFusionTransformer, TimeSeriesDataSet
from pytorch_forecasting.metrics import MAE, SMAPE, PoissonLoss, QuantileLoss
from pytorch_forecasting.models.temporal_fusion_transformer.tuning import optimize_hyperparameters

data_hoje = datetime.now().strftime('%d-%m')
inicio_execucao = pd.Timestamp.now()

os.makedirs(f'../logs/{data_hoje}', exist_ok=True)
os.makedirs(f'../plots/{data_hoje}', exist_ok=True)
os.makedirs(f'../best_models/{data_hoje}', exist_ok=True)

logging.basicConfig(filename=f'../logs/{data_hoje}/bilstm_optuna.log', level=logging.INFO, format='- %(message)s')
logging.info('-' * 50)
logging.info(f'{inicio_execucao} - Iniciando o processo de otimização e treinamento do modelo BiLSTM')

df_original = pd.read_csv('../dados_tratados/combinado/Piratininga/Piratininga_tratado_combinado.csv',
                          usecols=['PM2.5', 'Data e Hora', 'PM10', 'Monóxido de Carbono'], low_memory=False)

df_original['Data e Hora'] = pd.to_datetime(df_original['Data e Hora'], format='%Y-%m-%d %H:%M:%S')
df_original.index = df_original['Data e Hora']
df_original.sort_index(inplace=True)

colunas_selecionadas = ['PM2.5', 'PM10', 'Monóxido de Carbono']
logging.info(f"Colunas selecionadas: {colunas_selecionadas}")

df = df_original[colunas_selecionadas]
df = df.loc['2019-01-01':'2022-01-01']

logging.info(f"Período de análise: 2019-01-01 a 2022-01-01")
df = df.apply(pd.to_numeric, errors='coerce')

ModuleNotFoundError: No module named 'numpy.lib.function_base'

In [None]:
def log_imputation(method_name, impute_function, df):
    df_imputed = impute_function(df)
    logging.info(f"Imputação realizada usando: {method_name}")
    return df_imputed


def Mice_imputer(df):
    mice_imputer = IterativeImputer(
        estimator=ExtraTreesRegressor(n_estimators=100, random_state=42),
        max_iter=10,
        random_state=42,
        n_nearest_features=None,
        initial_strategy='mean'
    )

    df_imputed = pd.DataFrame(
        mice_imputer.fit_transform(df),
        columns=df.columns,
        index=df.index
    )

    return df_imputed


def mean_imputer(df):
    imputer = SimpleImputer(strategy='mean')
    df_imputed = pd.DataFrame(
        imputer.fit_transform(df),
        columns=df.columns,
        index=df.index
    )
    return df_imputed


def median_imputer(df):
    imputer = SimpleImputer(strategy='median')
    df_imputed = pd.DataFrame(
        imputer.fit_transform(df),
        columns=df.columns,
        index=df.index
    )
    return df_imputed


def forward_fill_imputer(df):
    df_imputed = df.fillna(method='ffill')
    return df_imputed


def backward_fill_imputer(df):
    df_imputed = df.fillna(method='bfill')
    return df_imputed


def linear_interpolation_imputer(df):
    df_imputed = df.interpolate(method='linear')
    return df_imputed


def exponential_smoothing_imputer(df):
    df_imputed = df.ewm(alpha=0.5).mean()
    return df_imputed


def knn_imputer(df):
    imputer = KNNImputer(n_neighbors=5)
    df_imputed = pd.DataFrame(
        imputer.fit_transform(df),
        columns=df.columns,
        index=df.index
    )
    return df_imputed


# df_imputed = log_imputation('Mean', mean_imputer, df)
# df_imputed = log_imputation('Median', median_imputer, df)
# df_imputed = log_imputation('Forward Fill', forward_fill_imputer, df)
# df_imputed = log_imputation('Backward Fill', backward_fill_imputer, df)
# df_imputed = log_imputation('Linear Interpolation', linear_interpolation_imputer, df)
# df_imputed = log_imputation('Exponential Smoothing', exponential_smoothing_imputer, df)
# df_imputed = log_imputation('KNN', knn_imputer, df)
df_imputed = log_imputation('MICE', Mice_imputer, df)


In [None]:
# Preparando os dados para o TFT
max_prediction_length = 24  # Previsão para as próximas 24 horas
max_encoder_length = 168  # Usar uma semana de dados históricos

df_imputed = df_imputed.reset_index()
df_imputed['time_idx'] = df_imputed.index

# Adicionando variáveis de tempo
df_imputed['hour'] = df_imputed['Data e Hora'].dt.hour
df_imputed['dayofweek'] = df_imputed['Data e Hora'].dt.dayofweek
df_imputed['month'] = df_imputed['Data e Hora'].dt.month

# Definindo variáveis categóricas e contínuas
static_categoricals = []
static_reals = []
time_varying_known_reals = ['hour', 'dayofweek', 'month']
time_varying_unknown_reals = ['PM10', 'Monóxido de Carbono']

# Criando o conjunto de dados do TimeSeriesDataSet
training_cutoff = df_imputed['time_idx'].max() - max_prediction_length

training = TimeSeriesDataSet(
    df_imputed[lambda x: x.time_idx <= training_cutoff],
    time_idx='time_idx',
    target='PM2.5',
    group_ids=['Data e Hora'],
    static_categoricals=static_categoricals,
    static_reals=static_reals,
    time_varying_known_reals=time_varying_known_reals,
    time_varying_unknown_reals=time_varying_unknown_reals,
    max_encoder_length=max_encoder_length,
    max_prediction_length=max_prediction_length,
)

# Criando os conjuntos de validação e teste
validation = TimeSeriesDataSet.from_dataset(training, df_imputed, predict=True, stop_randomization=True)
batch_size = 128
train_dataloader = training.to_dataloader(train=True, batch_size=batch_size, num_workers=0)
val_dataloader = validation.to_dataloader(train=False, batch_size=batch_size * 10, num_workers=0)

# Função para otimizar os hiperparâmetros
def optimize_tft(trial):
    params = {
        "hidden_size": trial.suggest_int("hidden_size", 16, 128),
        "lstm_layers": trial.suggest_int("lstm_layers", 1, 5),
        "dropout": trial.suggest_float("dropout", 0.1, 0.5),
        "learning_rate": trial.suggest_float("learning_rate", 1e-5, 1e-2, log=True),
        "hidden_continuous_size": trial.suggest_int("hidden_continuous_size", 8, 64),
        "attention_head_size": trial.suggest_int("attention_head_size", 1, 4),
    }
    
    model = TemporalFusionTransformer.from_dataset(
        training,
        **params
    )
    
    # Treinamento e avaliação
    early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, verbose=False, mode="min")
    trainer = pl.Trainer(
        max_epochs=100,
        gpus=0,
        gradient_clip_val=0.1,
        callbacks=[early_stop_callback],
    )
    trainer.fit(
        model,
        train_dataloaders=train_dataloader,
        val_dataloaders=val_dataloader,
    )
    
    return trainer.callback_metrics["val_loss"].item()

# Otimização dos hiperparâmetros
study = optuna.create_study(direction="minimize")
study.optimize(optimize_tft, n_trials=50)

# Obtendo os melhores hiperparâmetros
best_params = study.best_params
print("Melhores hiperparâmetros:", best_params)

# Criando o modelo final com os melhores hiperparâmetros
final_model = TemporalFusionTransformer.from_dataset(
    training,
    **best_params
)

# Treinamento do modelo final
early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=50, verbose=False, mode="min")
trainer = pl.Trainer(
    max_epochs=500,
    gpus=0,
    gradient_clip_val=0.1,
    callbacks=[early_stop_callback],
)
trainer.fit(
    final_model,
    train_dataloaders=train_dataloader,
    val_dataloaders=val_dataloader,
)

# Salvando o modelo final
torch.save(final_model.state_dict(), f'../best_models/{data_hoje}/best_tft_model.pth')

# Avaliação do modelo
predictions = final_model.predict(val_dataloader)
actuals = torch.cat([y for x, y in iter(val_dataloader)])

mae = MAE()(predictions, actuals)
smape = SMAPE()(predictions, actuals)

print(f"MAE: {mae:.4f}")
print(f"SMAPE: {smape:.4f}")