In [6]:
# =========================================================
# SEÇÃO 1: IMPORTAÇÕES E SETUP GERAL
# =========================================================

In [7]:
import os
import pandas as pd
import numpy as np
import random
import matplotlib.pyplot as plt
import warnings
import itertools

# Libs de Modelagem e Estatística
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.datasets import get_rdataset
import pmdarima as pm
from neuralforecast import NeuralForecast
from neuralforecast.models import NBEATS, MLP, LSTM, Autoformer
from dieboldmariano import dm_test

# Libs de Avaliação
from sklearn.metrics import mean_squared_error, mean_absolute_error
from tqdm import tqdm
from IPython.display import display

warnings.filterwarnings("ignore")

In [8]:
# =========================================================
# SEÇÃO 2: FUNÇÕES AUXILIARES (SETUP E PROCESSAMENTO)
# =========================================================

In [9]:
def definir_seed(seed_value=42):
    np.random.seed(seed_value)
    random.seed(seed_value)
    os.environ['PYTHONHASHSEED'] = str(seed_value)

In [10]:
def salvar_dataset(serie, dataset_name):
    dir_path = "./datasets/bronze"
    os.makedirs(dir_path, exist_ok=True)
    file_path = os.path.join(dir_path, f"{dataset_name.lower()}.csv")
    df = pd.DataFrame({"date": serie.index, "value": serie.values})
    df.to_csv(file_path, index=False)
    print(f"-> Cópia do dataset '{dataset_name}' salva em: {file_path}")

def carregar_serie(nome):
    print(f"Buscando dados de '{nome}' via statsmodels...")
    nome_base = nome.lower()

    if nome_base == "airpassengers":
        df = get_rdataset("AirPassengers", package="datasets").data
        serie = pd.Series(df['value'].values, index=pd.date_range(start="1949-01-01", periods=len(df), freq="MS"),
                          name="AirPassengers")
    elif nome_base == "lynx":
        df = get_rdataset("lynx", package="datasets").data
        serie = pd.Series(df['value'].values, index=pd.date_range(start="1821", periods=len(df), freq="A"), name="Lynx")
    elif nome_base == "co2":
        df = get_rdataset("CO2", package="datasets").data
        df = df.ffill()
        serie = pd.Series(df['value'].values, index=pd.date_range(start="1958-03-29", periods=len(df), freq="MS"),
                          name="CO2")
    elif nome_base == "sunspots":
        df = get_rdataset("sunspots", package="datasets").data
        serie = pd.Series(df['value'].values, index=pd.date_range(start="1749-01-01", periods=len(df), freq="MS"),
                          name="Sunspots")
    elif nome_base == "austres":
        df = get_rdataset("austres", package="datasets").data
        serie = pd.Series(df['value'].values, index=pd.date_range(start="1971-03-01", periods=len(df), freq="QS-MAR"),
                          name="AustralianResidents")
    elif nome_base == "nottem":
        df = get_rdataset("nottem", package="datasets").data
        serie = pd.Series(df['value'].values, index=pd.date_range(start="1920-01-01", periods=len(df), freq="MS"),
                          name="Nottingham")
    else:
        raise ValueError(f"Lógica de download para a série '{nome}' não implementada.")

    salvar_dataset(serie, nome)
    return serie

In [11]:
def dividir_serie_temporal(serie, percentual_treino=0.7, percentual_validacao=0.15):
    """Divide a série em treino, validação e teste."""
    tamanho_total = len(serie)
    ponto_corte_treino = int(tamanho_total * percentual_treino)
    ponto_corte_validacao = int(tamanho_total * (percentual_treino + percentual_validacao))
    treino = serie.iloc[:ponto_corte_treino]
    validacao = serie.iloc[ponto_corte_treino:ponto_corte_validacao]
    teste = serie.iloc[ponto_corte_validacao:]
    return treino, validacao, teste

def preparar_dados_para_neuralforecast(serie, nome_serie):
    df = serie.reset_index()
    df.columns = ['ds', 'y']
    df['unique_id'] = nome_serie
    return df

In [12]:
# =========================================================
# SEÇÃO 3: FUNÇÕES PARA CÁLCULO DE MÉTRICAS E MODELAGEM
# =========================================================

In [13]:
def calcular_metricas(y_true, y_pred, y_train):
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100 if np.all(y_true != 0) else np.inf
    n = len(y_train)
    d = np.sum(np.abs(y_train[1:] - y_train[:-1])) / (n - 1) if n > 1 else np.nan
    mase = np.mean(np.abs(y_true - y_pred)) / d if d is not np.nan and d > 0 else np.inf
    return {'RMSE': rmse, 'MAPE(%)': mape, 'MASE': mase}

In [14]:
# =========================================================
# SEÇÃO 4: PIPELINE AVANÇADO PARA O ARIMA
# =========================================================


In [15]:
def encontrar_melhor_arima_auto(treino_log, freq):
    """Usa auto_arima para encontrar a melhor ordem ARIMA, incluindo sazonalidade."""
    print("Buscando melhor ordem ARIMA com auto_arima...")
    m = 1
    if freq and freq.startswith('M'): m = 12
    elif freq and freq.startswith('Q'): m = 4
    
    auto_arima_model = pm.auto_arima(
        treino_log,
        start_p=1, start_q=1,
        max_p=3, max_q=3,
        m=m,              # Frequência sazonal
        seasonal=True,    # Habilita a busca por parâmetros sazonais (P,D,Q)
        d=None,           # Deixa o auto_arima encontrar o melhor 'd'
        D=None,           # Deixa o auto_arima encontrar o melhor 'D' sazonal
        trace=False,      # Não imprime os passos da busca
        error_action='ignore',
        suppress_warnings=True,
        stepwise=True     # Usa uma busca mais rápida e eficiente
    )
    print(f"Melhor ordem encontrada: {auto_arima_model.order} Sazonal: {auto_arima_model.seasonal_order}")
    return auto_arima_model.order, auto_arima_model.seasonal_order

In [16]:
# =========================================================
# SEÇÃO 5: PIPELINE DE EXPERIMENTO COMPLETO E AVANÇADO
# =========================================================

In [17]:
def executar_experimento(nome_da_serie):
    """Executa o pipeline completo com metodologia Treino-Validação-Teste para todos os 7 modelos."""
    try:
        SEED = 42; definir_seed(SEED)
        MAX_INPUT_SIZE = 24; MAX_STEPS_NEURAL = 100

        serie_original = carregar_serie(nome_da_serie)
        serie_log = np.log(serie_original)
        
        treino_log, validacao_log, teste_log = dividir_serie_temporal(serie_log)
        treino_orig, validacao_orig, teste_orig = dividir_serie_temporal(serie_original)
        
        if len(teste_log) < 2 or len(validacao_log) < 2: return None, None
        freq = serie_original.index.freqstr or pd.infer_freq(serie_original.index)
        if freq is None: return None, None

        resultados_validacao, resultados_teste = [], []
        
        horizonte_total = len(validacao_log) + len(teste_log)
        input_size = min(2 * horizonte_total, MAX_INPUT_SIZE)

        # --- 1. Encontrar a melhor ordem ARIMA e treinar o modelo base ---
        # A chamada à função foi reintroduzida aqui
        melhor_ordem, melhor_ordem_sazonal = encontrar_melhor_arima_auto(treino_log, freq)
        modelo_arima, preds_log_valid_arima, preds_log_teste_arima = None, None, None
        try:
            print(f"Processando: ARIMA")
            modelo_arima = ARIMA(treino_log.asfreq(freq), order=melhor_ordem, seasonal_order=melhor_ordem_sazonal).fit()
            preds_log_futuro = modelo_arima.predict(start=validacao_log.index[0], end=teste_log.index[-1])
            preds_log_valid_arima, preds_log_teste_arima = preds_log_futuro[:len(validacao_log)], preds_log_futuro[-len(teste_log):]
            
            metricas_val = calcular_metricas(validacao_orig.values, np.exp(preds_log_valid_arima), treino_orig.values); metricas_val['modelo'] = 'ARIMA'; resultados_validacao.append(metricas_val)
            metricas_test = calcular_metricas(teste_orig.values, np.exp(preds_log_teste_arima), treino_orig.values); metricas_test['modelo'] = 'ARIMA'; resultados_teste.append(metricas_test)
        except Exception as e:
            print(f"AVISO: Modelo ARIMA falhou: {e}")

        # --- 2. Modelos Neurais Puros ---
        df_treino_log_nf = preparar_dados_para_neuralforecast(treino_log, nome_da_serie)
        for nome_modelo, classe_modelo in {'N-BEATS': NBEATS, 'MLP': MLP, 'LSTM': LSTM, 'Autoformer': Autoformer}.items():
            try:
                print(f"Processando: {nome_modelo} (MIMO)")
                modelo_neural = [classe_modelo(input_size=input_size, h=horizonte_total, max_steps=MAX_STEPS_NEURAL, scaler_type='standard', random_seed=SEED)]
                nf = NeuralForecast(models=modelo_neural, freq=freq)
                nf.fit(df=df_treino_log_nf, verbose=False)
                preds_log_futuro_n = nf.predict()[nome_modelo].values
                preds_log_valid_n, preds_log_teste_n = preds_log_futuro_n[:len(validacao_log)], preds_log_futuro_n[-len(teste_log):]
                metricas_val = calcular_metricas(validacao_orig.values, np.exp(preds_log_valid_n), treino_orig.values); metricas_val['modelo'] = f'{nome_modelo} (MIMO)'; resultados_validacao.append(metricas_val)
                metricas_test = calcular_metricas(teste_orig.values, np.exp(preds_log_teste_n), treino_orig.values); metricas_test['modelo'] = f'{nome_modelo} (MIMO)'; resultados_teste.append(metricas_test)
            except Exception as e: print(f"AVISO: Modelo {nome_modelo} falhou: {e}")
        
        # --- 3. Modelos Híbridos ---
        if modelo_arima is not None:
            residuos_treino_log = modelo_arima.resid
            df_residuos_nf = preparar_dados_para_neuralforecast(residuos_treino_log, "residuos")

            # Híbrido ARIMA+N-BEATS(MIMO)
            try:
                print("Processando: Híbrido ARIMA+N-BEATS(MIMO)")
                modelo_residuos = [NBEATS(input_size=input_size, h=horizonte_total, max_steps=MAX_STEPS_NEURAL, scaler_type='standard', random_seed=SEED)]
                nf_residuos = NeuralForecast(models=modelo_residuos, freq=freq)
                nf_residuos.fit(df=df_residuos_nf, verbose=False)
                preds_residuos_log = nf_residuos.predict()['NBEATS'].values
                preds_hibrido_log_valid = preds_log_valid_arima.values + preds_residuos_log[:len(validacao_log)]
                preds_hibrido_log_teste = preds_log_teste_arima.values + preds_residuos_log[-len(teste_log):]
                metricas_val = calcular_metricas(validacao_orig.values, np.exp(preds_hibrido_log_valid), treino_orig.values); metricas_val['modelo'] = 'Híbrido (MIMO)'; resultados_validacao.append(metricas_val)
                metricas_test = calcular_metricas(teste_orig.values, np.exp(preds_hibrido_log_teste), treino_orig.values); metricas_test['modelo'] = 'Híbrido (MIMO)'; resultados_teste.append(metricas_test)
            except Exception as e: print(f"AVISO: Modelo Híbrido (MIMO) falhou: {e}")

            # Híbrido HyS-MF (Direto)
            try:
                print("Processando: Híbrido HyS-MF (Direto) - Esta etapa é MUITO LENTA.")
                preds_residuos_direto_log = []
                for h_step in tqdm(range(1, horizonte_total + 1), desc="Treinando modelos 'Direto'"):
                    residuos_target = residuos_treino_log.shift(-(h_step-1))
                    df_direto_treino = preparar_dados_para_neuralforecast(residuos_target.dropna(), f"residuos_h{h_step}")
                    modelo_h = [NBEATS(input_size=input_size, h=1, max_steps=MAX_STEPS_NEURAL, scaler_type='standard', random_seed=SEED)]
                    nf_h = NeuralForecast(models=modelo_h, freq=freq)
                    nf_h.fit(df=df_direto_treino, verbose=False)
                    preds_residuos_log.append(nf_h.predict()['NBEATS'].values[0])

                preds_hibrido_log_valid_direto = preds_log_valid_arima.values + preds_residuos_log[:len(validacao_log)]
                preds_hibrido_log_teste_direto = preds_log_teste_arima.values + preds_residuos_log[-len(teste_log):]
                metricas_val = calcular_metricas(validacao_orig.values, np.exp(preds_hibrido_log_valid_direto), treino_orig.values); metricas_val['modelo'] = 'HyS-MF (Direto)'; resultados_validacao.append(metricas_val)
                metricas_test = calcular_metricas(teste_orig.values, np.exp(preds_hibrido_log_teste_direto), treino_orig.values); metricas_test['modelo'] = 'HyS-MF (Direto)'; resultados_teste.append(metricas_test)
            except Exception as e: print(f"AVISO: Modelo Híbrido (Direto) falhou: {e}")

        return pd.DataFrame(resultados_validacao), pd.DataFrame(resultados_teste)

    except Exception as e:
        print(f"ERRO GERAL no processamento do dataset '{nome_da_serie}': {e}")
        return None, None

In [18]:
# =========================================================
# SEÇÃO 6: ORQUESTRADOR
# =========================================================

In [None]:
LISTA_DE_DATASETS = ['AirPassengers'] 
resultados_validacao_geral, resultados_teste_geral = [], []

for dataset in tqdm(LISTA_DE_DATASETS, desc="Processando todos os datasets"):
    df_validacao, df_teste = executar_experimento(dataset)
    
    if df_validacao is not None and not df_validacao.empty:
        df_validacao['dataset'] = dataset
        resultados_validacao_geral.append(df_validacao)
    if df_teste is not None and not df_teste.empty:
        df_teste['dataset'] = dataset
        resultados_teste_geral.append(df_teste)

Processando todos os datasets:   0%|          | 0/1 [00:00<?, ?it/s]

Buscando dados de 'AirPassengers' via statsmodels...
-> Cópia do dataset 'AirPassengers' salva em: ./datasets/bronze\airpassengers.csv
Buscando melhor ordem ARIMA com auto_arima...
Melhor ordem encontrada: (2, 0, 0) Sazonal: (2, 1, 0, 12)
Processando: ARIMA


In [None]:
output_dir = "./datasets/silver"
os.makedirs(output_dir, exist_ok=True)
if resultados_validacao_geral:
    pd.concat(resultados_validacao_geral).to_csv(os.path.join(output_dir, "resultados_validacao.csv"), index=False)
    print(f"\nArquivo 'resultados_validacao.csv' salvo em: {output_dir}")
if resultados_teste_geral:
    pd.concat(resultados_teste_geral).to_csv(os.path.join(output_dir, "resultados_teste.csv"), index=False)
    print(f"Arquivo 'resultados_teste.csv' salvo em: {output_dir}")


In [None]:
# =========================================================
# SEÇÃO 7: GERAÇÃO DE RELATÓRIOS A PARTIR DOS ARQUIVOS SALVOS
# =========================================================

In [None]:
print("\n\n" + "="*60)
print("     GERANDO RELATÓRIOS A PARTIR DOS DADOS SALVOS")
print("="*60)

try:
    # Carrega os resultados do arquivo de teste
    df_teste_final = pd.read_csv("./datasets/silver/resultados_teste.csv").rename(columns={'modelo': 'Modelo'})
    
    # --- Relatório 1: Desempenho Geral (Agrupado por Modelo) ---
    print("\n--- RELATÓRIO 1: DESEMPENHO GERAL (MÉDIA NOS DATASETS) ---")
    df_agrupado_por_modelo = df_teste_final.groupby('Modelo')[['RMSE', 'MAPE(%)', 'MASE']].mean()
    display(df_agrupado_por_modelo.style.format('{:.3f}').highlight_min(axis=0, props='background-color: #4285F4; color: white;'))

    # --- Relatório 2: Desempenho Detalhado por Dataset ---
    print("\n--- RELATÓRIO 2: DESEMPENHO DETALHADO POR DATASET (NO TESTE) ---")
    df_reporte_detalhado = df_teste_final.set_index(['dataset', 'Modelo'])
    
    def destacar_melhor_por_grupo(df):
        df_style = pd.DataFrame('', index=df.index, columns=df.columns)
        estilo_melhor = 'background-color: #4285F4; color: white; font-weight: bold;'
        for metrica in ['RMSE', 'MAPE(%)', 'MASE']:
            if metrica in df.columns:
                idx_minimos = df.groupby('dataset')[metrica].idxmin()
                df_style.loc[idx_minimos, metrica] = estilo_melhor
        return df_style
        
    display(df_reporte_detalhado.style.format('{:.3f}').apply(destacar_melhor_por_grupo, axis=None))
    
except FileNotFoundError:
    print("\nERRO: Arquivo 'resultados_teste.csv' não encontrado.")
    print("Por favor, execute a Seção 4 (Etapa de Execução e Salvamento) primeiro para gerar os resultados.")
except Exception as e:
    print(f"Ocorreu um erro ao gerar os relatórios: {e}")