# Aula 3
Caso Pratico: Previsao do Consumo de Energia Eletrica na regiao Sudeste (SE)
- Fonte: http://ipeadata.gov.br

In [1]:
import seaborn as sns
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from sklearn.metrics import mean_absolute_percentage_error as mape

from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from statsmodels.tsa.api import SimpleExpSmoothing
from statsmodels.tools.sm_exceptions import ConvergenceWarning

import warnings

from scipy.stats import shapiro
from scipy.stats import kstest


import statsmodels.api as sm
from statsmodels.tsa.arima_process import ArmaProcess
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller, kpss
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
# from scipy import stats



In [2]:
import pmdarima

ValueError: numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject

In [None]:
from pmdarima import auto_arima


In [None]:
# caso nao tenha instalado
# !pip install ipeadatapy 
import ipeadatapy as ip

ip.list_series()

In [None]:
## preciso saber o codigo da serie temporal no site do IPEA
## para esta serie o codigo e dado a seguir
ip.describe('ELETRO12_CEESE12')

In [None]:
# In[124]: verificando os dados
cons_sudeste=ip.timeseries('ELETRO12_CEESE12')
cons_sudeste

In [None]:
# In[125]: colhendo os dados da serie do consumo de energia em GWh
consumo = pd.Series(cons_sudeste.iloc[:, 5].values, 
                    index=pd.date_range(start='1979-01-01', periods=len(cons_sudeste), 
                                        freq='ME'))

# In[126]: Plotando o grafico
plt.figure(figsize=(10, 6))
plt.plot(consumo)
plt.title("Energia elétrica referente ao consumo na região Sudeste (SE) - GWh")
plt.xlabel("Meses - jan/1979 a jun/2024")
plt.ylabel("GWh")
plt.grid(True)
plt.show()

In [None]:
# In[127]: separando a serie tem dados de treino e teste

# Definir as datas para separar treino e teste
# data de corte escolhida

data_corte="2022-06-01"

# Separar a série em treino e teste usando as datas
energia_treino = consumo['1979-01-01':'2022-06-01']
energia_teste = consumo['2022-07-01':'2024-07-01']
print(energia_treino)
print(energia_teste)

In [None]:
# Exibir os tamanhos dos períodos de treino e teste
print(f"Período de Treino: {energia_treino.index.min()} até {energia_treino.index.max()} - {len(energia_treino)} registros")
print(f"Período de Teste: {energia_teste.index.min()} até {energia_teste.index.max()} - {len(energia_teste)} registros")

# In[128]: Plotar os períodos de treino e teste
plt.figure(figsize=(10,6))
plt.plot(energia_treino, label='Treino', color='blue')
plt.plot(energia_teste, label='Teste', color='red')
plt.axvline(pd.to_datetime(data_corte), color='black', linestyle='--', label='Data de Corte')
plt.title('Separação da Série Temporal em Treino e Teste')
plt.xlabel('Data')
plt.ylabel('Valores')
plt.legend()
plt.show()

### Lendo os dados a partir de um arquivo

In [None]:
# Importando a base de dados
energia = pd.read_excel("/home/cairo/code/usp-ds-series-temporais/data/energia.xlsx")

# Lendo a base de dados
print(energia.head())

In [None]:
cons_energia = pd.Series(energia.iloc[:, 1].values, 
                    index=pd.date_range(start='1979-01-01', periods=len(energia), 
                                        freq='ME'))
cons_energia

In [None]:
# In[130]: Plotando o grafico
plt.figure(figsize=(10, 6))
plt.plot(cons_energia)
plt.title("Energia eletrica referente ao consumo na regiao Sudeste (SE) - GWh")
plt.xlabel("Meses - jan/1979 a jun/2024")
plt.ylabel("GWh")
plt.grid(True)
plt.show()

### Vamos rodar todos os modelos para a serie de energia do Sudeste

In [None]:
# Capturar avisos de convergência e tratá-los
warnings.filterwarnings("error", category=ConvergenceWarning)

In [None]:
# In[132]: Carregar os dados de energia e garantir que não há valores ausentes
energia = pd.read_excel("/home/cairo/code/usp-ds-series-temporais/data/energia.xlsx", usecols=[1]).dropna()

# In[133]: Criar a série temporal a partir de 1979 com frequência mensal
energia.index = pd.date_range(start='1979-01', periods=len(energia), freq='ME')
energia = energia.squeeze()  # Converter para uma Series

# In[134]: Separar a base de dados em treino e teste
benergia = energia[:'2022-06'].ffill()  # Preencher valores nulos com forward fill
reaisenergia = energia['2022-07':'2024-06']  # Teste de 2022-07 até 2024-06

# In[135]: Converter explicitamente para tipo numérico e garantir que são floats
benergia = pd.to_numeric(benergia, errors='coerce').astype(float)

# In[136]: Lista para armazenar os modelos, MAPE e previsões
modelos_energia = []
mapes_energia = []
previsoes_energia = {}

### Modelo Naive

In [None]:
naive_forecast = pd.Series([benergia.iloc[-1]] * len(reaisenergia), index=reaisenergia.index)
mape_naive = mape(reaisenergia, naive_forecast) * 100
modelos_energia.append("Naive")
mapes_energia.append(mape_naive)
previsoes_energia["Naive"] = naive_forecast

### Modelo Mean (média)

In [None]:
mean_forecast = pd.Series(benergia.mean(), index=reaisenergia.index)
mape_mean = mape(reaisenergia, mean_forecast) * 100
modelos_energia.append("Mean")
mapes_energia.append(mape_mean)
previsoes_energia["Mean"] = mean_forecast

### Modelo Drift

In [None]:
n = len(benergia)
drift_slope = (benergia.iloc[-1] - benergia.iloc[0]) / (n - 1)
drift_forecast = benergia.iloc[-1] + drift_slope * np.arange(1, len(reaisenergia) + 1)
drift_forecast = pd.Series(drift_forecast, index=reaisenergia.index)
mape_drift_result = mape(reaisenergia, drift_forecast) * 100
modelos_energia.append("Drift")
mapes_energia.append(mape_drift_result)
previsoes_energia["Drift"] = drift_forecast

### Modelo Naive Sazonal

In [None]:
naive_sazonal_forecast = pd.Series([benergia.iloc[-12 + (i % 12)]
                                    for i in range(len(reaisenergia))],
                                   index=reaisenergia.index)
mape_naive_sazonal = mape(reaisenergia, naive_sazonal_forecast) * 100
modelos_energia.append("Naive Sazonal")
mapes_energia.append(mape_naive_sazonal)
previsoes_energia["Naive Sazonal"] = naive_sazonal_forecast

### Suavização Exponencial Simples (SES)

In [None]:
ses_model = SimpleExpSmoothing(benergia).fit(optimized=True)
ses_forecast = ses_model.forecast(steps=len(reaisenergia))
mape_ses = mape(reaisenergia, ses_forecast) * 100
modelos_energia.append("SES")
mapes_energia.append(mape_ses)
previsoes_energia["SES"] = ses_forecast

### Holt-Winters Aditivo - Ajustar inicialização e Box-Cox para melhorar a convergência

In [None]:
try:
    hw_add_model = ExponentialSmoothing(
        benergia,
        seasonal_periods=12,
        trend='add',
        seasonal='add',
        initialization_method="estimated",  # Método robusto de inicialização
        use_boxcox=True  # Tentar estabilizar a variância com Box-Cox
    ).fit(optimized=True)
    
    hw_add_forecast = hw_add_model.forecast(steps=len(reaisenergia))
    mape_hw_add = mape(reaisenergia, hw_add_forecast) * 100
    modelos_energia.append("Holt-Winters Aditivo")
    mapes_energia.append(mape_hw_add)
    previsoes_energia["Holt-Winters Aditivo"] = hw_add_forecast
except Exception:
    modelos_energia.append("Holt-Winters Aditivo")
    mapes_energia.append(np.nan)


### Holt-Winters Multiplicativo - Ajustar inicialização e Box-Cox para melhorar a convergência

In [None]:
try:
    hw_mult_model = ExponentialSmoothing(
        benergia,
        seasonal_periods=12,
        trend='add',
        seasonal='mul',
        initialization_method="estimated",  # Método robusto de inicialização
        use_boxcox=True  # Tentar estabilizar a variância com Box-Cox
    ).fit(optimized=True)
    
    hw_mult_forecast = hw_mult_model.forecast(steps=len(reaisenergia))
    mape_hw_mult = mape(reaisenergia, hw_mult_forecast) * 100
    modelos_energia.append("Holt-Winters Multiplicativo")
    mapes_energia.append(mape_hw_mult)
    previsoes_energia["Holt-Winters Multiplicativo"] = hw_mult_forecast
except Exception:
    modelos_energia.append("Holt-Winters Multiplicativo")
    mapes_energia.append(np.nan)

### Comparação dos modelos com base no MAPE

In [None]:
mape_comparison = pd.DataFrame({'Modelo': modelos_energia, 'MAPE': mapes_energia})
mape_comparison = mape_comparison.sort_values(by='MAPE', ascending=True).reset_index(drop=True)
print(mape_comparison)

# In[145]: Gráfico dos MAPE dos modelos
plt.figure(figsize=(10, 6))
plt.barh(mape_comparison['Modelo'], mape_comparison['MAPE'], color='skyblue')
plt.xlabel("MAPE")
plt.title("MAPE Comparação de Modelos")
plt.grid(True)
plt.show()

In [None]:
# In[146]: Selecionar o modelo com o menor MAPE
melhor_modelo = mape_comparison.loc[0, 'Modelo']
melhores_previsoes = previsoes_energia[melhor_modelo]

# In[147]: Criar gráfico comparando os valores reais e previstos do melhor modelo
plt.figure(figsize=(10, 6))
plt.plot(reaisenergia.index, reaisenergia, label='Valores Reais', color='blue')
plt.plot(reaisenergia.index, melhores_previsoes, label=f'Previsão - {melhor_modelo}', color='red')
plt.title(f'Valores Reais vs Previsão ({melhor_modelo})')
plt.xlabel('Data')
plt.ylabel('Valores')
plt.legend()
plt.grid(True)
plt.show()

### Teste de normalidade e Ljung-Box para os resíduos do melhor modelo

In [None]:
residuos = reaisenergia - melhores_previsoes

# In[149]: Teste de normalidade Shapiro-Wilk
stat, p_value_shapiro = shapiro(residuos)
print(f"Teste de Normalidade Shapiro-Wilk: Estatística={stat:.4f}, p-valor={p_value_shapiro:.4f}")
if p_value_shapiro > 0.05:
    print("Os resíduos parecem seguir uma distribuição normal (não rejeitamos H0).")
else:
    print("Os resíduos não seguem uma distribuição normal (rejeitamos H0).")

In [None]:
# In[150]: Teste de Ljung-Box para autocorrelação dos resíduos
lb_test = acorr_ljungbox(residuos, lags=[10], return_df=True)
print(f"Teste Ljung-Box:\n{lb_test}")

In [None]:
# In[151]: Interpretação do teste de Ljung-Box
p_value_ljungbox = lb_test['lb_pvalue'].values[0]
if p_value_ljungbox > 0.05:
    print("Não há evidências de autocorrelação significativa nos resíduos (não rejeitamos H0).")
else:
    print("Há evidências de autocorrelação nos resíduos (rejeitamos H0).")

## Modelos ARIMA - (Box - Jenkins)

### MODELOS ARIMA - Simulados

In [None]:
# Simulação de um modelo AR(1)
# Definir o coeficiente do modelo AR(1)
ar = np.array([1, -0.8])

# AR(1) com coeficiente +0.8 (note o sinal negativo para simulação)
# a biblioteca ArmaProcess espera que o sinal seja inverso
ma = np.array([1])  # Não há parte MA, então é apenas [1]

# Criar o processo AR(1)
ar_process = ArmaProcess(ar, ma)

# Simular 500 pontos para a série temporal
np.random.seed(42)  # Para reprodutibilidade
serie_ar = ar_process.generate_sample(nsample=500)

# In[154]: Plotar a série temporal simulada
plt.figure(figsize=(10, 6))
plt.plot(serie_ar)
plt.title('Modelo AR(1) X(t)=0.8.X(t-1) + erro(t)')
plt.xlabel('Tempo')
plt.ylabel('Valores simulados')
plt.grid(True)
plt.show()

In [None]:
# In[155]: Definir o coeficiente do modelo MA(1)
ma = np.array([1, -0.3])  # MA(1) com coeficiente -0.3
ar = np.array([1])  # Não há parte AR, então é apenas [1]

# Criar o processo MA(1)
ma_process = ArmaProcess(ar, ma)

# Simular 500 pontos para a série temporal
np.random.seed(42)  # Para reprodutibilidade
serie_ma = ma_process.generate_sample(nsample=500)

# In[156]: Plotar a série temporal simulada
plt.figure(figsize=(10, 6))
plt.plot(serie_ma)
plt.title('Modelo MA(1) X(t)=-0.3erro(t-1) + erro(t)')
plt.xlabel('Tempo')
plt.ylabel('Valores simulados')
plt.grid(True)
plt.show()

In [None]:
# In[157]: Simulação de um modelo ARMA(1,1)
# Definir os coeficientes do modelo ARMA(1,1)
ar = np.array([1, -0.8])  # AR(1) com coeficiente +0.8
ma = np.array([1, -0.3])  # MA(1) com coeficiente -0.3

# Criar o processo ARMA(1,1)
arma_process = ArmaProcess(ar, ma)

# Simular 500 pontos para a série temporal
np.random.seed(42)  # Para reprodutibilidade
serie_arma = arma_process.generate_sample(nsample=500)

# In[158]: Plotar a série temporal simulada
plt.figure(figsize=(10, 6))
plt.plot(serie_arma)
plt.title('Simulação do Modelo ARMA(1,1) com AR=0.8 e MA=-0.3')
plt.xlabel('Tempo')
plt.ylabel('Valores simulados')
plt.grid(True)
plt.show()

In [None]:
# In[159]: Simulando um modelo ARIMA(1,1,1)
# from statsmodels.tsa.arima.model import ARIMA

# Definir o número de pontos para simulação
pontos = 500

# Definir os parâmetros ARIMA (1,1,1)
ar = np.array([1, -0.8])   
ma = np.array([1, -0.3])   
 
# Simular a série temporal ARIMA(1,1,1)
np.random.seed(42)  # Para reprodutibilidade
arma_process = ArmaProcess(ar, ma)
serie_arima = arma_process.generate_sample(nsample=pontos)

# Converter a série estacionária em uma série não estacionária aplicando a integração (d=1)
serie_arima_nao_estacionaria = np.cumsum(serie_arma)  # Diferenciação inversa (integração)

# Converter a série simulada em um DataFrame
serie_arima_nao_estacionaria = pd.Series(serie_arima_nao_estacionaria)

# In[160]: Visualizar a série simulada não estacionária
plt.figure(figsize=(10, 6))
plt.plot(serie_arima_nao_estacionaria)
plt.title("Série Não Estacionária ARIMA(1,1,1)")
plt.grid(True)
plt.show() 

## Analisando as séries autoregressivas

### Testes de Estacionariedade

- Teste de Dickey-Fuller
- H0: A série Não é Estacionária
- H1: A série é Estacionária

In [None]:
# Teste de Dickey-Fuller aumentado (ADF)
def dickey_fuller_test(series, title=''):
    result = adfuller(series)
    print(f'Teste de Dickey-Fuller para {title}')
    print(f'Estatística: {result[0]}')
    print(f'p-valor: {result[1]}')
    print('Critérios:')
    for key, value in result[4].items():
        print(f'{key}: {value}')
    print('Conclusão:', 'Estacionária' if result[1] < 0.01 else 'Não Estacionária')
    print()

In [None]:
# In[162]: Aplicando o teste de Dickey-Fuller
dickey_fuller_test(serie_ar, 'AR(1)')

In [None]:
dickey_fuller_test(serie_ma, 'MA(1)')

In [None]:
dickey_fuller_test(serie_arma, 'ARMA(1,1)')

In [None]:
# ATENÇÃO: vamos rodar para a série ARIMA, tem o I = 1
dickey_fuller_test(serie_arima_nao_estacionaria, 'ARIMA(1,1,1)')

### Estimação de um modelo ARIMA - Escolher, p, q e d
### Funções ACF e PACF

> continuar 2:01

In [None]:
def plot_acf_pacf(series, lags=20, title=''):
    fig, ax = plt.subplots(1, 2, figsize=(12, 5))
    plot_acf(series, lags=lags, ax=ax[0], title=f'ACF {title}')
    plot_pacf(series, lags=lags, ax=ax[1], title=f'PACF {title}', method='ywm')
    plt.show()

In [None]:
# In[164]: Plotando ACF e PACF das séries
plot_acf_pacf(serie_ar, title='AR(1)')

In [None]:
plot_acf_pacf(serie_ma, title='MA(1)')

In [None]:
plot_acf_pacf(serie_arma, title='ARMA(1,1)')

In [None]:
plot_acf_pacf(serie_arima_nao_estacionaria, title='ARIMA(1,1,1)')

### Estimando o modelo ARIMA usando auto-arima

In [None]:
# Estimar automaticamente o modelo ARIMA
# Lembrando: simulamos um AR(1) de coeficiente 0.8
auto_arima_model = auto_arima(serie_ar, trace=True, seasonal=False, stepwise=True)
print(auto_arima_model.summary())

In [None]:
# Lembrando: simulamos um MA(1) de coeficiente -0.3
auto_arima_model_ma = auto_arima(serie_ma, trace=True, seasonal=False, stepwise=True)
print(auto_arima_model_ma.summary())

In [None]:
# Lembrando: simulamos um ARMA(1,1) de coeficiente AR = 0.8 e MA= -0.3
auto_arima_model_arma = auto_arima(serie_arma, trace=True, seasonal=False, stepwise=True)
print(auto_arima_model_arma.summary())

In [None]:
# Lembrando: simulamos um ARIMA(1,1,1) de coeficiente AR = 0.8 e MA= -0.3
auto_arima_model_arima = auto_arima(serie_arima_nao_estacionaria, trace=True, seasonal=False, stepwise=True)

print(auto_arima_model_arima.summary())

### Função para identificar quantas diferenciações são necessárias 

In [None]:
def simular_arima(n, ar=[1, -0.8], ma=[1, -0.3], d=1, noise_std=5):
    """Simula uma série ARIMA com tendência, maior variabilidade e componente não sazonal."""
    
    # Criar a parte ARMA (ARIMA sem diferenciação)
    ar_params = np.r_[1, -np.array(ar[1:])]  # Parâmetro AR ajustado para ARMAProcess
    ma_params = np.r_[1, np.array(ma[1:])]   # Parâmetro MA ajustado para ARMAProcess
    arma_process = ArmaProcess(ar_params, ma_params)
    serie_arma = arma_process.generate_sample(nsample=n)
    
    # Adicionar um componente de tendência (para garantir que a série seja não estacionária)
    tendencia = np.linspace(0, n * 0.05, n)  # Componente de tendência linear
    
    # Adicionar variabilidade adicional
    variabilidade_adicional = np.random.normal(loc=0, scale=noise_std, size=n)  # Variabilidade adicional
    
    # Adicionar a tendência, variabilidade e aplicar a diferenciação (d=1)
    serie_arima = np.cumsum(serie_arma + tendencia + variabilidade_adicional)  # Diferenciação inversa (integração)
    
    return pd.Series(serie_arima)

In [None]:
# In[167]: Simular a série ARIMA(1,1,1) com tendência e variabilidade aumentada
np.random.seed(42)
serie_arima = simular_arima(500, ar=[1, -0.8], ma=[1, -0.3], d=1, noise_std=5)

# In[168]: Visualizar a série simulada ARIMA(1,1,1)
plt.figure(figsize=(10, 6))
plt.plot(serie_arima)
plt.title("Série Simulada ARIMA(1,1,1) com Tendência e Variabilidade Aumentada")
plt.grid(True)
plt.show()

In [None]:
# In[169]: Função para verificar quantas diferenciações são necessárias para tornar a série estacionária
import pmdarima as pm
def verificar_differenciacao(serie, nome):
    # Usar a função ndiffs do pmdarima
    d = pm.arima.ndiffs(serie, test='adf')  # Teste de Dickey-Fuller aumentado
    print(f"A série {nome} precisa de {d} diferenciação(ões) para ser estacionária.")
    return d

# Verificar quantas diferenciações são necessárias
verificar_differenciacao(serie_arima, "ARIMA(1,1,1)")

# Verificar quantas diferenciações são necessárias
verificar_differenciacao(serie_ar, "AR(1)")
verificar_differenciacao(serie_ma, "MA(1)")
verificar_differenciacao(serie_arma, "ARMA(1,1)")

### Simulação de séries AR de ordens maiores

In [None]:
# Função para simular séries AR, MA e ARIMA
def simular_arima(ar=None, ma=None, n=500, d=1, seed=42):
    np.random.seed(seed)
    ar = np.array([1] + [-coef for coef in (ar if ar else [])])  # Definir AR
    ma = np.array([1] + [coef for coef in (ma if ma else [])])  # Definir MA
    process = ArmaProcess(ar, ma)
    return process.generate_sample(nsample=n)

# In[171]: simulando outras series de ordem superior

serie_ar2 = simular_arima(ar=[0.8, 0.1], n=500)

# In[172]: Plotar a série simulada AR(2)
plt.figure(figsize=(10, 6))
plt.plot(serie_ar2)
plt.title('Simulação do Modelo AR(2)')
plt.xlabel('Tempo')
plt.ylabel('Valores simulados')
plt.grid(True)
plt.show()

In [None]:
# In[173]: Simulando agora um AR(3)
serie_ar3 = simular_arima(ar=[0.5, 0.1, 0.3], n=500)

# In[174]: Plotar a série simulada AR(3)
plt.figure(figsize=(10, 6))
plt.plot(serie_ar3)
plt.title('Simulação do Modelo AR(3)')
plt.xlabel('Tempo')
plt.ylabel('Valores simulados')
plt.grid(True)
plt.show()

In [None]:
# In[175]: Teste Dickey-Fuller (ADF) para estacionariedade
def teste_dickey_fuller(serie, nome):
    resultado = adfuller(serie)
    print(f"\nTeste Dickey-Fuller para {nome}:")
    print(f"Estatística: {resultado[0]}")
    print(f"P-valor: {resultado[1]}")
    for key, value in resultado[4].items():
        print(f"{key}: {value}")
    print('Conclusão:', 'Estacionária' if resultado[1] < 0.01 else 'Não Estacionária')

In [None]:
# In[176]: Verificar a estacionariedade das séries simuladas
teste_dickey_fuller(serie_ar2, "AR(2)")

teste_dickey_fuller(serie_ar3, "AR(3)")

# In[177]: Plotando ACF e PACF para as séries simuladas
plot_acf_pacf(serie_ar2, title='AR(2)')

plot_acf_pacf(serie_ar3, title='AR(3)')

### Identificar modelos ARIMA diretamente

In [None]:
# Estimar ARIMA manualmente com ordem (2,0,0) para série 2 e (3,0,0) para série 3
modelo_ar2 = ARIMA(serie_ar2, order=(2, 0, 0)).fit()
print(f'\nModelo ARIMA(2,0,0) ajustado para serie2:\n{modelo_ar2.summary()}')

In [None]:
modelo_ar3 = ARIMA(serie_ar3, order=(3, 0, 0)).fit()
print(f'\nModelo ARIMA(3,0,0) ajustado para serie3:\n{modelo_ar3.summary()}')

In [None]:
# In[179]: Simular séries AR(3) com diferentes coeficientes positivos e negativos
# AR(3) com coeficientes [0.5, -0.1, -0.3]
serie_ar31 = simular_arima(ar=[0.5, -0.1, -0.3], n=500)

# Plot ACF e PACF para serie2 (AR(3) com coeficientes [0.5, 0.1, 0.3])
plot_acf_pacf(serie_ar31, title='AR(3) coef positivos e negativos')

modelo_ar31 = ARIMA(serie_ar31, order=(3, 0, 0)).fit()
print(f'\nModelo ARIMA(3,0,0) ajustado para AR(3) com coef positivos e negativos:\n{modelo_ar31.summary()}')

In [None]:
# In[180]: 4. Simular uma série ARMA(2,2)
serie_arma221 = simular_arima(ar=[0.8, -0.1], ma=[0.4, -0.3], n=500)

# Testar a estacionariedade de série ARMA(2,2)
teste_dickey_fuller(serie_arma221, "ARMA(2,2) coef positivos e negativos")

# Plotar ACF e PACF para série ARMA(2,2)
plot_acf_pacf(serie_arma221, title='ARMA(2,2) coef positivos e negativos')

### Caso encontre ARIMA(0,0,0) - não foi possível encontrar memória
## autoregressiva significativa

- modelos ARIMA com Sazonalidade - SARIMA, possui os parâmetros P, D e Q Sazonais.
- Fica SARIMA(p,d,q)(P,D,Q)

## Buscando a série do Índice de Volume de Vendas de SP

In [None]:
# Pelo pacote python bcb - Baixar dados do Sistema Gerador de Séries Temporais
# do Banco Central

# https://www3.bcb.gov.br/sgspub/localizarseries/localizarSeries.do?method=prepararTelaLocalizarSeries

# Importa as bibliotecas

# pip install python-bcb
 
from bcb import sgs

In [None]:
# Obter os dados da série do Índice de Volume de Vendas de SP do BCB
varejo2 = sgs.get({'volume_vendas': 1475}, start='2000-01-01', end='2022-12-31')
print(varejo2)

In [None]:
# In[182]: Certificar-se de que a série temporal está no formato correto (frequência mensal)
varejo2.index = pd.to_datetime(varejo2.index)
varejo2 = varejo2.asfreq('MS')
print(varejo2)

In [None]:
# In[183]: Plot da série
plt.figure(figsize=(10, 6))
plt.plot(varejo2, label='Volume de Vendas - SP')
plt.title("Índice de Volume de Vendas de SP")
plt.xlabel('Data')
plt.ylabel('Índice')
plt.grid(True)
plt.show()

In [None]:
# In[184]: Divisão da série em treino e teste
varejotreino = varejo2[:'2020-12']
varejoteste = varejo2['2021-01':]

# Checagem do tamanho do conjunto de teste
print(f"Comprimento da série de teste: {len(varejoteste)}")

# In[185]:# Plotando as séries de treino e teste juntas
plt.figure(figsize=(10, 6))
plt.plot(varejo2, label='Varejo SP')
plt.plot(varejotreino, label='Treino')
plt.plot(varejoteste, label='Teste', color='blue')
plt.title("Série Treinada e Testada")
plt.xlabel('Data')
plt.ylabel('Índice')
plt.legend()
plt.grid(True)
plt.show()

### Análise da série

In [None]:
# Gráfico ACF e PACF
fig, axes = plt.subplots(1, 2, figsize=(16,4))
plot_acf(varejotreino, lags=24, ax=axes[0])
plot_pacf(varejotreino, lags=24, ax=axes[1], method='ywm')
plt.show()

# In[187]: Teste de Estacionariedade - ADF (Dickey-Fuller)
result = adfuller(varejotreino.dropna())
print(f'Resultado do Teste ADF: p-valor = {result[1]}')
if result[1] < 0.05:
    print("A série é estacionária.")
else:
    print("A série não é estacionária.")

In [None]:
# In[188]: Verificar quantas diferenciacoes sao necessarias
verificar_differenciacao(varejotreino, "Varejo - Treinamento")

# Diferenciação para estacionariedade
varejotreino_diff = varejotreino.diff().dropna()

# In[189]: Gráficos ACF e PACF da série diferenciada
fig, axes = plt.subplots(1, 2, figsize=(16,4))
plot_acf(varejotreino_diff, lags=24, ax=axes[0])
plot_pacf(varejotreino_diff, lags=24, ax=axes[1], method='ywm')
plt.show()

In [None]:
# In[190]: Ajuste do modelo ARIMA na série diferenciada (autoarima)
arimavarejo = auto_arima(varejotreino_diff,
                         seasonal=True,
                         m=12,  # Periodicidade da sazonalidade
                         trace=True,
                         stepwise=True)

# Exibir o resumo do modelo ajustado
print(arimavarejo.summary())

### Validação e Diagnóstico

In [None]:
# Resíduos do modelo
residuos_arima = arimavarejo.resid()
print(f"Resíduos do modelo: {residuos_arima}")

# In[192]: 1. Teste de Ljung-Box para verificar autocorrelação dos resíduos
ljung_box = sm.stats.acorr_ljungbox(residuos_arima, lags=[10], return_df=True)
print(f'Resultado do teste de Ljung-Box:\n{ljung_box}')
# Se p-value > 0.05, resíduos não são correlacionados

In [None]:
ks_stat, p_value = kstest(residuos_arima, 'norm', args=(np.mean(residuos_arima), np.std(residuos_arima)))
print(f'Teste de Kolmogorov-Smirnov para normalidade: p-valor = {p_value}')
if p_value > 0.01:
    print("Os resíduos seguem uma distribuição normal.")
else:
    print("Os resíduos não seguem uma distribuição normal.")

### Teste ARCH para verificar heterocedasticidade dos resíduos

In [None]:
# pip install arch

from arch import arch_model

In [None]:
am = arch_model(residuos_arima, vol='ARCH', p=1)
test_arch = am.fit(disp='off')
print(test_arch.summary())
#se p-value > 0.05 - nao ha efeitos ARCH

In [None]:
# In[195]: Prever 24 passos à frente na série diferenciada
n_periods = 24
previsoes_diff = arimavarejo.predict(n_periods=n_periods)
print(f"Previsões diferenciadas: {previsoes_diff}")

In [None]:
# In[196]: Índices das previsões (mesmo formato de data da série de treino e teste)
index_of_fc = pd.date_range(varejotreino.index[-1], periods=n_periods+1, freq='MS')[1:]

# In[197]: Para voltar ao nível original:
# Iterar para reverter a diferenciação das previsões
ultimo_valor_original = varejotreino.iloc[-1] # Último valor conhecido da série original (não diferenciada)
previsoes_nivel_original = [ultimo_valor_original]
print(ultimo_valor_original)
print(previsoes_nivel_original)

In [None]:
# In[198]: Somar as previsões diferenciadas ao último valor conhecido da série original
for previsao in previsoes_diff:
    novo_valor = previsoes_nivel_original[-1] + previsao
    previsoes_nivel_original.append(novo_valor)

# In[199]: Remover o primeiro valor, pois é o último valor conhecido da série original
previsoes_nivel_original = previsoes_nivel_original[1:]
print(previsoes_nivel_original)

# In[200]: Converter previsões de volta para uma Série Pandas com o índice correto
previsoes_nivel_original_series = pd.Series(previsoes_nivel_original, index=index_of_fc)
print(previsoes_nivel_original_series)

In [None]:
# In[201]: Plotando as previsões no nível original junto com a série de treino e teste
plt.figure(figsize=(10, 6))
plt.plot(varejotreino, label='Treino')
plt.plot(varejoteste, label='Teste', color='blue')
plt.plot(previsoes_nivel_original_series, label='Previsão ARIMA - Nível Original', color='orange')
plt.legend()
plt.title('Previsão ARIMA para Varejo SP (24 Passos à Frente - Nível Original)')
plt.grid(True)
plt.show()

In [None]:
# In[202]: Garantir que as previsões e os valores reais estejam alinhados para o MAPE
previsoes_series_alinhadas = previsoes_nivel_original_series[:len(varejoteste)].dropna()
varejoteste_alinhada = varejoteste.loc[previsoes_series_alinhadas.index]

In [None]:
# In[203]: Calcular o MAPE
from sklearn.metrics import mean_absolute_percentage_error

In [None]:
mape = mean_absolute_percentage_error(varejoteste_alinhada, previsoes_series_alinhadas)*100
print(f'MAPE: {mape}')

In [None]:

#pip install arch

# In[204]: Ajustar o modelo ETS (Holt-Winters Exponential Smoothing) - para serie varejotreino
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from statsmodels.tsa.stattools import adfuller
from pmdarima import auto_arima
from arch import arch_model
from scipy.stats import kstest
from sklearn.metrics import mean_absolute_percentage_error as mape
import statsmodels.api as sm

In [None]:
ets_model = ExponentialSmoothing(varejotreino, seasonal='add', trend='add', seasonal_periods=12).fit()

# In[205]: Previsões para os próximos 24 passos
ets_forecast = ets_model.forecast(steps=24)
print(f'Previsões ETS: {ets_forecast}')

In [None]:
# In[206]: Plotando os valores reais e as previsões
plt.figure(figsize=(10, 6))
plt.plot(varejotreino, label='Treino')
plt.plot(varejoteste, label='Teste', color='blue')
plt.plot(ets_forecast, label='Previsão ETS', color='orange')
plt.legend()
plt.title('Previsão ETS - 24 Passos à Frente')
plt.grid(True)
plt.show()

In [None]:
# In[207]: Avaliação do desempenho do modelo usando MAPE
mape_ets = mape(varejoteste, ets_forecast[:len(varejoteste)])*100
print(f'MAPE ETS: {mape_ets}')

### Prevendo a Inflação - IPCA - BACEN

In [None]:
# import pandas as pd
# import numpy as np
# import matplotlib.pyplot as plt
# from statsmodels.tsa.arima.model import ARIMA
# from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
#from statsmodels.tsa.stattools import acf, pacf
# pip install pmdarima
# from pmdarima import auto_arima
# from sklearn.metrics import mean_absolute_percentage_error
# from statsmodels.stats.diagnostic import acorr_ljungbox
# from scipy import stats
# pip install arch
# from arch import arch_model

In [None]:
ipca = sgs.get({'ipca': 433}, start='2000-01-01', end='2024-08-31')
print(ipca)

In [None]:
# In[209]: Dividindo em série de treino (sipca) e teste (teste)
sipca = ipca[:'2023-08']
teste = ipca['2023-09':'2024-08']

# In[210]: Plotando as séries de treino e teste juntas
plt.figure(figsize=(10, 6))
plt.plot(ipca, label='IPCA')
plt.plot(sipca, label='IPCA Treino')
plt.plot(teste, label='Teste', color='blue')
plt.title("Série Treinada e Testada")
plt.xlabel('Data')
plt.ylabel('Inflacao')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
ipca['Ano'] = ipca.index.year
ipca['Mês'] = ipca.index.month

# In[211]: Fazer o Gráfico com destaque para valores mensais
plt.figure(figsize=(10, 6))
sns.violinplot(x='Mês', y='ipca', data=ipca, palette='viridis')
plt.xticks(range(12), ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'])
plt.title('Distribuição Mensal do IPCA (2000-2024)')
plt.xlabel('Mês')
plt.ylabel('Valores do IPCA')
plt.show()

In [None]:
# Analisando a série com gráficos de ACF e PACF
plot_acf(sipca)
plot_pacf(sipca, method='ywm')
plt.show()

In [None]:
# In[213]: Estimando o modelo ARIMA(1,0,0)(0,0,1)[12] com sazonalidade
# Certifique-se de que seu índice é um DatetimeIndex com frequência
sipca.index = pd.to_datetime(sipca.index)

# Se necessário, definir explicitamente a frequência, por exemplo, mensal ('MS')
sipca = sipca.asfreq('MS')  # MS é o padrão para início de cada mês

# Ajuste do modelo ARIMA com ordem sazonal
mod = ARIMA(sipca, order=(1, 0, 0), seasonal_order=(0, 0, 1, 12)).fit()

# Exibir o resumo do modelo ajustado
print(mod.summary())

In [None]:
# In[214]: Usando auto_arima para encontrar o melhor modelo
modelo = auto_arima(sipca, seasonal=True, m=12, trace=True)

# Fazendo a previsão do modelo com sazonalidade
pipca = mod.get_forecast(steps=12)
pipca_mean = pipca.predicted_mean

# Fazendo a previsão do modelo auto_arima
psipca = modelo.predict(n_periods=12)

In [None]:
# In[215]: Plotando as previsões
plt.figure(figsize=(10, 6))
plt.plot(sipca, label='Treino')
plt.plot(teste, label='Teste', color='blue')
plt.plot(pd.Series(pipca_mean, index=teste.index), label='Previsão ARIMA Sazonal', color='orange')
plt.plot(pd.Series(psipca, index=teste.index), label='Previsão Auto ARIMA', color='green')
plt.legend()
plt.title('Previsões ARIMA Sazonal e Auto ARIMA')
plt.grid(True)
plt.show()

In [None]:
# In[216]: Fazendo o grafico somente com os valores previstos e reais
plt.figure(figsize=(10, 6))
plt.plot(teste, label='Valores Reais (Teste)', color='blue')
plt.plot(pd.Series(pipca_mean, index=teste.index), label='Previsão ARIMA Sazonal', color='orange')
plt.plot(pd.Series(psipca, index=teste.index), label='Previsão Auto ARIMA', color='green')
plt.legend()
plt.title('Comparação entre Valores Reais e Previsões (ARIMA Sazonal e Auto ARIMA)')
plt.grid(True)
plt.show()

In [None]:
# In[217]: Avaliação da acurácia das previsões
mape_pipca = mean_absolute_percentage_error(teste, pipca_mean)*100
mape_psipca = mean_absolute_percentage_error(teste, psipca)*100
print(f'MAPE Previsão ARIMA Sazonal: {mape_pipca}')
print(f'MAPE Previsão Auto ARIMA: {mape_psipca}')

In [None]:
# In[218]: Verificando os resíduos
residuals = mod.resid

Agora que temos um modelo definido precisamos saber se o modelo capturou toda a estrutura do processo.
Significa que devemos checar se os resíduos do modelo estão limpos quer dizer, devemos ter resíduos não autocorrelacionados e normalmente distribuídos.

### 1. Teste se os resíduos são não autocorrelacionados
- Teste de Ljung-Box
- H0: independência da ST, isto é, resíduos não correlacionados no tempo
- H1: dependência da ST, isto é, resíduos correlacionados, indicando que o modelo não capturou alguma estrutura que indica um erro sistemático
- Teste de Ljung-Box para independência dos resíduos

In [None]:
ljung_box = acorr_ljungbox(residuals, lags=[12], return_df=True)
print(ljung_box)

### 2. Teste de Normalidade dos Resíduos
- Teste de Kolmogorv-Smirnov
- H0: Resíduos com comportamento normal
- H1: Resíduos sem normalidade

In [None]:
# Teste de normalidade dos resíduos (Kolmogorov-Smirnov)
ks_stat, ks_p_value = stats.kstest(residuals, 'norm', args=(np.mean(residuals), np.std(residuals)))
print(f'Teste de Kolmogorov-Smirnov: p-value = {ks_p_value}')

### Testar a estacionariedade da variância
- testar se existe efeitos ARCH
- H0: Não Existe Efeitos ARCH
- H1: Existe Efeitos ARCH

In [None]:
# Teste de efeitos ARCH nos resíduos
arch_test = arch_model(residuals,rescale=False).fit()
print(arch_test.summary())

## Comparando agora as previsoes para a serie de energia com todos os modelos

In [None]:
from sklearn.metrics import mean_absolute_percentage_error as mape
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from statsmodels.tsa.api import SimpleExpSmoothing
from statsmodels.tools.sm_exceptions import ConvergenceWarning
from statsmodels.stats.diagnostic import acorr_ljungbox
from scipy.stats import shapiro
from pmdarima import auto_arima
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings

# Capturar avisos de convergência e tratá-los
warnings.filterwarnings("error", category=ConvergenceWarning)

In [None]:
# Carregar os dados de energia e garantir que não há valores ausentes
energia = pd.read_excel("energia.xlsx", usecols=[1]).dropna()

# In[223]: Criar a série temporal a partir de 1979 com frequência mensal
energia.index = pd.date_range(start='1979-01', periods=len(energia), freq='M')
energia = energia.squeeze()  # Converter para uma Series

# In[224]: Separar a base de dados em treino e teste
benergia = energia[:'2022-06'].ffill()  # Preencher valores nulos com forward fill
reaisenergia = energia['2022-07':'2024-06']  # Teste de 2022-07 até 2024-06

# In[225]: Converter explicitamente para tipo numérico e garantir que são floats
benergia = pd.to_numeric(benergia, errors='coerce').astype(float)

# In[226]: Lista para armazenar os modelos, MAPE e previsões
modelos_energia = []
mapes_energia = []
previsoes_energia = {}

In [None]:
# Modelo Naive
naive_forecast = pd.Series([benergia.iloc[-1]] * len(reaisenergia), index=reaisenergia.index)
mape_naive = mape(reaisenergia, naive_forecast) * 100
modelos_energia.append("Naive")
mapes_energia.append(mape_naive)
previsoes_energia["Naive"] = naive_forecast

In [None]:
# Modelo Mean (média)
mean_forecast = pd.Series(benergia.mean(), index=reaisenergia.index)
mape_mean = mape(reaisenergia, mean_forecast) * 100
modelos_energia.append("Mean")
mapes_energia.append(mape_mean)
previsoes_energia["Mean"] = mean_forecast

In [None]:
# Modelo Drift
n = len(benergia)
drift_slope = (benergia.iloc[-1] - benergia.iloc[0]) / (n - 1)
drift_forecast = benergia.iloc[-1] + drift_slope * np.arange(1, len(reaisenergia) + 1)
drift_forecast = pd.Series(drift_forecast, index=reaisenergia.index)
mape_drift_result = mape(reaisenergia, drift_forecast) * 100
modelos_energia.append("Drift")
mapes_energia.append(mape_drift_result)
previsoes_energia["Drift"] = drift_forecast

In [None]:
# Modelo Naive Sazonal
naive_sazonal_forecast = pd.Series([benergia.iloc[-12 + (i % 12)]
                                    for i in range(len(reaisenergia))],
                                   index=reaisenergia.index)
mape_naive_sazonal = mape(reaisenergia, naive_sazonal_forecast) * 100
modelos_energia.append("Naive Sazonal")
mapes_energia.append(mape_naive_sazonal)
previsoes_energia["Naive Sazonal"] = naive_sazonal_forecast

In [None]:
# Suavização Exponencial Simples (SES)
ses_model = SimpleExpSmoothing(benergia).fit(optimized=True)
ses_forecast = ses_model.forecast(steps=len(reaisenergia))
mape_ses = mape(reaisenergia, ses_forecast) * 100
modelos_energia.append("SES")
mapes_energia.append(mape_ses)
previsoes_energia["SES"] = ses_forecast

In [None]:
# Holt-Winters Aditivo - Ajustar inicialização e Box-Cox para melhorar a convergência
try:
    hw_add_model = ExponentialSmoothing(
        benergia,
        seasonal_periods=12,
        trend='add',
        seasonal='add',
        initialization_method="estimated",  # Método robusto de inicialização
        use_boxcox=True  # Tentar estabilizar a variância com Box-Cox
    ).fit(optimized=True)

    hw_add_forecast = hw_add_model.forecast(steps=len(reaisenergia))
    mape_hw_add = mape(reaisenergia, hw_add_forecast) * 100
    modelos_energia.append("Holt-Winters Aditivo")
    mapes_energia.append(mape_hw_add)
    previsoes_energia["Holt-Winters Aditivo"] = hw_add_forecast
except Exception:
    modelos_energia.append("Holt-Winters Aditivo")
    mapes_energia.append(np.nan)

In [None]:
# Holt-Winters Multiplicativo - Ajustar inicialização e Box-Cox para melhorar a convergência
try:
    hw_mult_model = ExponentialSmoothing(
        benergia,
        seasonal_periods=12,
        trend='add',
        seasonal='mul',
        initialization_method="estimated",  # Método robusto de inicialização
        use_boxcox=True  # Tentar estabilizar a variância com Box-Cox
    ).fit(optimized=True)

    hw_mult_forecast = hw_mult_model.forecast(steps=len(reaisenergia))
    mape_hw_mult = mape(reaisenergia, hw_mult_forecast) * 100
    modelos_energia.append("Holt-Winters Multiplicativo")
    mapes_energia.append(mape_hw_mult)
    previsoes_energia["Holt-Winters Multiplicativo"] = hw_mult_forecast
except Exception:
    modelos_energia.append("Holt-Winters Multiplicativo")
    mapes_energia.append(np.nan)

In [None]:
# Modelo ARIMA/SARIMA - Identificação automática
try:
    arima_model = auto_arima(benergia, seasonal=True, m=12, stepwise=True, trace=False, suppress_warnings=True)

    # Exibir o melhor modelo ARIMA/SARIMA encontrado
    print(f"Melhor modelo ARIMA/SARIMA identificado: {arima_model}")

    arima_forecast = pd.Series(arima_model.predict(n_periods=len(reaisenergia)), index=reaisenergia.index)
    mape_arima = mape(reaisenergia, arima_forecast) * 100
    modelos_energia.append("ARIMA/SARIMA")
    mapes_energia.append(mape_arima)
    previsoes_energia["ARIMA/SARIMA"] = arima_forecast
except Exception as e:
    print(f"Erro no modelo ARIMA/SARIMA: {e}")
    modelos_energia.append("ARIMA/SARIMA")
    mapes_energia.append(np.nan)

### Comparação dos modelos com base no MAPE

In [None]:
mape_comparison = pd.DataFrame({'Modelo': modelos_energia, 'MAPE': mapes_energia})
mape_comparison = mape_comparison.sort_values(by='MAPE', ascending=True).reset_index(drop=True)
print(mape_comparison)

# In[236]: Gráfico dos MAPE dos modelos
plt.figure(figsize=(10, 6))
plt.barh(mape_comparison['Modelo'], mape_comparison['MAPE'], color='skyblue')
plt.xlabel("MAPE")
plt.title("MAPE Comparação de Modelos")
plt.grid(True)
plt.show()

In [None]:
# In[237]: Selecionar o modelo com o menor MAPE
melhor_modelo = mape_comparison.loc[0, 'Modelo']
melhores_previsoes = previsoes_energia[melhor_modelo]

# In[238]: Criar gráfico comparando os valores reais e previstos do melhor modelo
plt.figure(figsize=(10, 6))
plt.plot(reaisenergia.index, reaisenergia, label='Valores Reais', color='blue')
plt.plot(reaisenergia.index, melhores_previsoes, label=f'Previsão - {melhor_modelo}', color='red')
plt.title(f'Valores Reais vs Previsão ({melhor_modelo})')
plt.xlabel('Data')
plt.ylabel('Valores')
plt.legend()
plt.grid(True)
plt.show()

### Teste de normalidade e Ljung-Box para os resíduos do melhor modelo

In [None]:
residuos = reaisenergia - melhores_previsoes

# In[240]: Teste de normalidade Shapiro-Wilk
stat, p_value_shapiro = shapiro(residuos)
print(f"Teste de Normalidade Shapiro-Wilk: Estatística={stat:.4f}, p-valor={p_value_shapiro:.4f}")
if p_value_shapiro > 0.01:
    print("Os resíduos parecem seguir uma distribuição normal (não rejeitamos H0).")
else:
    print("Os resíduos não seguem uma distribuição normal (rejeitamos H0).")

# In[241]: Teste de Ljung-Box para autocorrelação dos resíduos
lb_test = acorr_ljungbox(residuos, lags=[10], return_df=True)
print(f"Teste Ljung-Box:\n{lb_test}")

# In[242]: Interpretação do teste de Ljung-Box
p_value_ljungbox = lb_test['lb_pvalue'].values[0]
if p_value_ljungbox > 0.01:
    print("Não há evidências de autocorrelação significativa nos resíduos (não rejeitamos H0).")
else:
    print("Há evidências de autocorrelação nos resíduos (rejeitamos H0).")