# BLS


In [1]:
# BLS_config.py

from dataclasses import dataclass
from typing import Optional   # <-- add this for Python 3.9


@dataclass
class BLSConfig:
    n_feature_groups: int = 10
    feature_group_size: int = 10
    n_enhancement_groups: int = 10
    enhancement_group_size: int = 10
    feature_activation: str = "relu"          # identity, tanh, sigmoid, relu
    enhancement_activation: str = "relu"      # identity, tanh, sigmoid, relu
    lambda_reg: float = 1e-2
    add_bias: bool = True
    standardize: bool = True
    random_state: Optional[int] = 42          # <-- changed

In [2]:
# BLS.py

import numpy as np
from sklearn.linear_model import Ridge
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.utils.extmath import softmax


_ACTS = {
    "identity": lambda x: x,
    "tanh": np.tanh,
    "sigmoid": lambda x: 1.0 / (1.0 + np.exp(-x)),
    "relu": lambda x: np.maximum(0.0, x),
}


class BroadLearningSystem:
    """
    Broad Learning System that uses scikit-learn:
      - StandardScaler
      - OneHotEncoder
      - Ridge for closed-form output weights
      - train_test_split helper
    """

    def __init__(self, cfg: BLSConfig):
        """
        Initialize the Broad Learning System.

        Args:
            cfg (BLSConfig): Configuration object containing all hyperparameters
                for the BLS model including feature groups, enhancement groups,
                activation functions, and regularization parameters.
        """
        self.cfg = cfg
        self.rng = np.random.default_rng(cfg.random_state)
        self.scaler = StandardScaler(
            with_mean=True, with_std=True) if cfg.standardize else None
        self.enc = None
        self.is_classification = None
        self.classes_ = None

        self.Wf, self.bf = [], []
        self.We, self.be = [], []
        self.Wout = None
        self.ridge = None

        if cfg.feature_activation not in _ACTS or cfg.enhancement_activation not in _ACTS:
            raise ValueError("Unknown activation name.")
        self.act_f = _ACTS[cfg.feature_activation]
        self.act_e = _ACTS[cfg.enhancement_activation]

    @staticmethod
    def split(X, y, test_size=0.2, random_state=0, stratify=None):
        """
        Split arrays or matrices into random train and test subsets.

        This is a convenience wrapper around sklearn's train_test_split.

        Args:
            X (array-like): Features array of shape (n_samples, n_features).
            y (array-like): Target array of shape (n_samples,) or (n_samples, n_outputs).
            test_size (float, optional): Proportion of dataset to include in test split. 
                Defaults to 0.2.
            random_state (int, optional): Random seed for reproducible splits. 
                Defaults to 0.
            stratify (array-like, optional): If not None, data is split in a stratified 
                fashion using this as class labels. Defaults to None.

        Returns:
            tuple: X_train, X_test, y_train, y_test arrays.
        """
        return train_test_split(X, y, test_size=test_size, random_state=random_state, stratify=stratify)

    # ---------- internal ----------
    def _init_groups(self, in_dim):
        """
        Initialize feature and enhancement groups with random weights and biases.

        Creates random weight matrices and bias vectors for both feature mapping
        groups and enhancement groups. Feature groups map input to feature space,
        while enhancement groups map feature outputs to enhancement space.

        Args:
            in_dim (int): Input dimension for feature groups.
        """
        self.Wf, self.bf = [], []
        for _ in range(self.cfg.n_feature_groups):
            W = self.rng.normal(0.0, 1.0, size=(
                in_dim, self.cfg.feature_group_size))
            b = self.rng.normal(0.0, 0.1, size=(self.cfg.feature_group_size,))
            self.Wf.append(W)
            self.bf.append(b)

        total_feature_nodes = self.cfg.n_feature_groups * self.cfg.feature_group_size
        self.We, self.be = [], []
        for _ in range(self.cfg.n_enhancement_groups):
            W = self.rng.normal(0.0, 1.0, size=(
                total_feature_nodes, self.cfg.enhancement_group_size))
            b = self.rng.normal(0.0, 0.1, size=(
                self.cfg.enhancement_group_size,))
            self.We.append(W)
            self.be.append(b)

    def _map_features(self, X):
        """
        Map input data through feature groups using random weights and activation functions.

        Args:
            X (np.ndarray): Input data of shape (n_samples, n_features).

        Returns:
            np.ndarray: Feature mappings of shape (n_samples, n_feature_groups * feature_group_size).
        """
        feats = [self.act_f(X @ W + b) for W, b in zip(self.Wf, self.bf)]
        return np.concatenate(feats, axis=1) if feats else np.empty((X.shape[0], 0))

    def _map_enhancements(self, F):
        """
        Map feature outputs through enhancement groups using random weights and activation functions.

        Args:
            F (np.ndarray): Feature mappings of shape (n_samples, n_feature_nodes).

        Returns:
            np.ndarray: Enhancement mappings of shape (n_samples, n_enhancement_groups * enhancement_group_size).
        """
        enh = [self.act_e(F @ W + b) for W, b in zip(self.We, self.be)]
        return np.concatenate(enh, axis=1) if enh else np.empty((F.shape[0], 0))

    def _design(self, X):
        """
        Create the design matrix by concatenating feature and enhancement mappings.

        Applies feature mapping, enhancement mapping, and optionally adds bias term
        to create the final design matrix used for output weight learning.

        Args:
            X (np.ndarray): Input data of shape (n_samples, n_features).

        Returns:
            np.ndarray: Design matrix of shape (n_samples, total_nodes + bias).
        """
        F = self._map_features(X)
        E = self._map_enhancements(F)
        H = np.concatenate([F, E], axis=1) if E.size else F
        if self.cfg.add_bias:
            H = np.concatenate([H, np.ones((H.shape[0], 1))], axis=1)
        return H

    def fit(self, X, y):
        """
        Train the Broad Learning System on the given data.

        Fits the model by standardizing input, initializing random groups,
        preparing target encoding (for classification), creating design matrix,
        and solving for optimal output weights using Ridge regression.

        Args:
            X (array-like): Training features of shape (n_samples, n_features).
            y (array-like): Training targets of shape (n_samples,) or (n_samples, n_outputs).

        Returns:
            BroadLearningSystem: Returns self for method chaining.
        """
        X = np.asarray(X, dtype=float)

        if self.scaler:
            Xs = self.scaler.fit_transform(X)
        else:
            Xs = X

        self._init_groups(Xs.shape[1])

        y = np.asarray(y)
        if y.ndim == 1 or (y.ndim == 2 and y.shape[1] == 1):
            self.is_classification = True
            y = y.reshape(-1, 1)
            self.enc = OneHotEncoder(
                sparse_output=False, handle_unknown="ignore")
            T = self.enc.fit_transform(y)
            self.classes_ = self.enc.categories_[0]
        else:
            self.is_classification = False
            T = y.astype(float)

        H = self._design(Xs)
        self.ridge = Ridge(alpha=self.cfg.lambda_reg,
                           fit_intercept=False, random_state=self.cfg.random_state)
        self.ridge.fit(H, T)
        self.Wout = self.ridge.coef_.T
        return self

    def _forward(self, X):
        """
        Perform forward pass through the trained BLS model.

        Args:
            X (np.ndarray): Input data of shape (n_samples, n_features).

        Returns:
            np.ndarray: Model outputs of shape (n_samples, n_outputs).
        """
        if self.scaler:
            X = self.scaler.transform(X)
        H = self._design(X)
        return H @ self.Wout

    def predict(self, X):
        """
        Make predictions on new data.

        For classification tasks, returns predicted class labels.
        For regression tasks, returns predicted continuous values.

        Args:
            X (array-like): Input features of shape (n_samples, n_features).

        Returns:
            np.ndarray: Predictions of shape (n_samples,) for classification or 
                       (n_samples, n_outputs) for regression.
        """
        X = np.asarray(X, dtype=float)
        Y = self._forward(X)
        if self.is_classification:
            idx = np.argmax(Y, axis=1)
            return self.classes_[idx]
        return Y

    def predict_proba(self, X):
        """
        Compute class probabilities for input samples.

        Only available for classification tasks. Uses softmax to convert
        logits to probability distributions.

        Args:
            X (array-like): Input features of shape (n_samples, n_features).

        Returns:
            np.ndarray: Class probabilities of shape (n_samples, n_classes).

        Raises:
            ValueError: If called on a regression model.
        """
        if not self.is_classification:
            raise ValueError("predict_proba only for classification.")
        X = np.asarray(X, dtype=float)
        logits = self._forward(X)
        return softmax(logits)

    def add_feature_groups(self, k):
        """
        Add additional feature groups to expand the network breadth.

        This method supports incremental learning by adding new feature mapping
        groups without retraining the entire model. The output weights will need
        to be refitted after expansion.

        Args:
            k (int): Number of feature groups to add.

        Raises:
            RuntimeError: If called before the model is fitted.
        """
        in_dim = self.scaler.mean_.shape[0] if self.scaler else None
        if in_dim is None:
            raise RuntimeError("Model must be fitted before adding groups.")
        for _ in range(k):
            W = self.rng.normal(0.0, 1.0, size=(
                in_dim, self.cfg.feature_group_size))
            b = self.rng.normal(0.0, 0.1, size=(self.cfg.feature_group_size,))
            self.Wf.append(W)
            self.bf.append(b)

    def add_enhancement_groups(self, k):
        """
        Add additional enhancement groups to expand the network breadth.

        This method supports incremental learning by adding new enhancement
        groups that operate on the current feature space. The output weights
        will need to be refitted after expansion.

        Args:
            k (int): Number of enhancement groups to add.
        """
        total_feature_nodes = len(self.Wf) * self.cfg.feature_group_size
        for _ in range(k):
            W = self.rng.normal(0.0, 1.0, size=(
                total_feature_nodes, self.cfg.enhancement_group_size))
            b = self.rng.normal(0.0, 0.1, size=(
                self.cfg.enhancement_group_size,))
            self.We.append(W)
            self.be.append(b)

    def refit_output(self, X, y):
        """
        Refit only the output weights using the current network architecture.

        This method is efficient for updating the model after adding new feature
        or enhancement groups, as it only recomputes the final Ridge regression
        without reinitializing the random groups.

        Args:
            X (array-like): Training features of shape (n_samples, n_features).
            y (array-like): Training targets of shape (n_samples,) or (n_samples, n_outputs).

        Returns:
            BroadLearningSystem: Returns self for method chaining.
        """
        X = np.asarray(X, dtype=float)
        if self.scaler:
            X = self.scaler.transform(X)

        y = np.asarray(y)
        if self.is_classification:
            T = self.enc.transform(y.reshape(-1, 1))
        else:
            T = y.astype(float)

        H = self._design(X)
        self.ridge.fit(H, T)
        self.Wout = self.ridge.coef_.T
        return self

    def extract_features(self, X):
        """
        Extract the intermediate feature representation from the BLS network.

        Returns the design matrix (concatenated feature and enhancement mappings)
        without applying the final output weights. Useful for feature extraction
        and as input to other models like LSTMs.

        Args:
            X (array-like): Input features of shape (n_samples, n_features).

        Returns:
            np.ndarray: Feature representation of shape (n_samples, total_nodes + bias).
        """
        X = np.asarray(X, dtype=float)
        if self.scaler:
            X = self.scaler.transform(X)
        return self._design(X)

# Dataset


In [9]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader


def collate_keep_meta(batch):
    # tensors
    x = torch.stack([b["x_hist"] for b in batch])      # [B, L, F]
    y = torch.stack([b["y_future"] for b in batch])    # [B, H]
    lat = torch.tensor([b["lat"] for b in batch], dtype=torch.float32)
    lon = torch.tensor([b["lon"] for b in batch], dtype=torch.float32)
    # keep metadata as simple Python lists/strings
    meta = {
        "tile_id":   [b["tile_id"] for b in batch],
        # stringify Timestamps
        "start_time": [str(b["start_time"]) for b in batch],
        "lat": lat, "lon": lon,
    }
    return {"x_hist": x, "y_future": y, "meta": meta}


# ---------- 1) Identify columns ----------
NON_FEATURE_COLS = {
    "lon", "lat", "time", "source_file", "PM25_MERRA2", "PM25_ug_m3", "class"
}


def get_feature_cols(df: pd.DataFrame):
    num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    feat_cols = [c for c in num_cols if c not in NON_FEATURE_COLS and c.lower() not in {
        "timestamp"}]
    return feat_cols

# ---------- 2) Parse & tidy ----------


def prepare_dataframe(df: pd.DataFrame, hourly=True, dayfirst=True, freq="H") -> pd.DataFrame:
    """
    - Parses timestamps from df['time'].
    - Builds tile_id from (lat, lon).
    - Optionally resamples per tile to an evenly spaced time grid (freq='H' or '30T').
    - Returns a tidy numeric frame where features are numeric and PM25_ug_m3 is present.
    """
    df = df.copy()

    # 1) timestamp + tile id
    df["timestamp"] = pd.to_datetime(
        df["time"], dayfirst=dayfirst, errors="coerce")
    df["tile_id"] = (df["lat"].round(4).astype(str) +
                     "_" + df["lon"].round(4).astype(str))

    keep = ["timestamp", "tile_id", "lat", "lon",
            "PM25_ug_m3"] + get_feature_cols(df)
    df = df[keep].dropna(subset=["timestamp"]).sort_values(
        ["tile_id", "timestamp"])

    if hourly:
        def _resample(g):
            tile = g["tile_id"].iloc[0]
            lat0 = float(g["lat"].iloc[0])
            lon0 = float(g["lon"].iloc[0])

            g = g.set_index("timestamp").sort_index()

            # numeric-only columns for resampling (avoid strings like tile_id)
            num_cols = g.select_dtypes(include=[np.number]).columns
            # numeric_only implicitly True on numeric subset
            g_num = g[num_cols].resample(freq).mean()

            # fill gaps
            g_num = g_num.interpolate("time").ffill().bfill()

            # add metadata back
            g_num["tile_id"] = tile
            g_num["lat"] = lat0
            g_num["lon"] = lon0

            return g_num.reset_index()

        df = df.groupby("tile_id", group_keys=False).apply(_resample)

    # ensure numeric dtypes and fill any leftovers
    feat_cols = get_feature_cols(df)
    df[feat_cols + ["PM25_ug_m3"]] = df[feat_cols +
                                        ["PM25_ug_m3"]].astype(float).fillna(0.0)
    return df

# ---------- 3) Compute normalization stats ----------


def compute_feature_stats(df: pd.DataFrame):
    feat_cols = get_feature_cols(df)
    mean = df[feat_cols].mean().astype("float32").values
    std = df[feat_cols].std(ddof=0).replace(0, 1.0).astype("float32").values
    return feat_cols, mean, std

# ---------- 4) Window index builder ----------


def build_indices(df: pd.DataFrame, L=168, H=72, stride=1):
    idx = []
    for tile, g in df.groupby("tile_id"):
        n = len(g)
        for t in range(L, n - H + 1, stride):  # note +1
            idx.append((tile, t))
    return idx

# ---------- 5) PyTorch Dataset ----------


class TSWindowDataset(Dataset):
    def __init__(self, df: pd.DataFrame, L=168, H=72, stride=1, stats=None):
        """
        df: output of prepare_dataframe()
        L: lookback length (hours)
        H: horizon length (72)
        stats: (feat_cols, mean, std) from compute_feature_stats(train_df)
        """
        self.df = df
        self.L, self.H = L, H
        self.feat_cols, self.mean, self.std = stats if stats is not None else compute_feature_stats(
            df)
        self.idx = build_indices(df, L, H, stride)

        # pre-slice groups to avoid repeated groupby in __getitem__
        self.groups = {tile: g.reset_index(drop=True)
                       for tile, g in df.groupby("tile_id")}

    def __len__(self): return len(self.idx)

    def __getitem__(self, i):
        tile, t = self.idx[i]
        g = self.groups[tile]

        # history window [t-L .. t-1]
        hist = g.loc[t-self.L:t-1,
                     self.feat_cols].values.astype(np.float32)   # [L, F]

        # <- add .astype(np.float32)
        hist = ((hist - self.mean) / self.std).astype(np.float32)

        # future targets [t .. t+H-1]
        fut = g.loc[t:t+self.H-1,
                    "PM25_ug_m3"].values.astype(np.float32)       # [H]

        # metadata
        start_ts = g.loc[t, "timestamp"]
        lat = float(g["lat"].iloc[0])
        lon = float(g["lon"].iloc[0])

        return {
            "x_hist": torch.from_numpy(hist),        # [L, F]
            "y_future": torch.from_numpy(fut),       # [H]
            "tile_id": tile,
            "start_time": pd.Timestamp(start_ts),
            "lat": lat, "lon": lon,
        }


# ---------- 6) Quick usage + rich debug ----------
if __name__ == "__main__":
    # Load a CSV just for this run (swap to parquet reader when ready)
    # expects: time, lat, lon, PM25_ug_m3 + numeric features
    df_raw = pd.read_csv("../first_100k_rows.csv")

    df_raw = df_raw.iloc[:10000, :]

    # Build tidy frame
    df = prepare_dataframe(df_raw, hourly=True, dayfirst=True, freq="H")

    # Config (small so you see windows immediately)
    L, H, stride = 8, 4, 1

    # Feature stats
    feat_cols, mean, std = compute_feature_stats(df)
    F = len(feat_cols)

    # Dataset
    ds = TSWindowDataset(df, L=L, H=H, stride=stride,
                         stats=(feat_cols, mean, std))

    # ----------------- High-level summary -----------------
    print("\n=== DATA SUMMARY ===")
    print("df shape:", df.shape)
    print("time range:", df["timestamp"].min(), "→", df["timestamp"].max())
    print("#tiles:", df["tile_id"].nunique())
    print("#features (F):", F)
    print("first 10 feature cols:", feat_cols[:10])
    print(f"window config: L={L}, H={H}, stride={stride}")
    print("#windows:", len(ds))

    # Per-tile coverage (top/bottom few)
    counts = df.groupby("tile_id")["timestamp"].count().sort_values()
    print("\nrows per tile (smallest 5):")
    print(counts.head(5))
    print("rows per tile (largest 5):")
    print(counts.tail(5))

    if len(ds) == 0:
        raise SystemExit(
            "\n[!] No windows available. Increase coverage or lower L/H.")

    # ----------------- Inspect first window -----------------
    print("\n=== FIRST WINDOW DEBUG ===")
    tile0, t0 = ds.idx[0]
    g0 = ds.groups[tile0]
    b0 = ds[0]
    x0, y0 = b0["x_hist"], b0["y_future"]

    print("tile_id:", tile0)
    print("history shape [L,F]:", tuple(x0.shape),
          "| future shape [H]:", tuple(y0.shape))

    # time ranges for history & future
    hist_ts = g0.loc[t0-L:t0-1, "timestamp"].to_list()
    fut_ts = g0.loc[t0:t0+H-1, "timestamp"].to_list()
    print("hist timestamps:", hist_ts[0], "→", hist_ts[-1])
    print("fut  timestamps:", fut_ts[0],  "→", fut_ts[-1])

    # check spacing is hourly
    hist_deltas = pd.Series(hist_ts).diff(
    ).dropna().dt.total_seconds().unique()
    fut_deltas = pd.Series(fut_ts).diff().dropna().dt.total_seconds().unique()
    print("hist Δt seconds (unique):", hist_deltas)
    print("fut  Δt seconds (unique):", fut_deltas)

    # normalization sanity: mean ~ 0, std ~ 1 over this window (rough check)
    print("x_hist window mean/std (overall):",
          float(x0.mean()), float(x0.std()))
    # show a single feature’s first few timesteps
    print("x_hist[0:5, 0] sample:", x0[:5, 0].tolist())
    # show a few future targets
    print("y_future sample:", y0[:min(5, H)].tolist())

    # ----------------- Batch check via DataLoader -----------------
    loader = DataLoader(ds, batch_size=32, shuffle=True, drop_last=True,
                        collate_fn=collate_keep_meta)

    batch = next(iter(loader))
    Xb, Yb = batch["x_hist"], batch["y_future"]
    meta = batch["meta"]

    print("\n=== BATCH CHECK (custom collate) ===")
    print("batch x_hist shape [B,L,F]:", tuple(Xb.shape))
    print("batch y_future shape [B,H]:", tuple(Yb.shape))
    print("meta keys:", list(meta.keys()))
    print("tile_id[0]:", meta["tile_id"][0])
    print("start_time[0]:", meta["start_time"][0])
    print("lat/lon tensors:",
          tuple(meta["lat"].shape), tuple(meta["lon"].shape))

    # ----------------- PatchTST token check (dev aid) -----------------
    # If you plan to patchify later, this shows expected token count.
    def patchify(x, P=16, S=8):  # x: [B,L,F] -> [B,N,P*F]
        B, L_, F_ = x.shape
        N = (L_ - P) // S + 1
        return torch.stack([x[:, s:s+P, :].reshape(B, P*F_) for s in range(0, L_-P+1, S)], dim=1)

    P, S = 4, 2  # small numbers just to visualize with L=8
    tokens = patchify(Xb, P=P, S=S)
    print("\n=== PATCHIFY SMOKE TEST ===")
    print(f"P={P}, S={S} -> N={(L-P)//S + 1}")
    print("tokens shape [B,N,P*F]:", tuple(tokens.shape))

    print("\nAll sanity checks passed.\n")

  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(freq).mean()
  g_num = g[num_cols].resample(f


=== DATA SUMMARY ===
df shape: (24400, 55)
time range: 2021-01-10 00:00:00 → 2021-02-10 19:00:00
#tiles: 400
#features (F): 50
first 10 feature cols: ['DUEXTTAU', 'BCFLUXU', 'OCFLUXV', 'BCANGSTR', 'SUFLUXV', 'SSSMASS25', 'SSSMASS', 'OCSMASS', 'BCCMASS', 'BCSMASS']
window config: L=8, H=4, stride=1
#windows: 20000

rows per tile (smallest 5):
tile_id
31.5_27.5      24
31.5_28.125    24
31.5_28.75     24
31.5_29.375    24
31.5_30.0      24
Name: timestamp, dtype: int64
rows per tile (largest 5):
tile_id
30.5_25.0    764
29.0_25.0    764
30.0_25.0    764
28.5_25.0    764
22.5_25.0    764
Name: timestamp, dtype: int64

=== FIRST WINDOW DEBUG ===
tile_id: 22.0_25.0
history shape [L,F]: (8, 50) | future shape [H]: (4,)
hist timestamps: 2021-01-10 00:00:00 → 2021-01-10 07:00:00
fut  timestamps: 2021-01-10 08:00:00 → 2021-01-10 11:00:00
hist Δt seconds (unique): [3600.]
fut  Δt seconds (unique): [3600.]
x_hist window mean/std (overall): -0.3407861292362213 0.7213065028190613
x_hist[0:5, 0] sa

In [10]:
# ---- time-based split (80/20 by timestamp) ----
cutoff = df["timestamp"].quantile(0.80)
train_df = df[df["timestamp"] <= cutoff].copy()
val_df = df[df["timestamp"] > cutoff].copy()

# ---- stats on train only ----
feat_cols, mean, std = compute_feature_stats(train_df)

In [None]:
# 1) Gather row-wise feature matrices (no windowing here)
Xtr_rows = train_df[feat_cols].values
Xva_rows = val_df[feat_cols].values

# 2) Configure BLS as a preprocessor
cfg = BLSConfig(
    n_feature_groups=2,
    feature_group_size=2,
    n_enhancement_groups=2,
    enhancement_group_size=2,
    feature_activation="relu",
    enhancement_activation="relu",
    lambda_reg=1e-2,
    add_bias=False,
    standardize=True,
    random_state=1,
)

# 3) Fit BLS on training rows (dummy y to stay on regression path)
bls_y = train_df.iloc[:, -2].values
bls = BroadLearningSystem(cfg).fit(Xtr_rows, bls_y)

# 4) Transform rows -> BLS features (vectorized, no loops)
# [n_train_rows, D_bls]
Ztr_rows = bls.extract_features(Xtr_rows)
# [n_val_rows,   D_bls]
Zva_rows = bls.extract_features(Xva_rows)
D_bls = Ztr_rows.shape[1]
bls_cols = [f"BLS_{i}" for i in range(D_bls)]
print("BLS feature dim:", D_bls)

BLS feature dim: 8


In [12]:
# 5) Build new frames: keep metadata/target + add all BLS cols at once
keep_cols = ["timestamp", "tile_id", "lat", "lon", "PM25_ug_m3"]
Ztr_df = pd.DataFrame(Ztr_rows, columns=bls_cols,
                      index=train_df.index).astype("float32")
Zva_df = pd.DataFrame(Zva_rows, columns=bls_cols,
                      index=val_df.index).astype("float32")
train_df_bls = pd.concat([train_df[keep_cols], Ztr_df],
                         axis=1).reset_index(drop=True)
val_df_bls = pd.concat([val_df[keep_cols],   Zva_df],
                       axis=1).reset_index(drop=True)

# 6) From here on, treat BLS features as the only inputs
#    (NON_FEATURE_COLS already excludes PM25_ug_m3, lat/lon, etc.)
feat_cols_bls, mean_bls, std_bls = compute_feature_stats(train_df_bls)
F = len(feat_cols_bls)
print("#BLS features (F):", F)

#BLS features (F): 8


In [13]:
# 7) Rebuild datasets/loaders on the BLS-augmented frames
from torch.utils.data import DataLoader
L, H, stride = 168, 72, 1
train_ds = TSWindowDataset(train_df_bls, L=L, H=H, stride=stride, stats=(
    feat_cols_bls, mean_bls, std_bls))
val_ds = TSWindowDataset(val_df_bls,   L=L, H=H, stride=stride, stats=(
    feat_cols_bls, mean_bls, std_bls))

train_loader = DataLoader(train_ds, batch_size=32, shuffle=True,
                          drop_last=True,  collate_fn=collate_keep_meta)
val_loader = DataLoader(val_ds,   batch_size=32, shuffle=False,
                        drop_last=False, collate_fn=collate_keep_meta)

print("#train windows:", len(train_ds), "| #val windows:", len(val_ds))

#train windows: 5620 | #val windows: 100


In [14]:
import math
import torch.nn as nn
import torch


def sinusoidal_positional_encoding(n_pos: int, d_model: int, device=None):
    pe = torch.zeros(n_pos, d_model, device=device)
    pos = torch.arange(0, n_pos, device=device).unsqueeze(1).float()
    div = torch.exp(torch.arange(
        0, d_model, 2, device=device).float() * (-math.log(10000.0)/d_model))
    pe[:, 0::2] = torch.sin(pos * div)
    pe[:, 1::2] = torch.cos(pos * div)
    return pe  # [N, d]


class PatchPosEncoder(nn.Module):
    def __init__(self, in_features, patch_len=16, stride=8, d_model=128):
        super().__init__()
        self.P, self.S, self.F, self.d = patch_len, stride, in_features, d_model
        self.proj = nn.Linear(self.P * self.F, self.d)

    def forward(self, x):           # x: [B, L, F]
        B, L, F = x.shape
        starts = range(0, L - self.P + 1, self.S)
        patches = [
            x[:, s:s+self.P, :].reshape(B, self.P*self.F) for s in starts]
        T = torch.stack(patches, dim=1)              # [B, N, P*F]
        T = self.proj(T)                             # [B, N, d]
        pe = sinusoidal_positional_encoding(T.size(1), self.d, device=x.device)
        return T + pe                                # [B, N, d]


class SimplePatcherHead(nn.Module):
    def __init__(self, in_features, L, H, patch_len=16, stride=8, d_model=128):
        super().__init__()
        self.enc = PatchPosEncoder(in_features, patch_len, stride, d_model)
        self.head = nn.Linear(d_model, H)

    def forward(self, x_hist):      # [B, L, F]
        tokens = self.enc(x_hist)   # [B, N, d]
        pooled = tokens[:, -1]      # last-token pool (or tokens.mean(dim=1))

        return self.head(pooled)    # [B, H]

# TESTER


In [None]:
import math
import torch
import torch.nn as nn
import pennylane as qml


class QuantumEmbedOnly(nn.Module):
    """
    Data-only embedding: angles = Linear(d_model -> n_qubits)
    Circuit: RY(angle_i) on each qubit, measure <Z_i> for all wires.
    """

    def __init__(self, d_model: int, n_qubits: int = 8):
        super().__init__()
        self.n_qubits = n_qubits
        self.angle_proj = nn.Linear(d_model, n_qubits)

        self.dev = qml.device("default.qubit", wires=n_qubits)

        @qml.qnode(self.dev, interface="torch", diff_method="parameter-shift")
        def circuit(angles_1d):
            for i in range(n_qubits):
                qml.RY(angles_1d[i], wires=i)
            # per-qubit Z expectation values
            return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]

        self.circuit = circuit

    def forward(self, pooled):  # pooled: [B, d_model], float32
        angles = torch.tanh(self.angle_proj(pooled)) * \
            math.pi      # [B, n_qubits], float32

        outs = []
        for b in range(angles.shape[0]):
            # list/tuple OR tensor
            o = self.circuit(angles[b].double())
            if isinstance(o, (list, tuple)):
                o = torch.stack([oi if isinstance(oi, torch.Tensor)
                                 else torch.as_tensor(oi, dtype=torch.float64)
                                 for oi in o], dim=0)
            outs.append(o)

        # [B, n_qubits], float32
        qfeat = torch.stack(outs, dim=0).float()
        return qfeat


class PatchToQuantum72(nn.Module):
    """
    Patch+PosEnc (yours) -> last-token pool -> QuantumEmbedOnly -> Linear -> 72
    No changes to your earlier blocks.
    """

    def __init__(self, in_features: int, L: int, H: int,
                 patch_len=16, stride=8, d_model=128, n_qubits=8):
        super().__init__()
        self.enc = PatchPosEncoder(
            in_features, patch_len, stride, d_model)  # encoder
        # quantum embedding only
        self.qemb = QuantumEmbedOnly(d_model=d_model, n_qubits=n_qubits)
        self.head = nn.Linear(n_qubits, H)

    def forward(self, x_hist):            # x_hist: [B, L, F]
        tokens = self.enc(x_hist)         # [B, N, d_model]
        # pooled = tokens[:, -1]            # [B, d_model]  (or tokens.mean(dim=1)) (or

        q_each = [self.qemb(tokens[:, i, :]) for i in range(
            tokens.shape[1])]  # list of [B, n_qubits]
        q_stack = torch.stack(q_each, dim=1)  # [B, N, n_qubits]
        # [B, n_qubits]  (or attention here too)
        qfeat = q_stack.mean(dim=1)

        # qfeat  = self.qemb(pooled)        # [B, n_qubits]
        return self.head(qfeat)           # [B, 72]

In [None]:
F = len(feat_cols)
model = PatchToQuantum72(in_features=F, L=L, H=H, patch_len=16, stride=8,
                         d_model=128, n_qubits=8)

batch = next(iter(train_loader))
x, y = batch["x_hist"], batch["y_future"]         # x:[B,L,F], y:[B,72]
y_hat = model(x)                                  # -> [B,72]
print("y_hat shape:", y_hat.shape)

# quick gradient check
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
loss = nn.MSELoss()(y_hat, y)
opt.zero_grad()
loss.backward()
opt.step()
print("loss:", float(loss))

y_hat shape: torch.Size([32, 72])
loss: 2478.3671875


# QLSTM


In [15]:
# === QLSTM cell + wrapper that uses your PatchPosEncoder ===
import pennylane as qml
import torch
import torch.nn as nn


class zzfeatuermapQLSTM(nn.Module):
    """
    Quantum LSTM cell:
      concat([h_t, x_t]) -> Linear -> n_qubits
      -> IQPEmbedding + BasicEntanglerLayers -> Z expvals
      -> Linear -> hidden_size
      -> standard LSTM updates
    """

    def __init__(self, input_size, hidden_size, n_qubits=4, n_qlayers=1, backend="default.qubit"):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.concat_size = input_size + hidden_size
        self.n_qubits = n_qubits
        self.n_qlayers = n_qlayers

        # Separate devices per gate
        self.dev_forget = qml.device(backend, wires=n_qubits)
        self.dev_input = qml.device(backend, wires=n_qubits)
        self.dev_update = qml.device(backend, wires=n_qubits)
        self.dev_output = qml.device(backend, wires=n_qubits)

        # Gate circuits (same topology for each)  <-- NOTE the arg name: inputs
        def _circuit_forget(inputs, weights):
            qml.templates.IQPEmbedding(inputs, wires=range(n_qubits))
            qml.templates.BasicEntanglerLayers(weights, wires=range(n_qubits))
            return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]

        def _circuit_input(inputs, weights):
            qml.templates.IQPEmbedding(inputs, wires=range(n_qubits))
            qml.templates.BasicEntanglerLayers(weights, wires=range(n_qubits))
            return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]

        def _circuit_update(inputs, weights):
            qml.templates.IQPEmbedding(inputs, wires=range(n_qubits))
            qml.templates.BasicEntanglerLayers(weights, wires=range(n_qubits))
            return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]

        def _circuit_output(inputs, weights):
            qml.templates.IQPEmbedding(inputs, wires=range(n_qubits))
            qml.templates.BasicEntanglerLayers(weights, wires=range(n_qubits))
            return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]

        weight_shapes = {"weights": (n_qlayers, n_qubits)}
        # Older TorchLayer: just pass the QNode and weight_shapes
        self.qlayer_forget = qml.qnn.TorchLayer(
            qml.QNode(_circuit_forget, self.dev_forget, interface="torch"),
            weight_shapes
        )
        self.qlayer_input = qml.qnn.TorchLayer(
            qml.QNode(_circuit_input,  self.dev_input,  interface="torch"),
            weight_shapes
        )
        self.qlayer_update = qml.qnn.TorchLayer(
            qml.QNode(_circuit_update, self.dev_update, interface="torch"),
            weight_shapes
        )
        self.qlayer_output = qml.qnn.TorchLayer(
            qml.QNode(_circuit_output, self.dev_output, interface="torch"),
            weight_shapes
        )

        # Classical pre/post
        # [h_t, x_t] -> n_qubits
        self.clayer_in = nn.Linear(self.concat_size, n_qubits)
        # Z-expvals -> hidden_size
        self.clayer_out = nn.Linear(n_qubits, hidden_size)

    def forward(self, x, init_states=None):
        """
        x: [B, N, input_size]  (your Patch+PosEnc tokens)
        returns: hidden_seq [B, N, hidden], (h_T, c_T)
        """
        B, N, _ = x.size()
        if init_states is None:
            h_t = torch.zeros(B, self.hidden_size,
                              device=x.device, dtype=x.dtype)
            c_t = torch.zeros(B, self.hidden_size,
                              device=x.device, dtype=x.dtype)
        else:
            h_t, c_t = init_states

        h_list = []
        for t in range(N):
            x_t = x[:, t, :]                         # [B, input_size]
            v_t = torch.cat([h_t, x_t], dim=1)       # [B, hidden+input]
            y_t = self.clayer_in(v_t)                # [B, n_qubits]

            f_t = torch.sigmoid(self.clayer_out(
                self.qlayer_forget(y_t)))  # [B, hidden]
            i_t = torch.sigmoid(self.clayer_out(
                self.qlayer_input(y_t)))  # [B, hidden]
            g_t = torch.tanh(self.clayer_out(
                self.qlayer_update(y_t)))  # [B, hidden]
            o_t = torch.sigmoid(self.clayer_out(
                self.qlayer_output(y_t)))  # [B, hidden]

            c_t = f_t * c_t + i_t * g_t
            h_t = o_t * torch.tanh(c_t)
            h_list.append(h_t.unsqueeze(1))

        hidden_seq = torch.cat(h_list, dim=1)        # [B, N, hidden]
        return hidden_seq, (h_t, c_t)


class PatchToQLSTM72(nn.Module):
    """
    PatchPosEncoder (yours) -> QLSTM over tokens -> Linear -> H (e.g., 72).
    """

    def __init__(self, in_features, L, H, patch_len=16, stride=8,
                 d_model=128, hidden_size=128, n_qubits=4, n_qlayers=1, backend="default.qubit"):
        super().__init__()
        # uses your existing class
        self.enc = PatchPosEncoder(in_features, patch_len, stride, d_model)
        self.qlstm = zzfeatuermapQLSTM(input_size=d_model, hidden_size=hidden_size,
                                       n_qubits=n_qubits, n_qlayers=n_qlayers, backend=backend)
        self.head = nn.Linear(hidden_size, H)

    def forward(self, x_hist):               # x_hist: [B, L, F]
        tokens = self.enc(x_hist)            # [B, N, d_model]
        h_seq, (hT, cT) = self.qlstm(tokens)  # [B, N, hidden]
        return self.head(h_seq[:, -1, :])    # [B, H]

In [16]:
# --- Build the QLSTM model (must run before the training loop) ---
F_bls = len(feat_cols_bls)

# If you have lightning backends installed you can swap "default.qubit" -> "lightning.qubit" (CPU) or "lightning.gpu"
backend = "default.qubit"

qlstm_model = PatchToQLSTM72(
    in_features=F_bls, L=L, H=H,
    patch_len=16, stride=8,
    d_model=128, hidden_size=128,
    n_qubits=4, n_qlayers=1,
    backend="default.qubit"   # or "lightning.qubit" if available
)

# quick smoke test
with torch.no_grad():
    _ = qlstm_model(next(iter(train_loader))["x_hist"][:2])
print("QLSTM (with BLS features) forward pass OK.")

QLSTM (with BLS features) forward pass OK.


# QGRU


In [None]:
# === QGRU cell + wrapper that uses your PatchPosEncoder (drop-in alongside QLSTM) ===
import pennylane as qml
import torch
import torch.nn as nn


class zzfeatuermapQGRU(nn.Module):
    """
    Quantum GRU cell:
      Gates (reset/update/new) are VQCs.
      - For r,z gates: concat([h_t, x_t]) -> Linear -> n_qubits -> QNode -> Linear -> hidden_size
      - For candidate n_t: concat([r_t * h_t, x_t]) -> Linear -> n_qubits -> QNode -> Linear -> hidden_size
      - Standard GRU update: h_t = (1 - z_t) * n_t + z_t * h_{t-1}
    """

    def __init__(self, input_size, hidden_size, n_qubits=4, n_qlayers=1, backend="default.qubit"):
        super().__init__()
        self.n_inputs = input_size
        self.hidden_size = hidden_size
        self.concat_size = self.n_inputs + self.hidden_size
        self.n_qubits = n_qubits
        self.n_qlayers = n_qlayers
        self.backend = backend

        # unique wire names per gate (separate devices)
        self.wires_reset = [f"wire_reset_{i}" for i in range(self.n_qubits)]
        self.wires_update = [f"wire_update_{i}" for i in range(self.n_qubits)]
        self.wires_new = [f"wire_new_{i}" for i in range(self.n_qubits)]

        self.dev_reset = qml.device(self.backend, wires=self.wires_reset)
        self.dev_update = qml.device(self.backend, wires=self.wires_update)
        self.dev_new = qml.device(self.backend, wires=self.wires_new)

        # circuits: IQPEmbedding + BasicEntanglerLayers -> Z expvals
        def _circuit_reset(inputs, weights):
            qml.templates.IQPEmbedding(inputs, wires=self.wires_reset)
            qml.templates.BasicEntanglerLayers(weights, wires=self.wires_reset)
            return [qml.expval(qml.PauliZ(w)) for w in self.wires_reset]

        def _circuit_update(inputs, weights):
            qml.templates.IQPEmbedding(inputs, wires=self.wires_update)
            qml.templates.BasicEntanglerLayers(
                weights, wires=self.wires_update)
            return [qml.expval(qml.PauliZ(w)) for w in self.wires_update]

        def _circuit_new(inputs, weights):
            qml.templates.IQPEmbedding(inputs, wires=self.wires_new)
            qml.templates.BasicEntanglerLayers(weights, wires=self.wires_new)
            return [qml.expval(qml.PauliZ(w)) for w in self.wires_new]

        weight_shapes = {"weights": (n_qlayers, n_qubits)}
        self.qlayer_reset = qml.qnn.TorchLayer(
            qml.QNode(_circuit_reset,  self.dev_reset,  interface="torch"), weight_shapes)
        self.qlayer_update = qml.qnn.TorchLayer(
            qml.QNode(_circuit_update, self.dev_update, interface="torch"), weight_shapes)
        self.qlayer_new = qml.qnn.TorchLayer(
            qml.QNode(_circuit_new,    self.dev_new,    interface="torch"), weight_shapes)

        # classical pre/post
        self.clayer_in_gates = nn.Linear(self.concat_size, n_qubits)  # for r,z
        # for candidate n with (r ⊙ h)
        self.clayer_in_cand = nn.Linear(self.concat_size, n_qubits)
        self.clayer_out = nn.Linear(self.n_qubits, self.hidden_size)

    def forward(self, x, init_state=None):
        """
        x: [B, N, input_size]  tokens from Patch+PosEnc
        returns: hidden_seq [B, N, hidden], h_T
        """
        B, N, _ = x.size()
        hidden_seq = []

        if init_state is None:
            h_t = torch.zeros(B, self.hidden_size,
                              device=x.device, dtype=x.dtype)
        else:
            h_t = init_state

        for t in range(N):
            x_t = x[:, t, :]                              # [B, input_size]
            v_t = torch.cat((h_t, x_t), dim=1)            # [B, hidden+input]
            y_t = self.clayer_in_gates(v_t)               # [B, n_qubits]

            r_t = torch.sigmoid(self.clayer_out(
                self.qlayer_reset(y_t)))   # [B, hidden]
            z_t = torch.sigmoid(self.clayer_out(
                self.qlayer_update(y_t)))  # [B, hidden]

            v_cand = torch.cat((r_t * h_t, x_t), dim=1)   # [B, hidden+input]
            y_cand = self.clayer_in_cand(v_cand)          # [B, n_qubits]
            n_t = torch.tanh(self.clayer_out(
                self.qlayer_new(y_cand)))     # [B, hidden]

            h_t = (1.0 - z_t) * n_t + z_t * h_t
            hidden_seq.append(h_t.unsqueeze(1))

        hidden_seq = torch.cat(hidden_seq, dim=1)         # [B, N, hidden]
        return hidden_seq, h_t


class PatchToQGRU72(nn.Module):
    """
    PatchPosEncoder (yours) -> QGRU over tokens -> Linear -> H (e.g., 72).
    Same interface as PatchToQLSTM72 so you can swap easily.
    """

    def __init__(self, in_features, L, H, patch_len=16, stride=8,
                 d_model=128, hidden_size=128, n_qubits=4, n_qlayers=1, backend="default.qubit"):
        super().__init__()
        self.enc = PatchPosEncoder(in_features, patch_len, stride, d_model)
        self.qgru = zzfeatuermapQGRU(input_size=d_model, hidden_size=hidden_size,
                                     n_qubits=n_qubits, n_qlayers=n_qlayers, backend=backend)
        self.head = nn.Linear(hidden_size, H)

    def forward(self, x_hist):                # x_hist: [B, L, F]
        tokens = self.enc(x_hist)             # [B, N, d_model]
        h_seq, hT = self.qgru(tokens)         # [B, N, hidden], [B, hidden]
        return self.head(h_seq[:, -1, :])     # [B, H]

In [None]:
# reuse your existing values: F=len(feat_cols), L, H, backend, train_loader
F_bls = len(feat_cols_bls)

# If you have lightning backends installed you can swap "default.qubit" -> "lightning.qubit" (CPU) or "lightning.gpu"
backend = "default.qubit"

qgru_model = PatchToQGRU72(
    in_features=F_bls, L=L, H=H,
    patch_len=16, stride=8,
    d_model=128, hidden_size=128,
    n_qubits=4, n_qlayers=1,
    backend="default.qubit"
)

with torch.no_grad():
    _ = qgru_model(next(iter(train_loader))["x_hist"][:2])
print("QGRU model built and forward pass OK.")

# If you want to train QGRU **without touching your training loop** that references `qlstm_model`,
# simply alias it:
# qlstm_model = qgru_model

QGRU model built and forward pass OK.


# Training loop


In [17]:
# === Train QLSTM for multiple epochs and plot the loss ===
import time
import math
import torch
import torch.nn as nn
import matplotlib.pyplot as plt

# --- config (tweak as you like) ---
EPOCHS = 30
LR = 1e-3
WEIGHT_DECAY = 0.0
GRAD_CLIP = 1.0  # helps stabilize PQC training
PRINT_EVERY = 1

# Pennylane's default.qubit + TorchLayer are CPU-friendly; keeping everything on CPU avoids device mismatches
device = "cuda" if torch.cuda.is_available() else "cpu"
qlstm_model.to(device)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(
    qlstm_model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
# optional cosine decay
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)


def run_epoch(model, loader, train: bool):
    model.train(train)
    total = 0.0
    count = 0
    start = time.time()
    for batch in loader:
        x = batch["x_hist"].to(device)          # [B, L, F]
        y = batch["y_future"].to(device)        # [B, H]
        if train:
            optimizer.zero_grad()
        with torch.set_grad_enabled(train):
            y_hat = model(x)                    # [B, H]
            loss = criterion(y_hat, y)
            if train:
                loss.backward()
                if GRAD_CLIP is not None:
                    torch.nn.utils.clip_grad_norm_(
                        model.parameters(), GRAD_CLIP)
                optimizer.step()
        total += float(loss) * x.size(0)
        count += x.size(0)
    elapsed = time.time() - start
    return total / max(count, 1), elapsed


train_losses, val_losses = [], []
lrs = []

print(f"Training on {device} for {EPOCHS} epochs...")
best_val = math.inf
best_state = None

for epoch in range(1, EPOCHS + 1):
    tr_loss, tr_time = run_epoch(qlstm_model, train_loader, train=True)
    va_loss, va_time = run_epoch(qlstm_model, val_loader,   train=False)
    train_losses.append(tr_loss)
    val_losses.append(va_loss)
    lrs.append(optimizer.param_groups[0]["lr"])

    if scheduler is not None:
        scheduler.step()

    if va_loss < best_val:
        best_val = va_loss
        best_state = {k: v.cpu().clone()
                      for k, v in qlstm_model.state_dict().items()}

    if epoch % PRINT_EVERY == 0:
        print(f"Epoch {epoch:03d} | "
              f"train {tr_loss:.4f} ({tr_time:.1f}s) | "
              f"val {va_loss:.4f} ({va_time:.1f}s) | "
              f"lr {lrs[-1]:.2e}")

# (optional) load best
if best_state is not None:
    qlstm_model.load_state_dict(best_state)
    print(f"\nLoaded best model (val MSE = {best_val:.4f}).")

# --- Plot ---
plt.figure(figsize=(7, 4.5))
plt.plot(train_losses, label="train")
plt.plot(val_losses, label="val")
plt.xlabel("epoch")
plt.ylabel("MSE loss")
plt.title("QLSTM: training and validation loss")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

# (optional) plot learning rate
plt.figure(figsize=(7, 3))
plt.plot(lrs)
plt.xlabel("epoch")
plt.ylabel("learning rate")
plt.title("Learning rate schedule")
plt.grid(True)
plt.tight_layout()
plt.show()

Training on cpu for 30 epochs...


Consider using tensor.detach() first. (Triggered internally at C:\actions-runner\_work\pytorch\pytorch\pytorch\torch\csrc\autograd\generated\python_variable_methods.cpp:836.)
  total += float(loss) * x.size(0)


KeyboardInterrupt: 