# SARIMAX - Análise de Consumo Doméstico de Energia

## Objetivo
Modelar e prever o consumo doméstico de energia usando SARIMAX (Seasonal AutoRegressive Integrated Moving Average with eXogenous regressors)

## Dataset
- **Arquivo**: household_consumption.xlsx
- **Variável Target**: Consumption (consumo de energia)
- **Frequência**: Dados por minuto (2006-12-16)
- **Tipo**: Série temporal univariada com possível sazonalidade

In [2]:
# Importações necessárias
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

# Análise de séries temporais
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import adfuller, kpss
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.stats.diagnostic import acorr_ljungbox

# Transformações
from scipy.stats import boxcox
from scipy.special import inv_boxcox

# Métricas de avaliação
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import os

# Configurações de visualização
plt.style.use('seaborn-v0_8')
plt.rcParams['figure.figsize'] = (12, 6)
sns.set_palette("husl")

# Criar diretório de saída
output_dir = '../../../out/household_consumption'
os.makedirs(output_dir, exist_ok=True)

## 1. Carregamento e Exploração Inicial dos Dados

In [None]:
# Carregamento dos dados
data_path = '../../data/household_consumption.xlsx'
df = pd.read_excel(data_path, sheet_name='Sheet1')

print("Informações básicas do dataset:")
print(f"Shape: {df.shape}")
print(f"\nTipos de dados:\n{df.dtypes}")
print(f"\nPrimeiras linhas:")
df.head(10)

FileNotFoundError: [Errno 2] No such file or directory: '../../../data/household_consumption.'

In [None]:
# Preparação da série temporal
df['Date and Hour'] = pd.to_datetime(df['Date and Hour'])
df = df.set_index('Date and Hour')
df = df.sort_index()

# Remover valores nulos se houver
print(f"Valores nulos: {df['Consumption'].isnull().sum()}")
df = df.dropna()

# Estatísticas descritivas
print("\nEstatísticas descritivas:")
print(df['Consumption'].describe())

# Verificar frequência dos dados
freq = pd.infer_freq(df.index)
print(f"\nFrequência inferida: {freq}")
print(f"Período total: {df.index.min()} a {df.index.max()}")
print(f"Duração: {df.index.max() - df.index.min()}")

## 2. Análise Exploratória e Visualização

In [None]:
# Visualização da série temporal
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Série original
axes[0,0].plot(df.index, df['Consumption'], alpha=0.7)
axes[0,0].set_title('Série Temporal Original - Consumo de Energia')
axes[0,0].set_ylabel('Consumo (kW)')

# Histograma
axes[0,1].hist(df['Consumption'], bins=50, alpha=0.7, edgecolor='black')
axes[0,1].set_title('Distribuição do Consumo')
axes[0,1].set_xlabel('Consumo (kW)')
axes[0,1].set_ylabel('Frequência')

# Box plot
axes[1,0].boxplot(df['Consumption'])
axes[1,0].set_title('Box Plot do Consumo')
axes[1,0].set_ylabel('Consumo (kW)')

# Q-Q plot
stats.probplot(df['Consumption'], dist="norm", plot=axes[1,1])
axes[1,1].set_title('Q-Q Plot (Normalidade)')

plt.tight_layout()
plt.savefig(f'{output_dir}/01_exploratory_analysis.png', dpi=300, bbox_inches='tight')
plt.show()

# Teste de normalidade
from scipy.stats import jarque_bera, shapiro
jb_stat, jb_pvalue = jarque_bera(df['Consumption'])
print(f"\nTeste Jarque-Bera: estatística={jb_stat:.4f}, p-value={jb_pvalue:.4f}")
print(f"Série é normal? {jb_pvalue > 0.05}")

## 3. Análise de Estacionariedade

In [None]:
def test_stationarity(timeseries, title):
    """
    Testa estacionariedade usando ADF e KPSS
    """
    print(f'\n=== {title} ===')
    
    # Teste ADF
    adf_result = adfuller(timeseries, autolag='AIC')
    print(f'\nTeste Augmented Dickey-Fuller:')
    print(f'Estatística ADF: {adf_result[0]:.6f}')
    print(f'p-value: {adf_result[1]:.6f}')
    print(f'Lags usados: {adf_result[2]}')
    print(f'Valores críticos:')
    for key, value in adf_result[4].items():
        print(f'\t{key}: {value:.3f}')
    
    if adf_result[1] <= 0.05:
        print("=> Série é estacionária (rejeita H0)")
    else:
        print("=> Série é não-estacionária (não rejeita H0)")
    
    # Teste KPSS
    kpss_result = kpss(timeseries, regression='c', nlags='auto')
    print(f'\nTeste KPSS:')
    print(f'Estatística KPSS: {kpss_result[0]:.6f}')
    print(f'p-value: {kpss_result[1]:.6f}')
    print(f'Lags usados: {kpss_result[2]}')
    print(f'Valores críticos:')
    for key, value in kpss_result[3].items():
        print(f'\t{key}: {value:.3f}')
    
    if kpss_result[1] >= 0.05:
        print("=> Série é estacionária (não rejeita H0)")
    else:
        print("=> Série é não-estacionária (rejeita H0)")

# Teste na série original
test_stationarity(df['Consumption'], 'Série Original')

## 4. Decomposição da Série Temporal

In [None]:
# Reamostragem para análise sazonal (dados por hora para facilitar análise)
df_hourly = df['Consumption'].resample('h').mean()

# Decomposição sazonal
try:
    decomposition = seasonal_decompose(df_hourly[:24*7], model='additive', period=24)  # período diário
    
    fig, axes = plt.subplots(4, 1, figsize=(15, 12))
    
    decomposition.observed.plot(ax=axes[0], title='Série Original')
    decomposition.trend.plot(ax=axes[1], title='Tendência')
    decomposition.seasonal.plot(ax=axes[2], title='Sazonalidade')
    decomposition.resid.plot(ax=axes[3], title='Resíduos')
    
    plt.tight_layout()
    plt.savefig(f'{output_dir}/02_seasonal_decomposition.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    # Análise dos componentes
    print("Análise dos componentes:")
    print(f"Variância da série original: {decomposition.observed.var():.4f}")
    print(f"Variância da tendência: {decomposition.trend.var():.4f}")
    print(f"Variância sazonal: {decomposition.seasonal.var():.4f}")
    print(f"Variância dos resíduos: {decomposition.resid.var():.4f}")
    
except Exception as e:
    print(f"Erro na decomposição: {e}")
    print("Continuando sem decomposição detalhada...")

## 5. Análise de Autocorrelação (ACF e PACF)

In [None]:
# Usar uma amostra menor para ACF/PACF se a série for muito longa
sample_size = min(1000, len(df))
ts_sample = df['Consumption'].iloc[:sample_size]

fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# ACF e PACF da série original
plot_acf(ts_sample, ax=axes[0,0], lags=40, title='ACF - Série Original')
plot_pacf(ts_sample, ax=axes[0,1], lags=40, title='PACF - Série Original')

# ACF e PACF da primeira diferença
ts_diff = ts_sample.diff().dropna()
plot_acf(ts_diff, ax=axes[1,0], lags=40, title='ACF - Primeira Diferença')
plot_pacf(ts_diff, ax=axes[1,1], lags=40, title='PACF - Primeira Diferença')

plt.tight_layout()
plt.savefig(f'{output_dir}/03_acf_pacf_analysis.png', dpi=300, bbox_inches='tight')
plt.show()

# Teste de estacionariedade na primeira diferença
test_stationarity(ts_diff, 'Primeira Diferença')

## 6. Transformação Box-Cox

In [None]:
# Aplicar transformação Box-Cox (apenas para valores positivos)
if (df['Consumption'] > 0).all():
    ts_boxcox, lambda_boxcox = boxcox(df['Consumption'])
    print(f"Parâmetro λ da transformação Box-Cox: {lambda_boxcox:.4f}")
    
    # Visualizar transformação
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))
    
    axes[0].plot(df.index[:1000], df['Consumption'][:1000])
    axes[0].set_title('Série Original')
    axes[0].set_ylabel('Consumo')
    
    axes[1].plot(df.index[:1000], ts_boxcox[:1000])
    axes[1].set_title(f'Série Transformada (Box-Cox λ={lambda_boxcox:.3f})')
    axes[1].set_ylabel('Consumo Transformado')
    
    plt.tight_layout()
    plt.savefig(f'{output_dir}/04_boxcox_transformation.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    # Criar série transformada
    df_transformed = df.copy()
    df_transformed['Consumption'] = ts_boxcox
    
    # Teste de estacionariedade na série transformada
    test_stationarity(ts_boxcox, 'Série Box-Cox Transformada')
    
else:
    print("Série contém valores não-positivos. Box-Cox não aplicável.")
    df_transformed = df.copy()
    lambda_boxcox = None

## 7. Divisão dos Dados (Treino/Teste)

In [None]:
# Divisão temporal: 80% treino, 20% teste
train_size = int(len(df_transformed) * 0.8)
train_data = df_transformed[:train_size]
test_data = df_transformed[train_size:]

print(f"Tamanho do conjunto de treino: {len(train_data)}")
print(f"Tamanho do conjunto de teste: {len(test_data)}")
print(f"Período de treino: {train_data.index.min()} a {train_data.index.max()}")
print(f"Período de teste: {test_data.index.min()} a {test_data.index.max()}")

# Visualização da divisão
plt.figure(figsize=(15, 6))
plt.plot(train_data.index, train_data['Consumption'], label='Treino', alpha=0.7)
plt.plot(test_data.index, test_data['Consumption'], label='Teste', alpha=0.7)
plt.title('Divisão Treino/Teste')
plt.ylabel('Consumo (transformado)' if lambda_boxcox else 'Consumo')
plt.legend()
plt.savefig(f'{output_dir}/05_train_test_split.png', dpi=300, bbox_inches='tight')
plt.show()

## 8. Busca de Hiperparâmetros SARIMAX

In [None]:
def evaluate_sarimax_model(train_data, test_data, order, seasonal_order, lambda_param=None):
    """
    Avalia um modelo SARIMAX com parâmetros específicos
    """
    try:
        # Ajustar modelo
        model = SARIMAX(train_data['Consumption'], 
                       order=order, 
                       seasonal_order=seasonal_order,
                       enforce_stationarity=False,
                       enforce_invertibility=False)
        
        fitted_model = model.fit(disp=False, maxiter=100)
        
        # Previsões
        forecast = fitted_model.forecast(steps=len(test_data))
        
        # Reverter transformação Box-Cox se aplicada
        if lambda_param is not None:
            forecast_original = inv_boxcox(forecast, lambda_param)
            test_original = inv_boxcox(test_data['Consumption'], lambda_param)
        else:
            forecast_original = forecast
            test_original = test_data['Consumption']
        
        # Métricas
        mse = mean_squared_error(test_original, forecast_original)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(test_original, forecast_original)
        mape = np.mean(np.abs((test_original - forecast_original) / test_original)) * 100
        
        return {
            'model': fitted_model,
            'forecast': forecast_original,
            'mse': mse,
            'rmse': rmse,
            'mae': mae,
            'mape': mape,
            'aic': fitted_model.aic,
            'bic': fitted_model.bic
        }
    except Exception as e:
        return None

# Grid search para SARIMAX
print("Iniciando busca de hiperparâmetros...")

# Parâmetros para testar (reduzidos para eficiência)
p_values = range(0, 3)
d_values = range(0, 2)
q_values = range(0, 3)

# Parâmetros sazonais (assumindo sazonalidade diária se houver dados suficientes)
seasonal_periods = [24] if len(df_hourly) >= 24*7 else [0]  # 24 horas

best_model = None
best_aic = float('inf')
results = []

for p in p_values:
    for d in d_values:
        for q in q_values:
            for sp in seasonal_periods:
                if sp > 0:
                    seasonal_orders = [(0,1,1,sp), (1,1,1,sp), (1,1,0,sp)]  # Alguns padrões comuns
                else:
                    seasonal_orders = [(0,0,0,0)]  # Sem sazonalidade
                
                for seasonal_order in seasonal_orders:
                    order = (p, d, q)
                    
                    result = evaluate_sarimax_model(train_data, test_data, order, seasonal_order, lambda_boxcox)
                    
                    if result is not None:
                        results.append({
                            'order': order,
                            'seasonal_order': seasonal_order,
                            **result
                        })
                        
                        if result['aic'] < best_aic:
                            best_aic = result['aic']
                            best_model = result
                            best_order = order
                            best_seasonal = seasonal_order
                        
                        print(f"SARIMAX{order}x{seasonal_order} - AIC: {result['aic']:.2f}, RMSE: {result['rmse']:.4f}")

print(f"\nMelhor modelo: SARIMAX{best_order}x{best_seasonal}")
print(f"AIC: {best_model['aic']:.2f}")
print(f"RMSE: {best_model['rmse']:.4f}")
print(f"MAPE: {best_model['mape']:.2f}%")

## 9. Modelo Final e Diagnósticos

In [None]:
# Treinar modelo final com melhores parâmetros
final_model = SARIMAX(train_data['Consumption'], 
                     order=best_order, 
                     seasonal_order=best_seasonal,
                     enforce_stationarity=False,
                     enforce_invertibility=False)

fitted_final = final_model.fit(disp=False)

print("\n=== Resumo do Modelo Final ===")
print(fitted_final.summary())

# Diagnósticos dos resíduos
residuals = fitted_final.resid

fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Resíduos ao longo do tempo
axes[0,0].plot(residuals)
axes[0,0].set_title('Resíduos ao Longo do Tempo')
axes[0,0].set_ylabel('Resíduos')

# Histograma dos resíduos
axes[0,1].hist(residuals, bins=30, alpha=0.7, edgecolor='black')
axes[0,1].set_title('Distribuição dos Resíduos')
axes[0,1].set_xlabel('Resíduos')

# Q-Q plot dos resíduos
stats.probplot(residuals, dist="norm", plot=axes[1,0])
axes[1,0].set_title('Q-Q Plot dos Resíduos')

# ACF dos resíduos
plot_acf(residuals, ax=axes[1,1], lags=30, title='ACF dos Resíduos')

plt.tight_layout()
plt.savefig(f'{output_dir}/06_residuals_analysis.png', dpi=300, bbox_inches='tight')
plt.show()

# Testes nos resíduos
print("\n=== Testes de Diagnóstico ===")

# Teste de Ljung-Box (independência dos resíduos)
lb_result = acorr_ljungbox(residuals, lags=10, return_df=True)
print(f"\nTeste Ljung-Box:")
print(lb_result)

# Teste de normalidade dos resíduos
jb_stat, jb_pvalue = jarque_bera(residuals)
print(f"\nTeste Jarque-Bera (resíduos): estatística={jb_stat:.4f}, p-value={jb_pvalue:.4f}")
print(f"Resíduos são normais? {jb_pvalue > 0.05}")

## 10. Previsões e Avaliação

In [None]:
# Previsões no conjunto de teste
forecast = fitted_final.forecast(steps=len(test_data))
forecast_ci = fitted_final.get_forecast(steps=len(test_data)).conf_int()

# Reverter transformação Box-Cox se aplicada
if lambda_boxcox is not None:
    forecast_original = inv_boxcox(forecast, lambda_boxcox)
    test_original = inv_boxcox(test_data['Consumption'], lambda_boxcox)
    train_original = inv_boxcox(train_data['Consumption'], lambda_boxcox)
    
    # Intervalos de confiança (aproximação)
    forecast_lower = inv_boxcox(forecast_ci.iloc[:, 0], lambda_boxcox)
    forecast_upper = inv_boxcox(forecast_ci.iloc[:, 1], lambda_boxcox)
else:
    forecast_original = forecast
    test_original = test_data['Consumption']
    train_original = train_data['Consumption']
    forecast_lower = forecast_ci.iloc[:, 0]
    forecast_upper = forecast_ci.iloc[:, 1]

# Métricas de avaliação
mse = mean_squared_error(test_original, forecast_original)
rmse = np.sqrt(mse)
mae = mean_absolute_error(test_original, forecast_original)
mape = np.mean(np.abs((test_original - forecast_original) / test_original)) * 100
r2 = r2_score(test_original, forecast_original)

print("\n=== Métricas de Avaliação ===")
print(f"MSE: {mse:.6f}")
print(f"RMSE: {rmse:.6f}")
print(f"MAE: {mae:.6f}")
print(f"MAPE: {mape:.2f}%")
print(f"R²: {r2:.6f}")

# Visualização das previsões
plt.figure(figsize=(15, 8))

# Plotar apenas últimas observações para melhor visualização
n_plot = min(500, len(train_original))
plt.plot(train_original.index[-n_plot:], train_original.iloc[-n_plot:], 
         label='Treino', alpha=0.7)
plt.plot(test_original.index, test_original, 
         label='Teste (Real)', color='green', alpha=0.8)
plt.plot(test_data.index, forecast_original, 
         label='Previsões SARIMAX', color='red', alpha=0.8)
plt.fill_between(test_data.index, forecast_lower, forecast_upper, 
                color='red', alpha=0.2, label='Intervalo de Confiança 95%')

plt.title(f'Previsões SARIMAX{best_order}x{best_seasonal} - Consumo de Energia')
plt.ylabel('Consumo (kW)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.savefig(f'{output_dir}/07_sarimax_predictions.png', dpi=300, bbox_inches='tight')
plt.show()

## 11. Análise de Validação Cruzada Temporal

In [None]:
# Validação cruzada temporal (Time Series Split)
def time_series_cv(data, model_order, seasonal_order, n_splits=5, test_size=0.1):
    """
    Validação cruzada para séries temporais
    """
    results = []
    data_size = len(data)
    test_samples = int(data_size * test_size)
    
    for i in range(n_splits):
        # Calcular índices de divisão
        split_idx = int(data_size * (0.5 + i * 0.1))  # Começar com 50% dos dados
        
        if split_idx + test_samples >= data_size:
            break
            
        train_cv = data[:split_idx]
        test_cv = data[split_idx:split_idx + test_samples]
        
        try:
            # Ajustar modelo
            model = SARIMAX(train_cv['Consumption'], 
                           order=model_order, 
                           seasonal_order=seasonal_order,
                           enforce_stationarity=False,
                           enforce_invertibility=False)
            
            fitted = model.fit(disp=False, maxiter=100)
            forecast = fitted.forecast(steps=len(test_cv))
            
            # Reverter Box-Cox se necessário
            if lambda_boxcox is not None:
                forecast_orig = inv_boxcox(forecast, lambda_boxcox)
                test_orig = inv_boxcox(test_cv['Consumption'], lambda_boxcox)
            else:
                forecast_orig = forecast
                test_orig = test_cv['Consumption']
            
            # Métricas
            mse = mean_squared_error(test_orig, forecast_orig)
            mae = mean_absolute_error(test_orig, forecast_orig)
            mape = np.mean(np.abs((test_orig - forecast_orig) / test_orig)) * 100
            
            results.append({
                'split': i+1,
                'mse': mse,
                'rmse': np.sqrt(mse),
                'mae': mae,
                'mape': mape
            })
            
        except Exception as e:
            print(f"Erro no split {i+1}: {e}")
            
    return results

# Executar validação cruzada
print("\n=== Validação Cruzada Temporal ===")
cv_results = time_series_cv(df_transformed, best_order, best_seasonal, n_splits=3)

if cv_results:
    cv_df = pd.DataFrame(cv_results)
    print("\nResultados da Validação Cruzada:")
    print(cv_df)
    
    print("\nEstatísticas da Validação Cruzada:")
    print(f"RMSE médio: {cv_df['rmse'].mean():.6f} ± {cv_df['rmse'].std():.6f}")
    print(f"MAE médio: {cv_df['mae'].mean():.6f} ± {cv_df['mae'].std():.6f}")
    print(f"MAPE médio: {cv_df['mape'].mean():.2f}% ± {cv_df['mape'].std():.2f}%")
    
    # Visualização
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    cv_df['rmse'].plot(kind='bar', ax=axes[0], title='RMSE por Split')
    cv_df['mae'].plot(kind='bar', ax=axes[1], title='MAE por Split')
    cv_df['mape'].plot(kind='bar', ax=axes[2], title='MAPE por Split')
    
    plt.tight_layout()
    plt.savefig(f'{output_dir}/08_cross_validation.png', dpi=300, bbox_inches='tight')
    plt.show()
else:
    print("Não foi possível executar validação cruzada.")

## 12. Salvamento do Modelo e Resultados

In [None]:
# Salvar resultados em arquivo
results_dict = {
    'model_type': 'SARIMAX',
    'best_order': best_order,
    'best_seasonal_order': best_seasonal,
    'lambda_boxcox': lambda_boxcox,
    'metrics': {
        'mse': float(mse),
        'rmse': float(rmse),
        'mae': float(mae),
        'mape': float(mape),
        'r2': float(r2),
        'aic': float(fitted_final.aic),
        'bic': float(fitted_final.bic)
    },
    'cross_validation': cv_results if cv_results else None,
    'data_info': {
        'total_samples': len(df),
        'train_samples': len(train_data),
        'test_samples': len(test_data),
        'start_date': str(df.index.min()),
        'end_date': str(df.index.max())
    }
}

import json
with open(f'{output_dir}/sarimax_results.json', 'w') as f:
    json.dump(results_dict, f, indent=2)

# Salvar previsões
predictions_df = pd.DataFrame({
    'real': test_original,
    'predicted': forecast_original,
    'lower_ci': forecast_lower,
    'upper_ci': forecast_upper
}, index=test_data.index)

predictions_df.to_csv(f'{output_dir}/sarimax_predictions.csv')

print(f"\n=== Arquivos Salvos ===")
print(f"Resultados: {output_dir}/sarimax_results.json")
print(f"Previsões: {output_dir}/sarimax_predictions.csv")
print(f"Gráficos: {output_dir}/01-08_*.png")

print(f"\n=== Resumo Final - SARIMAX ===")
print(f"Modelo: SARIMAX{best_order}x{best_seasonal}")
print(f"Transformação Box-Cox: λ = {lambda_boxcox:.4f}" if lambda_boxcox else "Sem transformação")
print(f"RMSE: {rmse:.6f}")
print(f"MAPE: {mape:.2f}%")
print(f"R²: {r2:.4f}")
print(f"\nDiagnósticos:")
print(f"- Resíduos normais: {jb_pvalue > 0.05}")
print(f"- AIC: {fitted_final.aic:.2f}")
print(f"- BIC: {fitted_final.bic:.2f}")