In [11]:
# xLSTM para previsão de tráfego com dataset PEMS03

import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import sklearn

In [12]:
# xLSTM aplicado ao dataset PEMS03

# Este script usa o repositório [NX-AI/xlstm](https://github.com/NX-AI/xlstm) como base para aplicar um modelo xLSTM na tarefa de previsão de tráfego com o dataset PEMS03.

## 🧩 Pré-requisitos
%pip install xlstm mlstm_kernels  # Install specialized LSTM packages


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [13]:
## 📥 1. Carregar e preparar o dataset PEMS03

import numpy as np
from sklearn.preprocessing import StandardScaler
import torch
from torch.utils.data import Dataset, DataLoader

path = 'PEMS03.npz'
data = np.load(path)['data']  # (num_samples, num_nodes, num_features)
data = data[:, :, 0]  # Seleciona só volume (shape: num_samples x num_nodes)

scaler = StandardScaler()
data = scaler.fit_transform(data)

def create_sequences(data, input_len, output_len):
    X, Y = [], []
    for i in range(len(data) - input_len - output_len):
        x = data[i:i+input_len]
        y = data[i+input_len:i+input_len+output_len]
        X.append(x)
        Y.append(y)
    return np.stack(X), np.stack(Y)

input_len = 12
output_len = 3
X, Y = create_sequences(data, input_len, output_len)  # shape: (N, input_len, num_nodes), (N, output_len, num_nodes)

train_end = int(len(X) * 0.6)
val_end = int(len(X) * 0.8)

X_train, Y_train = X[:train_end], Y[:train_end]
X_val, Y_val = X[train_end:val_end], Y[train_end:val_end]
X_test, Y_test = X[val_end:], Y[val_end:]

class TrafficDataset(Dataset):
    def __init__(self, X, Y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.Y = torch.tensor(Y, dtype=torch.float32)
    def __len__(self): return len(self.X)
    def __getitem__(self, idx): return self.X[idx], self.Y[idx]

batch_size = 64
train_loader = DataLoader(TrafficDataset(X_train, Y_train), batch_size=batch_size, shuffle=True)
val_loader = DataLoader(TrafficDataset(X_val, Y_val), batch_size=batch_size)
test_loader = DataLoader(TrafficDataset(X_test, Y_test), batch_size=batch_size)

In [None]:
## 🧠 2. Construir o modelo xLSTM

import torch.nn as nn
from xLSTM import xLSTMBlockStack, xLSTMBlockStackConfig, sLSTMBlockConfig, mLSTMBlockConfig, sLSTMLayerConfig, mLSTMLayerConfig, FeedForwardConfig

num_nodes = X.shape[2]
print(num_nodes)
hidden_dim = 128

cfg = xLSTMBlockStackConfig(
    mlstm_block=mLSTMBlockConfig(
        mlstm=mLSTMLayerConfig(conv1d_kernel_size=3, num_heads=4)
    ),
    slstm_block=sLSTMBlockConfig(
        slstm=sLSTMLayerConfig(backend="cuda", num_heads=2, conv1d_kernel_size=3),
        feedforward=FeedForwardConfig(proj_factor=1.0, act_fn="gelu")
    ),
    context_length=input_len,
    num_blocks=3,
    embedding_dim=hidden_dim,
    slstm_at=[1]
)

class xLSTMForecast(nn.Module):
    def __init__(self):
        super().__init__()
        self.input_proj = nn.Linear(num_nodes, hidden_dim)
        self.xlstm = xLSTMBlockStack(cfg)
        self.output_proj = nn.Linear(hidden_dim, num_nodes * output_len)

    def forward(self, x):
        # x: (B, T, N)
        x = self.input_proj(x)  # -> (B, T, H)
        x = self.xlstm(x)       # -> (B, T, H)
        x = x[:, -1, :]         # pega último passo
        x = self.output_proj(x) # -> (B, N * output_len)
        return x.view(-1, output_len, num_nodes)  # -> (B, output_len, N)

model = xLSTMForecast()


358


KeyError: 'CUDA_LIB'

In [None]:

## 🏋️ 3. Treinamento

import torch.optim as optim
import matplotlib.pyplot as plt

epochs = 50
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

def train_epoch():
    model.train()
    total_loss = 0
    for X_batch, Y_batch in train_loader:
        X_batch, Y_batch = X_batch.cuda(), Y_batch.cuda()
        optimizer.zero_grad()
        pred = model(X_batch)
        loss = criterion(pred, Y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(train_loader)

def val_epoch():
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for X_batch, Y_batch in val_loader:
            X_batch, Y_batch = X_batch.cuda(), Y_batch.cuda()
            pred = model(X_batch)
            loss = criterion(pred, Y_batch)
            total_loss += loss.item()
    return total_loss / len(val_loader)

train_losses, val_losses = [], []

for epoch in range(epochs):
    train_loss = train_epoch()
    val_loss = val_epoch()
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    print(f"Epoch {epoch+1}/{epochs} - Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

plt.plot(train_losses, label='Train')
plt.plot(val_losses, label='Validation')
plt.legend()
plt.title("Loss por época")
plt.xlabel("Épocas")
plt.ylabel("MSE")
plt.show()


In [None]:

## 🧪 4. Avaliação Final
from sklearn.metrics import mean_absolute_error, mean_squared_error
import numpy as np

def evaluate(loader):
    model.eval()
    preds, trues = [], []
    with torch.no_grad():
        for X_batch, Y_batch in loader:
            X_batch = X_batch.cuda()
            pred = model(X_batch).cpu().numpy()
            preds.append(pred)
            trues.append(Y_batch.numpy())
    preds = np.concatenate(preds)
    trues = np.concatenate(trues)
    mae = mean_absolute_error(trues.flatten(), preds.flatten())
    rmse = mean_squared_error(trues.flatten(), preds.flatten())
    return mae, rmse

mae, rmse = evaluate(test_loader)
print(f"Test MAE: {mae:.4f}, RMSE: {rmse:.4f}")