Creating baseline model EGARCH for BTC 

In [60]:
import warnings
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller, kpss
from arch.univariate import EGARCH, ZeroMean, StudentsT, Normal
from statsmodels.tools.sm_exceptions import InterpolationWarning

Helper functions 

In [1]:
#need to import this for rmse and qlike
#import numpy as np


def rmse(y_true, y_pred):
    return float(np.sqrt(np.mean((y_true - y_pred) ** 2)))


def qlike(y2, var_fc, eps=1e-12):
    ratio = (y2 + eps) / (var_fc + eps)
    return float(np.mean(ratio - np.log(ratio) - 1.0))


def clean_series(x, name="series"):
    x = pd.to_numeric(x, errors="coerce").astype(float)
    x = x.replace([np.inf, -np.inf], np.nan).dropna()
    if not x.index.is_monotonic_increasing:
        x = x.sort_index()
    return x


def scale_series(x, factor=100.0):
    if factor is None or factor == 1.0:
        return x, 1.0
    return x * factor, factor

Loading dataset 

In [71]:
df = pd.read_csv("dune_btc_hour.csv")
df['hour_utc'] = pd.to_datetime(df['hour_utc'], utc=True)


df = df.set_index('hour_utc')
df.index = pd.to_datetime(df.index)

df = df.sort_index()
df = df[~df.index.duplicated(keep='last')]

df['hourly_return'] = clean_series(df['hourly_return'], name="hourly_return_all")
r = df['hourly_return']

print("Index dtype:", df.index.dtype)
print("Columns:", df.columns)
print("Data shape:", df.shape)
print(f"\nUsing {len(r)} observations for EGARCH modelling.")

Index dtype: datetime64[ns, UTC]
Columns: Index(['btc_exchange_netflow_usd', 'active_sending_addresses',
       'active_receiving_addresses', 'onchain_volume_usd', 'open', 'low',
       'high', 'close', 'mint_reward_usd', 'total_fee_usd',
       'transaction_count', 'wallet_to_exchange_usd', 'realized_volatility',
       'RV_MA_1hr', 'RV_MA_3hr', 'RV_MA_12hr', 'hourly_return'],
      dtype='object')
Data shape: (8208, 17)

Using 8208 observations for EGARCH modelling.


Defining stationary tests 

In [63]:
def adf_kpss(x: pd.Series):
    x = x.dropna()
    adf_res = adfuller(x, autolag="AIC")
    kpss_level = kpss(x, regression="c", nlags="auto")
    kpss_trend = kpss(x, regression="ct", nlags="auto")
    return {
        "ADF": {"stat": adf_res[0], "p": adf_res[1]},
        "KPSS_level": {"stat": kpss_level[0], "p": kpss_level[1]},
        "KPSS_trend": {"stat": kpss_trend[0], "p": kpss_trend[1]},
    }

print("\n=== Stationarity Tests ===")
print(adf_kpss(r))


=== Stationarity Tests ===
{'ADF': {'stat': np.float64(-20.879240730752954), 'p': 0.0}, 'KPSS_level': {'stat': np.float64(0.1442031290166316), 'p': np.float64(0.1)}, 'KPSS_trend': {'stat': np.float64(0.10581848073629616), 'p': np.float64(0.1)}}


look-up table. The actual p-value is greater than the p-value returned.

  kpss_level = kpss(x, regression="c", nlags="auto")
look-up table. The actual p-value is greater than the p-value returned.

  kpss_trend = kpss(x, regression="ct", nlags="auto")


Use table to show at least weak stationarity. 

In [76]:
def stationarity_table(x, alpha=0.05):
    x = pd.to_numeric(x, errors="coerce").astype(float).dropna()

    adf_stat, adf_p, adf_lags, adf_nobs, adf_crit, *_ = adfuller(x, autolag="AIC")

    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", category=InterpolationWarning)
        kpss_level_stat, kpss_level_p, kpss_level_lags, kpss_level_crit = kpss(x, regression="c", nlags="auto")
        kpss_trend_stat, kpss_trend_p, kpss_trend_lags, kpss_trend_crit = kpss(x, regression="ct", nlags="auto")

    rows = [
        {
            "Test": "ADF",
            "Statistic": adf_stat,
            "p-value": adf_p,
            "H0": "Unit root (non-stationary)",
            f"Decision @ {int(alpha*100)}%": "Stationary (reject H0)" if adf_p < alpha else "Non-stationary (fail to reject)",
            "Lags used": adf_lags,
            "Obs used": adf_nobs,
        },
        {
            "Test": "KPSS (level)",
            "Statistic": kpss_level_stat,
            "p-value": kpss_level_p,
            "H0": "Level-stationary",
            f"Decision @ {int(alpha*100)}%": "Non-stationary (reject H0)" if kpss_level_p < alpha else "Stationary (fail to reject)",
            "Lags used": kpss_level_lags,
            "Obs used": len(x),
        },
        {
            "Test": "KPSS (trend)",
            "Statistic": kpss_trend_stat,
            "p-value": kpss_trend_p,
            "H0": "Trend-stationary",
            f"Decision @ {int(alpha*100)}%": "Non-stationary (reject H0)" if kpss_trend_p < alpha else "Stationary (fail to reject)",
            "Lags used": kpss_trend_lags,
            "Obs used": len(x),
        },
    ]

    df_out = pd.DataFrame(rows)
    return df_out

    num_cols = ["Statistic", "p-value"]
    df_out[num_cols] = df_out[num_cols].applymap(lambda v: np.nan if pd.isna(v) else float(v))
    return df_out

tbl = stationarity_table(r, alpha=0.05)
tbl

Unnamed: 0,Test,Statistic,p-value,H0,Decision @ 5%,Lags used,Obs used
0,ADF,-20.879241,0.0,Unit root (non-stationary),Stationary (reject H0),22,8184
1,KPSS (level),0.144203,0.1,Level-stationary,Stationary (fail to reject),6,8207
2,KPSS (trend),0.105818,0.1,Trend-stationary,Stationary (fail to reject),7,8207


Train/Validation/Test Split (70/15/15)

In [65]:
use_validation = True 
n = len(r)
if use_validation:
    n_train = int(0.70 * n)
    n_val   = int(0.15 * n)
    train_proto = r.iloc[:n_train]
    val_proto   = r.iloc[n_train:n_train+n_val]
    test_proto  = r.iloc[n_train+n_val:]
else:
    n_train = int(0.85 * n)
    train_proto = r.iloc[:n_train]
    val_proto   = None
    test_proto  = r.iloc[n_train:]

PURGE_HOURS = 12
if len(test_proto) == 0:
    raise ValueError("Empty test set after temporal split.")
boundary = test_proto.index.min()
purged_start = boundary + pd.Timedelta(hours=PURGE_HOURS)

train_val = r.loc[: purged_start - pd.Timedelta(hours=1)]
test      = r.loc[purged_start:]

print(f"Train/Val for CV: {len(train_val)} obs  |  Test (after 12h purge): {len(test)} obs")

Train/Val for CV: 6988 obs  |  Test (after 12h purge): 1220 obs


Rolling Window = 5 Splits 

In [66]:
def rolling_split(x, n_splits=5, min_train_size=100):
    n = len(x)
    sizes = np.full(n_splits, n // n_splits, dtype=int)
    sizes[: n % n_splits] += 1

    idx = np.arange(n)
    cur = 0
    for fs in sizes:
        test_start, test_end = cur, cur + fs
        cur = test_end
        if test_start < min_train_size:
            continue
        yield idx[:test_start], idx[test_start:test_end]

Fit EGARCH

In [67]:
def egarch_path_refit(train_ret, test_ret, dist="t", scale=100.0):
    tr = clean_series(train_ret, "train_ret")
    te = clean_series(test_ret, "test_ret")

    tr_s, s = scale_series(tr, scale)
    te_s, _ = scale_series(te, scale)

    all_s = pd.concat([tr_s, te_s])
    n_tr, n_te = len(tr_s), len(te_s)

    var_fc_s = np.empty(n_te)

    for j in range(n_te):
        y_fit = all_s.iloc[: n_tr + j]
        model = ZeroMean(y_fit)
        model.volatility = EGARCH(p=1, o=1, q=1)
        model.distribution = StudentsT() if dist == "t" else Normal()
        res = model.fit(disp="off")

        fcast = res.forecast(horizon=1, reindex=False)
        var_fc_s[j] = float(fcast.variance.values[-1, 0])

    var_fc = var_fc_s / (s ** 2)
    r2 = te.values ** 2

    return {
        "var_fc": var_fc,
        "r2": r2,
        "rmse": rmse(r2, var_fc),
        "qlike": qlike(r2, var_fc),
        "n_train": len(tr),
        "n_test": len(te),
    }

Cross Validation 

In [72]:
print("\n=== Rolling CV (5 splits)===")
cv_scores = []
for i, (tri, tei) in enumerate(rolling_split(train_val, n_splits=5, min_train_size=100), 1):
    tr_i = train_val.iloc[tri]
    te_i = train_val.iloc[tei]
    
    out = egarch_path_refit(tr_i, te_i, dist="t", scale=100.0)
    cv_scores.append((out["rmse"], out["qlike"]))
    print(f"Fold {i}: RMSE={out['rmse']:.6e} | QLIKE={out['qlike']:.6f} "
            f"| Train={out['n_train']} Test={out['n_test']}")
    

if cv_scores:
    mean_rmse  = float(np.mean([s[0] for s in cv_scores]))
    mean_qlike = float(np.mean([s[1] for s in cv_scores]))
    print(f"\nMean CV RMSE={mean_rmse:.6e} | Mean CV QLIKE={mean_qlike:.6f}")
else:
    print("No valid CV folds.")



=== Rolling CV (5 splits)===
Fold 1: RMSE=9.505968e-05 | QLIKE=1.729956 | Train=1397 Test=1398
Fold 2: RMSE=1.396738e-04 | QLIKE=1.696587 | Train=2795 Test=1398
Fold 3: RMSE=3.904055e-05 | QLIKE=1.731113 | Train=4193 Test=1397
Fold 4: RMSE=3.410794e-05 | QLIKE=1.741678 | Train=5590 Test=1397

Mean CV RMSE=7.697049e-05 | Mean CV QLIKE=1.724833


Final Evaluation 

In [69]:
final_train = train_val 
final_out = egarch_path_refit(final_train, test, dist="t", scale=100.0)
print("\n=== Final Test Results ===")
print(f"Test RMSE={final_out['rmse']:.6e} | QLIKE={final_out['qlike']:.6f}")


=== Final Test Results ===
Test RMSE=4.695800e-05 | QLIKE=1.849920


Check on why RMSE is so low - due to scaling since returns are of small magnitude 

In [70]:
print("Scale checks:")
print(f"mean|r|     = {np.mean(np.abs(r)):.3e}")
print(f"mean r^2    = {np.mean(r**2):.3e}")
print(f"std(r)^2    = {np.std(r)**2:.3e}")  


print("\nMetrics (scientific):")
print(f"RMSE       = {final_out['rmse']:.3e}")   
print(f"QLIKE      = {final_out['qlike']:.6f}")


vf = final_out['var_fc']
print("\nForecast variance summary:")
print(f"min={vf.min():.3e}  p50={np.median(vf):.3e}  mean={vf.mean():.3e}  max={vf.max():.3e}")


Scale checks:
mean|r|     = 3.077e-03
mean r^2    = 2.272e-05
std(r)^2    = 2.271e-05

Metrics (scientific):
RMSE       = 4.696e-05
QLIKE      = 1.849920

Forecast variance summary:
min=2.330e-06  p50=1.096e-05  mean=1.570e-05  max=2.477e-04
