In [1]:
"""
Two Seq2Seq Experiments on RSU Time-Series Parquet
-------------------------------------------------
Reads:  data/processed/rsu_net_timeseries.parquet

Task: multi-horizon RSU load forecasting (Tout steps ahead) from Tin history.

Exp-1 (Minimal): ONLY time + RSU aggregated load history
    - Inputs: rsu_id embedding + time features + past rsu_load_kbps
    - No other network/comm features.

Exp-2 (Full): time + RSU load history + ALL aggregated network/comm features
    - Inputs: rsu_id embedding + time features + past rsu_load_kbps + feature vector.

Training:
    - 400 epochs (no early stopping, but best checkpoint is saved)
    - Metrics per epoch: MAE, RMSE, R2 (averaged over horizons)
    - Plots saved for each experiment

Outputs:
    results/seq2seq_minimal_history.csv
    results/seq2seq_full_history.csv
    results/seq2seq_minimal_curves.png
    results/seq2seq_full_curves.png
    results/seq2seq_minimal_best.pt
    results/seq2seq_full_best.pt
"""

from __future__ import annotations
from pathlib import Path
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

In [2]:
# =========================
# Config
# =========================
PARQ_PATH = Path("data/processed/rsu_net_timeseries.parquet")
OUT_DIR = Path("results")
OUT_DIR.mkdir(parents=True, exist_ok=True)

Tin = 20
Tout = 10

BATCH_SIZE = 256
EPOCHS = 400
LR = 1e-3
WEIGHT_DECAY = 1e-4
HIDDEN = 128
LAYERS = 2
DROPOUT = 0.15
TEACHER_FORCING = 0.5

TRAIN_RATIO = 0.7
VAL_RATIO = 0.1

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)

In [6]:
# =========================
# Utils: Metrics (NumPy)
# =========================
def multi_horizon_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> dict:
    """
    y_true, y_pred: [N, Tout, 1] in ORIGINAL scale
    Returns avg MAE, avg RMSE, avg R2 across horizons.
    R2 is computed per-horizon then averaged (stable & interpretable).
    """
    eps = 1e-12
    maes, rmses, r2s = [], [], []
    for k in range(y_true.shape[1]):
        yt = y_true[:, k, 0]
        yp = y_pred[:, k, 0]
        err = yt - yp
        mae = float(np.mean(np.abs(err)))
        rmse = float(np.sqrt(np.mean(err ** 2)))

        ss_res = float(np.sum((yt - yp) ** 2))
        ss_tot = float(np.sum((yt - np.mean(yt)) ** 2)) + eps
        r2 = 1.0 - ss_res / ss_tot

        maes.append(mae); rmses.append(rmse); r2s.append(r2)
    return {
        "mae": float(np.mean(maes)),
        "rmse": float(np.mean(rmses)),
        "r2": float(np.mean(r2s)),
        "mae_per_h": maes,
        "rmse_per_h": rmses,
        "r2_per_h": r2s,
    }


def add_time_features(df: pd.DataFrame, ts_col: str) -> pd.DataFrame:
    """
    Adds general time features without assuming timestamp is datetime.
    - If datetime: use hour/day-of-week.
    - Else numeric: use sin/cos of normalized index within each rsu_id.
    """
    if np.issubdtype(df[ts_col].dtype, np.datetime64):
        dt = pd.to_datetime(df[ts_col])
        df["hour"] = dt.dt.hour.astype(np.int16)
        df["dow"] = dt.dt.dayofweek.astype(np.int16)
        df["minute"] = dt.dt.minute.astype(np.int16)

        # cyclical encodings
        df["hour_sin"] = np.sin(2 * np.pi * df["hour"] / 24.0)
        df["hour_cos"] = np.cos(2 * np.pi * df["hour"] / 24.0)
        df["dow_sin"] = np.sin(2 * np.pi * df["dow"] / 7.0)
        df["dow_cos"] = np.cos(2 * np.pi * df["dow"] / 7.0)
        df["min_sin"] = np.sin(2 * np.pi * df["minute"] / 60.0)
        df["min_cos"] = np.cos(2 * np.pi * df["minute"] / 60.0)
        return df

    # numeric timestamps: create per-RSU normalized "progress" feature
    # (works even if timestamps are irregular)
    df["_t_rank"] = df.groupby("rsu_id")[ts_col].rank(method="dense").astype(np.float32)
    df["_t_max"] = df.groupby("rsu_id")["_t_rank"].transform("max").astype(np.float32)
    prog = (df["_t_rank"] - 1.0) / (df["_t_max"] - 1.0 + 1e-6)
    df["t_sin"] = np.sin(2 * np.pi * prog)
    df["t_cos"] = np.cos(2 * np.pi * prog)
    df = df.drop(columns=["_t_rank", "_t_max"])
    return df

In [7]:
# =========================
# Windowing
# =========================
def build_windows(
    df: pd.DataFrame,
    ts_col: str,
    rsu_id_col: str,
    y_col: str,
    x_cols: list[str],
    Tin: int,
    Tout: int,
):
    """
    Build windows per RSU:
      X: [N, Tin, F]
      y: [N, Tout, 1]
      rsu_idx: [N] integer RSU index for embedding
      anchor: [N] timestamp (end of input window)
    """
    Xs, Ys, Ridx, Anch = [], [], [], []

    # map rsu_id to index
    rsu_ids = df[rsu_id_col].astype("string").unique().tolist()
    rsu2i = {rid: i for i, rid in enumerate(rsu_ids)}

    for rid, dfr in df.groupby(rsu_id_col, sort=False):
        dfr = dfr.sort_values(ts_col).reset_index(drop=True)
        Xmat = dfr[x_cols].to_numpy(dtype=np.float32)
        yvec = dfr[y_col].to_numpy(dtype=np.float32)
        T = len(dfr)
        n = T - Tin - Tout + 1
        if n <= 0:
            continue

        X = np.zeros((n, Tin, len(x_cols)), dtype=np.float32)
        y = np.zeros((n, Tout, 1), dtype=np.float32)
        a = np.zeros((n,), dtype=np.float64)
        r = np.full((n,), rsu2i[str(rid)], dtype=np.int64)

        for i in range(n):
            in_end = i + Tin
            out_end = in_end + Tout
            X[i] = Xmat[i:in_end]
            y[i, :, 0] = yvec[in_end:out_end]
            a[i] = dfr.loc[in_end - 1, ts_col].value if np.issubdtype(dfr[ts_col].dtype, np.datetime64) else float(dfr.loc[in_end - 1, ts_col])

        Xs.append(X); Ys.append(y); Ridx.append(r); Anch.append(a)

    X_all = np.concatenate(Xs, axis=0)
    y_all = np.concatenate(Ys, axis=0)
    r_all = np.concatenate(Ridx, axis=0)
    a_all = np.concatenate(Anch, axis=0)

    # strict time split by anchor
    order = np.argsort(a_all)
    return X_all[order], y_all[order], r_all[order], rsu2i

# =========================
# Dataset / scaling
# =========================
class RSUSeqDataset(Dataset):
    def __init__(self, X, y, ridx):
        self.X = torch.from_numpy(X).float()
        self.y = torch.from_numpy(y).float()
        self.ridx = torch.from_numpy(ridx).long()
    def __len__(self): return len(self.X)
    def __getitem__(self, i): return self.X[i], self.y[i], self.ridx[i]


def standardize_fit(X_train: np.ndarray):
    # X: [N, Tin, F]
    mu = X_train.reshape(-1, X_train.shape[-1]).mean(axis=0)
    sig = X_train.reshape(-1, X_train.shape[-1]).std(axis=0) + 1e-6
    return mu, sig

def standardize_apply(X: np.ndarray, mu, sig):
    return (X - mu) / sig

def scale_y_fit(y_train: np.ndarray):
    mu = y_train.reshape(-1).mean()
    sig = y_train.reshape(-1).std() + 1e-6
    return mu, sig

def scale_y_apply(y: np.ndarray, mu, sig):
    return (y - mu) / sig

def unscale_y(y: np.ndarray, mu, sig):
    return y * sig + mu

In [8]:















# =========================
# Seq2Seq model with RSU embedding
# =========================
class Encoder(nn.Module):
    def __init__(self, in_dim, hidden, layers, dropout):
        super().__init__()
        self.gru = nn.GRU(in_dim, hidden, num_layers=layers, batch_first=True,
                          dropout=dropout if layers > 1 else 0.0)
    def forward(self, x):
        _, h = self.gru(x)
        return h

class Decoder(nn.Module):
    def __init__(self, hidden, layers, dropout):
        super().__init__()
        self.gru = nn.GRU(1, hidden, num_layers=layers, batch_first=True,
                          dropout=dropout if layers > 1 else 0.0)
        self.fc = nn.Linear(hidden, 1)
    def forward(self, y_prev, h):
        out, h = self.gru(y_prev, h)
        y_hat = self.fc(out)
        return y_hat, h

class Seq2SeqRSU(nn.Module):
    def __init__(self, in_dim, hidden, layers, dropout, n_rsu, emb_dim=16):
        super().__init__()
        self.emb = nn.Embedding(n_rsu, emb_dim)
        self.enc = Encoder(in_dim + emb_dim, hidden, layers, dropout)
        self.dec = Decoder(hidden, layers, dropout)

    def forward(self, x, ridx, y_true=None, teacher_forcing=0.5):
        # x: [B, Tin, F]
        # ridx: [B]
        B = x.size(0)
        e = self.emb(ridx).unsqueeze(1).expand(-1, x.size(1), -1)  # [B, Tin, emb]
        x_in = torch.cat([x, e], dim=-1)
        h = self.enc(x_in)

        # IMPORTANT: init decoder with last observed load (assumed feature 0 is load)
        # x[:, -1, 0] is scaled load at last input time-step
        y_prev = x[:, -1:, 0:1]  # [B, 1, 1]

        outs = []
        for t in range(Tout):
            y_hat, h = self.dec(y_prev, h)
            outs.append(y_hat)
            if (y_true is not None) and (torch.rand(1).item() < teacher_forcing):
                y_prev = y_true[:, t:t+1, :]
            else:
                y_prev = y_hat
        return torch.cat(outs, dim=1)  # [B, Tout, 1]


# =========================
# Training loop (400 epochs)
# =========================
def run_experiment(
    name: str,
    df: pd.DataFrame,
    ts_col: str,
    rsu_id_col: str,
    y_col: str,
    x_cols: list[str],
):
    print(f"\n====================\nExperiment: {name}\n====================")

    X, y, ridx, rsu2i = build_windows(df, ts_col, rsu_id_col, y_col, x_cols, Tin, Tout)
    N = len(X)
    n_train = int(N * TRAIN_RATIO)
    n_val = int(N * VAL_RATIO)

    X_train, y_train, r_train = X[:n_train], y[:n_train], ridx[:n_train]
    X_val, y_val, r_val = X[n_train:n_train+n_val], y[n_train:n_train+n_val], ridx[n_train:n_train+n_val]
    X_test, y_test, r_test = X[n_train+n_val:], y[n_train+n_val:], ridx[n_train+n_val:]

    print(f"Windows: X={X.shape}, y={y.shape}, features={X.shape[-1]}, RSUs={len(rsu2i)}")
    print(f"Split: train={len(X_train)} val={len(X_val)} test={len(X_test)}")

    # Fit scalers on train only
    x_mu, x_sig = standardize_fit(X_train)
    y_mu, y_sig = scale_y_fit(y_train)

    X_train_s = standardize_apply(X_train, x_mu, x_sig)
    X_val_s   = standardize_apply(X_val, x_mu, x_sig)
    X_test_s  = standardize_apply(X_test, x_mu, x_sig)

    y_train_s = scale_y_apply(y_train, y_mu, y_sig)
    y_val_s   = scale_y_apply(y_val, y_mu, y_sig)
    y_test_s  = scale_y_apply(y_test, y_mu, y_sig)

    train_loader = DataLoader(RSUSeqDataset(X_train_s, y_train_s, r_train), batch_size=BATCH_SIZE, shuffle=True, drop_last=True)
    val_loader   = DataLoader(RSUSeqDataset(X_val_s, y_val_s, r_val), batch_size=BATCH_SIZE, shuffle=False)
    test_loader  = DataLoader(RSUSeqDataset(X_test_s, y_test_s, r_test), batch_size=BATCH_SIZE, shuffle=False)

    model = Seq2SeqRSU(in_dim=X.shape[-1], hidden=HIDDEN, layers=LAYERS, dropout=DROPOUT, n_rsu=len(rsu2i)).to(DEVICE)
    opt = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
    loss_fn = nn.SmoothL1Loss(beta=1.0)  # robust to spikes

    history = []
    best_val = float("inf")

    def eval_loader(loader):
        model.eval()
        ys, yps = [], []
        with torch.no_grad():
            for Xb, yb, rb in loader:
                Xb = Xb.to(DEVICE)
                yb = yb.to(DEVICE)
                rb = rb.to(DEVICE)
                yp = model(Xb, rb, y_true=None, teacher_forcing=0.0)
                ys.append(yb.cpu().numpy())
                yps.append(yp.cpu().numpy())
        y_true_s = np.concatenate(ys, axis=0)
        y_pred_s = np.concatenate(yps, axis=0)
        # unscale to original
        y_true = unscale_y(y_true_s, y_mu, y_sig)
        y_pred = unscale_y(y_pred_s, y_mu, y_sig)
        return y_true, y_pred

    for epoch in range(1, EPOCHS + 1):
        model.train()
        losses = []
        for Xb, yb, rb in train_loader:
            Xb = Xb.to(DEVICE)
            yb = yb.to(DEVICE)
            rb = rb.to(DEVICE)

            opt.zero_grad()
            yp = model(Xb, rb, y_true=yb, teacher_forcing=TEACHER_FORCING)
            loss = loss_fn(yp, yb)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            opt.step()
            losses.append(loss.item())

        # Compute metrics on train/val (original scale)
        ytr_true, ytr_pred = eval_loader(train_loader)
        yv_true, yv_pred = eval_loader(val_loader)

        m_tr = multi_horizon_metrics(ytr_true, ytr_pred)
        m_va = multi_horizon_metrics(yv_true, yv_pred)

        # choose best checkpoint by val RMSE (or val MAE)
        if m_va["rmse"] < best_val:
            best_val = m_va["rmse"]
            torch.save(model.state_dict(), OUT_DIR / f"{name}_best.pt")

        row = {
            "epoch": epoch,
            "train_mae": m_tr["mae"],
            "train_rmse": m_tr["rmse"],
            "train_r2": m_tr["r2"],
            "val_mae": m_va["mae"],
            "val_rmse": m_va["rmse"],
            "val_r2": m_va["r2"],
        }
        history.append(row)

        if epoch == 1 or epoch % 10 == 0:
            print(
                f"Epoch {epoch:03d} | "
                f"train(MAE={row['train_mae']:.3f}, RMSE={row['train_rmse']:.3f}, R2={row['train_r2']:.3f}) | "
                f"val(MAE={row['val_mae']:.3f}, RMSE={row['val_rmse']:.3f}, R2={row['val_r2']:.3f})"
            )

    hist = pd.DataFrame(history)
    hist.to_csv(OUT_DIR / f"{name}_history.csv", index=False)

    # Load best and evaluate on test
    model.load_state_dict(torch.load(OUT_DIR / f"{name}_best.pt", map_location=DEVICE))
    yte_true, yte_pred = eval_loader(test_loader)
    m_te = multi_horizon_metrics(yte_true, yte_pred)
    print(f"\nBEST TEST ({name}): MAE={m_te['mae']:.4f} RMSE={m_te['rmse']:.4f} R2={m_te['r2']:.4f}")

    # Plot learning curves
    plt.figure(figsize=(10, 4))
    plt.plot(hist["epoch"], hist["train_mae"], label="train_MAE")
    plt.plot(hist["epoch"], hist["val_mae"], label="val_MAE")
    plt.plot(hist["epoch"], hist["train_rmse"], label="train_RMSE")
    plt.plot(hist["epoch"], hist["val_rmse"], label="val_RMSE")
    plt.xlabel("Epoch")
    plt.ylabel("Error")
    plt.title(f"{name}: MAE/RMSE over epochs (multi-horizon avg)")
    plt.legend()
    plt.tight_layout()
    plt.savefig(OUT_DIR / f"{name}_curves.png", dpi=200)
    plt.close()

    # R2 curve
    plt.figure(figsize=(10, 3))
    plt.plot(hist["epoch"], hist["train_r2"], label="train_R2")
    plt.plot(hist["epoch"], hist["val_r2"], label="val_R2")
    plt.xlabel("Epoch")
    plt.ylabel("R2")
    plt.title(f"{name}: R2 over epochs (multi-horizon avg)")
    plt.legend()
    plt.tight_layout()
    plt.savefig(OUT_DIR / f"{name}_r2.png", dpi=200)
    plt.close()

    return m_te


# =========================
# Main: Load parquet and run two experiments
# =========================
df = pd.read_parquet(PARQ_PATH)

# Identify timestamp column in your parquet (support common variants)
if "timestamp_dt" in df.columns:
    ts_col = "timestamp_dt"
    df[ts_col] = pd.to_datetime(df[ts_col])
elif "timestamp" in df.columns:
    ts_col = "timestamp"
    # try parse to datetime if it looks like datetime strings
    if df[ts_col].dtype == "object":
        try:
            df[ts_col] = pd.to_datetime(df[ts_col])
            ts_col = "timestamp"
        except Exception:
            pass
else:
    raise ValueError("No timestamp column found (expected timestamp or timestamp_dt).")

if "rsu_id" not in df.columns:
    raise ValueError("No rsu_id column found in parquet.")
if "rsu_load_kbps" not in df.columns:
    raise ValueError("No rsu_load_kbps column found in parquet (target).")

# Add time features
df = add_time_features(df, ts_col)

# Ensure rsu_id is string
df["rsu_id"] = df["rsu_id"].astype("string")

# Sort for safety
df = df.sort_values(["rsu_id", ts_col]).reset_index(drop=True)

# Define candidate feature columns (exclude keys + target)
key_cols = {"rsu_id", ts_col, "rsu_load_kbps"}
all_cols = set(df.columns)

# Always include past load as feature 0 (decoder init expects feature 0 = load)
# Minimal features:
time_feat_cols = [c for c in df.columns if c.endswith("_sin") or c.endswith("_cos") or c in ["hour", "dow", "minute", "t_sin", "t_cos"]]
time_feat_cols = [c for c in time_feat_cols if c in df.columns]

# Exp-1: only load + time features (NO network/comm aggregates)
x_cols_minimal = ["rsu_load_kbps"] + time_feat_cols

# Exp-2: load + time + all aggregated metrics (numeric only)
candidate = [c for c in df.columns if c not in key_cols and c != "rsu_id"]
# keep numeric columns only
numeric_candidate = [c for c in candidate if pd.api.types.is_numeric_dtype(df[c])]
# remove target if accidentally included
numeric_candidate = [c for c in numeric_candidate if c != "rsu_load_kbps"]
# ensure time features included too
x_cols_full = ["rsu_load_kbps"] + [c for c in numeric_candidate if c != "rsu_load_kbps"]

print(f"Timestamp column: {ts_col}")
print(f"Minimal X cols: {len(x_cols_minimal)} -> {x_cols_minimal[:10]}")
print(f"Full X cols:    {len(x_cols_full)} (includes all numeric aggregates)")

# Run experiments
m1 = run_experiment("seq2seq_minimal", df, ts_col, "rsu_id", "rsu_load_kbps", x_cols_minimal)
m2 = run_experiment("seq2seq_full", df, ts_col, "rsu_id", "rsu_load_kbps", x_cols_full)

print("\nSummary (TEST best checkpoints)")
print(f"seq2seq_minimal: MAE={m1['mae']:.4f} RMSE={m1['rmse']:.4f} R2={m1['r2']:.4f}")
print(f"seq2seq_full:    MAE={m2['mae']:.4f} RMSE={m2['rmse']:.4f} R2={m2['r2']:.4f}")
print(f"\nSaved outputs under: {OUT_DIR.resolve()}")


Timestamp column: timestamp
Minimal X cols: 3 -> ['rsu_load_kbps', 't_sin', 't_cos']
Full X cols:    32 (includes all numeric aggregates)

Experiment: seq2seq_minimal
Windows: X=(878401, 20, 3), y=(878401, 10, 1), features=3, RSUs=13
Split: train=614880 val=87840 test=175681
Epoch 001 | train(MAE=8.103, RMSE=20.072, R2=0.995) | val(MAE=4.327, RMSE=7.321, R2=0.993)


KeyboardInterrupt: 