In [3]:
# ==========================================================
# BULLETPROOF TRANSFORMER TIME SERIES FORECASTING
# ==========================================================

import numpy as np
import math
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
import statsmodels.api as sm
import warnings
warnings.filterwarnings("ignore")

torch.manual_seed(42)
np.random.seed(42)

# -----------------------------
# 1. Load Dataset
# -----------------------------
data = sm.datasets.get_rdataset("AirPassengers").data
series = data['value'].values.astype(float)

# -----------------------------
# 2. Parameters (SAFE for small dataset)
# -----------------------------
SEQ_LEN = 12
FORECAST_HORIZON = 6

# -----------------------------
# 3. Train-Test Split (Guaranteed Valid)
# -----------------------------
split_index = int(len(series) * 0.8)

train_series = series[:split_index]
test_series = series[split_index-SEQ_LEN:]   # overlap included

# -----------------------------
# 4. Scaling
# -----------------------------
scaler = StandardScaler()
train_scaled = scaler.fit_transform(train_series.reshape(-1,1))
test_scaled = scaler.transform(test_series.reshape(-1,1))

# -----------------------------
# 5. Create Sequences
# -----------------------------
def create_sequences(data, seq_length, horizon):
    X, y = [], []
    for i in range(len(data) - seq_length - horizon + 1):
        X.append(data[i:i+seq_length])
        y.append(data[i+seq_length:i+seq_length+horizon])
    return np.array(X), np.array(y)

X_train, y_train = create_sequences(train_scaled, SEQ_LEN, FORECAST_HORIZON)
X_test, y_test = create_sequences(test_scaled, SEQ_LEN, FORECAST_HORIZON)

# Safety check
if len(X_test) == 0:
    raise ValueError("Test set too small. Reduce SEQ_LEN or FORECAST_HORIZON.")

X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train.squeeze(), dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test.squeeze(), dtype=torch.float32)

print("Train shape:", X_train.shape)
print("Test shape :", X_test.shape)

# -----------------------------
# 6. Positional Encoding
# -----------------------------
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=500):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) *
                             (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer("pe", pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

# -----------------------------
# 7. Transformer Model
# -----------------------------
class TimeSeriesTransformer(nn.Module):
    def __init__(self):
        super().__init__()
        d_model = 64
        self.input_proj = nn.Linear(1, d_model)
        self.pos_encoder = PositionalEncoding(d_model)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=4,
            dropout=0.1,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=2)
        self.fc_out = nn.Linear(d_model, FORECAST_HORIZON)

    def forward(self, x):
        x = self.input_proj(x)
        x = self.pos_encoder(x)
        x = self.transformer(x)
        x = x[:, -1, :]
        return self.fc_out(x)

model = TimeSeriesTransformer()

# -----------------------------
# 8. Training
# -----------------------------
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

EPOCHS = 120

for epoch in range(EPOCHS):
    model.train()
    optimizer.zero_grad()
    output = model(X_train)
    loss = criterion(output, y_train)
    loss.backward()
    optimizer.step()

# -----------------------------
# 9. Evaluation
# -----------------------------
model.eval()
with torch.no_grad():
    predictions = model(X_test).cpu().numpy()

# Inverse scaling
pred_inv = scaler.inverse_transform(predictions.reshape(-1,1)).reshape(predictions.shape)
y_test_inv = scaler.inverse_transform(y_test.numpy().reshape(-1,1)).reshape(y_test.shape)

rmse = math.sqrt(mean_squared_error(y_test_inv.flatten(), pred_inv.flatten()))
mae = mean_absolute_error(y_test_inv.flatten(), pred_inv.flatten())

mase_denom = np.mean(np.abs(train_series[1:] - train_series[:-1]))
mase = mae / mase_denom

print("\n===== Transformer Performance =====")
print("RMSE :", round(rmse,4))
print("MAE  :", round(mae,4))
print("MASE :", round(mase,4))

# -----------------------------
# 10. SARIMA Baseline
# -----------------------------
sarima = sm.tsa.SARIMAX(train_series,
                        order=(1,1,1),
                        seasonal_order=(1,1,1,12))
sarima_fit = sarima.fit(disp=False)
sarima_forecast = sarima_fit.forecast(steps=len(series)-split_index)

rmse_s = math.sqrt(mean_squared_error(series[split_index:], sarima_forecast))
mae_s = mean_absolute_error(series[split_index:], sarima_forecast)
mase_s = mae_s / mase_denom

print("\n===== SARIMA Performance =====")
print("RMSE :", round(rmse_s,4))
print("MAE  :", round(mae_s,4))
print("MASE :", round(mase_s,4))


Train shape: torch.Size([98, 12, 1])
Test shape : torch.Size([24, 12, 1])

===== Transformer Performance =====
RMSE : 78.3183
MAE  : 61.2243
MASE : 2.9069

===== SARIMA Performance =====
RMSE : 30.142
MAE  : 23.5557
MASE : 1.1184
