# Análise de Séries Temporais para Previsão

Este notebook apresenta uma análise detalhada de um sistema de previsão de séries temporais, implementando técnicas avançadas de machine learning e validação cruzada específica para dados temporais.

## Setup Inicial

Primeiro, vamos configurar o ambiente e importar as bibliotecas necessárias:

In [None]:
# Configuração para exibir gráficos no notebook
%matplotlib inline
import matplotlib.pyplot as plt
plt.style.use('seaborn')

# Importações básicas
import pandas as pd
import numpy as np
from typing import Any, Tuple, List, Dict, Callable
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.seasonal import STL
from scipy.stats import kruskal, linregress
from sktime.transformations.series.difference import Differencer
from sktime.transformations.series.boxcox import LogTransformer
from sktime.transformations.compose import TransformerPipeline
from sktime.transformations.base import BaseTransformer
from sklearn.base import BaseEstimator
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import Ridge, Lasso, ElasticNet

## 1. Implementação das Classes e Funções Base

### 1.1 Transformadores Base

Primeiro, implementamos os transformadores base que serão usados para processar nossas séries temporais:

In [None]:
class PassThroughTransformer(BaseTransformer):
    """Transformer que não realiza nenhuma transformação, apenas retorna os dados como estão."""
    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        return X
        
    def inverse_transform(self, X):
        return X

class TotalGrowthTransformer(BaseTransformer):
    """Transformer que calcula o crescimento total entre t-h e t.
    Para cada ponto t, calcula (X_t/X_{t-h} - 1)."""
    def __init__(self, horizon=3):
        super().__init__()
        self.horizon = horizon
        self._X = None
        
    def fit(self, X, y=None):
        if isinstance(X, pd.Series):
            X = X.to_frame()
        self._X = X.copy()
        self._is_fitted = True
        return self
        
    def transform(self, X, y=None):
        self.check_is_fitted()
        single_series = isinstance(X, pd.Series)
        if single_series:
            X = X.to_frame()
            
        self._X = X.copy()
        X_growth = pd.DataFrame()
        
        for col in X.columns:
            current_values = X[col]
            past_values = X[col].shift(self.horizon)
            total_growth = (current_values / past_values) - 1
            X_growth[col] = total_growth
            
        if single_series:
            return X_growth.iloc[:, 0]
        return X_growth
        
    def inverse_transform(self, X):
        self.check_is_fitted()
        single_series = isinstance(X, pd.Series)
        if single_series:
            X = X.to_frame()
            
        X_original = pd.DataFrame(index=X.index, columns=X.columns)
        
        for col in X.columns:
            base_values = self._X[col].shift(self.horizon)
            X_original[col] = base_values * (1 + X[col])
            
        if single_series:
            return X_original.iloc[:, 0]
        return X_original

### 1.2 Funções de Transformação

Implementamos várias funções auxiliares para criar diferentes tipos de transformadores:

In [None]:
def delta_seasonal_transformer(seasonality=12):
    return Differencer(lags=seasonality)
    
def delta_seasonal_delta_transformer(seasonality=12):
    return Differencer(lags=[1,seasonality])
    
def ln_transformer():
    return LogTransformer()
    
def delta_ln_transformer():
    return TransformerPipeline(steps=[
        ("log", LogTransformer()),
        ("diff", Differencer(lags=1))
    ])

def total_growth_ln_transformer(horizon=3):
    return TotalGrowthTransformer(horizon=horizon)

def delta_delta_ln_transformer():
    return TransformerPipeline(steps=[
        ("log", LogTransformer()),
        ("diff", Differencer(lags=1)),
        ("diff2", Differencer(lags=1))
    ])

def delta_transformer():
    return Differencer(lags=1)

def delta_delta_transformer():
    return Differencer(lags=[1,1])

def passthrough_transformer():
    return PassThroughTransformer()

### 1.3 Funções de Verificação

Implementamos funções para verificar características importantes das séries temporais:

In [None]:
def check_stationarity(series):
    """Verifica estacionariedade usando o teste ADF."""
    adf_pvalue = adfuller(series.dropna(), autolag='AIC')[1]
    return adf_pvalue < 0.05 

def check_seasonality(series):
    """Verifica a presença de sazonalidade anual."""
    ts = series.dropna()
    ts.reset_index(drop=True, inplace=True)
    
    stl = STL(ts, period=12, robust=True)
    result = stl.fit()
    seasonal = result.seasonal
    
    groups = [seasonal[i::12] for i in range(12)]
    stat, p_value = kruskal(*groups)
    kruskal_result = p_value < 0.05
    
    total_var = np.var(series)
    seasonal_var = np.var(seasonal)
    seasonal_strength = seasonal_var / total_var
    seasonal_strength_result = seasonal_strength > 0.25
    
    return kruskal_result and seasonal_strength_result

def check_proportional_variance(series):
    """Verifica se a variância é proporcional à magnitude."""
    ts = series.dropna()
    abs_diff = abs(ts.diff()).dropna()
    slope, _, r_value, p_value, _ = linregress(ts[1:], abs_diff)
    return p_value < 0.05

### 1.4 Seleção de Transformador

Implementamos a lógica para selecionar automaticamente o melhor transformador para cada série:

In [None]:
def transform_and_check(series, transformer):
    transformed_series = transformer.fit_transform(series)
    return check_stationarity(transformed_series)

def select_transformer(series, seasonality=12):
    """Seleciona o transformador mais apropriado para a série."""
    seasonal_transformers = {
        'ΔSazonal': delta_seasonal_transformer(seasonality),
        'ΔSazonalΔ': delta_seasonal_delta_transformer(seasonality),
    }

    log_transformers = {
        'ln(x)': ln_transformer(),
        'Δln(x)': delta_ln_transformer(),
        'ΔΔln(x)': delta_delta_ln_transformer(),
    }

    general_transformers = {
        'Δx': delta_transformer(),
        'ΔΔx': delta_delta_transformer(),
    }

    if check_stationarity(series):
        return passthrough_transformer()

    if check_seasonality(series):
        for transformer in seasonal_transformers.values():
            if transform_and_check(series, transformer):
                return transformer
    elif check_proportional_variance(series):
        for transformer in log_transformers.values():
            if transform_and_check(series, transformer):
                return transformer
    else:
        for transformer in general_transformers.values():
            if transform_and_check(series, transformer):
                return transformer
    
    return passthrough_transformer()

class StationarityTransformer(BaseTransformer):
    """Transformer que automaticamente seleciona e aplica a transformação mais apropriada para cada coluna."""
    def __init__(self):
        super().__init__()
        self.columns_transformers_ = {}
    
    def fit(self, X, y=None):
        for col in X.columns:
            transformer = select_transformer(X[col])
            transformer.fit(X[[col]])
            self.columns_transformers_[col] = transformer
        self._is_fitted = True
        return self
    
    def transform(self, X, y=None):
        self.check_is_fitted()
        transformed_columns = []
        for col in X.columns:
            transformer = self.columns_transformers_[col]
            transformed_col = transformer.transform(X[[col]])
            transformed_columns.append(pd.DataFrame(transformed_col, index=X.index, columns=[col]))
        return pd.concat(transformed_columns, axis=1)
        
    def inverse_transform(self, X):
        self.check_is_fitted()
        inverse_transformed_columns = []
        for col in X.columns:
            transformer = self.columns_transformers_[col]
            inverse_transformed_col = transformer.inverse_transform(X[[col]])
            inverse_transformed_columns.append(pd.DataFrame(inverse_transformed_col, index=X.index, columns=[col]))
        return pd.concat(inverse_transformed_columns, axis=1)

### 1.5 Funções de Processamento de Dados

Implementamos as funções principais para processamento dos dados:

In [None]:
class TargetTransformer:
    """Classe que encapsula a transformação do target e seus dados."""
    def __init__(self, y, horizon=3):
        self.original_data = y
        self.transformer = total_growth_ln_transformer(horizon=horizon)
        self.transformed_data = None
        self._fit_transform()
    
    def _fit_transform(self):
        y_df = pd.DataFrame(self.original_data)
        y_df.columns = ['target']
        self.transformed_data = self.transformer.fit_transform(y_df)['target']
    
    def align_with_index(self, index):
        self.original_data = self.original_data[index]
        self.transformed_data = self.transformed_data[index]
        
    def get_data(self):
        return self.original_data, self.transformed_data

def create_lagged_features(X, lags=range(12)):
    """Cria features defasadas para cada coluna do DataFrame."""
    all_series = {}
    
    for column in X.columns:
        all_series[f"{column}_lag0"] = X[column]
        
        for lag in lags:
            if lag > 0:
                all_series[f"{column}_lag{lag}"] = X[column].shift(lag)
    
    return pd.DataFrame(all_series, index=X.index)

def load_and_preprocess_data():
    """Carrega e pré-processa o dataset inicial."""
    data = pd.read_csv('all_data.csv', parse_dates=True)
    data.set_index(data.columns[0], inplace=True)
    data = data.dropna()
    data.index = pd.DatetimeIndex(data.index.values, freq='MS')
    
    y = data['target'].copy()
    X = data.copy()
    
    return X, y

def transform_features(X, y, horizon=3):
    """Transforma features para torná-las estacionárias e escaladas."""
    stationarity_transformer = StationarityTransformer()
    X_stationary = stationarity_transformer.fit_transform(X)
    
    target_transformer = TargetTransformer(y, horizon=horizon)
    
    scaler = MinMaxScaler()
    X_stationary_scaled = pd.DataFrame(
        scaler.fit_transform(X_stationary),
        index=X_stationary.index,
        columns=X_stationary.columns
    )
    
    X_with_lags = create_lagged_features(X_stationary_scaled)
    X_with_lags = X_with_lags.dropna()
    
    target_transformer.align_with_index(X_with_lags.index)
    
    return X_with_lags, target_transformer

### 1.6 Funções de Modelagem

Implementamos as funções para treinar e fazer previsões com os modelos:

In [None]:
def train_model_with_lagged_features(
    X: pd.DataFrame,
    y: pd.Series,
    train_indices: List[int],
    fh: int,
    estimator: Any
) -> Tuple[Any, MinMaxScaler, MinMaxScaler]:
    """Treina um modelo usando features defasadas."""
    X_train_list = []
    y_train_list = []
    
    for idx in train_indices:
        if idx >= fh:
            X_train_list.append(X.iloc[idx-fh].values)
            y_train_list.append(y.iloc[idx])
    
    X_train = np.vstack(X_train_list)
    y_train = np.array(y_train_list).reshape(-1, 1)
    
    X_scaler = MinMaxScaler()
    y_scaler = MinMaxScaler()
    
    X_train_scaled = X_scaler.fit_transform(X_train)
    y_train_scaled = y_scaler.fit_transform(y_train).ravel()
    
    model = estimator
    model.fit(X_train_scaled, y_train_scaled)
    
    return model, X_scaler, y_scaler

def predict_next_steps(
    X: pd.DataFrame,
    y: pd.Series,
    validation_indices: List[int],
    model: Any,
    X_scaler: MinMaxScaler,
    y_scaler: MinMaxScaler,
    fh: int = 3,
    metric: Callable = mean_squared_error
) -> Tuple[Dict[int, float], float]:
    """Faz previsões do crescimento total para os próximos fh passos."""
    predictions = {}
    y_true = []
    y_pred = []
    squared_errors = []
    
    for idx in validation_indices:
        if idx >= fh:
            current_features = X.iloc[idx-fh].values.reshape(1, -1)
            current_features_scaled = X_scaler.transform(current_features)
            pred_scaled = model.predict(current_features_scaled)[0]
            pred = y_scaler.inverse_transform([[pred_scaled]])[0][0]
            
            predictions[idx] = pred
            
            if not np.isnan(y.iloc[idx]):
                true_value = y.iloc[idx]
                y_true.append(true_value)
                y_pred.append(pred)
                squared_errors.append((true_value - pred) ** 2)
    
    mse = np.mean(squared_errors) if len(y_true) > 0 else float('inf')
    
    return predictions, mse

## 2. Validação Cruzada

Implementamos um sistema robusto de validação cruzada específico para séries temporais:

In [None]:
def calculate_error_statistics(errors: List[float]) -> Dict[str, float]:
    """Calcula estatísticas detalhadas dos erros."""
    errors = np.array(errors)
    return {
        'min': np.min(errors),
        'max': np.max(errors),
        'mean': np.mean(errors),
        'median': np.median(errors),
        'std': np.std(errors),
        'q1': np.percentile(errors, 25),
        'q3': np.percentile(errors, 75),
        'p90': np.percentile(errors, 90)
    }

class CustomSlidingWindowSplitter:
    """Divisor de janela deslizante personalizado para validação cruzada temporal."""
    def __init__(self, training_window: int, validation_window: int, fh: List[int], 
                 lag: int, differencer_lag: int, step: int):
        self.training_window = training_window
        self.validation_window = validation_window
        self.fh = fh
        self.lag = lag
        self.differencer_lag = differencer_lag
        self.step = step

    def split(self, y):
        """Divide os dados em conjuntos de treinamento e validação."""
        n_samples = len(y)
        start = max(self.lag, self.differencer_lag)
        end = n_samples - self.validation_window - max(self.fh)

        for train_start in range(start, end, self.step):
            train_end = train_start + self.training_window
            val_end = train_end + self.validation_window

            if val_end > n_samples:
                break

            train_indices = list(range(train_start, train_end))
            val_indices = list(range(train_end, val_end))

            yield train_indices, val_indices

## 3. Exemplo de Uso

Aqui está um exemplo de como usar o sistema implementado:

In [None]:
# Carregar e preparar os dados
X, y = load_and_preprocess_data()

# Transformar as features
X_with_lags, target_transformer = transform_features(X, y, horizon=3)

# Configurar a validação cruzada
cv = CustomSlidingWindowSplitter(
    training_window=120,  # 10 anos de dados mensais
    validation_window=24,  # 2 anos de dados mensais
    fh=[3],  # Previsão 3 meses à frente
    lag=12,  # 1 ano de defasagem
    differencer_lag=12,
    step=5
)

# Definir modelos para teste
models = [
    ('Ridge', Ridge(alpha=1.0)),
    ('Lasso', Lasso(alpha=0.1)),
    ('ElasticNet', ElasticNet(alpha=0.1, l1_ratio=0.5)),
    ('RandomForest', RandomForestRegressor(n_estimators=100, max_depth=5)),
    ('GradientBoosting', GradientBoostingRegressor(n_estimators=100, learning_rate=0.1))
]

# Avaliar cada modelo
for name, model in models:
    print(f"\nAvaliando {name}...")
    fold_errors, test_stats, fold_y_preds = run_cross_validation(
        X_with_lags=X_with_lags,
        target_transformer=target_transformer,
        cv=cv,
        estimator=model,
        target_column='target',
        fh=3
    )