In [25]:
import numpy as np
import plotly.graph_objects as go
from scipy.optimize import minimize
from tqdm import tqdm
from scipy.stats import t, multivariate_normal, multivariate_t
from scipy.special import logsumexp, gamma, gammaln
from numpy.linalg import inv, det, slogdet, LinAlgError
from plotly.subplots import make_subplots

from simulation import * 

In [None]:
# def gas_general_filter(
#     params,
#     Y,
#     X,
#     W1=None,
#     W2=None,
#     cL=None,
#     cK=None,
#     fix_nu=None
# ):
#     """
#     General multivariate GAS filter with optional fixed nu and intercept omega.

#     Parameters:
#     -----------
#     params : ndarray
#         Flattened parameter vector.
#     Y : ndarray (T, R)
#         Response matrix.
#     X : ndarray (T, R, P)
#         Regressor array.
#     W1, W2 : ignored (only included for compatibility).
#     cL, cK : int
#         Lengths of lambda and kappa vectors (usually = R).
#     fix_nu : float or None
#         If provided, fixes the degrees of freedom nu.
#     phi_type : str
#         "scalar" or "matrix" for phi.

#     Returns:
#     --------
#     Negative log-likelihood.
#     """
#     T, R = Y.shape
#     P = X.shape[2]

#     idx = 0
#     Phi = params[idx:idx + R * R].reshape(R, R); idx += R * R
#     omega = params[idx:idx + R]; idx += R
#     beta = params[idx:idx + P * R].reshape(P, R); idx += P * R
#     lambda_vec = params[idx:idx + cL]; idx += cL
#     kappa_vec = params[idx:idx + cK]; idx += cK
#     if fix_nu is not None:
#             nu = fix_nu
#     else:
#             nu = params[-1] #; idx += 1
#     Lambda = np.diag(np.exp(2 * lambda_vec))
#     Kappa = np.diag(kappa_vec)

#     mu = np.zeros((T + 1, R))
#     mu[0] = omega.copy()
#     u = np.zeros((T, R))
#     loglik = 0.0
#     Lambda_inv = np.linalg.inv(Lambda)

#     for t in range(T):
#         dev_t = Y[t] - X[t] @ beta - mu[t]
#         alpha_t = 1 + (1 / nu) * (dev_t.T @ Lambda_inv @ dev_t)
#         u[t] = dev_t / alpha_t
#         mu[t + 1] = omega + Phi @ (mu[t] - omega) + Kappa @ u[t]

#         term1 = gammaln((nu + R) / 2) - gammaln(nu / 2)
#         term2 = -0.5 * R * np.log(nu) - 0.5 * R * np.log(np.pi)
#         term3 = -0.5 * np.log(det(Lambda))
#         term4 = -0.5 * (nu + R) * np.log(alpha_t)
#         loglik += term1 + term2 + term3 + term4
    
#     return -loglik

def gas_general_filter(params, Y, X, cL, cK, fix_nu=None):
    """
    Multivariate GAS filter with optional fixed nu and intercept omega.

    Parameters:
    -----------
    params : ndarray
        Parameter vector (flattened): [Phi, omega, beta, lambda, kappa, (nu)]
    Y : ndarray (T, R)
        Observed data
    X : ndarray (T, P, R)
        Regressor array with intercept included (broadcasted over R)
    cL : int
        Length of lambda vector (typically R)
    cK : int
        Length of kappa vector (typically R)
    fix_nu : float or None
        If provided, uses this fixed value for nu

    Returns:
    --------
    neg_loglik : float
        Negative log-likelihood (to minimize)
    """
    T, R = Y.shape
    P = X.shape[1]

    idx = 0
    Phi = params[idx:idx + R * R].reshape(R, R); idx += R * R
    omega = params[idx:idx + R]; idx += R
    beta = params[idx:idx + P * R].reshape(P, R); idx += P * R
    lambda_vec = params[idx:idx + cL]; idx += cL
    kappa_vec = params[idx:idx + cK]; idx += cK

    if fix_nu is not None:
        nu = fix_nu
    else:
        nu = params[idx]
        if nu <= 2.01 or nu > 300:
            return 1e10  # Penalize invalid nu

    Lambda = np.diag(np.exp(2 * lambda_vec))
    Kappa = np.diag(kappa_vec)
    Lambda_inv = inv(Lambda)

    mu = np.zeros((T + 1, R))
    mu[0] = omega.copy()
    loglik = 0.0

    for t in range(T):
        # Ensure all vectors are 1D
        x_beta = X[t] @ beta  # (R,)
        resid = (Y[t] - x_beta - mu[t]).flatten()

        alpha_t = 1 + (1 / nu) * (resid.T @ Lambda_inv @ resid)
        u_t = (Lambda_inv @ resid) / alpha_t

        mu[t + 1] = omega + Phi @ (mu[t] - omega) + Kappa @ u_t

        # Log-likelihood contribution
        term1 = gammaln((nu + R) / 2) - gammaln(nu / 2)
        term2 = -0.5 * R * np.log(nu) - 0.5 * R * np.log(np.pi)
        term3 = -0.5 * np.log(det(Lambda))
        term4 = -0.5 * (nu + R) * np.log(alpha_t)
        loglik += term1 + term2 + term3 + term4

    return -loglik  # for minimization


In [None]:
# import numpy as np
# from scipy.special import gammaln
# from numpy.linalg import det


# def gas_general_filter(
#     params,
#     Y,
#     X,
#     cL,
#     cK,
#     fix_nu=None,
#     kappa_type="diagonal"
# ):
#     """
#     General multivariate GAS filter log-likelihood.

#     Parameters:
#     -----------
#     params : ndarray
#         Flattened parameter vector.
#     Y : ndarray (T, R)
#         Response matrix.
#     X : ndarray (T, P)
#         Regressor matrix (including intercept if used).
#     cL : int
#         Length of lambda vector (usually = R).
#     cK : int
#         Length of kappa vector or matrix (depends on type).
#     fix_nu : float or None
#         If provided, fixes the degrees of freedom nu.
#     kappa_type : str
#         'diagonal' or 'full' for Kappa structure.

#     Returns:
#     --------
#     Negative log-likelihood.
#     """
#     T, R = Y.shape
#     P = X.shape[1]  # num regressors (with intercept)

#     idx = 0
#     Phi = params[idx:idx + R * R].reshape(R, R); idx += R * R
#     omega = params[idx:idx + R]; idx += R
#     beta = params[idx:idx + P * R].reshape(P, R); idx += P * R
#     lambda_vec = params[idx:idx + cL]; idx += cL

#     if kappa_type == "diagonal":
#         kappa_vec = params[idx:idx + R]; idx += R
#         Kappa = np.diag(kappa_vec)
#     elif kappa_type == "full":
#         kappa_vec = params[idx:idx + R * R]; idx += R * R
#         Kappa = kappa_vec.reshape(R, R)
#     else:
#         raise ValueError("kappa_type must be 'diagonal' or 'full'")

#     if fix_nu is not None:
#         nu = fix_nu
#     else:
#         nu = params[idx]

#     Lambda = np.diag(np.exp(2 * lambda_vec))
#     Lambda_inv = np.linalg.inv(Lambda)

#     mu = np.zeros((T + 1, R))
#     mu[0] = omega.copy()

#     loglik = 0.0

#     for t in range(T):
#         x_beta = X[t] @ beta  # shape (R,)
#         resid = Y[t] - x_beta - mu[t]  # shape (R,)
#         alpha_t = 1 + (1 / nu) * (resid.T @ Lambda_inv @ resid)
#         u_t = (Lambda_inv @ resid) / alpha_t

#         mu[t + 1] = omega + Phi @ (mu[t] - omega) + Kappa @ u_t

#         term1 = gammaln((nu + R) / 2) - gammaln(nu / 2)
#         term2 = -0.5 * R * np.log(nu) - 0.5 * R * np.log(np.pi)
#         term3 = -0.5 * np.log(det(Lambda))
#         term4 = -0.5 * (nu + R) * np.log(alpha_t)

#         loglik += term1 + term2 + term3 + term4

#     return -loglik


In [3]:
def gas_general_filter(
    params,
    Y,
    X,
    cL,
    cK,
    fix_nu=None,
    kappa_type="diagonal"
):
    """
    General multivariate GAS filter log-likelihood.

    Parameters:
    -----------
    params : ndarray
        Flattened parameter vector.
    Y : ndarray (T, R)
        Response matrix.
    X : ndarray (T, P)
        Regressor matrix (including intercept if used).
    cL : int
        Length of lambda vector (usually = R).
    cK : int
        Length of kappa vector or matrix (depends on type).
    fix_nu : float or None
        If provided, fixes the degrees of freedom nu.
    kappa_type : str
        'diagonal' or 'full' for Kappa structure.

    Returns:
    --------
    Negative log-likelihood.
    """
    T, R = Y.shape
    P = X.shape[1]  # num regressors (with intercept)

    idx = 0
    Phi = params[idx:idx + R * R].reshape(R, R); idx += R * R
    omega = params[idx:idx + R]; idx += R
    beta = params[idx:idx + P * R].reshape(P, R); idx += P * R
    lambda_vec = params[idx:idx + cL]; idx += cL

    if kappa_type == "diagonal":
        kappa_vec = params[idx:idx + R]; idx += R
        Kappa = np.diag(kappa_vec)
    elif kappa_type == "full":
        kappa_vec = params[idx:idx + R * R]; idx += R * R
        Kappa = kappa_vec.reshape(R, R)
    else:
        raise ValueError("kappa_type must be 'diagonal' or 'full'")

    if fix_nu is not None:
        nu = fix_nu
    else:
        nu = params[idx]

    Lambda = np.diag(np.exp(2 * lambda_vec))
    Lambda_inv = np.linalg.inv(Lambda)

    mu = np.zeros((T + 1, R))
    mu[0] = omega.copy()

    loglik = 0.0

    for t in range(T):
        x_beta = X[t] @ beta  # shape (R,)
        resid = Y[t] - x_beta - mu[t]  # shape (R,)
        alpha_t = 1 + (1 / nu) * (resid.T @ Lambda_inv @ resid)
        u_t = (Lambda_inv @ resid) / alpha_t

        mu[t + 1] = omega + Phi @ (mu[t] - omega) + Kappa @ u_t

        term1 = gammaln((nu + R) / 2) - gammaln(nu / 2)
        term2 = -0.5 * R * np.log(nu) - 0.5 * R * np.log(np.pi)
        term3 = -0.5 * np.log(det(Lambda))
        term4 = -0.5 * (nu + R) * np.log(alpha_t)

        loglik += term1 + term2 + term3 + term4

    return -loglik
    

## Build own student_t_logpdf 
need some safeguards for numerical stability

In [27]:
# def student_t_logpdf(y, mu, Omega, nu, log_output = True, min_det = 1e-50):
#     """
#     Log-density of multivariate Student-t distribution.

#     Parameters:
#     -----------
#     y : ndarray (R,)
#         Observation
#     mu : ndarray (R,)
#         Mean vector
#     Omega : ndarray (R, R)
#         Scale matrix (positive definite)
#     nu : float
#         Degrees of freedom
#     log_output : bool
#         Whether to return log-density
#     min_det : float
#         Minimum determinant value to avoid numerical instability

#     Returns:
#     --------
#     log_pdf : float
#     """
#     R = Omega.shape[0]
#     diff = y - mu
#     try:
#         sign, logdet_Omega = slogdet(Omega)
#         if sign <= 0 or logdet_Omega < np.log(min_det):     # Avoid negative or zero determinant
#             logdet_Omega = np.log(min_det)
#     except:
#         logdet_Omega = np.log(min_det)
    
#     inv_sqrt_Omega = inv(np.linalg.cholesky(Omega).T)
#     eps = inv_sqrt_Omega @ diff
#     w = 1 + (1 / nu) * (eps.T @ eps)

#     log_pdf = (
#         gammaln((nu + R) / 2)
#         - gammaln(nu / 2)
#         - (R / 2) * np.log(np.pi)
#         - (R / 2) * np.log(nu)
#         - 0.5 * logdet_Omega
#         - ((nu + R) / 2) * np.log(w)
#     )
#     return log_pdf if log_output else np.exp(log_pdf)

def student_t_logpdf(y, mu, Omega, nu, log_output=True, min_det=1e-50):
    R = Omega.shape[0]
    diff = y - mu

    try:
        sign, logdet_Omega = slogdet(Omega)
        if sign <= 0 or logdet_Omega < np.log(min_det):
            logdet_Omega = np.log(min_det)

        L = np.linalg.cholesky(Omega)
        inv_sqrt_Omega = inv(L.T)
    except np.linalg.LinAlgError:
        return -1e12

    eps = inv_sqrt_Omega @ diff
    w = 1 + (1 / nu) * (eps.T @ eps)

    log_pdf = (
        gammaln((nu + R) / 2)
        - gammaln(nu / 2)
        - (R / 2) * np.log(np.pi)
        - (R / 2) * np.log(nu)
        - 0.5 * logdet_Omega
        - ((nu + R) / 2) * np.log(w)
    )

    if not np.isfinite(log_pdf):
        return -1e12

    return log_pdf if log_output else np.exp(log_pdf)

In [28]:
def gas_general_filter(
    params,
    Y,
    X,
    cL,
    cK,
    fix_nu=None,
    kappa_type="diagonal"
):
    """
    General multivariate GAS filter log-likelihood.

    Parameters:
    -----------
    params : ndarray
        Flattened parameter vector.
    Y : ndarray (T, R)
        Response matrix.
    X : ndarray (T, P)
        Regressor matrix (including intercept if used).
    cL : int
        Length of lambda vector (usually = R).
    cK : int
        Length of kappa vector or matrix (depends on type).
    fix_nu : float or None
        If provided, fixes the degrees of freedom nu.
    kappa_type : str
        'diagonal' or 'full' for Kappa structure.

    Returns:
    --------
    Negative log-likelihood.
    """
    T, R = Y.shape
    P = X.shape[1]  # num regressors (with intercept)

    idx = 0
    Phi = params[idx:idx + R * R].reshape(R, R); idx += R * R
    omega = params[idx:idx + R]; idx += R
    beta = params[idx:idx + P * R].reshape(P, R); idx += P * R
    lambda_vec = params[idx:idx + cL]; idx += cL

    if kappa_type == "diagonal":
        kappa_vec = params[idx:idx + R]; idx += R
        Kappa = np.diag(kappa_vec)
    elif kappa_type == "full":
        kappa_vec = params[idx:idx + R * R]; idx += R * R
        Kappa = kappa_vec.reshape(R, R)
    else:
        raise ValueError("kappa_type must be 'diagonal' or 'full'")

    if fix_nu is not None:
        nu = fix_nu
    else:
        nu = params[idx]

    Lambda = np.diag(np.exp(2 * lambda_vec))
    Lambda_inv = np.linalg.inv(Lambda)

    mu = np.zeros((T + 1, R))
    mu[0] = omega.copy()

    loglik = 0.0

    for t in range(T):
        x_beta = X[t] @ beta  # shape (R,)
        resid = Y[t] - x_beta - mu[t]  # shape (R,)
        alpha_t = 1 + (1 / nu) * (resid.T @ Lambda_inv @ resid)
        u_t = (Lambda_inv @ resid) / alpha_t

        mu[t + 1] = omega + Phi @ (mu[t] - omega) + Kappa @ u_t

        # term1 = gammaln((nu + R) / 2) - gammaln(nu / 2)
        # term2 = -0.5 * R * np.log(nu) - 0.5 * R * np.log(np.pi)
        # term3 = -0.5 * np.log(det(Lambda))
        # term4 = -0.5 * (nu + R) * np.log(alpha_t)
        # loglik += term1 + term2 + term3 + term4
        loglik += student_t_logpdf(Y[t], x_beta + mu[t], Lambda, nu)


    return -loglik

In [5]:
def gas_filter_eval(params, Y, X, cL, cK, fix_nu=None, kappa_type="diagonal"):
    """
    Run the GAS filter and return filtered mu, scaled scores u, and residuals.
    
    Parameters:
    -----------
    params : ndarray
        Flattened parameter vector.
    Y : ndarray (T, R)
        Response matrix.
    X : ndarray (T, P)
        Regressor matrix.
    cL : int
        Length of lambda vector.
    cK : int
        Length of kappa vector or matrix.
    fix_nu : float or None
        If provided, fixes the degrees of freedom.
    kappa_type : str
        'diagonal' or 'full'.
    
    Returns:
    --------
    mu : ndarray (T, R)
        Filtered latent states.
    u : ndarray (T, R)
        Scaled score vectors.
    resid : ndarray (T, R)
        Residuals.
    """
    T, R = Y.shape
    P = X.shape[1]  # Number of regressors (including intercept)

    idx = 0
    Phi = params[idx:idx + R * R].reshape(R, R); idx += R * R
    omega = params[idx:idx + R]; idx += R
    beta = params[idx:idx + P * R].reshape(P, R); idx += P * R
    lambda_vec = params[idx:idx + cL]; idx += cL

    if kappa_type == "diagonal":
        kappa_vec = params[idx:idx + R]; idx += R
        Kappa = np.diag(kappa_vec)
    elif kappa_type == "full":
        kappa_vec = params[idx:idx + R * R]; idx += R * R
        Kappa = kappa_vec.reshape(R, R)
    else:
        raise ValueError("kappa_type must be 'diagonal' or 'full'")

    if fix_nu is not None:
        nu = fix_nu
    else:
        nu = params[idx]

    Lambda = np.diag(np.exp(2 * lambda_vec))
    Lambda_inv = np.linalg.inv(Lambda)

    mu = np.zeros((T + 1, R))
    mu[0] = omega.copy()

    u = np.zeros((T, R))
    resid = np.zeros((T, R))

    for t in range(T):
        x_beta = X[t] @ beta
        resid[t] = Y[t] - x_beta - mu[t]
        alpha_t = 1 + (1 / nu) * (resid[t].T @ Lambda_inv @ resid[t])
        u[t] = (Lambda_inv @ resid[t]) / alpha_t
        mu[t + 1] = omega + Phi @ (mu[t] - omega) + Kappa @ u[t]

    return mu[1:], u, resid


In [29]:
def simulate_multivariate_t_gas(T, N, omega, Phi, K_mat, Omega, nu, beta=None, seed=None):
    """
    Simulate multivariate GAS model data, optionally with regressors.

    Model:
        y_t = mu_t + X_t @ beta + eps_t, eps_t ~ t_nu(0, Omega)
        mu_{t+1} = omega + Phi(mu_t - omega) + K_mat * u_t

    Scaled score:
        u_t = ((nu + N) * inv_Omega @ residual) / (nu + residual' @ inv_Omega @ residual)

    Parameters:
    -----------
    T : int
        Number of observations.
    N : int
        Dimension of the response vector.
    omega : ndarray (N,)
        GAS intercept.
    Phi : ndarray (N, N)
        GAS transition matrix.
    K_mat : ndarray (N, N)
        GAS updating matrix.
    Omega : ndarray (N, N)
        Covariance matrix (positive definite).
    nu : float
        Degrees of freedom for Student's t-distribution.
    beta : ndarray ((K+1), N) or None, optional
        Regression coefficient matrix including intercept. If None, no regressors included.
    seed : int or None, optional
        Random seed for reproducibility.

    Returns:
    --------
    y : ndarray (T, N)
        Simulated observations.
    mu : ndarray (T, N)
        Simulated latent states.
    X : ndarray (T, K+1) or None
        Simulated regressors including intercept (first column), if beta provided.
    errors : ndarray (T, N)
        Simulated residuals (eps_t).
    """
    if seed is not None:
        np.random.seed(seed)

    omega = np.asarray(omega).reshape(N,)
    Phi = np.asarray(Phi).reshape(N, N)
    K_mat = np.asarray(K_mat).reshape(N, N)
    Omega = np.asarray(Omega).reshape(N, N)
    inv_Omega = np.linalg.inv(Omega)

    # Generate regressors with intercept if beta is provided
    if beta is not None:
        beta = np.asarray(beta)
        K_plus_1 = beta.shape[0]  # includes intercept
        X = np.random.normal(size=(T, K_plus_1 - 1))
        X = np.hstack((np.ones((T, 1)), X))  # prepend intercept column
    else:
        X = None

    mu = np.zeros((T, N))
    y = np.zeros((T, N))
    errors = np.zeros((T, N))
    mu[0] = omega.copy()

    # First observation
    if beta is not None:
        y[0] = mu[0] + X[0] @ beta + multivariate_t.rvs(df=nu, loc=np.zeros(N), shape=Omega)
    else:
        y[0] = mu[0] + multivariate_t.rvs(df=nu, loc=np.zeros(N), shape=Omega)

    errors[0] = y[0] - (X[0] @ beta + mu[0] if beta is not None else mu[0])

    for i in tqdm(range(1, T)):
        pred = mu[i - 1] + (X[i] @ beta if beta is not None else 0)
        eps_t = multivariate_t.rvs(df=nu, loc=np.zeros(N), shape=Omega)
        y[i] = pred + eps_t

        resid = y[i] - pred
        mahalanobis = resid @ inv_Omega @ resid
        weight = (nu + N) / (nu + mahalanobis)
        u_t = weight * (inv_Omega @ resid)

        mu[i] = omega + Phi @ (mu[i - 1] - omega) + K_mat @ u_t
        errors[i] = eps_t

    return y, mu, X, errors

In [30]:
import numpy as np

def simulate_multivariate_state_space(T, N, c, Phi, Q, R, beta=None, seed=None):
    """
    Simulate a multivariate linear Gaussian state-space model with optional regressors.

    Observation equation:
        y_t = mu_t + X_t @ beta + eps_t, eps_t ~ N(0, R)

    State equation:
        mu_{t+1} = c + Phi @ mu_t + eta_t, eta_t ~ N(0, Q)

    Parameters:
    -----------
    T : int
        Number of time steps.
    N : int
        Number of response series (dimensionality).
    c : ndarray (N,)
        State intercept vector.
    Phi : ndarray (N, N)
        State transition matrix.
    Q : ndarray (N, N)
        Covariance of state noise.
    R : ndarray (N, N)
        Covariance of observation noise.
    beta : ndarray (K+1, N) or None
        Regression coefficients (first row is intercept).
    seed : int or None
        Seed for reproducibility.

    Returns:
    --------
    y : ndarray (T, N)
        Simulated observations.
    mu : ndarray (T, N)
        Simulated latent states.
    X : ndarray (T, K+1) or None
        Regressor matrix (first column is intercept).
    """
    if seed is not None:
        np.random.seed(seed)

    # Validate shapes
    c = np.asarray(c).reshape(N,)
    Phi = np.asarray(Phi).reshape(N, N)
    Q = np.asarray(Q).reshape(N, N)
    R = np.asarray(R).reshape(N, N)

    # Regressor matrix with intercept
    if beta is not None:
        beta = np.asarray(beta)
        K_plus1 = beta.shape[0]
        K = K_plus1 - 1
        X_raw = np.random.normal(size=(T, K))
        X = np.ones((T, K_plus1))  # first column = intercept
        X[:, 1:] = X_raw
    else:
        X = None

    mu = np.zeros((T, N))
    y = np.zeros((T, N))

    # Initial state (stationary mean of AR(1) process)
    mu[0] = np.linalg.solve(np.eye(N) - Phi, c)

    for t in range(T):
        # Observation
        if beta is not None:
            y[t] = mu[t] + X[t] @ beta + np.random.multivariate_normal(mean=np.zeros(N), cov=R)
        else:
            y[t] = mu[t] + np.random.multivariate_normal(mean=np.zeros(N), cov=R)

        # State transition
        if t < T - 1:
            mu[t + 1] = c + Phi @ mu[t] + np.random.multivariate_normal(mean=np.zeros(N), cov=Q)

    return y, mu, X


In [31]:
# Simulate the system GAS
T, N = 300, 2
omega_true = np.array([0.5, -0.3])
Phi_true = np.array([[0.7, 0.1], [0.05, 0.8]])
K_true = np.array([[0.2, 0.05], [0.01, 0.25]])
Omega_true = np.array([[0.6, 0.1], [0.1, 0.4]])
nu_true = 10
beta_true = np.array([[1.0, -0.5], [0.3, 0.2]])  # intercept + 1 regressor

y_sim, mu_sim, X_sim, _ = simulate_multivariate_t_gas(
    T=T,
    N=N,
    omega=omega_true,
    Phi=Phi_true,
    K_mat=K_true,
    Omega=Omega_true,
    nu=nu_true,
    beta=beta_true,
    seed=123
)

# T, N = 500, 2
# K = 1  # one regressor + intercept

# Phi = np.array([[0.6, 0.2], 
#                 [0.1, 0.8]])
# Q = np.eye(N) * 0.2
# R = np.eye(N) * 0.3
# c = np.array([0.2, -0.1])
# beta = np.array([[1.0, -0.5], 
#                  [0.3, 0.2]])  # (K+1, N) shape: (2, 2)

# y_sim, mu_sim, X_sim = simulate_multivariate_state_space(T, N, c, Phi, Q, R, beta, seed=8888)



# Plot
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, subplot_titles=["Observed y", "Filtered mu"])
for i in range(N):
    fig.add_trace(go.Scatter(y=y_sim[:, i], mode='lines', name=f'y_{i+1}'), row=1, col=1)
    fig.add_trace(go.Scatter(y=mu_sim[:, i], mode='lines', name=f'mu_{i+1}'), row=2, col=1)
fig.update_layout(height=600, title='Multivariate t-GAS Simulation with 1 Regressor + Intercept')
fig.show()

100%|██████████| 299/299 [00:00<00:00, 20433.68it/s]




In [33]:
from xbbg import blp
import pandas as pd

# list_to_pull = ["USGG2YR Index", "USGG5YR Index", "USGG10YR Index",]
# colnames = ['US2Y', 'US5Y', 'US10Y']
# today = pd.Timestamp.today().strftime('%Y-%m-%d') 
# # yesterday = (pd.Timestamp.today() - pd.Timedelta(days=1)).strftime('%Y-%m-%d')
# data = blp.bdh(
#     tickers=list_to_pull, 
#     flds=['PX_LAST'], 
#     start_date='2000-01-01',
#     end_date=today)

# data.columns = data.columns.get_level_values(0)
# data.index = pd.to_datetime(data.index)
# data.columns = colnames
list_to_pull = ["FWISUS55 Index", "FWISEU55 Index", "USGG10YR Index", "CO1 Comdty"]
colnames = ['ILS5Y5Y US','ILS5Y5Y EU', 'US10Y', 'Brent']
today = pd.Timestamp.today().strftime('%Y-%m-%d') 
# yesterday = (pd.Timestamp.today() - pd.Timedelta(days=1)).strftime('%Y-%m-%d')
data = blp.bdh(
    tickers=list_to_pull, 
    flds=['PX_LAST'], 
    start_date='2000-01-01',
    end_date=today)

data.columns = data.columns.get_level_values(0)
data.index = pd.to_datetime(data.index)
data.columns = colnames
data1 = data['2021-01-01':].ffill().bfill()
# y_sim.shape[0]
y_sim = data1[['ILS5Y5Y US', 'ILS5Y5Y EU']].values  # Reshape to (T, 1) for single response variable
X_sim = np.column_stack((
    np.ones(shape=(y_sim.shape[0],)),   # Shape: (T,)
    data1[['US10Y', 'Brent']].values       # Shape: (T, 2)
))
T, N = y_sim.shape[0], 2  # N=1 since we have one response variable (US10Y)

In [34]:
# GAS estimation settings
K_type = "diagonal"

P = X_sim.shape[1]
cL =  N  # Lambda and Kappa are diagonal

cK = N  if K_type == "diagonal" else N * N  # Kappa is diagonal or full matrix
fix_nu = None  # fix degrees of freedom


# Initial parameter vector
init_phi = 0.5 * np.eye(N).flatten()          # (N*N,)
init_omega = np.zeros(N)                      # (N,)
init_beta = np.zeros((P, N)).flatten()        # (P*N,)
init_lambda = np.zeros(N)                     # (N,)
init_kappa = np.ones(N) if K_type == "diagonal" else np.eye(N).flatten()       # (N,) for diagonal, (N*N,) for full
# init_kappa = np.eye(N).flatten()  # for full

init_nu = [5.0]                                  # degrees of freedom
init_params = np.concatenate([init_phi, init_omega, init_beta, init_lambda, init_kappa, init_nu])

# Estimation
# res = minimize(
#     gas_general_filter,
#     init_params,
#     args=(y_sim, X_sim, cL, cK, fix_nu),
#     method='L-BFGS-B'
# )

res = minimize(
    gas_general_filter,
    init_params,
    args=(y_sim, X_sim, cL, cK, fix_nu, K_type),  # or "diagonal"
    method='L-BFGS-B'
)

# Extract estimated parameters
est = res.x
idx = 0
Phi_est = est[idx:idx + N * N].reshape(N, N); idx += N * N
omega_est = est[idx:idx + N]; idx += N
beta_est = est[idx:idx + P * N].reshape(P, N); idx += P * N
lambda_est = est[idx:idx + cL]; idx += cL
# kappa_est = est[idx:idx + cK]; idx += cK
# After extracting lambda
kappa_est = est[idx:idx + cK]; idx += cK

Kappa_est = (np.diag(kappa_est) if K_type == "diagonal" else kappa_est.reshape(N, N))

Omega_est = np.diag(np.exp(2 * lambda_est))
# Kappa_est = np.diag(kappa_est)
nu_est = est[idx]


# Print estimates
print("Estimated Phi:\n", Phi_est)
print("Estimated omega:\n", omega_est)
print("Estimated beta:\n", beta_est)
print("Estimated Omega (from lambda):\n", Omega_est)
print("Estimated Kappa:\n", Kappa_est)
print("Estimated nu:\n", nu_est)



overflow encountered in matmul


overflow encountered in matmul


overflow encountered in matmul


invalid value encountered in divide



KeyboardInterrupt: 

In [11]:
mu_filtered, u_filtered, resid_filtered = gas_filter_eval(
    params=res.x,
    Y=y_sim,
    X=X_sim,
    cL=N,
    cK=N,
    fix_nu=None,
    kappa_type= "diagonal"  # or "diagonal"
)

In [None]:
from Functions.filters_old import multivariate_KF_with_estimation

In [13]:
kf_res = multivariate_KF_with_estimation(
    y = y_sim, 
    X=  X_sim, 
    initial_params=None, verbose=True, max_iter=1000
)

✅ Explicit Kalman estimation completed successfully.
Estimated parameters explicitly:
c: [ 0.41589294 -0.21416674]
Phi: 
[[0.61650351 0.10059712]
 [0.15068527 0.83357426]]
beta: 
[[ 0.54899371 -0.34285282]
 [ 0.32542271  0.18781649]]
Q: 
[[ 0.24369622 -0.0269515 ]
 [-0.0269515   0.18341019]]
R: 
[[0.50807087 0.15581062]
 [0.15581062 0.21382193]]


In [14]:
mu_kf = kf_res['mu_filtered']

In [15]:
kf_res['kalman_gain'][-1]

array([[ 0.41107322, -0.10348979],
       [-0.10180801,  0.6010301 ]])

In [16]:
kappa_est

array([0.28200405, 0.30956314])

In [17]:
# T, N = 300, 2
# omega_true = np.array([0.5, -0.3])
# Phi_true = np.array([[0.7, 0.1], [0.05, 0.8]])
# K_true = np.array([[0.2, 0.05], [0.01, 0.25]])
# Omega_true = np.array([[0.6, 0.1], [0.1, 0.4]])
# nu_true = 8
# beta_true = np.array([[1.0, -0.5], [0.3, 0.2]])  # intercept + 1 regressor

In [18]:
mu_sim.shape, mu_filtered.shape, mu_kf.shape

((300, 2), (300, 2), (300, 2))

In [22]:
# Plot
fig = make_subplots(
    rows=N, cols=1,
    subplot_titles=[f"mu_{i+1}: true vs filtered" for i in range(N)],
    shared_xaxes=True
)

for i in range(N):
    fig.add_trace(
        go.Scatter(y=y_sim[:, i], mode="lines", name=f"Observed", line=dict(dash='solid')),
        row=i+1, col=1
    )
    fig.add_trace(
        go.Scatter(y=mu_sim[:, i], mode="lines", name=f"mu_{i+1} (true)", line=dict(dash='solid')),
        row=i+1, col=1
    )
    fig.add_trace(
        go.Scatter(y=mu_filtered[:, i], mode="lines", name=f"mu_{i+1} (filtered)", line=dict(dash='dot')),
        row=i+1, col=1
    )
    fig.add_trace(
        go.Scatter(y=mu_kf[:, i], mode="lines", name=f"mu_{i+1} (KF)", line=dict(dash='dash')),
        row=i+1, col=1
    )

fig.update_layout(
    height=300 * N,
    title="Comparison of True vs Filtered mu_t (GAS Filter)",
    showlegend=True
)

fig.show()