In [1]:
import numpy as np
import pandas as pd
import yfinance as yf
from arch import arch_model
from scipy.optimize import minimize

# -----------------------------
# 1. Load SPY & TLT data, compute returns
# -----------------------------
def load_returns(start="2010-01-01", end=None):
    tickers = ["SPY", "TLT"]
    data = yf.download(tickers, start=start, end=end)["Close"].dropna()
    log_prices = np.log(data)
    rets = log_prices.diff().dropna()
    rets.columns = tickers
    return rets

# -----------------------------
# 2. Fit univariate GARCH(1,1) to each series
# -----------------------------
def fit_garch_11(series):
    """
    Fit GARCH(1,1) with zero mean and normal errors to a return series.
    Returns:
      res   : arch_model fit result
      sigma : conditional std dev (np.array aligned with series)
    """
    am = arch_model(series, mean="Zero", vol="Garch", p=1, q=1, dist="normal")
    res = am.fit(disp="off")
    sigma = res.conditional_volatility.values  # same scale as series
    return res, sigma

# -----------------------------
# 3. DCC(1,1) negative log-likelihood
# -----------------------------
def dcc_nll(theta, Z):
    """
    DCC(1,1) negative log-likelihood for standardized residuals Z (T x 2).

    theta: unconstrained parameters (u, v) that we map to (a, b) via softmax:
       e1 = exp(u), e2 = exp(v)
       a  = e1 / (1 + e1 + e2)
       b  = e2 / (1 + e1 + e2)
       => a>0, b>0, a+b<1 automatically.
    """
    T, k = Z.shape
    # Map (theta[0], theta[1]) -> (a, b)
    e1 = np.exp(theta[0])
    e2 = np.exp(theta[1])
    denom = 1.0 + e1 + e2
    a = e1 / denom
    b = e2 / denom

    # Unconditional covariance of Z
    Qbar = (Z.T @ Z) / T

    # Initialize Q_0 as Qbar
    Q = Qbar.copy()

    nll = 0.0
    for t in range(T):
        z = Z[t].reshape(-1, 1)  # (2 x 1)

        # DCC recursion
        Q = (1.0 - a - b) * Qbar + a * (z @ z.T) + b * Q

        # Convert Q_t to correlation matrix R_t
        d = np.sqrt(np.diag(Q))
        Dinv = np.diag(1.0 / d)
        R = Dinv @ Q @ Dinv

        # log-likelihood contribution: 0.5 [ log|R_t| + z_t' R_t^{-1} z_t ]
        detR = np.linalg.det(R)
        if detR <= 0:
            # Penalize non-PD (should not happen with good params)
            return 1e10

        invR = np.linalg.inv(R)
        nll += 0.5 * (np.log(detR) + (z.T @ invR @ z)[0, 0])

    return nll

# -----------------------------
# 4. Estimate DCC parameters and build H_t
# -----------------------------
def fit_dcc_2d(rets):
    """
    Fit 2D DCC-GARCH(1,1) to returns DataFrame with columns ["SPY","TLT"].

    Returns:
      params_garch: dict with GARCH fits for each asset
      params_dcc  : dict with 'a', 'b'
      sigma       : (T x 2) cond. std devs
      R_t         : (T x 2 x 2) dynamic correlation matrices
      H_t         : (T x 2 x 2) dynamic covariance matrices
    """
    # Ensure correct order
    rets = rets[["SPY", "TLT"]].dropna()
    T = len(rets)

    # 4.1 Fit univariate GARCH(1,1)
    res_spy, sigma_spy = fit_garch_11(rets["SPY"])
    res_tlt, sigma_tlt = fit_garch_11(rets["TLT"])

    sigma = np.column_stack([sigma_spy, sigma_tlt])  # T x 2

    # 4.2 Standardized residuals
    Z = rets.values / sigma  # T x 2

    # 4.3 Estimate DCC parameters (a, b)
    # Start with something like a ~ 0.01, b ~ 0.98
    # We choose theta0 so that the softmax maps near there
    # Roughly, small e1 and large e2 -> a small, b near 1.
    theta0 = np.array([-4.0, 4.0])  # just a heuristic

    opt = minimize(dcc_nll, theta0, args=(Z,), method="L-BFGS-B")
    if not opt.success:
        print("Warning: DCC optimization did not fully converge:", opt.message)

    # Decode (a, b) from opt.x
    e1 = np.exp(opt.x[0])
    e2 = np.exp(opt.x[1])
    denom = 1.0 + e1 + e2
    a_hat = e1 / denom
    b_hat = e2 / denom

    # 4.4 Recompute Q_t, R_t, H_t using estimated (a_hat, b_hat)
    Qbar = (Z.T @ Z) / T
    Q = Qbar.copy()

    R_t = np.zeros((T, 2, 2))
    H_t = np.zeros((T, 2, 2))

    for t in range(T):
        z = Z[t].reshape(-1, 1)
        Q = (1.0 - a_hat - b_hat) * Qbar + a_hat * (z @ z.T) + b_hat * Q

        d = np.sqrt(np.diag(Q))
        Dinv = np.diag(1.0 / d)
        R = Dinv @ Q @ Dinv

        R_t[t] = R

        # Full covariance H_t = D_t R_t D_t
        D_t = np.diag(sigma[t])
        H_t[t] = D_t @ R @ D_t

    params_garch = {
        "SPY": res_spy,
        "TLT": res_tlt,
    }
    params_dcc = {
        "a": a_hat,
        "b": b_hat,
        "opt_result": opt,
    }

    return params_garch, params_dcc, sigma, R_t, H_t

# -----------------------------
# 5. Run the whole thing
# -----------------------------
if __name__ == "__main__":
    rets = load_returns(start="2010-01-01")
    params_garch, params_dcc, sigma, R_t, H_t = fit_dcc_2d(rets)

    print("\nUnivariate GARCH(1,1) summary:")
    print("SPY GARCH params:")
    print(params_garch["SPY"].params)
    print("\nTLT GARCH params:")
    print(params_garch["TLT"].params)

    print("\nEstimated DCC parameters:")
    print("a (correlation ARCH term) :", params_dcc["a"])
    print("b (correlation GARCH term):", params_dcc["b"])

    # Example: last estimated correlation between SPY and TLT
    last_rho = R_t[-1, 0, 1]
    print("\nLast estimated correlation ρ_T(SPY, TLT):", last_rho)


  data = yf.download(tickers, start=start, end=end)["Adj Close"].dropna()
[*********************100%***********************]  2 of 2 completed


KeyError: 'Adj Close'

In [2]:
import numpy as np

# ---------------------------------------------------------
# 0. Utility: simple gradient descent with finite differences
# ---------------------------------------------------------

def gradient_descent(func, theta0, lr=1e-3, max_iter=2000, tol=1e-6, verbose=False):
    """
    Very simple gradient descent minimizer using central finite differences.

    func    : function(theta) -> scalar
    theta0  : initial parameter vector (1D numpy array)
    lr      : learning rate
    max_iter: maximum iterations
    tol     : stop if ||grad|| < tol
    """
    theta = theta0.astype(float).copy()
    eps = 1e-5

    for it in range(max_iter):
        # finite-difference gradient
        grad = np.zeros_like(theta)
        f0 = func(theta)
        for i in range(len(theta)):
            e = np.zeros_like(theta)
            e[i] = 1.0
            f_plus = func(theta + eps * e)
            f_minus = func(theta - eps * e)
            grad[i] = (f_plus - f_minus) / (2.0 * eps)

        grad_norm = np.linalg.norm(grad)
        if verbose and (it % 200 == 0):
            print(f"[GD] iter={it}, f={f0:.6f}, ||grad||={grad_norm:.4e}")

        if grad_norm < tol:
            break

        theta -= lr * grad

    return theta, func(theta)


# ---------------------------------------------------------
# 1. Univariate GARCH(1,1) from scratch (Gaussian QML)
# ---------------------------------------------------------

def garch11_nll(theta, y):
    """
    Negative log-likelihood for GARCH(1,1) with Gaussian innovations.

    Parametrization:
      theta = [a, b, c]  (unconstrained)
      omega = exp(a) > 0
      (alpha, beta) in (0,1) with alpha + beta < 1 via:
         u = logistic(b)
         v = logistic(c)
         alpha = u * (1 - v)
         beta  = u * v
    """
    y = np.asarray(y, dtype=float)
    T = len(y)

    a, b, c = theta
    omega = np.exp(a)

    u = 1.0 / (1.0 + np.exp(-b))   # in (0,1)
    v = 1.0 / (1.0 + np.exp(-c))   # in (0,1)
    alpha = u * (1.0 - v)
    beta  = u * v
    # alpha + beta = u < 1, guaranteed

    sigma2 = np.empty(T)
    # initialize with unconditional variance
    sigma2[0] = omega / (1.0 - alpha - beta + 1e-6)
    for t in range(1, T):
        sigma2[t] = omega + alpha * y[t-1]**2 + beta * sigma2[t-1]

    # Gaussian negative log-likelihood (constant dropped)
    return 0.5 * np.sum(np.log(sigma2) + (y**2) / sigma2)


def fit_garch11(y, lr=1e-3, max_iter=3000, verbose=False):
    """
    Fit a univariate GARCH(1,1) to y using gradient descent on the NLL.

    Returns:
      omega_hat, alpha_hat, beta_hat, sigma2_hat
    """
    y = np.asarray(y, dtype=float)
    y = y - y.mean()  # center

    # crude initial guesses
    var_y = np.var(y)
    omega0 = 0.1 * var_y
    alpha0 = 0.05
    beta0  = 0.9 - alpha0  # persistence ~0.9

    # map to unconstrained a,b,c
    a0 = np.log(omega0)

    # inverse of alpha = u(1-v), beta = uv is messy; just pick a, b, c s.t. u, v close
    # we can pick u0 = alpha0 + beta0, v0 = beta0 / (alpha0 + beta0)
    u0 = alpha0 + beta0
    v0 = beta0 / u0
    b0 = np.log(u0 / (1.0 - u0))
    c0 = np.log(v0 / (1.0 - v0))

    theta0 = np.array([a0, b0, c0], dtype=float)

    theta_hat, nll_min = gradient_descent(
        lambda th: garch11_nll(th, y),
        theta0,
        lr=lr,
        max_iter=max_iter,
        verbose=verbose
    )

    a_hat, b_hat, c_hat = theta_hat
    omega_hat = np.exp(a_hat)

    u_hat = 1.0 / (1.0 + np.exp(-b_hat))
    v_hat = 1.0 / (1.0 + np.exp(-c_hat))
    alpha_hat = u_hat * (1.0 - v_hat)
    beta_hat  = u_hat * v_hat

    # build sigma2 path with estimated params
    T = len(y)
    sigma2_hat = np.empty(T)
    sigma2_hat[0] = omega_hat / (1.0 - alpha_hat - beta_hat + 1e-6)
    for t in range(1, T):
        sigma2_hat[t] = omega_hat + alpha_hat * y[t-1]**2 + beta_hat * sigma2_hat[t-1]

    return omega_hat, alpha_hat, beta_hat, sigma2_hat


# ---------------------------------------------------------
# 2. DCC(1,1) for 2D standardized residuals
# ---------------------------------------------------------

def dcc_nll(theta, Z):
    """
    DCC(1,1) negative log-likelihood for standardized residuals Z (T x 2).

    Parametrization:
      theta = [u, v] unconstrained.
      a, b are mapped via "softmax" so that:
         a = e1 / (1 + e1 + e2)
         b = e2 / (1 + e1 + e2)
      => a>0, b>0, a + b < 1.
    """
    Z = np.asarray(Z, dtype=float)
    T, k = Z.shape
    assert k == 2, "This DCC implementation is for 2D only."

    # map to (a, b)
    e1 = np.exp(theta[0])
    e2 = np.exp(theta[1])
    denom = 1.0 + e1 + e2
    a = e1 / denom
    b = e2 / denom

    # unconditional covariance of Z
    Qbar = (Z.T @ Z) / T
    Q = Qbar.copy()

    nll = 0.0
    for t in range(T):
        z = Z[t].reshape(-1, 1)  # (2 x 1)
        # DCC recursion
        Q = (1.0 - a - b) * Qbar + a * (z @ z.T) + b * Q

        # normalize to correlation matrix
        d = np.sqrt(np.diag(Q))
        Dinv = np.diag(1.0 / d)
        R = Dinv @ Q @ Dinv

        detR = np.linalg.det(R)
        if detR <= 0:
            return 1e10  # penalize non-PD

        invR = np.linalg.inv(R)
        nll += 0.5 * (np.log(detR) + (z.T @ invR @ z)[0, 0])

    return nll


def fit_dcc_2d(spy_ret, tlt_ret, lr_garch=1e-3, lr_dcc=1e-3,
               max_iter_garch=3000, max_iter_dcc=2000,
               verbose=False):
    """
    Fit 2D DCC-GARCH(1,1) to SPY and TLT return series.

    Inputs:
      spy_ret, tlt_ret : 1D numpy arrays of returns (same length)

    Returns:
      params_garch : dict with univariate GARCH params & sigma paths
      params_dcc   : dict with DCC params
      Z            : standardized residuals (T x 2)
      R_t          : dynamic correlation matrices (T x 2 x 2)
      H_t          : dynamic covariance matrices (T x 2 x 2)
    """
    spy_ret = np.asarray(spy_ret, dtype=float)
    tlt_ret = np.asarray(tlt_ret, dtype=float)
    assert len(spy_ret) == len(tlt_ret), "Series must have same length."
    T = len(spy_ret)

    # --- 1. Fit univariate GARCH(1,1) to each ---
    omega1, alpha1, beta1, sigma2_1 = fit_garch11(spy_ret, lr=lr_garch,
                                                  max_iter=max_iter_garch,
                                                  verbose=verbose)
    omega2, alpha2, beta2, sigma2_2 = fit_garch11(tlt_ret, lr=lr_garch,
                                                  max_iter=max_iter_garch,
                                                  verbose=verbose)

    sigma1 = np.sqrt(sigma2_1)
    sigma2 = np.sqrt(sigma2_2)

    # standardized residuals
    z1 = (spy_ret - spy_ret.mean()) / sigma1
    z2 = (tlt_ret - tlt_ret.mean()) / sigma2
    Z = np.column_stack([z1, z2])  # T x 2

    # --- 2. Fit DCC(1,1) on Z ---
    theta0 = np.array([-4.0, 4.0])  # heuristic start (a small, b near 1)

    theta_hat, dcc_nll_min = gradient_descent(
        lambda th: dcc_nll(th, Z),
        theta0,
        lr=lr_dcc,
        max_iter=max_iter_dcc,
        verbose=verbose
    )

    e1 = np.exp(theta_hat[0])
    e2 = np.exp(theta_hat[1])
    denom = 1.0 + e1 + e2
    a_hat = e1 / denom
    b_hat = e2 / denom

    # --- 3. Reconstruct Q_t, R_t, H_t ---
    Qbar = (Z.T @ Z) / T
    Q = Qbar.copy()

    R_t = np.zeros((T, 2, 2))
    H_t = np.zeros((T, 2, 2))

    for t in range(T):
        z = Z[t].reshape(-1, 1)
        Q = (1.0 - a_hat - b_hat) * Qbar + a_hat * (z @ z.T) + b_hat * Q

        d = np.sqrt(np.diag(Q))
        Dinv = np.diag(1.0 / d)
        R = Dinv @ Q @ Dinv
        R_t[t] = R

        D_t = np.diag([sigma1[t], sigma2[t]])
        H_t[t] = D_t @ R @ D_t

    params_garch = {
        "SPY": {
            "omega": omega1,
            "alpha": alpha1,
            "beta": beta1,
            "sigma2": sigma2_1
        },
        "TLT": {
            "omega": omega2,
            "alpha": alpha2,
            "beta": beta2,
            "sigma2": sigma2_2
        }
    }

    params_dcc = {
        "a": a_hat,
        "b": b_hat,
        "nll": dcc_nll_min
    }

    return params_garch, params_dcc, Z, R_t, H_t


In [7]:
tickers = ["SPY", "TLT"]
data = yf.download(tickers, start="2021-01-01", end="2025-12-31")["Close"].dropna()
log_prices = np.log(data)
rets = log_prices.diff().dropna()
rets.columns = tickers

  data = yf.download(tickers, start="2021-01-01", end="2025-12-31")["Close"].dropna()
[*********************100%***********************]  2 of 2 completed


In [6]:
rets

Unnamed: 0_level_0,SPY,TLT
Date,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-01-05,0.006864,-0.007454
2021-01-06,0.005961,-0.020742
2021-01-07,0.014748,-0.008854
2021-01-08,0.005682,-0.003233
2021-01-11,-0.006764,-0.001654
...,...,...
2024-12-24,0.011054,0.004220
2024-12-26,0.000067,-0.000569
2024-12-27,-0.010582,-0.008232
2024-12-30,-0.011477,0.008005


In [None]:
# spy_ret and tlt_ret: 1D numpy arrays with daily log returns

params_garch, params_dcc, Z, R_t, H_t = fit_dcc_2d(spy_ret, tlt_ret,
                                                   lr_garch=1e-3,
                                                   lr_dcc=1e-3,
                                                   max_iter_garch=3000,
                                                   max_iter_dcc=2000,
                                                   verbose=True)

print("SPY GARCH params:", params_garch["SPY"]["omega"],
                          params_garch["SPY"]["alpha"],
                          params_garch["SPY"]["beta"])

print("TLT GARCH params:", params_garch["TLT"]["omega"],
                          params_garch["TLT"]["alpha"],
                          params_garch["TLT"]["beta"])

print("DCC params: a =", params_dcc["a"], ", b =", params_dcc["b"])

# Last estimated correlation between SPY and TLT:
last_rho = R_t[-1, 0, 1]
print("Last ρ(SPY, TLT):", last_rho)


In [8]:
import numpy as np

# --- helper: get sigma^2 path from ARCH(1) params and data ---

def arch1_sigma2(y, omega, alpha):
    """
    Given y_t and ARCH(1) params (omega, alpha) from fitarch1,
    compute the conditional variance path sigma2_t.
    """
    y = np.asarray(y, dtype=float)
    T = len(y)
    sigma2 = np.empty(T)
    # init with unconditional variance
    sigma2[0] = omega / (1.0 - alpha + 1e-6)
    for t in range(1, T):
        sigma2[t] = omega + alpha * y[t-1]**2
    return sigma2

# --- DCC(1,1) negative log-likelihood for 2D Z ---

def dcc_nll(theta, Z):
    """
    DCC(1,1) negative log-likelihood for standardized residuals Z (T x 2).

    theta = [u, v] unconstrained.
    We map to (a, b) via a softmax to ensure a>0, b>0, a+b<1:
        e1 = exp(u), e2 = exp(v)
        a  = e1 / (1 + e1 + e2)
        b  = e2 / (1 + e1 + e2)
    """
    Z = np.asarray(Z, dtype=float)
    T, k = Z.shape
    assert k == 2, "This DCC implementation is for 2D only."

    # map (theta[0], theta[1]) -> (a, b)
    e1 = np.exp(theta[0])
    e2 = np.exp(theta[1])
    denom = 1.0 + e1 + e2
    a = e1 / denom
    b = e2 / denom

    # unconditional covariance of Z
    Qbar = (Z.T @ Z) / T
    Q = Qbar.copy()

    nll = 0.0
    for t in range(T):
        z = Z[t].reshape(-1, 1)  # (2 x 1)

        # DCC recursion
        Q = (1.0 - a - b) * Qbar + a * (z @ z.T) + b * Q

        # normalize to correlation matrix
        d = np.sqrt(np.diag(Q))
        Dinv = np.diag(1.0 / d)
        R = Dinv @ Q @ Dinv

        detR = np.linalg.det(R)
        if detR <= 0:
            return 1e10  # penalize non-PD

        invR = np.linalg.inv(R)
        nll += 0.5 * (np.log(detR) + (z.T @ invR @ z)[0, 0])

    return nll

# --- main wrapper: DCC-ARCH(1) for your `rets` DataFrame ---

def fit_dcc_arch1_2d(rets, lr_dcc=1e-3, max_iter_dcc=2000):
    """
    rets: DataFrame with columns ["SPY","TLT"] as you defined:
        tickers = ["SPY", "TLT"]
        data = yf.download(tickers, start="2021-01-01", end="2025-12-31")["Close"].dropna()
        log_prices = np.log(data)
        rets = log_prices.diff().dropna()
        rets.columns = tickers
    Uses your existing `fitarch1` for the univariate parts.
    """
    rets = rets[["SPY", "TLT"]].dropna()
    spy = rets["SPY"].to_numpy()
    tlt = rets["TLT"].to_numpy()
    T = len(spy)

    # --- 1. Fit univariate ARCH(1) to SPY and TLT using YOUR fitarch1 ---
    res_spy = fitarch1(spy)
    res_tlt = fitarch1(tlt)

    omega1, alpha1 = res_spy["omega"], res_spy["alpha"]
    omega2, alpha2 = res_tlt["omega"], res_tlt["alpha"]

    # build sigma^2 paths
    sigma2_1 = arch1_sigma2(spy, omega1, alpha1)
    sigma2_2 = arch1_sigma2(tlt, omega2, alpha2)

    sigma1 = np.sqrt(sigma2_1)
    sigma2 = np.sqrt(sigma2_2)

    # standardized residuals (use centered returns; ARCH is already on centered y in your fit)
    z1 = (spy - spy.mean()) / sigma1
    z2 = (tlt - tlt.mean()) / sigma2
    Z = np.column_stack([z1, z2])  # T x 2

    # --- 2. Fit DCC(1,1) over Z using your gradient_descent ---
    theta0 = np.array([-4.0, 4.0])  # heuristic start (a small, b near 1)
    theta_hat, nll_dcc_min = gradient_descent(
        lambda th: dcc_nll(th, Z),
        theta0,
        lr=lr_dcc,
        max_iter=max_iter_dcc
    )

    e1 = np.exp(theta_hat[0])
    e2 = np.exp(theta_hat[1])
    denom = 1.0 + e1 + e2
    a_hat = e1 / denom
    b_hat = e2 / denom

    # --- 3. Reconstruct Q_t, R_t, H_t ---
    Qbar = (Z.T @ Z) / T
    Q = Qbar.copy()

    R_t = np.zeros((T, 2, 2))
    H_t = np.zeros((T, 2, 2))

    for t in range(T):
        z = Z[t].reshape(-1, 1)
        Q = (1.0 - a_hat - b_hat) * Qbar + a_hat * (z @ z.T) + b_hat * Q

        d = np.sqrt(np.diag(Q))
        Dinv = np.diag(1.0 / d)
        R = Dinv @ Q @ Dinv
        R_t[t] = R

        D_t = np.diag([sigma1[t], sigma2[t]])
        H_t[t] = D_t @ R @ D_t

    params_garch = {
        "SPY": {"omega": omega1, "alpha": alpha1, "sigma2": sigma2_1},
        "TLT": {"omega": omega2, "alpha": alpha2, "sigma2": sigma2_2},
    }
    params_dcc = {"a": a_hat, "b": b_hat, "nll": nll_dcc_min}

    return params_garch, params_dcc, Z, R_t, H_t

In [9]:
params_garch, params_dcc, Z, R_t, H_t = fit_dcc_arch1_2d(rets)

print("SPY ARCH(1):", params_garch["SPY"]["omega"], params_garch["SPY"]["alpha"])
print("TLT ARCH(1):", params_garch["TLT"]["omega"], params_garch["TLT"]["alpha"])

print("DCC params: a =", params_dcc["a"], ", b =", params_dcc["b"])

# Last estimated correlation between SPY and TLT:
last_rho = R_t[-1, 0, 1]
print("Last correlation ρ_T(SPY, TLT):", last_rho)


NameError: name 'fitarch1' is not defined