This notebook fabricates compact synthetic datasets that stand in for the larger
benchmarks in the plan, then runs a focused suite of probes:

• Parity-locked comparisons (PSANN + tiny temporal spine vs TCN/LSTM/MLP)
• Cross-"station" generalization & missingness robustness
• Grouped permutation (information-usage) ablations
• Spectral/geometry diagnostics (Jacobian SVD & participation ratio)
• EAF-style per-heat resets & ΔTEMP targets

Results are saved to: /content/psann_synth_results/

Design draws from the experiment plan and PSANN docs:
- Plan & hypotheses (H1–H5), fairness constraints, and probes:  :contentReference[oaicite:3]{index=3}
- Estimator surface and usage patterns (sklearn-style, HISSO-ready):  :contentReference[oaicite:4]{index=4}
- Activation math, residual wrappers, and utilities (Jacobian/NTK):  :contentReference[oaicite:5]{index=5}


In [1]:
# @title Imports, device, tiny utils
import os, math, time, json, random, itertools, functools
from dataclasses import dataclass
import numpy as np
import pandas as pd
from typing import Tuple, Dict, List, Callable, Optional

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from sklearn.metrics import r2_score, mean_absolute_error
from sklearn.metrics import accuracy_score, f1_score

SEED = 1337
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
RESULTS_DIR = "/content/psann_synth_results"
os.makedirs(RESULTS_DIR, exist_ok=True)

def set_seed(s:int):
    random.seed(s); np.random.seed(s); torch.manual_seed(s)

def param_count(model: nn.Module) -> int:
    return sum(p.numel() for p in model.parameters())

def smape(y_true, y_pred, eps=1e-8):
    denom = (np.abs(y_true) + np.abs(y_pred) + eps)
    return (100.0 / len(y_true)) * np.sum(np.abs(y_pred - y_true) / denom)

def mase(y_true, y_pred, m=1):
    # naive seasonal m=1 by default
    y_true = np.asarray(y_true).ravel()
    y_pred = np.asarray(y_pred).ravel()
    naive = np.mean(np.abs(y_true[m:] - y_true[:-m])) + 1e-8
    return np.mean(np.abs(y_true - y_pred)) / naive

def save_csv(df: pd.DataFrame, name: str):
    path = os.path.join(RESULTS_DIR, name)
    if os.path.exists(path):
        old = pd.read_csv(path)
        df = pd.concat([old, df], ignore_index=True)
    df.to_csv(path, index=False)
    print(f"Saved -> {path} ({len(df)} rows)")


In [2]:
!pip install psann

Collecting psann
  Downloading psann-0.10.3-py3-none-any.whl.metadata (10 kB)
Downloading psann-0.10.3-py3-none-any.whl (78 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/78.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.4/78.4 kB[0m [31m7.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: psann
Successfully installed psann-0.10.3


In [3]:
# @title Try importing your PSANN; else define a compact fallback that mirrors PSANN
try:
    import psann  # type: ignore
    HAVE_PSANN = True
    from psann import PSANNRegressor  # sklearn-style
    print("Using installed psann package.")
except Exception as e:
    HAVE_PSANN = False
    print("psann not found; using a compact sine-activated residual fallback.")

import math

class SineParam(nn.Module):
    """SIREN-style learnable sine activation with optional decay as in TECHNICAL_DETAILS.md (simplified)."""
    def __init__(self, features, w0=30.0, use_decay=True):
        super().__init__()
        self.a = nn.Parameter(torch.zeros(features))  # amplitude pre-softplus
        self.b = nn.Parameter(torch.zeros(features))  # frequency pre-softplus
        self.c = nn.Parameter(torch.zeros(features))  # decay pre-softplus
        self.w0 = w0
        self.use_decay = use_decay
        self.softplus = nn.Softplus()

    def forward(self, z):
        A = self.softplus(self.a).view(1, -1)
        f = self.softplus(self.b).view(1, -1) + 1e-6
        d = self.softplus(self.c).view(1, -1)
        if z.dim() == 3:  # (B, T, F)
            A = A.unsqueeze(1)
            f = f.unsqueeze(1)
            d = d.unsqueeze(1)
        if self.use_decay:
            return A * torch.exp(-d * torch.abs(z)) * torch.sin(f * z)
        else:
            return A * torch.sin(f * z)

class ResBlock(nn.Module):
    def __init__(self, in_f, out_f, p_drop=0.0):
        super().__init__()
        self.fc = nn.Linear(in_f, out_f)
        self.act = SineParam(out_f)
        self.alpha = nn.Parameter(torch.tensor(1.0))
        self.do = nn.Dropout(p_drop)
        self.short = (in_f == out_f)

        # SIREN-ish init for fc
        wstd = math.sqrt(6.0 / in_f) / 30.0
        nn.init.uniform_(self.fc.weight, -wstd, wstd)
        nn.init.zeros_(self.fc.bias)

    def forward(self, x):
        h = self.do(self.act(self.fc(x)))
        out = h
        if self.short:
            out = x + self.alpha * h
        return out

class ResSineMLP(nn.Module):
    def __init__(self, in_f, hidden, layers=2, out_f=1, p_drop=0.0):
        super().__init__()
        net = []
        dim = in_f
        for _ in range(layers):
            net.append(ResBlock(dim, hidden, p_drop))
            dim = hidden
        self.net = nn.Sequential(*net)
        self.head = nn.Linear(dim, out_f)
        nn.init.zeros_(self.head.bias)

    def forward(self, x):  # x: (B, F)
        h = self.net(x)
        return self.head(h)

class ConvSpine1D(nn.Module):
    """Tiny strided Conv1d spine -> global average -> residual sine MLP head."""
    def __init__(self, in_ch, ch=32, k=5, stride=2, mlp_hidden=64, mlp_layers=2, out_f=1):
        super().__init__()
        pad = (k//2)
        self.conv1 = nn.Conv1d(in_ch, ch, kernel_size=k, padding=pad, stride=stride)
        self.conv2 = nn.Conv1d(ch, ch, kernel_size=k, padding=pad, stride=stride)
        self.mlp = ResSineMLP(in_f=ch, hidden=mlp_hidden, layers=mlp_layers, out_f=out_f)
        # SIREN-ish init
        for c in [self.conv1, self.conv2]:
            nn.init.kaiming_uniform_(c.weight, a=math.sqrt(5))
            if c.bias is not None: nn.init.zeros_(c.bias)

    def forward(self, x):  # x: (B, T, F) -> (B, F, T)
        x = x.transpose(1, 2)
        h = F.silu(self.conv1(x))
        h = F.silu(self.conv2(h))
        h = h.mean(-1)  # GAP over time -> (B, ch)
        return self.mlp(h)

class AttentionSpine1D(nn.Module):
    """Single-head self-attention spine -> mean pool -> residual sine MLP head."""
    def __init__(self, in_f, d=64, mlp_hidden=64, mlp_layers=2, out_f=1):
        super().__init__()
        self.q = nn.Linear(in_f, d); self.k = nn.Linear(in_f, d); self.v = nn.Linear(in_f, d)
        self.mlp = ResSineMLP(in_f=d, hidden=mlp_hidden, layers=mlp_layers, out_f=out_f)

    def forward(self, x):  # (B, T, F)
        Q, K, V = self.q(x), self.k(x), self.v(x)
        att = torch.softmax(Q @ K.transpose(1,2) / math.sqrt(Q.size(-1)), dim=-1)
        H = att @ V
        h = H.mean(1)  # pool over time
        return self.mlp(h)

class TCNBlock(nn.Module):
    """Simple 1D TCN block with causal padding."""
    def __init__(self, in_ch, out_ch, k=5, dilation=1):
        super().__init__()
        pad = (k-1)*dilation
        self.conv = nn.Conv1d(in_ch, out_ch, k, padding=pad, dilation=dilation)
        self.short = (in_ch == out_ch)
        self.proj = nn.Conv1d(in_ch, out_ch, 1) if not self.short else nn.Identity()

    def forward(self, x):
        h = self.conv(x)
        h = h[..., :x.size(-1)]  # causal trim
        h = F.relu(h)
        return F.relu(self.proj(x) + h)

class TinyTCN(nn.Module):
    def __init__(self, in_ch, ch=32, layers=2, k=5, out_f=1):
        super().__init__()
        blocks = []
        c = in_ch
        for i in range(layers):
            blocks.append(TCNBlock(c, ch, k=k, dilation=2**i))
            c = ch
        self.tcn = nn.Sequential(*blocks)
        self.head = nn.Linear(ch, out_f)

    def forward(self, x):  # (B, T, F) -> (B, F, T)
        x = x.transpose(1,2)
        h = self.tcn(x).transpose(1,2)  # back to (B, T, C)
        h_last = h[:, -1, :]
        return self.head(h_last)

class TinyLSTM(nn.Module):
    def __init__(self, in_f, hidden=64, out_f=1, layers=1):
        super().__init__()
        self.lstm = nn.LSTM(in_f, hidden, num_layers=layers, batch_first=True)
        self.head = nn.Linear(hidden, out_f)

    def forward(self, x):  # (B, T, F)
        h, (hn, cn) = self.lstm(x)
        return self.head(h[:, -1, :])

# --- Generic training helpers (regression) ---
def fit_regressor(model, X_train, y_train, X_val, y_val, epochs=60, lr=1e-3, bs=128, verbose=False):
    model = model.to(DEVICE)
    opt = torch.optim.AdamW(model.parameters(), lr=lr)
    best = math.inf; best_state = None
    X_train = torch.tensor(X_train, dtype=torch.float32); y_train = torch.tensor(y_train, dtype=torch.float32)
    X_val = torch.tensor(X_val, dtype=torch.float32); y_val = torch.tensor(y_val, dtype=torch.float32)
    train_loader = DataLoader(torch.utils.data.TensorDataset(X_train, y_train), batch_size=bs, shuffle=True)
    for ep in range(epochs):
        model.train()
        for xb, yb in train_loader:
            xb, yb = xb.to(DEVICE), yb.to(DEVICE)
            pred = model(xb)
            loss = F.mse_loss(pred.view_as(yb), yb)
            opt.zero_grad(); loss.backward(); opt.step()
        model.eval()
        with torch.no_grad():
            val_pred = model(X_val.to(DEVICE)).cpu().numpy().ravel()
            vloss = np.mean((val_pred - y_val.numpy().ravel())**2)
        if vloss < best:
            best = vloss; best_state = {k: v.detach().cpu().clone() for k,v in model.state_dict().items()}
        if verbose and (ep+1)%10==0:
            print(f"ep {ep+1:3d} val_mse={vloss:.6f}")
    if best_state is not None:
        model.load_state_dict(best_state)
    return model

def predict_regressor(model, X):
    model.eval()
    with torch.no_grad():
        X = torch.tensor(X, dtype=torch.float32, device=DEVICE)
        y = model(X).cpu().numpy().ravel()
    return y


Using installed psann package.


In [4]:
# @title Generators: Seasonal (Jena proxy), Cross-Station Air (Beijing proxy), and EAF ΔTEMP
def gen_seasonal_series(n=20000, noise=0.15, drift=0.0002, w=(1/24, 1/168)):
    t = np.arange(n, dtype=np.float32)
    daily = np.sin(2*np.pi*w[0]*t)
    weekly = 0.5*np.sin(2*np.pi*w[1]*t + 0.3)
    trend = drift * t
    base = daily + weekly + trend
    exo = np.stack([
        np.cos(2*np.pi*w[0]*t), np.sin(2*np.pi*w[0]*t),
        np.cos(2*np.pi*w[1]*t), np.sin(2*np.pi*w[1]*t),
    ], axis=1)
    y = base + noise*np.random.randn(n).astype(np.float32)
    return y.astype(np.float32), exo.astype(np.float32)

def window_xy(y, exo, ctx=72, horizon=6):
    Xs, Ys = [], []
    for i in range(ctx, len(y)-horizon):
        hist = y[i-ctx:i].reshape(-1,1)
        feats = np.concatenate([hist, exo[i-ctx:i]], axis=1)
        Xs.append(feats)           # (ctx, 1+exo_dim)
        Ys.append(y[i+horizon])
    return np.array(Xs, np.float32), np.array(Ys, np.float32)

def gen_cross_station_air(stations=10, n=5000, ctx=24, horizon=3, missing=0.0, seed=0):
    rs = np.random.RandomState(seed)
    data = []
    for s in range(stations):
        amp = 0.8 + 0.4*rs.rand()
        phi = rs.rand()*2*np.pi
        t = np.arange(n, dtype=np.float32)
        base = amp*np.sin(2*np.pi*(1/24)*t + phi) + 0.2*np.sin(2*np.pi*(1/48)*t)
        met = np.stack([
            np.sin(2*np.pi*(1/24)*t + 0.1), np.cos(2*np.pi*(1/24)*t + 0.2),
            0.5*np.sin(2*np.pi*(1/168)*t + 0.3)
        ], axis=1).astype(np.float32)
        y = base + 0.1*rs.randn(n).astype(np.float32)
        X, Y = window_xy(y, met, ctx=ctx, horizon=horizon)
        # inject missingness on met channels only
        if missing > 0:
            mask = rs.rand(*X.shape) < (missing * (X.shape[-1]-1)/X.shape[-1])
            # keep history column (index 0) intact, drop some exo
            mask[..., 0] = False
            X[mask] = 0.0
            # optional: append masks as features
            X = np.concatenate([X, mask.astype(np.float32)], axis=-1)
        data.append((s, X, Y))
    return data  # list of (station_id, X, y)

def gen_eaf_heats(heats=120, min_len=80, max_len=200, seed=0):
    """ΔTEMP_t ≈ a1*O2_t + a2*MW_t + a3*sqrt(O2_t)*noise; counters reset per heat."""
    rs = np.random.RandomState(seed)
    X_rows, y_rows, heat_ids = [], [], []
    for h in range(heats):
        L = rs.randint(min_len, max_len+1)
        O2 = np.abs(rs.randn(L).astype(np.float32))*2.0
        MW = np.abs(rs.randn(L).astype(np.float32))*3.0
        # cumulative counters reset at each heat:
        c_O2 = np.cumsum(O2)
        c_MW = np.cumsum(MW)
        dtemp = 0.7*O2 + 0.4*MW + 0.15*np.sqrt(O2+1e-6)*rs.randn(L).astype(np.float32)
        TEMP = np.cumsum(dtemp) + 20*rs.rand()  # integral of ΔTEMP
        # target: next-step ΔTEMP
        y = np.roll(dtemp, -1); y[-1] = dtemp[-1]
        feats = np.stack([O2, MW, c_O2, c_MW, TEMP], axis=1).astype(np.float32)
        X_rows.append(feats); y_rows.append(y.astype(np.float32)); heat_ids += [h]*L
    X = np.concatenate(X_rows, axis=0)
    y = np.concatenate(y_rows, axis=0)
    heat_ids = np.array(heat_ids, dtype=np.int32)
    # build windows
    ctx=24
    Xs, Ys = [], []
    for i in range(ctx, len(y)-1):
        if heat_ids[i-ctx] != heat_ids[i]:  # ensure window does not cross heats
            continue
        hist = X[i-ctx:i, :]  # includes counters already reset per-heat
        Xs.append(hist); Ys.append(y[i])
    return np.array(Xs, np.float32), np.array(Ys, np.float32)


In [5]:
# @title Parity matchers: adjust hidden sizes to hit a target parameter budget
def build_psann_conv(in_f, target_params=200_000, tol=0.15, out_f=1):
    # search small grid over (conv_ch, mlp_hidden, layers)
    for ch in [16, 24, 32, 40, 48]:
        for mh in [32, 48, 64, 96]:
            for layers in [1, 2, 3]:
                m = ConvSpine1D(in_ch=in_f, ch=ch, k=5, stride=2, mlp_hidden=mh, mlp_layers=layers, out_f=out_f)
                p = param_count(m)
                if abs(p - target_params)/target_params <= tol:
                    return m, p
    # fallback: pick closest
    best, bestp = None, 1e18
    for ch in [16, 24, 32, 40, 48]:
        for mh in [32, 48, 64, 96]:
            for layers in [1, 2, 3]:
                m = ConvSpine1D(in_ch=in_f, ch=ch, k=5, stride=2, mlp_hidden=mh, mlp_layers=layers, out_f=out_f)
                p = param_count(m)
                if abs(p - target_params) < abs(bestp - target_params):
                    best, bestp = m, p
    return best, bestp

def build_tcn(in_f, target_params=200_000, tol=0.15, out_f=1):
    for ch in [16, 24, 32, 40, 48, 64]:
        for layers in [1,2,3,4]:
            m = TinyTCN(in_ch=in_f, ch=ch, layers=layers, k=5, out_f=out_f)
            p = param_count(m)
            if abs(p - target_params)/target_params <= tol:
                return m, p
    # fallback
    best, bestp = None, 1e18
    for ch in [16, 24, 32, 40, 48, 64]:
        for layers in [1,2,3,4]:
            m = TinyTCN(in_ch=in_f, ch=ch, layers=layers, k=5, out_f=out_f)
            p = param_count(m)
            if abs(p - target_params) < abs(bestp - target_params):
                best, bestp = m, p
    return best, bestp

def build_lstm(in_f, target_params=200_000, tol=0.15, out_f=1):
    for h in [32, 48, 64, 80, 96, 128, 160]:
        m = TinyLSTM(in_f=in_f, hidden=h, out_f=out_f, layers=1)
        p = param_count(m)
        if abs(p - target_params)/target_params <= tol:
            return m, p
    # fallback
    best, bestp = None, 1e18
    for h in [32, 48, 64, 80, 96, 128, 160]:
        m = TinyLSTM(in_f=in_f, hidden=h, out_f=out_f, layers=1)
        p = param_count(m)
        if abs(p - target_params) < abs(bestp - target_params):
            best, bestp = m, p
    return best, bestp

def build_psann_tabular(in_f, target_params=200_000, tol=0.15, out_f=1):
    for mh in [32, 48, 64, 96, 128]:
        for layers in [1, 2, 3]:
            m = ResSineMLP(in_f, hidden=mh, layers=layers, out_f=out_f)
            p = param_count(m)
            if abs(p - target_params)/target_params <= tol:
                return m, p
    # fallback
    best, bestp = None, 1e18
    for mh in [32, 48, 64, 96, 128]:
        for layers in [1, 2, 3]:
            m = ResSineMLP(in_f, hidden=mh, layers=layers, out_f=out_f)
            p = param_count(m)
            if abs(p - target_params) < abs(bestp - target_params):
                best, bestp = m, p
    return best, bestp

class SimpleScaler:
    def __init__(self):
        self.mean = None; self.std = None
    def fit(self, X):
        self.mean = X.mean(axis=0, keepdims=True)
        self.std = X.std(axis=0, keepdims=True) + 1e-8
    def transform(self, X):
        return (X - self.mean)/self.std

def eval_regression(y_true, y_pred):
    r2 = r2_score(y_true, y_pred)
    mae = mean_absolute_error(y_true, y_pred)
    rmse = float(np.sqrt(np.mean((y_true - y_pred)**2)))
    sm = smape(y_true, y_pred)
    ms = mase(y_true, y_pred, m=1)
    return dict(r2=r2, mae=mae, rmse=rmse, smape=sm, mase=ms)


In [6]:
# @title Seasonal probe: PSANN+Conv vs TCN vs LSTM (parity, 3 seeds)
set_seed(SEED)
y, exo = gen_seasonal_series(n=22000, noise=0.12, drift=0.00015)
X, Y = window_xy(y, exo, ctx=72, horizon=6)   # X: (N, 72, 1+4) -> F=5
# Train/val/test split
N = len(Y); n_train = int(0.7*N); n_val = int(0.15*N)
X_train, y_train = X[:n_train], Y[:n_train]
X_val,   y_val   = X[n_train:n_train+n_val], Y[n_train:n_train+n_val]
X_test,  y_test  = X[n_train+n_val:], Y[n_train+n_val:]
sc = SimpleScaler();  # scale per-feature across flattened dims
flat = X_train.reshape(-1, X_train.shape[-1]); sc.fit(flat)
def scale_seq(Xseq):
    shp = Xseq.shape
    Xs = sc.transform(Xseq.reshape(-1, shp[-1])).reshape(shp)
    return Xs

X_train_s = scale_seq(X_train); X_val_s = scale_seq(X_val); X_test_s = scale_seq(X_test)

target_params = 220_000
in_f = X.shape[-1]
records = []
for seed in [1, 2, 3]:
    set_seed(seed)
    # Build parity-matched models
    psann_m, p_ps = build_psann_conv(in_f, target_params, out_f=1)
    tcn_m,   p_tc = build_tcn(in_f, target_params, out_f=1)
    lstm_m,  p_ls = build_lstm(in_f, target_params, out_f=1)
    for name, model, pcount in [
        ("ResPSANN_conv_spine", psann_m, p_ps),
        ("TCN_baseline",        tcn_m,   p_tc),
        ("LSTM_baseline",       lstm_m,  p_ls),
    ]:
        t0 = time.time()
        model = fit_regressor(model, X_train_s, y_train[:,None], X_val_s, y_val[:,None],
                              epochs=60, lr=3e-3, bs=128, verbose=False)
        t1 = time.time()
        yhat = predict_regressor(model, X_test_s)
        mets = eval_regression(y_test, yhat)
        rec = dict(
            dataset="SEASONAL_JENA_PROXY", probe="PARITY", split="test", seed=seed,
            model=name, params=param_count(model), train_wall_seconds=t1-t0, **mets
        )
        records.append(rec)
df = pd.DataFrame(records)
save_csv(df, "synthetic_experiment_metrics.csv")
df.pivot_table(index=["dataset","probe","model"], values=["r2","mae","rmse","smape","mase","params","train_wall_seconds"]).round(4)


Saved -> /content/psann_synth_results/synthetic_experiment_metrics.csv (9 rows)


Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,mae,mase,params,r2,rmse,smape,train_wall_seconds
dataset,probe,model,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
SEASONAL_JENA_PROXY,PARITY,LSTM_baseline,0.1742,0.8531,107041.0,0.9279,0.2176,2.8717,39.9298
SEASONAL_JENA_PROXY,PARITY,ResPSANN_conv_spine,0.1172,0.5742,37108.0,0.9656,0.1504,1.9844,41.4863
SEASONAL_JENA_PROXY,PARITY,TCN_baseline,0.1113,0.5452,63745.0,0.9704,0.1395,1.9404,29.8154


In [8]:
# @title Cross-station held-out & missingness robustness; parity-matched PSANN+Conv vs TCN vs LSTM
set_seed(SEED)

def prep_dataset(miss=0.0, seed=42, ctx=24, horizon=3):
    """
    Generate a fresh cross-station dataset with a given missingness rate,
    split into train (stations != 0,1), val (station 0), test (station 1),
    and fit a scaler on the training split only.
    """
    data = gen_cross_station_air(stations=10, n=6000, ctx=ctx, horizon=horizon, missing=miss, seed=seed)
    # Split by station id
    X_train = np.concatenate([x for (sid, x, y) in data if sid not in (0, 1)], axis=0)
    y_train = np.concatenate([y for (sid, x, y) in data if sid not in (0, 1)], axis=0)
    X_val   = next(x for (sid, x, y) in data if sid == 0)
    y_val   = next(y for (sid, x, y) in data if sid == 0)
    X_test  = next(x for (sid, x, y) in data if sid == 1)
    y_test  = next(y for (sid, x, y) in data if sid == 1)

    # Scale using only TRAIN statistics
    scaler = SimpleScaler()
    scaler.fit(X_train.reshape(-1, X_train.shape[-1]))
    def scale_seq(X):
        shp = X.shape
        return scaler.transform(X.reshape(-1, shp[-1])).reshape(shp)

    return scale_seq(X_train), y_train, scale_seq(X_val), y_val, scale_seq(X_test), y_test

target_params = 200_000
records = []
for miss in [0.0, 0.1, 0.3]:
    # Fresh dataset (and scaler) per missingness level for clean robustness curves
    Xtr, ytr, Xv, yv, Xte, yte = prep_dataset(miss=miss, seed=42, ctx=24, horizon=3)
    in_f = Xtr.shape[-1]

    for seed in [7, 8, 9]:
        set_seed(seed)
        # Build parity-matched models (≈ same param count)
        psann_m, p_ps = build_psann_conv(in_f, target_params, out_f=1)
        tcn_m,   p_tc = build_tcn(in_f, target_params, out_f=1)
        lstm_m,  p_ls = build_lstm(in_f, target_params, out_f=1)

        for name, model in [
            ("ResPSANN_conv_spine", psann_m),
            ("TCN_baseline",        tcn_m),
            ("LSTM_baseline",       lstm_m),
        ]:
            t0 = time.time()
            model = fit_regressor(model, Xtr, ytr[:, None], Xv, yv[:, None],
                                  epochs=50, lr=3e-3, bs=256)
            t1 = time.time()
            yhat = predict_regressor(model, Xte)
            mets = eval_regression(yte, yhat)
            rec = dict(
                dataset="AIR_BEIJING_PROXY",
                probe=f"HELDOUT+MISS_{int(miss*100)}",
                split="test",
                seed=seed,
                model=name,
                params=param_count(model),
                train_wall_seconds=t1 - t0,
                **mets
            )
            records.append(rec)

df = pd.DataFrame(records)
save_csv(df, "synthetic_experiment_metrics.csv")
df[df["dataset"] == "AIR_BEIJING_PROXY"].pivot_table(
    index=["probe", "model"], values=["r2", "mae", "rmse"]
).round(4)


Saved -> /content/psann_synth_results/synthetic_experiment_metrics.csv (36 rows)


Unnamed: 0_level_0,Unnamed: 1_level_0,mae,r2,rmse
probe,model,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
HELDOUT+MISS_0,LSTM_baseline,0.1025,0.9635,0.1292
HELDOUT+MISS_0,ResPSANN_conv_spine,0.0948,0.9688,0.1195
HELDOUT+MISS_0,TCN_baseline,0.0856,0.9748,0.1075
HELDOUT+MISS_10,LSTM_baseline,0.1096,0.9545,0.1374
HELDOUT+MISS_10,ResPSANN_conv_spine,0.0973,0.9645,0.1218
HELDOUT+MISS_10,TCN_baseline,0.0887,0.9703,0.1114
HELDOUT+MISS_30,LSTM_baseline,0.1117,0.9528,0.1405
HELDOUT+MISS_30,ResPSANN_conv_spine,0.0985,0.9634,0.1237
HELDOUT+MISS_30,TCN_baseline,0.0899,0.9692,0.1134


In [10]:
# @title Permute grouped features on the AIR proxy (history vs meteorology vs calendar)
# FIX: make sure F is the PyTorch functional module, not an int
import torch.nn.functional as F  # restore F after any accidental shadowing

def grouped_permute(X, groups: Dict[str, List[int]], which: str, rs: np.random.RandomState):
    Xp = X.copy()
    idxs = groups[which]
    # permute across samples independently for each feature channel
    for j in idxs:
        rs.shuffle(Xp[:, :, j])
    return Xp

# Rebuild a dataset with explicit groups: [0]=history, [1..m]=met; append calendar(sin/cos hour)
def air_with_calendar(seed=2024):
    data = gen_cross_station_air(stations=10, n=6000, ctx=24, horizon=3, missing=0.0, seed=seed)
    def add_calendar(X):
        N, T, Fd = X.shape
        hours = np.arange(T, dtype=np.float32)[None, :, None].repeat(N, axis=0)
        sin = np.sin(2*np.pi*hours/24.0).astype(np.float32)
        cos = np.cos(2*np.pi*hours/24.0).astype(np.float32)
        return np.concatenate([X, sin, cos], axis=2)
    return [(s, add_calendar(X), y) for (s, X, y) in data]

set_seed(101)
data = air_with_calendar(seed=101)
# Train on all but station 1; test on station 1
Xtr = np.concatenate([x for (s, x, y) in data if s != 1], axis=0)
ytr = np.concatenate([y for (s, x, y) in data if s != 1], axis=0)
Xte = [x for (s, x, y) in data if s == 1][0]
yte = [y for (s, x, y) in data if s == 1][0]

# Scale on training only
sc = SimpleScaler(); sc.fit(Xtr.reshape(-1, Xtr.shape[-1]))
def scale_seq(X):
    shp = X.shape
    return sc.transform(X.reshape(-1, shp[-1])).reshape(shp)

Xtr_s, Xte_s = scale_seq(Xtr), scale_seq(Xte)

# Group indices:
feat_dim = Xtr_s.shape[-1]   # FIX: don't shadow F
# history at index 0; meteorology next 3; calendar at the end (2 dims)
groups = {"history": [0], "meteorology": [1, 2, 3], "calendar": [feat_dim - 2, feat_dim - 1]}

# pick a parity-matched winner config (PSANN+Conv) and compare
psann_m, _ = build_psann_conv(feat_dim, target_params=180_000, out_f=1)
psann_m = fit_regressor(psann_m, Xtr_s, ytr[:, None], Xtr_s[-5000:], ytr[-5000:, None],
                        epochs=40, lr=3e-3, bs=256)
base = eval_regression(yte, predict_regressor(psann_m, Xte_s))["r2"]

abl_records = []
rs = np.random.RandomState(7)
for gname in ["history", "meteorology", "calendar"]:
    Xp = grouped_permute(Xte_s, groups, gname, rs)
    r2 = eval_regression(yte, predict_regressor(psann_m, Xp))["r2"]
    abl_records.append(dict(dataset="AIR_BEIJING_PROXY", probe="ABLATE_GROUPS",
                            model="ResPSANN_conv_spine",
                            group=gname, base_r2=base, ablated_r2=r2, delta=r2 - base))

abl_df = pd.DataFrame(abl_records)
save_csv(abl_df, "synthetic_ablation_results.csv")
abl_df


Saved -> /content/psann_synth_results/synthetic_ablation_results.csv (3 rows)


Unnamed: 0,dataset,probe,model,group,base_r2,ablated_r2,delta
0,AIR_BEIJING_PROXY,ABLATE_GROUPS,ResPSANN_conv_spine,history,0.984804,-0.823856,-1.80866
1,AIR_BEIJING_PROXY,ABLATE_GROUPS,ResPSANN_conv_spine,meteorology,0.984804,0.917602,-0.067202
2,AIR_BEIJING_PROXY,ABLATE_GROUPS,ResPSANN_conv_spine,calendar,0.984804,0.984804,0.0


In [12]:
# @title Jacobian spectrum & participation ratio (PSANN vs MLP) on a small batch — FIXED
def jacobian_matrix(model: nn.Module, Xb: np.ndarray):
    """
    Compute d y / d x for a batch: for each sample i (out_i is scalar y[i,0]),
    we compute grad(out_i, Xb)[i, :, :] and vectorize into one row.
    Returns J of shape (B, T*F).
    """
    model.eval()
    xb = torch.tensor(Xb, dtype=torch.float32, device=DEVICE, requires_grad=True)
    y = model(xb)  # (B, 1)
    assert y.ndim == 2 and y.shape[1] == 1, f"Expected (B,1) output, got {tuple(y.shape)}"

    B, T, Fdim = xb.shape
    rows = []
    for i in range(B):
        # grad_outputs must be same shape as y: (B,1)
        go = torch.zeros_like(y)
        go[i, 0] = 1.0
        g = torch.autograd.grad(outputs=y, inputs=xb,
                                grad_outputs=go,
                                retain_graph=True, create_graph=False,
                                allow_unused=False)[0]  # (B,T,F)
        gi = g[i].reshape(-1).detach().cpu().numpy()       # (T*F,)
        rows.append(gi)
    J = np.stack(rows, axis=0)  # (B, T*F)
    return J

def participation_ratio(M: np.ndarray):
    # effective dimensionality of rows of M
    C = (M - M.mean(0, keepdims=True))
    C = C.T @ C / max(C.shape[0]-1, 1)
    evals = np.maximum(np.linalg.eigvalsh(C), 1e-12)
    s1 = evals.sum(); s2 = (evals**2).sum()
    return float((s1**2) / s2)

# build small dataset
y, exo = gen_seasonal_series(n=6000)
X, Y = window_xy(y, exo, ctx=72, horizon=6)
sc = SimpleScaler(); sc.fit(X.reshape(-1, X.shape[-1]))
Xs = sc.transform(X.reshape(-1, X.shape[-1])).reshape(X.shape)

# models: PSANN+Conv (parity-targeted) vs MLP baseline on pooled features
psann_m, _ = build_psann_conv(Xs.shape[-1], target_params=160_000, out_f=1)
psann_m = fit_regressor(psann_m, Xs[:3000], Y[:3000,None], Xs[3000:4000], Y[3000:4000,None], epochs=30)

# Simple MLP baseline on last-step features only
class TinyMLP(nn.Module):
    def __init__(self, in_f, hidden=64):
        super().__init__()
        self.net = nn.Sequential(nn.Linear(in_f, hidden), nn.ReLU(), nn.Linear(hidden, 1))
    def forward(self, x):  # x: (B,T,F)
        x_last = x[:, -1, :]
        return self.net(x_last)

mlp = TinyMLP(Xs.shape[-1])
mlp = fit_regressor(mlp, Xs[:3000], Y[:3000,None], Xs[3000:4000], Y[3000:4000,None], epochs=30)

# Jacobian on a tiny batch for each
batch = Xs[4000:4024]
J_ps = jacobian_matrix(psann_m, batch)
J_ml = jacobian_matrix(mlp, batch)

# SVD & metrics
_, s_ps, _ = np.linalg.svd(J_ps, full_matrices=False)
_, s_ml, _ = np.linalg.svd(J_ml, full_matrices=False)

spec_records = [{
    "dataset":"SEASONAL_JENA_PROXY","probe":"SPECTRAL","model":"ResPSANN_conv_spine",
    "top_sv": float(s_ps[0]), "sum_sv": float(s_ps.sum()), "pr": participation_ratio(J_ps)
},{
    "dataset":"SEASONAL_JENA_PROXY","probe":"SPECTRAL","model":"MLP_laststep",
    "top_sv": float(s_ml[0]), "sum_sv": float(s_ml.sum()), "pr": participation_ratio(J_ml)
}]
with open(os.path.join(RESULTS_DIR,"synthetic_spectral_results.json"),"w") as f:
    json.dump(spec_records, f, indent=2)
spec_records

[{'dataset': 'SEASONAL_JENA_PROXY',
  'probe': 'SPECTRAL',
  'model': 'ResPSANN_conv_spine',
  'top_sv': 4.814483165740967,
  'sum_sv': 6.343472957611084,
  'pr': 1.770914912223816},
 {'dataset': 'SEASONAL_JENA_PROXY',
  'probe': 'SPECTRAL',
  'model': 'MLP_laststep',
  'top_sv': 4.128472328186035,
  'sum_sv': 5.6605544090271,
  'pr': 2.7963361740112305}]

In [13]:
# @title Fairness check: single-head attention spine vs conv spine under matched params
set_seed(2025)
data = gen_cross_station_air(stations=8, n=5000, ctx=24, horizon=3, missing=0.0, seed=77)
X = np.concatenate([x for (s,x,y) in data], axis=0)
y = np.concatenate([y for (s,x,y) in data], axis=0)
sc = SimpleScaler(); sc.fit(X.reshape(-1, X.shape[-1]))
Xs = sc.transform(X.reshape(-1, X.shape[-1])).reshape(X.shape)

N = len(y); ntr=int(0.7*N); nv=int(0.15*N)
Xtr, ytr = Xs[:ntr], y[:ntr]; Xv, yv = Xs[ntr:ntr+nv], y[ntr:ntr+nv]; Xte, yte = Xs[ntr+nv:], y[ntr+nv:]

target_params = 180_000
# match conv
conv_m, p_conv = build_psann_conv(Xs.shape[-1], target_params, out_f=1)
# for attention, sweep d & mlp until within tol
best_att = None; bestp = 1e18
for d in [48, 64, 80]:
    for mh in [48, 64, 80]:
        att = AttentionSpine1D(in_f=Xs.shape[-1], d=d, mlp_hidden=mh, mlp_layers=2, out_f=1)
        p = param_count(att)
        if abs(p - target_params) < abs(bestp - target_params):
            bestp = p; best_att = att
att_m = best_att

conv_m = fit_regressor(conv_m, Xtr, ytr[:,None], Xv, yv[:,None], epochs=40, lr=3e-3)
att_m  = fit_regressor(att_m,  Xtr, ytr[:,None], Xv, yv[:,None], epochs=40, lr=3e-3)

yhat_c = predict_regressor(conv_m, Xte)
yhat_a = predict_regressor(att_m,  Xte)
rec = pd.DataFrame([
    dict(dataset="AIR_BEIJING_PROXY", probe="SPINE_FAIRNESS", split="test", model="ResPSANN_conv_spine",
         params=param_count(conv_m), **eval_regression(yte, yhat_c)),
    dict(dataset="AIR_BEIJING_PROXY", probe="SPINE_FAIRNESS", split="test", model="ResPSANN_attention_spine",
         params=param_count(att_m), **eval_regression(yte, yhat_a)),
])
save_csv(rec, "synthetic_experiment_metrics.csv")
rec[["model","r2","mae","rmse","params"]].round(5)


Saved -> /content/psann_synth_results/synthetic_experiment_metrics.csv (38 rows)


Unnamed: 0,model,r2,mae,rmse,params
0,ResPSANN_conv_spine,0.97023,0.0943,0.11857,36868
1,ResPSANN_attention_spine,0.67226,0.30052,0.39342,14723


In [14]:
# @title EAF synthetic: ΔTEMP regression with per-heat resets; PSANN-tabular vs MLP baseline
set_seed(606)
X, y = gen_eaf_heats(heats=140, min_len=80, max_len=180, seed=606)  # X: (N, ctx, F_tabular_per_step)
# Collapse per-step features to summary stats over last k steps (tabular proxy)
def collapse_tail(Xseq, k=6):
    # concat mean and last values for each channel to form a fixed-length tabular vector
    tail = Xseq[:, -k:, :]  # (N,k,F)
    mean = tail.mean(axis=1)
    last = tail[:, -1, :]
    return np.concatenate([mean, last], axis=1)

X_tab = collapse_tail(X, k=8)
# scale
mu = X_tab.mean(0, keepdims=True); sd = X_tab.std(0, keepdims=True) + 1e-8
Xn = (X_tab - mu)/sd

N = len(y); ntr=int(0.7*N); nv=int(0.15*N)
Xtr, ytr = Xn[:ntr], y[:ntr]; Xv,yv = Xn[ntr:ntr+nv], y[ntr:ntr+nv]; Xte,yte = Xn[ntr+nv:], y[ntr+nv:]

# Baseline MLP
class MLP_Reg(nn.Module):
    def __init__(self, in_f, hidden=64, layers=2):
        super().__init__()
        net=[nn.Linear(in_f, hidden), nn.ReLU()]
        for _ in range(layers-1):
            net += [nn.Linear(hidden, hidden), nn.ReLU()]
        self.net = nn.Sequential(*net)
        self.head = nn.Linear(hidden, 1)
    def forward(self, x): return self.head(self.net(x))

def fit_reg_tab(model, Xtr, ytr, Xv, yv, epochs=80, lr=2e-3, bs=128):
    model = model.to(DEVICE); opt = torch.optim.AdamW(model.parameters(), lr=lr)
    Xtr = torch.tensor(Xtr, dtype=torch.float32); ytr = torch.tensor(ytr, dtype=torch.float32)[:,None]
    Xv = torch.tensor(Xv, dtype=torch.float32); yv = torch.tensor(yv, dtype=torch.float32)[:,None]
    dl = DataLoader(torch.utils.data.TensorDataset(Xtr, ytr), batch_size=bs, shuffle=True)
    best=1e9; best_state=None
    for ep in range(epochs):
        model.train()
        for xb,yb in dl:
            xb,yb = xb.to(DEVICE), yb.to(DEVICE)
            pred = model(xb)
            loss = F.mse_loss(pred, yb)
            opt.zero_grad(); loss.backward(); opt.step()
        model.eval()
        with torch.no_grad():
            vpred = model(Xv.to(DEVICE)).cpu().numpy().ravel()
        v = np.mean((vpred - yv.numpy().ravel())**2)
        if v < best: best=v; best_state={k:v.detach().cpu().clone() for k,v in model.state_dict().items()}
    if best_state: model.load_state_dict(best_state)
    return model

in_f = Xn.shape[1]
target_params = 160_000
psann_tab, p_ps = build_psann_tabular(in_f, target_params, out_f=1)
mlp = MLP_Reg(in_f, hidden=128, layers=3)

t0 = time.time(); psann_tab = fit_reg_tab(psann_tab, Xtr, ytr, Xv, yv, epochs=80, lr=2e-3); t1 = time.time()
tp0 = time.time(); mlp       = fit_reg_tab(mlp,       Xtr, ytr, Xv, yv, epochs=80, lr=2e-3); tp1 = time.time()
yhat_ps = predict_regressor(psann_tab, Xte)
yhat_ml = predict_regressor(mlp, Xte)

rec = pd.DataFrame([
    dict(dataset="EAF_PROXY", probe="DELTA_TEMP", split="test", model="ResPSANN_tabular",
         params=param_count(psann_tab), train_wall_seconds=t1-t0, **eval_regression(yte, yhat_ps)),
    dict(dataset="EAF_PROXY", probe="DELTA_TEMP", split="test", model="MLP_baseline",
         params=param_count(mlp), train_wall_seconds=tp1-tp0, **eval_regression(yte, yhat_ml)),
])
save_csv(rec, "synthetic_experiment_metrics.csv")
rec[["model","r2","mae","rmse","params","train_wall_seconds"]].round(5)


Saved -> /content/psann_synth_results/synthetic_experiment_metrics.csv (40 rows)


Unnamed: 0,model,r2,mae,rmse,params,train_wall_seconds
0,ResPSANN_tabular,-0.00184,0.90183,1.12802,35716,29.38711
1,MLP_baseline,-0.00844,0.90144,1.13173,34561,15.71341


In [15]:
# @title Load and preview all CSV outputs
em_path = os.path.join(RESULTS_DIR, "synthetic_experiment_metrics.csv")
ab_path = os.path.join(RESULTS_DIR, "synthetic_ablation_results.csv")

em = pd.read_csv(em_path) if os.path.exists(em_path) else pd.DataFrame()
ab = pd.read_csv(ab_path) if os.path.exists(ab_path) else pd.DataFrame()

print("Experiment metrics rows:", len(em))
display(em.sort_values(["dataset","probe","model"]).head(20))
print("\nAblation results rows:", len(ab))
display(ab)
print("\nSpectral results:", os.path.join(RESULTS_DIR,"synthetic_spectral_results.json"))
if os.path.exists(os.path.join(RESULTS_DIR,"synthetic_spectral_results.json")):
    print(open(os.path.join(RESULTS_DIR,"synthetic_spectral_results.json")).read())


Experiment metrics rows: 40


Unnamed: 0,dataset,probe,split,seed,model,params,train_wall_seconds,r2,mae,rmse,smape,mase
11,AIR_BEIJING_PROXY,HELDOUT+MISS_0,test,7.0,LSTM_baseline,106401,41.524874,0.966884,0.098049,0.123167,14.319148,0.531465
14,AIR_BEIJING_PROXY,HELDOUT+MISS_0,test,8.0,LSTM_baseline,106401,41.716242,0.963445,0.102644,0.129404,14.929571,0.556373
17,AIR_BEIJING_PROXY,HELDOUT+MISS_0,test,9.0,LSTM_baseline,106401,41.80374,0.960139,0.106705,0.135129,16.25956,0.578387
9,AIR_BEIJING_PROXY,HELDOUT+MISS_0,test,7.0,ResPSANN_conv_spine,36868,58.247009,0.968087,0.095834,0.120908,14.547373,0.519461
12,AIR_BEIJING_PROXY,HELDOUT+MISS_0,test,8.0,ResPSANN_conv_spine,36868,57.968974,0.970053,0.092577,0.117125,14.569326,0.501808
15,AIR_BEIJING_PROXY,HELDOUT+MISS_0,test,9.0,ResPSANN_conv_spine,36868,57.654462,0.96829,0.096014,0.120524,14.819717,0.520436
10,AIR_BEIJING_PROXY,HELDOUT+MISS_0,test,7.0,TCN_baseline,63361,45.60039,0.976168,0.08335,0.104485,13.160618,0.451792
13,AIR_BEIJING_PROXY,HELDOUT+MISS_0,test,8.0,TCN_baseline,63361,45.590491,0.974812,0.085392,0.107418,13.489844,0.46286
16,AIR_BEIJING_PROXY,HELDOUT+MISS_0,test,9.0,TCN_baseline,63361,45.708496,0.973368,0.087999,0.110452,14.183676,0.476995
20,AIR_BEIJING_PROXY,HELDOUT+MISS_10,test,7.0,LSTM_baseline,108961,42.478026,0.957508,0.106708,0.133331,15.894076,0.605897



Ablation results rows: 3


Unnamed: 0,dataset,probe,model,group,base_r2,ablated_r2,delta
0,AIR_BEIJING_PROXY,ABLATE_GROUPS,ResPSANN_conv_spine,history,0.984804,-0.823856,-1.80866
1,AIR_BEIJING_PROXY,ABLATE_GROUPS,ResPSANN_conv_spine,meteorology,0.984804,0.917602,-0.067202
2,AIR_BEIJING_PROXY,ABLATE_GROUPS,ResPSANN_conv_spine,calendar,0.984804,0.984804,0.0



Spectral results: /content/psann_synth_results/synthetic_spectral_results.json
[
  {
    "dataset": "SEASONAL_JENA_PROXY",
    "probe": "SPECTRAL",
    "model": "ResPSANN_conv_spine",
    "top_sv": 4.814483165740967,
    "sum_sv": 6.343472957611084,
    "pr": 1.770914912223816
  },
  {
    "dataset": "SEASONAL_JENA_PROXY",
    "probe": "SPECTRAL",
    "model": "MLP_laststep",
    "top_sv": 4.128472328186035,
    "sum_sv": 5.6605544090271,
    "pr": 2.7963361740112305
  }
]


In [16]:
import os
import zipfile
from pathlib import Path

def zip_folder(folder_path: str | Path, output_path: str | Path | None = None, *, include_hidden: bool = True) -> Path:
    """
    Compresses an entire folder (recursively) into a .zip archive.

    Parameters
    ----------
    folder_path : str or Path
        Path to the folder to zip.
    output_path : str or Path or None, optional
        Output .zip file path. Defaults to "<folder_name>.zip" in the same directory.
    include_hidden : bool, optional
        Whether to include hidden files (those starting with '.').

    Returns
    -------
    Path
        Path to the created .zip file.
    """
    folder_path = Path(folder_path).resolve()
    if not folder_path.is_dir():
        raise ValueError(f"{folder_path} is not a valid directory")

    if output_path is None:
        output_path = folder_path.with_suffix(".zip")
    else:
        output_path = Path(output_path).resolve()

    with zipfile.ZipFile(output_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        for root, dirs, files in os.walk(folder_path):
            # skip hidden dirs/files if requested
            if not include_hidden:
                dirs[:] = [d for d in dirs if not d.startswith(".")]
                files = [f for f in files if not f.startswith(".")]

            for file in files:
                abs_path = Path(root) / file
                # relative path inside the zip
                rel_path = abs_path.relative_to(folder_path)
                zf.write(abs_path, arcname=rel_path)

    print(f"Zipped {folder_path} → {output_path}")
    return output_path



In [17]:
zip_folder(folder_path = '/content/psann_synth_results', output_path = '/content/psann_synth_results.zip')

Zipped /content/psann_synth_results → /content/psann_synth_results.zip


PosixPath('/content/psann_synth_results.zip')