### Decay regress single

In [None]:
import numpy as np
from numbers import Real

_DT = np.float64   # dtype alias


def _iterate_beta_single(
    beta, sw, swy2, swxy, swx2, prior_mean, prior_precision
):
    """
    Single-series analogue of the vectorised `iterate_beta`.
    beta            : K            (current provisional β)
    sw              : scalar
    swy2            : scalar
    swxy            : K
    swx2            : K × K
    prior_mean      : K
    prior_precision : K × K   (Σ₀⁻¹)
    """
    xy  = swxy        # K
    xx  = swx2        # K × K
    bt  = beta        # K

    # ---- residual variance ----------------------------------------------
    res_var = (swy2 - 2 * xy @ bt + bt @ xx @ bt) / sw
    if not (res_var > 0):
        res_var = 1e-5          # floor to keep Λ non-singular

    # ---- posterior update -----------------------------------------------
    λ = res_var * prior_precision               # K × K
    A = xx + λ                                  # K × K
    b = xy + λ @ prior_mean                     # K
    return np.linalg.solve(A, b)                # K


def decay_regress_featurewise(
    x: np.ndarray,          # shape K × T
    y: np.ndarray,          # shape T
    *,
    prior_mean: np.ndarray, # shape K
    prior_covar: np.ndarray,# shape K × K
    decay_scales: np.ndarray|list,   # shape K (one τ_k per feature)
) -> np.ndarray:            # returns K × T
    """
    Exponentially-weighted Bayesian regression with *feature-specific*
    decay factors and **one** time-series.

    Each feature k uses ρ_k = exp(-1 / τ_k).
    """
    # ------------- sanity checks -----------------------------------------
    x   = np.asarray(x, dtype=_DT)
    y   = np.asarray(y, dtype=_DT)
    τ   = np.asarray(decay_scales, dtype=_DT)

    assert x.ndim == 2, "x must be K×T for a single series"
    K, T = x.shape
    assert y.shape == (T,)
    assert τ.shape == (K,)
    assert np.all(τ > 0)

    assert prior_mean.shape  == (K,)
    assert prior_covar.shape == (K, K)
    Λ0 = np.linalg.inv(prior_covar)

    ρ       = np.exp(-1.0 / τ)                 # K vector   (ρ_k)
    ρ_outer = np.outer(ρ, ρ)                   # K × K       (ρ_i ρ_j)

    # ------------- running moments ---------------------------------------
    sw    = 0.0
    swy2  = 0.0
    swxy  = np.zeros(K,     _DT)
    swx2  = np.zeros((K,K), _DT)

    beta_t    = prior_mean.copy()              # provisional β
    betas_out = np.empty((K, T), _DT)

    # ------------- main loop ---------------------------------------------
    for t in range(T):
        x_t = np.nan_to_num(x[:, t], nan=0.0)
        y_t = 0.0 if np.isnan(y[t]) else y[t]
        w_t = float(np.all(np.isfinite((x_t, y[t]))))  # 1 if both valid else 0

        # (a) decay previous sufficient stats
        swxy *= ρ
        swx2 *= ρ_outer

        # (b) add current contribution
        sw    += w_t
        swy2  += w_t * y_t**2
        swxy  += w_t * y_t * x_t
        swx2  += w_t * np.outer(x_t, x_t)

        # (c) two-pass β update
        beta0     = _iterate_beta_single(beta_t, sw, swy2, swxy, swx2,
                                         prior_mean, Λ0)
        beta_t    = _iterate_beta_single(beta0,  sw, swy2, swxy, swx2,
                                         prior_mean, Λ0)

        betas_out[:, t] = beta_t

    return betas_out


In [None]:
K, T = 5, 1_000
x     = np.random.randn(K, T)
y     = 0.7 * x[0] - 0.3 * x[3] + 0.1 * np.random.randn(T)

tau   = np.array([20, 50, 50, 10, 100])   # faster decay for feat-0 & 3
mu0   = np.zeros(K)
Sigma = np.eye(K) * 10

beta_path = decay_regress_featurewise(x, y,
                                      prior_mean=mu0,
                                      prior_covar=Sigma,
                                      decay_scales=tau)


### Decay regress multiple 

In [None]:
import numpy as np
from numbers import Real

DT_DOUBLE = np.float64                      # dtype alias


def decay_regress_featurewise(
    x: np.ndarray,          # K × T × N   predictors
    y: np.ndarray,          # T × N       target
    *,
    prior_mean: np.ndarray, # (K,)
    prior_covar: np.ndarray,# K × K
    decay_scales: np.ndarray | list,   # (K,)  one τ_k per feature
    target_decay_scale: Real = np.inf, # scalar (optional) decay for y
) -> np.ndarray:           # returns K × T × N (posterior betas)

    # --------- validation -------------------------------------------------
    x = np.asarray(x, dtype=DT_DOUBLE)
    y = np.asarray(y, dtype=DT_DOUBLE)
    τ = np.asarray(decay_scales, dtype=DT_DOUBLE)

    assert x.ndim == 3, "`x` must be K×T×N"
    K, T, N = x.shape
    assert y.shape == (T, N)
    assert τ.shape == (K,)
    assert np.all(τ > 0), "all decay_scales must be positive"

    assert prior_mean.shape == (K,)
    assert prior_covar.shape == (K, K)

    Λ0 = np.linalg.inv(prior_covar)         # prior precision  K×K
    ρ  = np.exp(-1.0 / τ)                   # (K,)    per-feature decay
    ρ_outer = np.outer(ρ, ρ)                # K × K   pairwise product

    # optional decay for the y-side moments
    ρ_y = (1.0 if np.isinf(target_decay_scale)
           else np.exp(-1.0 / target_decay_scale))

    # --------- running moments (vectorised over *series* N) ---------------
    sw    = np.zeros(N,                         DT_DOUBLE)   # scalar per series
    swy2  = np.zeros(N,                         DT_DOUBLE)
    swxy  = np.zeros((K,     N),                DT_DOUBLE)
    swx2  = np.zeros((K, K,  N),                DT_DOUBLE)

    betas = np.empty((K, T, N),                 DT_DOUBLE)   # output
    beta_init = np.broadcast_to(prior_mean[:, None], (K, N)).T  # N×K

    # ------------ helper --------------------------------------------------
    def iterate_beta(beta_a: np.ndarray) -> np.ndarray:
        """
        Vectorised posterior update (N series in parallel).
        beta_a : N × K  (provisional)
        returns: N × K  (updated)
        """
        xy  = swxy.T[:, None, :]            # N × 1 × K
        xx  = swx2.T                        # N × K × K
        bt  = beta_a[:, :, None]            # N × K × 1
        btt = beta_a[:, None, :]            # N × 1 × K

        with np.errstate(invalid='ignore'):
            res_var0 = (
                swy2 + (-2 * (xy @ bt) + btt @ xx @ bt)[:, 0, 0]
            ) / sw

        # validity / flooring
        good = np.nonzero(res_var0 > 0)[0]
        res_var = np.where(res_var0 > 0, res_var0, 1e-5)

        λ = res_var[:, None, None] * Λ0                 # N × K × K
        A = xx + λ
        b = swxy.T + λ @ prior_mean                    # N × K

        beta_new = beta_a.copy()
        beta_new[good] = np.linalg.solve(A[good], b[good])
        return beta_new

    # ------------- main time loop ----------------------------------------
    for t in range(T):
        x_t = x[:, t, :]                      # K × N
        y_t = y[t, :]                        # N
        bad_x = np.isnan(x_t)
        bad   = bad_x.any(axis=0) | np.isnan(y_t)
        w_t   = (~bad).astype(DT_DOUBLE)      # N, 0/1

        x_t = np.where(bad_x, 0.0, x_t)       # zero-fill NaNs
        y_t = np.where(bad,   0.0, y_t)

        # -- (a) decay previous moments -----------------------------------
        swxy *= ρ[:, None]                    # K × N
        swx2 *= ρ_outer[:, :, None]           # K × K × N
        sw    *= ρ_y
        swy2  *= ρ_y

        # -- (b) accumulate current obs -----------------------------------
        sw    += w_t
        swy2  += w_t * y_t**2
        swxy  += w_t * y_t    * x_t
        swx2  += w_t * (x_t[:, None, :] * x_t[None, :, :])

        # -- (c) two-pass β update ----------------------------------------
        beta0      = iterate_beta(beta_init)
        beta1      = iterate_beta(beta0)
        betas[:, t, :] = beta1.T

    return betas


In [None]:
K, T, N = 4, 500, 20
x   = np.random.randn(K, T, N)
true_b = np.array([ 1.5, -0.8, 0.0, 0.3 ])[:, None, None]
y   = (true_b * x).sum(0) + 0.2 * np.random.randn(T, N)

taus = np.array([ 10, 50, 50, 5 ])     # fast decay for feature-0 & 3
mu0  = np.zeros(K)
Sigma0 = np.eye(K) * 5

beta_path = decay_regress_featurewise(
    x, y,
    prior_mean=mu0,
    prior_covar=Sigma0,
    decay_scales=taus,
    target_decay_scale=np.inf,         # no forgetting on y
)
# beta_path has shape K×T×N
