In [1]:
import numpy as np, pandas as pd, matplotlib.pyplot as plt
from scipy.optimize import minimize
import statsmodels.api as sm

plt.rcParams["figure.dpi"] = 120

def _safelog(x, eps=1e-12):
    return np.log(np.clip(np.asarray(x, float), eps, None))

def qlike(f, y, eps=1e-12):
    f = np.clip(np.asarray(f, float), eps, None)
    y = np.asarray(y, float)
    return np.mean(np.log(f) + y/f)

def mse(yhat, y):
    yhat = np.asarray(yhat, float); y = np.asarray(y, float)
    return np.mean((yhat - y)**2)


In [10]:
rv = pd.read_csv('SPY.csv')
rv = rv[["Date", "Volatility", "Type"]]
rv.rename(columns={"Volatility": "RV_daily"},inplace=True)
rv = rv[rv['Type'] == 'QMLE-Trade']
rv.drop(columns=['Type'], inplace=True)
rv = rv.set_index("Date")
rv.index = pd.to_datetime(rv.index)
rv.index.name = "date"

rv.head()

Unnamed: 0_level_0,RV_daily
date,Unnamed: 1_level_1
1996-01-02,0.140261
1996-01-03,0.082399
1996-01-04,0.211454
1996-01-05,0.022647
1996-01-09,0.228727


In [12]:
# returns (CRSP) → ret  (PERMNO = 84398, date is DD/MM/YYYY)
ret = pd.read_csv("returns (crsp).csv")
ret.columns = ret.columns.str.strip()

# pick columns that really exist in this file
date_col = "date" if "date" in ret.columns else "Date"
ret_col  = "RET" if "RET" in ret.columns else "RETX"  # prefer RET, fallback to RETX

ret = ret[[date_col, "PERMNO", ret_col]].copy()
ret = ret[ret["PERMNO"] == 84398].drop(columns=["PERMNO"])

# parse dates BEFORE setting the index (DD/MM/YYYY)
ret[date_col] = pd.to_datetime(ret[date_col], format="%d/%m/%Y", errors="raise")
ret = ret.sort_values(date_col).set_index(date_col)
ret.index.name = "date"

# make numeric returns (handles 'C','B','' etc.), create r
ret["r"] = pd.to_numeric(ret[ret_col], errors="coerce")
ret.drop(columns=[ret_col], inplace=True)
ret.dropna(subset=["r"], inplace=True)

# cap to last available date you mentioned
ret = ret.loc[:"2024-12-29"]

ret.head()


Unnamed: 0_level_0,r
date,Unnamed: 1_level_1
1996-01-02,0.010673
1996-01-03,0.002766
1996-01-04,-0.009529
1996-01-05,-0.002025
1996-01-08,0.003805


In [14]:
df = rv.join(ret, how="inner").dropna()
df.head()


Unnamed: 0_level_0,RV_daily,r
date,Unnamed: 1_level_1,Unnamed: 2_level_1
1996-01-02,0.140261,0.010673
1996-01-03,0.082399,0.002766
1996-01-04,0.211454,-0.009529
1996-01-05,0.022647,-0.002025
1996-01-09,0.228727,-0.017185


In [15]:
class RealizedGARCH11:
    def __init__(self):
        self.theta_ = None
        self.success_ = None
        self.info_ = {}

    @staticmethod
    def _negloglik(theta, r, x):
        omega, beta, gamma, xi, phi, tau1, tau2, log_sigma_u2 = theta
        sigma_u2 = np.exp(log_sigma_u2)

        r = np.asarray(r, float); x = np.asarray(x, float)
        T = len(r)
        if T < 20: return 1e12

        logx = _safelog(x)
        logh = np.empty(T)
        init_var = max(np.var(r), 1e-8)
        logh[0] = np.log(init_var)

        ll = 0.0
        two_pi = 2*np.pi

        for t in range(1, T):
            logh[t] = omega + beta*logh[t-1] + gamma*logx[t-1]
            h_t = np.exp(logh[t])

            # r_t | h_t
            ll_r = -0.5*(np.log(two_pi) + np.log(h_t) + (r[t]**2)/h_t)

            # z_t
            z_t = r[t]/np.sqrt(h_t)

            # log x_t | h_t, z_t
            mu_x = xi + phi*logh[t] + tau1*z_t + tau2*(z_t**2 - 1.0)
            e_t = logx[t] - mu_x
            ll_x = -0.5*(np.log(two_pi) + np.log(sigma_u2) + (e_t**2)/sigma_u2)

            ll += (ll_r + ll_x)

        return -ll if np.isfinite(ll) else 1e12

    def fit(self, r, x, start=None, bounds=None, options=None):
        r = np.asarray(r, float); x = np.asarray(x, float)

        if start is None:
            start = np.array([
                -0.1,   # omega
                 0.95,  # beta
                 0.05,  # gamma
                 0.0,   # xi
                 1.0,   # phi
                 0.0,   # tau1
                 0.0,   # tau2
                -2.0    # log_sigma_u2
            ], float)

        if bounds is None:
            bounds = [
                (-5.0,   5.0),   # omega
                ( 0.00,  0.999), # beta
                ( 0.00,  2.0),   # gamma
                (-5.0,   5.0),   # xi
                ( 0.5,   1.5),   # phi
                (-2.0,   2.0),   # tau1
                (-2.0,   2.0),   # tau2
                (-12.0,  5.0)    # log_sigma_u2
            ]

        res = minimize(self._negloglik, start, args=(r,x),
                       method="L-BFGS-B", bounds=bounds,
                       options={"maxiter": 2000, "ftol": 1e-10, **(options or {})})
        self.theta_ = res.x
        self.success_ = res.success
        self.info_ = {"opt": res}
        return self

    def filter_in_sample(self, r, x, theta=None):
        if theta is None: theta = self.theta_
        omega, beta, gamma, xi, phi, tau1, tau2, log_sigma_u2 = theta

        r = np.asarray(r, float); x = np.asarray(x, float)
        T = len(r)
        logx = _safelog(x)

        logh = np.empty(T)
        init_var = max(np.var(r), 1e-8)
        logh[0] = np.log(init_var)

        z = np.zeros(T); u = np.zeros(T)
        for t in range(1, T):
            logh[t] = omega + beta*logh[t-1] + gamma*logx[t-1]
            h_t = np.exp(logh[t])
            z[t] = r[t]/np.sqrt(h_t)
            mu_x = xi + phi*logh[t] + tau1*z[t] + tau2*(z[t]**2 - 1.0)
            u[t] = logx[t] - mu_x

        return logh, np.exp(logh), z, u, np.exp(log_sigma_u2)

    def forecast_oos(self, r_train, x_train, x_test):
        theta = self.theta_
        omega, beta, gamma, xi, phi, tau1, tau2, log_sigma_u2 = theta

        r_train = np.asarray(r_train, float); x_train = np.asarray(x_train, float); x_test = np.asarray(x_test, float)

        logh_tr, _, _, _, _ = self.filter_in_sample(r_train, x_train, theta=theta)
        last_logh = logh_tr[-1]
        last_logx = _safelog(x_train[-1])

        logh_fore = np.empty(len(x_test))
        x_pred = np.empty(len(x_test))

        for i in range(len(x_test)):
            logh_t = omega + beta*last_logh + gamma*last_logx
            h_t = np.exp(logh_t)
            logh_fore[i] = logh_t

            # E[log x_t | F_{t-1}] uses E[z_t]=0, E[z_t^2-1]=0
            logx_pred = xi + phi*logh_t
            x_pred[i] = np.exp(logx_pred)

            # update recursion with realized x_t
            last_logh = logh_t
            last_logx = _safelog(x_test[i])

        h_pred = np.exp(logh_fore)
        return h_pred, x_pred


In [None]:
print(df.index.min(), df.index.max(), df.shape)
print(series_x.isna().sum(), series_r.isna().sum())

In [None]:
# --- PRECONDITIONS ---
assert "RV_daily" in df.columns, "df must have column 'RV_daily' from SPY.csv"
assert "r" in df.columns,        "df must have column 'r' from returns (crsp).csv"
assert isinstance(df.index, pd.DatetimeIndex), "df.index must be DatetimeIndex"

df = df.sort_index()

# If RV_daily is *volatility* (sigma), square it to variance for Realized GARCH.
# If it's already variance, leave the next line as-is.
series_x = df["RV_daily"].astype(float)      # <- use as-is; square here if needed: series_x = (df["RV_daily"].astype(float))**2
series_r = df["r"].astype(float)

# Convert dates to monthly periods
months = df.index.to_period("M")
all_months = pd.PeriodIndex(months.unique()).sort_values()

lookback_months  = 42     # 3.5y train
test_span_months = 1      # 1m test
step_months      = 1      # slide monthly

fold_results, param_store = [], []

for i in range(lookback_months, len(all_months), step_months):
    test_months  = all_months[i : i + test_span_months]
    if len(test_months) == 0:
        break
    train_months = all_months[i - lookback_months : i]

    tr_mask = months.isin(train_months)
    te_mask = months.isin(test_months)

    # Guard against empty folds
    if tr_mask.sum() < 50 or te_mask.sum() == 0:
        # not enough data in this fold—skip
        continue

    x_tr = series_x.loc[tr_mask].values
    r_tr = series_r.loc[tr_mask].values
    x_te = series_x.loc[te_mask].values
    y_te = series_x.loc[te_mask]  # truth as Series for the index

    # Fit per fold with basic retry on optimizer failure
    try:
        rg = RealizedGARCH11().fit(r_tr, x_tr)
        if not getattr(rg, "success_", True):
            # simple retry with slightly different start values (nudged beta/gamma)
            start = np.array([-0.1, 0.92, 0.08, 0.0, 1.0, 0.0, 0.0, -2.0])
            rg = RealizedGARCH11().fit(r_tr, x_tr, start=start)
    except Exception as e:
        print(f"[WARN] fold {str(test_months[0])} fit failed: {e}")
        continue

    param_store.append(rg.theta_)

    try:
        h_pred, x_pred = rg.forecast_oos(r_tr, x_tr, x_te)
    except Exception as e:
        print(f"[WARN] fold {str(test_months[0])} forecast failed: {e}")
        continue

    fold_df = pd.DataFrame(
        {
            "RV_true": y_te.values,
            "h_pred":  h_pred,
            "x_pred":  x_pred,
            # safer than Period.strftime across pandas versions
            "fold_month": str(test_months[0])
        },
        index=y_te.index,
    )

    fold_results.append(fold_df)

# Concatenate all out-of-sample predictions
if len(fold_results) == 0:
    raise RuntimeError("No successful folds were produced. Check that df has enough overlap and that RV/returns contain valid numbers.")
cv_rg = pd.concat(fold_results).sort_index()

print(f"Folds estimated: {len(fold_results)}; OOS days: {cv_rg.shape[0]}")


In [None]:
oos = cv_rg.copy()
mse_h,   ql_h = mse(oos["h_pred"], oos["RV_true"]), qlike(oos["h_pred"], oos["RV_true"])
mse_x,   ql_x = mse(oos["x_pred"], oos["RV_true"]), qlike(oos["x_pred"], oos["RV_true"])

print("OOS Realized GARCH vs RV_true")
print(f"  h_pred: MSE={mse_h:,.6f}  QLIKE={ql_h:,.6f}")
print(f"  x_pred: MSE={mse_x:,.6f}  QLIKE={ql_x:,.6f}")

plt.figure(figsize=(14,5))
plt.plot(oos.index, oos["RV_true"], label="Actual RV", linewidth=1)
plt.plot(oos.index, oos["h_pred"],  label="Predicted (h_t)", linewidth=1)
plt.title("Realized GARCH(1,1): OOS Predicted vs Actual (variance)")
plt.xlabel("Date"); plt.ylabel("Variance"); plt.legend(); plt.tight_layout(); plt.show()

plt.figure(figsize=(6,6))
plt.scatter(oos["RV_true"], oos["h_pred"], s=10, alpha=0.6)
lims = [float(min(oos["RV_true"].min(), oos["h_pred"].min())),
        float(max(oos["RV_true"].max(), oos["h_pred"].max()))]
plt.plot(lims, lims, linewidth=1)
plt.xlim(lims); plt.ylim(lims)
plt.xlabel("Actual RV"); plt.ylabel("Predicted (h_t)")
plt.title("OOS Predicted vs Actual (Realized GARCH)"); plt.tight_layout(); plt.show()
