In [None]:
import pandas as pd
import numpy as np
from get_data import get_yield
from statsmodels.tsa.api import VAR
from statsmodels.tsa.stattools import grangercausalitytests

# Yield data
df = get_yield(term=10)
# to weekly and take diff
df = df.resample("W-FRI").last()
df = df.diff().dropna()
# Skip last two years of data
df = df.truncate(after = pd.to_datetime('2023-2-10'))

# corr = df_1.corr()

Y = df.copy()
Y = Y.sort_index()
Y = Y.apply(pd.to_numeric, errors="coerce")

df

In [None]:
# Granger causlity test (just for UK-CAN for now)
# Might need to make dict of dicts for all of the combinations
gc_test_result = grangercausalitytests(Y[['CAN', 'UK']], maxlag=5, addconst=True, verbose=True)

In [None]:
# VAR
model = VAR(Y)
order = model.select_order(maxlags=7)
print(order.summary())
p = order.selected_orders["aic"]
results = model.fit(p)
print(results.summary())

In [None]:
# Granger Causality of everything

# Statistical meaning, things with a low enough p value improve the prediction of of Y inside the VAR, eg: lags of CAN help
# predict the UK change in yield

for caused in results.names:
    for causing in results.names:
        if caused != causing:
            test = results.test_causality(caused, [causing], kind='f')
            print(f"{causing} -> {caused}: p-value = {test.pvalue}")

In [None]:
fevd = results.fevd(10)
fevd.summary()

In [6]:
def generalized_fevd(var_results, H=10, normalize=True):
    """
    Generalized FEVD (Pesaran-Shin) for a fitted statsmodels VARResults.
    
    Parameters
    ----------
    var_results : statsmodels.tsa.vector_ar.var_model.VARResults
        Fitted VAR results (e.g., `results = model.fit(p)`).
    H : int
        Forecast horizon in steps (e.g., weeks). Uses horizons 0..H-1 in sums.
    normalize : bool
        If True, row-normalize so contributions sum to 1 for each equation at each horizon.
        
    Returns
    -------
    fevd : np.ndarray
        Array of shape (H, n, n) where fevd[h, i, j] is contribution of shock j
        to variable i at horizon h (h=0..H-1). If normalize=True, each row sums to 1.
    names : list
        Variable names in order.
    """
    names = list(var_results.names)
    n = len(names)

    # Moving-average (MA) representation coefficients Psi_k
    # Psi has shape (H, n, n) with Psi[0] = I
    Psi = var_results.ma_rep(H)

    # Residual covariance matrix Σ (n x n)
    Sigma = np.asarray(var_results.sigma_u)

    # Precompute denominators: denom[h, i] = sum_{k=0}^{h} e_i' Psi_k Σ Psi_k' e_i
    # We'll build horizon-by-horizon contributions using cumulative sums.
    fevd = np.zeros((H, n, n), dtype=float)

    # For each horizon h, compute cumulative sums from k=0..h
    for h in range(H):
        denom = np.zeros(n, dtype=float)
        numer = np.zeros((n, n), dtype=float)

        for k in range(h + 1):
            A = Psi[k] @ Sigma  # (n x n)
            # denom_i adds (Psi_k Σ Psi_k')_ii
            denom += np.diag(A @ Psi[k].T)

            # numer_{i,j} adds (e_i' Psi_k Σ e_j)^2 / σ_jj
            # e_i' Psi_k Σ e_j is just A[i, j]
            numer += (A ** 2)

        # divide each column j by σ_jj (generalized shock scaling)
        sigma_diag = np.diag(Sigma).copy()
        # avoid division by 0 if any diag is 0 (shouldn't happen in sane VAR)
        sigma_diag[sigma_diag == 0] = np.nan

        numer = numer / sigma_diag  # broadcasts across rows

        # contribution at horizon h
        # θ_ij(h) = numer_ij / denom_i
        # (broadcast denom_i across columns)
        fevd[h] = numer / denom[:, None]

        if normalize:
            row_sums = fevd[h].sum(axis=1, keepdims=True)
            fevd[h] = fevd[h] / row_sums

    return fevd, names


def fevd_table(fevd, names, var, horizons=None):
    """
    Convenience: return a DataFrame for one dependent variable across horizons.
    var : str, dependent variable name.
    horizons : iterable of int or None -> use all.
    """
    idx = names.index(var)
    H = fevd.shape[0]
    if horizons is None:
        horizons = range(H)

    rows = []
    for h in horizons:
        rows.append(fevd[h, idx, :])

    df = pd.DataFrame(rows, columns=names, index=list(horizons))
    df.index.name = "horizon"
    return df


In [None]:
# This creates a table which shows how much of the yields is caused locally, versus impacted from others (I think)
gfevd, names = generalized_fevd(results, H=10, normalize=True)
uk_g = fevd_table(gfevd, names, "UK")
can_g = fevd_table(gfevd, names, "CAN")
us_g = fevd_table(gfevd, names, "US")

uk_g


In [None]:
# This creates the Diebold-Yilmaz spillover table which is found in their papers

theta = gfevd[9]
N = theta.shape[0]

total_spillover = (theta.sum() - np.trace(theta)) / N * 100

to_others = theta.sum(axis=0) - np.diag(theta)
from_others = theta.sum(axis=1) - np.diag(theta)
net = to_others - from_others

spill = pd.DataFrame({
    "TO_others": to_others,
    "FROM_others": from_others,
    "NET": net
}, index=names).sort_values("NET", ascending=False)

total_spillover, spill