In [1]:
%%writefile sim_tvp_5gvar.py
# sim_tvp_gvar.py  -- standalone simulation module (ASCII only)

import numpy as np
import pandas as pd

COUNTRIES = ("JP","US","EU")
VTYPES    = ("GDP_D","CPI_D","HUR_D")
COLS = [f"{c}_{v}" for c in COUNTRIES for v in VTYPES] + ["WTI_D"]
COL_IDX = {c:i for i,c in enumerate(COLS)}

def weight_equal_3():
    W = np.ones((3,3))/2.0
    np.fill_diagonal(W, 0.0)
    return W

def base_coef(vtype:str, P:int):
    # coeff order (no intercept): [ own(P), foreign_mean(P), wti(P) ] => 3P
    if vtype=="GDP_D":
        own1, for1, wti1 = 0.45, 0.15, -0.12
    elif vtype=="CPI_D":
        own1, for1, wti1 = 0.55, 0.20,  0.15
    else:  # HUR_D
        own1, for1, wti1 = 0.50, 0.10,  0.10
    own = [own1] + [0.05]*(P-1)
    frm = [for1] + [0.02]*(P-1)
    wti = [wti1] + [0.02]*(P-1)
    return np.array(own+frm+wti)

def gen_beta_paths(T:int, P:int, tvp:bool=True, break_at:int=None,
                   rho:float=0.97, q_scale:float=0.02, trend_strength:float=0.0,
                   boost_mult: float = 1.6, boost_block: str = "WTI"):

    betas = {}

    for c in COUNTRIES:
        for v in VTYPES:
            eq = f"{c}_{v}"
            b0 = np.r_[0.0, base_coef(v, P)]     # 截距 + [own(P), foreign(P), wti(P)]
            b  = np.zeros((T, b0.size)); b[0] = b0
            trend = np.linspace(0, trend_strength, T)
            for t in range(1, T):
                if tvp:
                    drift = (1.0 - rho) * (b0 - b[t-1])
                    noise = np.random.normal(0.0, q_scale, size=b0.size)
                    b[t]  = b[t-1] + drift + noise + trend[t]
                else:
                    b[t]  = b0 + trend[t]
            betas[eq] = b

    b0w = np.r_[0.0, np.array([0.60] + [0.10]*(P-1))]
    bw  = np.zeros((T, b0w.size)); bw[0] = b0w
    trend_w = np.linspace(0, trend_strength, T)
    for t in range(1, T):
        if tvp:
            drift = (1.0 - rho) * (b0w - bw[t-1])
            noise = np.random.normal(0.0, q_scale, size=b0w.size)
            bw[t] = bw[t-1] + drift + noise + trend_w[t]
        else:
            bw[t] = b0w + trend_w[t]
    betas["WTI_D"] = bw

    if (break_at is not None) and (0 <= int(break_at) < T):
        b_at = int(break_at)
        for c in COUNTRIES:
            for v in ("GDP_D", "CPI_D"):
                eq = f"{c}_{v}"
                b = betas[eq][b_at].copy()
                P3 = 3*P
                if boost_block == "WTI":
                    b[1+2*P : 1+P3] *= boost_mult
                elif boost_block == "OWN":
                    b[1 : 1+P] *= boost_mult
                elif boost_block == "FOREIGN":
                    b[1+P : 1+2*P] *= boost_mult
                else:  # "ALL"
                    b[1:1+P3] *= boost_mult
                betas[eq][b_at] = b
    return betas

def make_sigma():
    n = len(COLS)
    S = np.eye(n)
    # couple same vtype across countries
    for v in VTYPES:
        ids = [COL_IDX[f"{c}_{v}"] for c in COUNTRIES]
        for i in ids:
            for j in ids:
                if i!=j: S[i,j] = 0.20
    # WTI with CPI stronger
    for c in COUNTRIES:
        S[COL_IDX["WTI_D"], COL_IDX[f"{c}_CPI_D"]] = 0.25
        S[COL_IDX[f"{c}_CPI_D"], COL_IDX["WTI_D"]] = 0.25
    return S

def simulate_panel(T:int, P:int, betas:dict, start_date:str="2000-01-01",
                   hetero:bool=False, break_at:int=None, eps_scale:float=0.08):
    W = weight_equal_3()
    S = make_sigma()
    L = np.linalg.cholesky(S)
    Y = np.zeros((T, len(COLS)))

    def foreign_mean(prev_vec, vtype):
        ids = [COL_IDX[f"{c}_{vtype}"] for c in COUNTRIES]
        vec = prev_vec[ids]
        f = W @ vec
        return {"JP":f[0], "US":f[1], "EU":f[2]}

    for t in range(P, T):
        prev = {lag: Y[t-lag].copy() for lag in range(1, P+1)}

        for c in COUNTRIES:
            for v in VTYPES:
                eq = f"{c}_{v}"
                feats = []
                for lag in range(1, P+1):
                    own = prev[lag][COL_IDX[eq]]
                    frm = foreign_mean(prev[lag], v)[c]
                    wti = prev[lag][COL_IDX["WTI_D"]]
                    feats += [own, frm, wti]
                b = betas[eq][t]
                Y[t, COL_IDX[eq]] = b[0] + np.dot(b[1:], np.array(feats))

        feats_w = [prev[lag][COL_IDX["WTI_D"]] for lag in range(1, P+1)]
        bw = betas["WTI_D"][t]
        Y[t, COL_IDX["WTI_D"]] = bw[0] + np.dot(bw[1:], np.array(feats_w))

        scale = eps_scale
        if hetero and (break_at is not None) and (t >= break_at):
            scale *= 1.8
        eps = L @ np.random.normal(0.0, scale, size=len(COLS))
        Y[t] += eps

    idx = pd.date_range(start_date, periods=T, freq="M")
    df = pd.DataFrame(Y, index=idx, columns=COLS).iloc[P:]
    return df

def add_foreign_and_lags(df:pd.DataFrame, P:int):
    data = df.copy()
    # foreign mean (level columns)
    for c in COUNTRIES:
        for v in VTYPES:
            others = [oc for oc in COUNTRIES if oc!=c]
            data[f"{c}*FOR*{v}"] = 0.5*(data[f"{others[0]}_{v}"] + data[f"{others[1]}_{v}"])
    # add lags
    for lag in range(1, P+1):
        data = data.join(data.shift(lag).add_suffix(f"_lag{lag}"))
    data.dropna(inplace=True)
    return data

def simulate_and_preprocess(T:int=300, P:int=2, split_date:str="2022-06-01",
                            tvp:bool=True, break_at:int=None,
                            hetero:bool=False, q_scale:float=0.02, eps_scale:float=0.08,
                            trend_strength:float=0.0):  
    betas = gen_beta_paths(T=T, P=P, tvp=tvp, break_at=break_at, q_scale=q_scale,
                           trend_strength=trend_strength)
    df = simulate_panel(T=T, P=P, betas=betas, hetero=hetero, break_at=break_at, eps_scale=eps_scale)
    data = add_foreign_and_lags(df, P)
    split_ts = pd.Timestamp(split_date)
    test_h = 12
    train = data[data.index<=split_ts]
    tmp   = data[data.index>split_ts].head(P+test_h)
    test  = tmp.iloc[P:]
    return train, test, df

Overwriting sim_tvp_5gvar.py
