In [None]:
# requeriment
#%capture
!pip cache purge
!mkdir -p ~/tmp_pip
!TMPDIR=~/tmp_pip pip install --no-cache-dir neuralforecast statsforecast optuna plotly scikit-learn lightning ipywidgets openpyxl nbformat

# Funções auxiliares

In [12]:
from sklearn.metrics import root_mean_squared_error
from neuralforecast.losses.pytorch import HuberLoss
from pytorch_lightning.loggers import CSVLogger
from neuralforecast import NeuralForecast
from IPython.display import clear_output
from neuralforecast.models import LSTM
from datetime import datetime
from torch.optim import AdamW
from pathlib import Path
import pandas as pd
import numpy as np
import optuna
import shutil
import yaml
import os

In [13]:
# Classe para facilitar o uso de cores no terminal
class CoresTerminal:
    """Contém códigos ANSI para colorir o output no terminal."""
    VERMELHO = '\033[91m'
    VERDE = '\033[92m'
    FIM = '\033[0m'


In [None]:
from torch.optim.lr_scheduler import StepLR
import torch
import time

torch.set_float32_matmul_precision('high')

# VARIÁVEIS GLOBAIS PARA LOGGING DE MÉTRICAS DE TREINAMENTO
score_media_simples = 0.0
score_rmse = 0.0
training_duration = 0.0
tempo_total = 0.0
avg_training_time = 0.0


# ===================================================================
# FUNÇÃO OBJECTIVE (RESPONSABILIDADE: TREINAR E AVALIAR)
# ===================================================================
def objective(trial: optuna.trial.Trial) -> float:
    """
    Treina e avalia um modelo para um conjunto de hiperparâmetros.
    Salva os artefatos do trial em uma pasta temporária.
    """
    global score_media_simples, score_rmse, training_duration, avg_training_time, tempo_total

    string_saidas = f"\nTeste do Modelo: {treino_id}_({ano_teste})\n"
    clear_output()
    print(string_saidas)
    # 1) Hiperparâmetros a serem otimizados
    #steps_options = [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000]
    #encoder_n_layers = trial.suggest_int("encoder_n_layers", 3, 8)
    #encoder_hidden_size = trial.suggest_categorical("encoder_hidden_size", [64, 128, 192, 256])
    #decoder_layers = trial.suggest_int("decoder_layers", 1, 4)
    #decoder_hidden_size = trial.suggest_categorical("decoder_hidden_size", [64, 128, 192, 256])
    #weight_decay = trial.suggest_categorical("weight_decay", [1e-5, 1e-4, 1e-2])
    #steps = trial.suggest_categorical("steps", steps_options)
    learning_rate = trial.suggest_float("learning_rate", 1e-4, 1e-2, log=True)

    # Parâmetros fixos
    batch_size = 32
    dropout = 0.3
    steps = 1000
    encoder_n_layers= 4
    encoder_hidden_size= 192
    decoder_layers= 1
    decoder_hidden_size= 64
    weight_decay= 0.0001
    # --- Gerenciamento de Logs por Trial ---
    trial_log_dir = log_dir_base / f"trial_{trial.number}"
    if trial_log_dir.exists():
        shutil.rmtree(trial_log_dir)
    trial_log_dir.mkdir(parents=True, exist_ok=True)
    
    csv_logger = CSVLogger(save_dir=str(trial_log_dir.parent), name=trial_log_dir.name)

    # 2) Instanciação do modelo LSTM
    model = LSTM(
        h=h,
        input_size=input_size,
        batch_size=batch_size,
        scaler_type="revin",
        encoder_dropout=dropout,
        encoder_n_layers=encoder_n_layers,
        encoder_hidden_size=encoder_hidden_size,
        decoder_layers=decoder_layers,
        decoder_hidden_size=decoder_hidden_size,
        futr_exog_list=exog_list,
        learning_rate=learning_rate,
        max_steps=steps,
        loss=HuberLoss(delta=1.0),
        optimizer=AdamW,
        optimizer_kwargs={"weight_decay": weight_decay},
        lr_scheduler=StepLR,
        lr_scheduler_kwargs={"step_size": int(steps * 0.5), "gamma": 0.1},
        logger=csv_logger,
        random_seed=42
    )

    # 3) Treinamento
    nf = NeuralForecast(models=[model], freq="YE")
    
    if trial.number != 0:
        print(f"Resultados da Trial anterior:")
        print(f"  - WMAPE: {score_media_simples:.4f}")
        print(f"  - RMSE:  {score_rmse:.4f}")
        print(f"  - Tempo Total Treino: {training_duration:.2f}s")
        print(f"  - Tempo médio de treino até agora: {avg_training_time:.2f}s")
        print(f"  - Tempo estimado para conclusão (baseado na média): {(num_trials - trial.number) * avg_training_time / 60:.2f} minutos\n")

    print(f"Iniciando treinamento do Trial {trial.number}...")
    start_time = time.time()
    nf.fit(df=train_ds)
    end_time = time.time()

    # Cálculo do tempo de treinamento
    training_duration = end_time - start_time
    tempo_total += training_duration
    avg_training_time = tempo_total / (trial.number + 1)

    # 4) Avaliação
    combined_df = evaluate_simple_forecast(
        model=nf,
        train_df=train_ds,
        test_df=val_ds,
        string_saidas = string_saidas
    )
    

    # 5) Cálculo da métrica
    actual = combined_df["y"]
    predicted = combined_df["LSTM"]

    # 5) Cálculo das métricas
    if np.isfinite(combined_df["LSTM"]).all():
        score_media_simples = calcular_media_wmape_simples(combined_df)
        score_rmse = root_mean_squared_error(actual, predicted)
        score = score_media_simples
    else:
        # Penalidade para previsões inválidas (NaN, inf)
        score = 1e10 
        print("Previsão inválida encontrada. Atribuindo score de penalidade.")


    # --- SALVAMENTO TEMPORÁRIO DOS ARTEFATOS DO TRIAL ---
    trial_artifact_path = temp_model_dir / f"trial_{trial.number}"
    nf.save(path=str(trial_artifact_path), overwrite=True)
    
    # --- LÓGICA DE ESPERA ATIVA (POLLING) ---
    # CORREÇÃO: Construímos o caminho para version_0 explicitamente, pois sabemos que será sempre este.
    log_version_dir = trial_log_dir / "version_0"
    metrics_path = log_version_dir / "metrics.csv"
    hparams_path = log_version_dir / "hparams.yaml"
    
    file_found = False
    # Tenta encontrar o arquivo por até 10 segundos
    for _ in range(100):
        if metrics_path.exists():
            file_found = True
            break
        time.sleep(0.1)

    # Copia os arquivos de log se eles foram encontrados
    if file_found:
        shutil.copy(metrics_path, trial_artifact_path / "metrics.csv")
        shutil.copy(hparams_path, trial_artifact_path / "hparams.yaml")
    else:
        print(f"AVISO: Arquivo metrics.csv não encontrado em {log_version_dir} para o trial {trial.number}")

    # Anexa informações ao trial para o callback poder acessá-las
    trial.set_user_attr("artifact_path", str(trial_artifact_path))
    
    # Converte a coluna de data para string para garantir que seja serializável em JSON.
    combined_df_serializable = combined_df.copy()
    if 'ds' in combined_df_serializable.columns:
        combined_df_serializable['ds'] = combined_df_serializable['ds'].astype(str)
    trial.set_user_attr("prediction_df", combined_df_serializable.to_dict())
    return score

In [15]:
def teste_modelo(local, csv_dir, dataset, train_ds, val_ds, test_ds, comentario, nome_dataset, string_saidas = """""", h = 1):
    string_saidas += f"\nTeste do Modelo: {local}"
    clear_output()
    print(string_saidas)
    # carregamento dos hiperparâmetros do melhor modelo salvo
    path_para_hparams = f"{local}/hparams.yaml" 
    params_extraidos = get_hyperparameters_from_yaml(path_para_hparams)

    # carregamento do modelo salvo
    model = NeuralForecast.load(path=f"{local}")
    
    # --- Execução da avaliação do dados de teste --- 
    predictions = evaluate_simple_forecast(
        model=model,
        train_df=dataset,
        test_df=test_ds
    )

    dados_teste = predictions[['unique_id', 'ds', 'y', 'LSTM', 'diferença_%']].copy()
    dados_teste.columns = ['unique_id', 'ds', 'y', 'y_pred', 'diferença_%'] # Renomeando as colunas para o formato esperado
    dados_teste['flag'] = 'teste'
    dados_teste['ds'] = pd.to_datetime(dados_teste['ds']).fillna(dados_teste['ds'])
    dados_teste['ds'] = (pd.to_datetime(dados_teste['ds'], errors='coerce').fillna(pd.Timestamp.now()).dt.strftime('%Y-%m-%dT%H:%M:%S'))


    # --- Execução da avaliação do dados de validação --- 
    if val_ds is not None:
        dados_validacao = evaluate_simple_forecast(
            model=model,
            train_df=dataset,
            test_df=val_ds
        )
        dados_validacao = dados_validacao[['unique_id', 'ds', 'y', 'LSTM', 'diferença_%']].copy()
        dados_validacao.columns = ['unique_id', 'ds', 'y', 'y_pred', 'diferença_%'] # Renomeando as colunas para o formato esperado
        dados_validacao['flag'] = 'validacao'
        dados_validacao['ds'] = pd.to_datetime(dados_validacao['ds']).fillna(dados_validacao['ds'])
        dados_validacao['ds'] = (pd.to_datetime(dados_validacao['ds'], errors='coerce').fillna(pd.Timestamp.now()).dt.strftime('%Y-%m-%dT%H:%M:%S'))
    
    # --- Execução da avaliação do dados de treino --- 
    dados_treino = dataset[['unique_id', 'ds', 'y']].copy()
    dados_treino['y'] = round(np.expm1(dados_treino['y']),2)
    # identifica o ano de termino do treino (pode ser o inicio da validação ou dos testes)
    if val_ds is not None:
        data_inicio = val_ds['ds'].min() # identifica o periodo de inicio da validação
        print(f"Periodo de inicio da Validação: {data_inicio}\n")
    else:
        data_inicio = dados_teste['ds'].min() # identifica o periodo de inicio dos teste
        print(f"Periodo de inicio dos Testes: {data_inicio}\n")
    dados_treino = dados_treino[dados_treino['ds'] < data_inicio] # pega apenas as datas anteriores a data de inicio
    dados_treino['y_pred'] = "-"  # "- " para previsões no treino, nesse ponto elas não exitem. (preenche com valor padão que sera substituido)


    # Extrai as previsões in-sample do modelo para o conjunto de treino
    insample_df = model.predict_insample(step_size=h)
    # Inversão da transformação log1p
    insample_df['y'] = np.expm1(insample_df['y'])
    insample_df['LSTM'] = np.expm1(insample_df['LSTM'])
    # Filtrar para garantir contexto suficiente
    contexto =  params_extraidos['input_size'] # verifica o tanho da janela de contexto.
    # remove os primeiros registros que não tem contexto suficiente para ter uma previsão válida.
    min_cutoff_valido = pd.to_datetime(insample_df['cutoff'].min()) + pd.DateOffset(years=contexto) 
    insample_df = insample_df[insample_df['cutoff'] >= min_cutoff_valido].copy()

    insample_preds = insample_df[['unique_id', 'ds', 'LSTM']].copy()
    insample_preds = insample_preds.rename(columns={'LSTM': 'y_pred_temp'})
    # Faz o merge dos dados de treino com as previsões disponíveis no insample_df
    dados_treino = dados_treino.merge(insample_preds, on=['unique_id', 'ds'], how='left')
    # Atualiza a coluna y_pred: onde tiver previsão do insample_df usa ela, senão mantém o "-"
    dados_treino['y_pred'] = dados_treino['y_pred_temp'].combine_first(dados_treino['y_pred'])
    # Remove a coluna temporária
    dados_treino = dados_treino.drop(columns=['y_pred_temp'])

    dados_treino['diferença_%'] = "-"  # "-" para a diferença percentual entre predito e real, nesse ponto elas não exitem.
    dados_treino['flag'] = 'treino'
    dados_treino['ds'] = (pd.to_datetime(dados_treino['ds'], errors='coerce').fillna(pd.Timestamp.now()).dt.strftime('%Y-%m-%dT%H:%M:%S'))

    dados_treino['diferença_%'] = dados_treino.apply(calcula_diferenca_pct, axis=1)
    dados_treino['flag'] = 'treino'
    dados_treino['ds'] = (pd.to_datetime(dados_treino['ds'], errors='coerce').fillna(pd.Timestamp.now()).dt.strftime('%Y-%m-%dT%H:%M:%S'))

    # Prepara o dataframe final unindo treino, validação e teste
    colunas = [
        'treino_id',    # Identificador do treino
        'unique_id',    # Identificador de cada série
        'ds',           # Data de cada registro
        'y',            # Valor real
        'y_pred',       # Valor predito pelo modelo
        'diferença_%',  # Valor da diferença percentual entre o valor predito e o real
        'flag',         # Indica se foi usado em treino, validação ou teste
        'dataset',      # Nome do dataset usado
        'modelo',       # Nome do algoritmo (LSTM, Random Forest, etc)
        'comentario',   # Anotações adicionais (pode ser vazio)
        'data_treino'   # Data que o modelo foi treinado
    ]

    # Verificar existência do arquivo
    # Se o dataframe com os dados de previsão não existir, ele será criado
    if not os.path.exists(f"{csv_dir}"):
        # Criar DataFrame vazio com as colunas especificadas
        df = pd.DataFrame(columns=colunas)

        # Salvar o DataFrame vazio como CSV
        df.to_csv(f"{csv_dir}", index=False)
        print(f"Arquivo {csv_dir} criado com DataFrame vazio.")
    else:
        df = pd.read_csv(f"{csv_dir}")
        df['ds'] = (pd.to_datetime(df['ds'], errors='coerce').fillna(pd.Timestamp.now()).dt.strftime('%Y-%m-%dT%H:%M:%S'))
        print(f"Arquivo {csv_dir} já existe. Nenhuma ação necessária.")

    
    # Concatena os dados de treino, validação e teste em um único DataFrame. Considerando a posibilidade de val_ds ser None
    if val_ds is not None:
        # Concatena os dados de treino, validação e teste
        dados_completos = pd.concat([dados_treino, dados_validacao, dados_teste], ignore_index=True)
    else:
        # Concatena apenas os dados de treino e teste
        dados_completos = pd.concat([dados_treino, dados_teste], ignore_index=True)
    

    dados_completos['treino_id'] = local  # Identificador do treino
    dados_completos['dataset'] = nome_dataset  # Nome do dataset usado
    dados_completos['modelo'] = "LSTM"
    dados_completos['data_treino'] = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')

    dados_completos['comentario'] =f"""{local}
    Modelo LSTM treinado com dados de {train_ds['ds'].dt.year.min()} a {train_ds['ds'].dt.year.max()}, validado com os dados de {val_ds['ds'].dt.year.min()} e testado com os dados de {test_ds['ds'].dt.year.min()}.
    Para esse treinamento foi utilizado o dataset {nome_dataset}.
    {comentario}
    O modelo foi treinado com as seguintes configurações:
    input_size: {params_extraidos['input_size']}

    encoder_n_layers = {params_extraidos['encoder_n_layers']}
    learning_rate: {params_extraidos['learning_rate']}
    encoder_hidden_size: {params_extraidos['encoder_hidden_size']}
    decoder_layers: {params_extraidos['decoder_layers']}
    decoder_hidden_size: {params_extraidos['decoder_hidden_size']}
    batch_size: {params_extraidos['batch_size']}
    dropout: {params_extraidos['dropout']}
    weight_decay: {params_extraidos['weight_decay']}
    steps: {params_extraidos['steps']}
    """

    # Reordenar as colunas
    nova_ordem_colunas = ['treino_id', 'unique_id', 'ds', 'y', 'y_pred', 'diferença_%', 'flag', 'dataset', 'modelo', 'comentario', 'data_treino']
    dados_completos = dados_completos[nova_ordem_colunas]
    # Inclui as novas previsões no arquivo existente
    df_final = pd.concat([df, dados_completos], ignore_index=True)
    # Salva o DataFrame atualizado de volta ao CSV
    df_final.to_csv(f"{csv_dir}", index=False)
    clear_output()
    print(string_saidas)
    print(f"Previsões salvas no arquivo {csv_dir}")

In [16]:
# Calcula a diferença percentual absoluta entre previsão e valor real, ela é usada durante para gerar o dataframe com as previsões.
# E como o dataframe de previsões inclui os dados de teste e validação, ela precisa lidar com o periodo de treino que não tem previsão,
# os anos referentes a janela de contexto e nesse caso retorna "-".
def calcula_diferenca_pct(row):
    if row['y_pred'] == "-" or row['y_pred'] is None:
        return "-"
    try:
        y_pred_float = float(row['y_pred'])
        y_real = float(row['y'])
        # Evita divisão por zero
        if y_real == 0:
            return "-"
        # Calcula diferença percentual absoluta
        diff_pct = abs((y_pred_float - y_real) / y_real) * 100
        return round(diff_pct, 2)
    except:
        return "-"

In [17]:
def get_hyperparameters_from_yaml(yaml_path: str) -> dict:
    """
    Lê um arquivo hparams.yaml, extrai um conjunto específico de hiperparâmetros
    e os retorna em um dicionário.

    Esta função lida com os arquivos hparams.yaml gerados pelo PyTorch Lightning,
    que podem conter tags de objetos Python e requerem o uso de 'UnsafeLoader'.

    Args:
        yaml_path (str): O caminho para o arquivo hparams.yaml.

    Returns:
        dict: Um dicionário contendo os hiperparâmetros extraídos.
              Ex: {'learning_rate': 0.001, 'batch_size': 32, ...}
        
        None: Retorna None se o arquivo não for encontrado, se houver um erro de
              leitura do YAML ou se uma chave esperada não for encontrada.
    """
    # Tenta ler o arquivo YAML
    try:
        with open(yaml_path, 'r', encoding='utf-8') as f:
            config_data = yaml.load(f, Loader=yaml.UnsafeLoader)
    except FileNotFoundError:
        print(f"ERRO: Arquivo não encontrado no caminho: {yaml_path}")
        return None
    except yaml.YAMLError as e:
        print(f"ERRO: Ocorreu um problema ao processar o arquivo YAML: {e}")
        return None

    # Tenta extrair os parâmetros específicos
    try:
        hyperparameters = {
            'encoder_n_layers': config_data['encoder_n_layers'],
            'learning_rate': config_data['learning_rate'],
            'input_size': config_data['input_size'],
            'encoder_hidden_size': config_data['encoder_hidden_size'],
            'decoder_layers': config_data['decoder_layers'],
            'decoder_hidden_size': config_data['decoder_hidden_size'],
            'batch_size': config_data['batch_size'],
            'dropout': config_data['encoder_dropout'],  # Mapeado de 'encoder_dropout'
            'weight_decay': config_data['optimizer_kwargs']['weight_decay'], # Mapeado de um dicionário aninhado
            'steps': config_data['max_steps'] # Mapeado de 'max_steps'
        }
        return hyperparameters
    except KeyError as e:
        print(f"ERRO: A chave de hiperparâmetro esperada {e} não foi encontrada no arquivo.")
        return None

### Resumo das Métricas WMAPE

O objetivo de cada função é avaliar o desempenho do seu modelo de previsão sob uma ótativa diferente.

#### 1. Média Simples dos WMAPEs (`calcular_media_wmape_simples`)

* **Como Funciona:**
    1.  O modelo olha para cada município de forma isolada.
    2.  Calcula o erro percentual (WMAPE) apenas para aquele município.
    3.  Depois de fazer isso para todos os municípios, ele simplesmente tira a **média aritmética** de todos esses erros individuais.

* **Principal Objetivo:** Responder à pergunta: **"Em média, qual é o desempenho do meu modelo em cada localidade, tratando todas como igualmente importantes?"**
    * É uma visão "democrática". Um erro de 20% em um município pequeno tem o mesmo peso na nota final que um erro de 20% em um município gigante. Use esta métrica para garantir que seu modelo seja consistente e não tenha um desempenho muito ruim em localidades específicas, independentemente do tamanho delas.

#### 2. WMAPE da Soma Total ou Agregado (`calcular_wmape_agregado`)

* **Como Funciona:**
    1.  Primeiro, ele ignora as divisões por município.
    2.  Soma **toda a produção real** de todos os municípios para obter a safra total real do estado.
    3.  Soma **toda a produção prevista** de todos os municípios para obter a safra total prevista.
    4.  Calcula um único WMAPE comparando esses dois totais (Total Real vs. Total Previsto).

* **Principal Objetivo:** Responder à pergunta: **"No final, o quão perto eu cheguei de acertar a safra TOTAL do estado de Minas Gerais?"**
    * Esta é a visão do "resultado final". Ela foca no número mais importante para o negócio. Esta métrica permite que erros se cancelem (prever a mais em um município pode compensar prever a menos em outro), refletindo a precisão da previsão no nível mais alto.

#### 3. Média Ponderada dos WMAPEs (`calcular_wmape_ponderado`)

* **Como Funciona:**
    1.  Começa como a primeira métrica: calcula o erro (WMAPE) individualmente para cada município.
    2.  Porém, antes de calcular a média final, ele atribui um **"peso de importância"** a cada município, baseado no seu volume de produção.
    3.  A nota final é uma **média ponderada**, onde os erros dos municípios que produzem mais têm um impacto muito maior no resultado.

* **Principal Objetivo:** Responder à pergunta: **"Qual é o desempenho médio do meu modelo, considerando que um erro nos municípios grandes é muito mais prejudicial que um erro nos pequenos?"**
    * É uma visão de "impacto no negócio". Ela mede a performance média, mas de uma forma que reflete a importância econômica de cada localidade. É um excelente meio-termo entre a visão puramente local (Métrica 1) и a visão puramente agregada (Métrica 2).

In [18]:
# -----------------------------------------------------------------------------
# Função base (helper) que calcula o WMAPE para um par de vetores
# -----------------------------------------------------------------------------
def wmape_base(actual: np.ndarray, predicted: np.ndarray) -> float:
    """Calcula o WMAPE para um conjunto de valores reais e previstos."""
    # Garante que não haverá divisão por zero. Se a soma real for 0, o erro é 0.
    sum_actual = np.sum(np.abs(actual))
    if sum_actual == 0:
        return 0.0
    return np.sum(np.abs(predicted - actual)) / sum_actual

# -----------------------------------------------------------------------------
# Métrica 1: Média Simples dos WMAPEs por Município
# -----------------------------------------------------------------------------
def calcular_media_wmape_simples(df: pd.DataFrame) -> float:
    """
    Calcula o WMAPE para cada município (unique_id) individualmente e
    retorna a média simples desses WMAPEs.
    Trata todos os municípios como igualmente importantes.
    """
    # Agrupa por município e calcula o WMAPE para cada um
    wmapes_individuais = df.groupby('unique_id')[['y', 'LSTM']].apply(
        lambda g: wmape_base(g['y'].values, g['LSTM'].values)
    )
    
    # Retorna a média dos WMAPEs calculados
    return wmapes_individuais.mean()

# -----------------------------------------------------------------------------
# Métrica 2: WMAPE da Soma Total (Agregado) - O que você já usa
# -----------------------------------------------------------------------------
def calcular_wmape_agregado(df: pd.DataFrame) -> float:
    """
    Soma todas as previsões e todos os valores reais do dataframe inteiro
    e calcula um único WMAPE sobre esses totais.
    Foca na acurácia da previsão agregada (total do estado).
    """
    actual_total = df['y'].values
    predicted_total = df['LSTM'].values
    
    return wmape_base(actual_total, predicted_total)

# -----------------------------------------------------------------------------
# Métrica 3: Média Ponderada dos WMAPEs por Município
# -----------------------------------------------------------------------------
def calcular_wmape_ponderado(df: pd.DataFrame) -> float:
    """
    Calcula o WMAPE para cada município e depois faz uma média ponderada,
    onde o peso de cada município é sua participação na produção total.
    """
    # 1. Calcula o WMAPE para cada município
    wmapes_individuais = df.groupby('unique_id')[['y', 'LSTM']].apply(
        lambda g: wmape_base(g['y'].values, g['LSTM'].values)
    )
    
    # 2. Calcula o peso de cada município (sua produção total / produção geral)
    producao_por_municipio = df.groupby('unique_id')['y'].sum()
    producao_total = df['y'].sum()
    
    # Evita divisão por zero se a produção total for 0
    if producao_total == 0:
        return 0.0
        
    pesos = producao_por_municipio / producao_total
    
    # 3. Multiplica os WMAPEs pelos pesos e soma o resultado
    wmape_final_ponderado = np.sum(wmapes_individuais * pesos)
    
    return wmape_final_ponderado

In [19]:
def evaluate_simple_forecast(
    model,
    train_df: pd.DataFrame,
    test_df: pd.DataFrame,
    model_name: str = 'LSTM',  
    string_saidas: str = "",
    inteiro: bool = False,
    log_transformed: bool = True
) -> pd.DataFrame:
    """
    Executa avaliação de previsão considerando o nome do modelo dinâmico.
    """
    
    # Limpeza e Logs (conforme seu original)
    if string_saidas:
        clear_output()
        print(string_saidas)

    print(f"Iniciando a previsão com o modelo: {model_name}...")

    # --- 1. Preparação dos Dados ---
    history_df = train_df.copy()
    future_data_to_evaluate = test_df.copy()

    # Conversão de datas
    history_df['ds'] = pd.to_datetime(history_df['ds'])
    future_data_to_evaluate['ds'] = pd.to_datetime(future_data_to_evaluate['ds'])

    # Filtra histórico para evitar vazamento de dados
    cutoff = future_data_to_evaluate['ds'].min()
    history_df = history_df[history_df['ds'] < cutoff]
  
    # Prepara df futuro para predict (sem y)
    futr_df = future_data_to_evaluate.drop(columns=["y"], errors='ignore')

    # --- 2. Geração da Previsão ---
    forecasts_df = model.predict(
        df=history_df,
        futr_df=futr_df
    )
    
    # Verificação de segurança: A coluna do modelo existe?
    if model_name not in forecasts_df.columns:
        # Tenta fallback inteligente ou erro
        cols_disponiveis = [c for c in forecasts_df.columns if c not in ['unique_id', 'ds']]
        raise ValueError(f"A coluna '{model_name}' não foi encontrada na previsão. Colunas disponíveis: {cols_disponiveis}")

    print("Previsão concluída. Combinando com dados reais...")

    # --- 3. Pós-processamento e Avaliação ---
    evaluation_df = forecasts_df.merge(
        future_data_to_evaluate[["unique_id", "ds", "y"]],
        on=["unique_id", "ds"],
        how="inner"
    )

    # Tratamento de Log (Reversão)
    if log_transformed:
        evaluation_df['y'] = np.expm1(evaluation_df['y'])
        evaluation_df[model_name] = np.expm1(evaluation_df[model_name]) # Usa model_name

    # Arredondamento
    if inteiro:
        evaluation_df[model_name] = evaluation_df[model_name].round().astype(int)
        evaluation_df['y'] = evaluation_df['y'].round().astype(int)

    # Cálculo da Diferença Percentual (passando o nome da coluna)
    evaluation_df = calcular_diferenca_percentual(evaluation_df, col_pred=model_name)

    print("Avaliação finalizada.")
    return evaluation_df


def calcular_diferenca_percentual(df: pd.DataFrame, col_pred: str = 'LSTM') -> pd.DataFrame:
    """
    Calcula a diferença percentual dinamicamente baseada na coluna predita.
    """
    df = df.copy()
    
    # Fórmula: ((Predito - Real) / Real) * 100
    # Adicionei tratamento para divisão por zero com np.where
    df['diferença_%'] = np.where(
        df['y'] == 0, 
        0,  # Se real for 0, define erro como 0 para evitar inf (ajuste conforme regra de negócio)
        ((df[col_pred] - df['y']) / df['y']) * 100
    )
    
    return df

In [20]:
# ===================================================================
# FÁBRICA DE CALLBACKS
# Responsabilidade: Criar uma função de callback configurada com o caminho correto de onde salvar o modelo.
# ===================================================================
def create_save_best_model_callback(destination_path: Path):
    """
    Esta função é uma "fábrica": ela recebe o caminho de destino e retorna
    a função de callback que o Optuna vai usar.
    """
    
    # Esta é a função que o Optuna realmente vai chamar.
    # Note que ela tem acesso à variável 'destination_path' da função externa.
    # ===================================================================
    # FUNÇÃO CALLBACK (RESPONSABILIDADE: SALVAR O MELHOR MODELO)
    # ===================================================================
    def callback(study: optuna.study.Study, trial: optuna.trial.FrozenTrial):
        """
        Callback que é acionado após cada trial para salvar o melhor modelo.
        """
        if study.best_trial.number == trial.number:
            print(f"\n✨ Novo melhor trial encontrado: #{trial.number} com Score: {trial.value:.4f}")
            
            source_path_str = trial.user_attrs.get("artifact_path")
            prediction_df_dict = trial.user_attrs.get("prediction_df")

            if not source_path_str or not prediction_df_dict:
                print("   -> AVISO: Não foi possível encontrar os artefatos para salvar.")
                return

            source_path = Path(source_path_str)
            # *** A MUDANÇA CRÍTICA ESTÁ AQUI ***
            # Usa o caminho que foi "injetado" quando a função foi criada
            final_destination = destination_path 

            print(f"   -> Salvando artefatos de '{source_path}' para '{final_destination}'...")

            if final_destination.exists():
                shutil.rmtree(final_destination)
            final_destination.mkdir(parents=True, exist_ok=True) # Adicionado exist_ok=True por segurança

            shutil.copytree(source_path, final_destination, dirs_exist_ok=True)
            
            prediction_df = pd.DataFrame(prediction_df_dict)
            prediction_df.to_excel(final_destination / "valores_predicao.xlsx", index=False)
            
            print("   -> Modelo e artefatos salvos com sucesso!")

    # A fábrica retorna a função de callback configurada
    return callback

In [21]:
from typing import Tuple
def filtrar_datasets_por_integridade(
    train_df: pd.DataFrame, 
    val_df: pd.DataFrame, 
    test_df: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Filtra os datasets de treino, validação e teste para manter apenas os
    municípios ('unique_id') que possuem dados completos no período de validação.

    Args:
        train_df (pd.DataFrame): DataFrame de treino.
        val_df (pd.DataFrame): DataFrame de validação. Usado como referência para a verificação.
        test_df (pd.DataFrame): DataFrame de teste.

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: Uma tupla contendo os 
        DataFrames de treino, validação e teste devidamente filtrados.
    """
    print("--- Iniciando verificação de integridade dos dados ---")
    
    # Usa o DataFrame de validação como referência para a integridade.
    # 1. Conta o número de timestamps únicos que cada município DEVERIA ter.
    timestamps_esperados = len(val_df['ds'].unique())
    if timestamps_esperados == 0:
        print(f"{CoresTerminal.VERMELHO}ALERTA: O conjunto de validação está vazio. Nenhum filtro será aplicado.{CoresTerminal.FIM}")
        return train_df, val_df, test_df

    # 2. Conta quantos timestamps únicos cada município realmente POSSUI.
    contagem_por_id = val_df.groupby('unique_id')['ds'].nunique()
    
    # 3. Identifica os municípios com dados completos.
    ids_completos = contagem_por_id[contagem_por_id == timestamps_esperados].index
    
    # 4. Compara com o total de municípios para ver se a filtragem é necessária.
    ids_originais = val_df['unique_id'].nunique()
    
    if len(ids_completos) < ids_originais:
        total_removido = ids_originais - len(ids_completos)
        
        print(f"{CoresTerminal.VERMELHO}"
              "----------------------------------------------------------------------\n"
              "ALERTA: Inconsistência de dados encontrada.\n"
              f"Foram encontrados {total_removido} de {ids_originais} municípios com dados INCOMPLETOS no período de validação.\n"
              "Todos os datasets serão filtrados para manter apenas os municípios com dados completos.\n"
              f"----------------------------------------------------------------------{CoresTerminal.FIM}")
        
        # Filtra TODOS os dataframes para manter apenas os municípios com dados completos.
        # O uso de .copy() evita o SettingWithCopyWarning do pandas.
        train_filtrado = train_df[train_df['unique_id'].isin(ids_completos)].copy()
        val_filtrado = val_df[val_df['unique_id'].isin(ids_completos)].copy()
        test_filtrado = test_df[test_df['unique_id'].isin(ids_completos)].copy()
        
        # Se após a filtragem não sobrar nenhum dado, interrompe a execução.
        if val_filtrado.empty:
            print(f"{CoresTerminal.VERMELHO}ALERTA: Após a filtragem, não restaram dados válidos. Retornando DataFrames vazios.{CoresTerminal.FIM}")
            return train_filtrado, val_filtrado, test_filtrado
        
        print(f"Filtragem concluída. {len(ids_completos)} municípios mantidos.")
        return train_filtrado, val_filtrado, test_filtrado
        
    else:
        print(f"{CoresTerminal.VERDE}Verificação concluída. Todos os {ids_originais} municípios possuem dados completos no período de validação.{CoresTerminal.FIM}")
        return train_df, val_df, test_df

In [22]:
def get_dataset(dataset_file):
    """
    Carrega, processa e limpa o dataset, garantindo que todas as séries temporais
    para cada 'unique_id' estejam completas no intervalo de datas do dataset.
    """
    dataset = pd.read_csv(dataset_file)

    # --- Aplica transformação logarítmica ---
    log_cols = [
        'Área colhida (Hectares)',
        'target',
        'precomediocafe', # a partir de novembro 2025
        'Área destinada à colheita (Hectares)'
    ]
    existing_log_cols = [col for col in log_cols if col in dataset.columns]
    if existing_log_cols:
        dataset[existing_log_cols] = dataset[existing_log_cols].apply(lambda x: np.log1p(x))

    # --- Renomeação e formatação para o padrão NeuralForecast ---
    dataset = dataset.rename(columns={
        "municipio": "unique_id",
        "ano": "ds",
        "target": "y"
    })
    # Ordenar por unique_id e ds (ano)
    dataset = dataset.sort_values(by=["unique_id", "ds"]).reset_index(drop=True)
    dataset['ds'] = pd.to_datetime(dataset['ds'].astype(str) + '-12-31')
    return dataset

# Finetuning

In [23]:
dataset_path = "Dataset/V38/2020"

Local_treino =  "Altitude_Dataset_Unico_Ajuste" # Nome do arquivo final com todas as predições
nome_dataset = "V38"
intervalo_validacao = [
    [2019, 2019] # ano de inicio e fim da validação
]
num_trials = 150  # Número de trials para cada execução do Optuna
h = 1 # horizonte de previsão

janela_contexto = None # Tamanho da janela de contexto (input_size) / Use None para usar o máximo possível

comentario = """Estou usando o dataset V38 para o ano de 2020 para fazer o ajuste de hiperparametros.
"""

In [24]:
# Lista de configurações iniciais (cada dicionário é um conjunto de parâmetros)
trials_iniciais = [
    {
        'learning_rate': 0.00014552082561940832,
    }
]

In [25]:
for dataset_file in os.listdir(dataset_path):
    treino_id = dataset_file[:-4]
    print(treino_id)
    dataset = get_dataset(f"{dataset_path}/{dataset_file}")

    # coleta a lista variaveis exogenas
    exog_list = [col for col in dataset.columns.tolist() if col not in ["ds", "y", "unique_id"]]
    
    for ano_val in intervalo_validacao: # Para cada ano que sera usado como validação
        # --- Preparação do dataset de treino, validação e teste ---
        ano_teste = ano_val[1] + h # Define o ano de teste com base no horizonte. 

        train_ds = dataset[dataset['ds'].dt.year < ano_val[0]].copy()
        val_ds = dataset[(dataset['ds'].dt.year >= ano_val[0]) & (dataset['ds'].dt.year < ano_teste)].copy()
        if h == 1:
            test_ds = dataset[dataset['ds'].dt.year == ano_teste].copy()
        else:
            # O calculo é ano_teste+h-1, pois se ano_teste= 2024, h=2, daria 2024+2=2026, mas o correto é 2024 e 2025, ou seja, 2024+2-1=2025.
            test_ds = dataset[(dataset['ds'].dt.year >= ano_teste) & (dataset['ds'].dt.year <= ano_teste+h-1)].copy()
        # Aplica a função de filtragem para garantir a consistência
        train_ds, val_ds, test_ds = filtrar_datasets_por_integridade(
            train_df=train_ds,
            val_df=val_ds,
            test_df=test_ds
        )
        print(f"Dados de Treino entre os anos de {train_ds['ds'].dt.year.min()} e {train_ds['ds'].dt.year.max()}: {len(train_ds)} registros")
        print(f"Dados de Validação entre os anos de {val_ds['ds'].dt.year.min() if val_ds is not None else 'N/A'} e {val_ds['ds'].dt.year.max() if val_ds is not None else 'N/A'}: {len(val_ds) if val_ds is not None else 0} registros")
        print(f"Dados de Teste entre os anos de {test_ds['ds'].dt.year.min()} e {test_ds['ds'].dt.year.max()}: {len(test_ds)} registros") 
        # --- Configuração das Pastas ---
        # Pasta final para o melhor modelo
        base_dir = Path(f"./Treinos/{Local_treino}/Modelos/{treino_id}_({ano_teste})") 
        csv_dir = Path(f"./Treinos/{Local_treino}/{Local_treino}.csv")
        base_dir.mkdir(parents=True, exist_ok=True)
        best_model_callback = create_save_best_model_callback(base_dir) # define a pasta correta para a função de callback

        # Pasta para logs de cada trial (será gerenciada dentro da objective)
        log_dir_base = Path("./Logs")

        # Pasta temporária para salvar o modelo de CADA trial
        temp_model_dir = Path("./temp_models")
        temp_model_dir.mkdir(exist_ok=True)

        # --- Configuração do treinamento ---
        # define a janela de contexto (input_size)
        if janela_contexto is not None:
            input_size = janela_contexto # Usa valor fixo definido no início
        else:
            input_size = train_ds['ds'].nunique() - h  # tamanho da janela de entrada (número de períodos anteriores usados para prever o próximo)

        # --- Otimização de Hiperparâmetros com Optuna ---
        optuna_db_dir = Path(f"./Treinos/{Local_treino}/optuna_db")
        optuna_db_dir.mkdir(parents=True, exist_ok=True)

        # Usa o ID do treino e o ano de teste para garantir que cada loop tenha seu próprio DB exclusivo
        study_name = f"{treino_id}_{ano_teste}"

        # Define o caminho do arquivo .db
        db_file = optuna_db_dir / f"{study_name}.db"
        
        # Cria a URL de conexão para o SQLite (necessário converter Path para string)
        storage_url = f"sqlite:///{db_file}"

        print(f"Armazenando histórico do Optuna em: {storage_url}")

        study = optuna.create_study(
            study_name=study_name,
            storage=storage_url,
            direction="minimize",
            load_if_exists=True
        )
        print(f"Treinamento do Modelo: {treino_id}_({ano_teste})")
        # Enfileirar trials iniciais para serem avaliados primeiro
        for params in trials_iniciais:
            study.enqueue_trial(params)
            
        try:
            study.optimize(
                objective,
                n_trials=num_trials, 
                n_jobs=1,
                callbacks=[best_model_callback] 
            )

            # --- Teste do melhor modelo salvo ---
            teste_modelo(base_dir, csv_dir, dataset, train_ds, val_ds, test_ds, comentario, nome_dataset, h=h)
        finally:
            # --- LIMPEZA ---
            # Garante que a pasta temporária dentro de Logs seja apagada ao final, mesmo que ocorra algum erro durante a otimização.
            if temp_model_dir.exists():
                print(f"Limpando arquivos temporários em: {log_dir_base} e {temp_model_dir}")
                shutil.rmtree(temp_model_dir, ignore_errors=True)
                shutil.rmtree(log_dir_base, ignore_errors=True)  

            treino_path = Path(f"./Treinos/{Local_treino}/Modelos")
            for modelo in os.listdir(treino_path):
                print(f"{modelo}\n\n)")
                path_para_hparams = f"{treino_path}/{modelo}/hparams.yaml" 
                params_extraidos = get_hyperparameters_from_yaml(path_para_hparams)
                print("Hiperparâmetros do melhor modelo salvo:")
                for chave, valor in params_extraidos.items():
                    print(f" - {chave}: {valor}")


Teste do Modelo: Treinos/Altitude_Dataset_Unico_Ajuste/Modelos/V38_(2020)
Previsões salvas no arquivo Treinos/Altitude_Dataset_Unico_Ajuste/Altitude_Dataset_Unico_Ajuste.csv
Limpando arquivos temporários em: Logs e temp_models
V38_(2020)

)
Hiperparâmetros do melhor modelo salvo:
 - encoder_n_layers: 4
 - learning_rate: 0.0010330837083315822
 - input_size: 6
 - encoder_hidden_size: 192
 - decoder_layers: 1
 - decoder_hidden_size: 64
 - batch_size: 32
 - dropout: 0.3
 - weight_decay: 0.0001
 - steps: 1000
