## Imports

In [113]:
from pathlib import Path
import os, sys, shutil
import pandas as pd
import plotly.graph_objects as go
import statsmodels.api as sm
from statsmodels.api import OLS, add_constant
from statsmodels.tsa.stattools import adfuller, kpss, coint, zivot_andrews
from statsmodels.tsa.vector_ar.vecm import coint_johansen
import numpy as np

try:
    from arch.unitroot import PhillipsPerron, DFGLS
    from arch.unitroot.cointegration import phillips_ouliaris
    _HAVE_ARCH = True
except Exception:
    _HAVE_ARCH = False

%matplotlib inline

OUT_DIR = Path("arbitrage/data")
OUT_DIR.mkdir(parents=True, exist_ok=True)

sys.path.insert(0, '..')  # add project root
from src.binance_perp_ingest_full import fetch_perpetual_klines

## Functions

In [None]:
def load_perp_csv(symbol: str, interval: str, out_dir: Path) -> pd.DataFrame:
    path = out_dir / f"{symbol}_{interval}_PERP.csv"
    cols = ["open_time","open","high","low","close","volume","close_time",
            "quote_volume","trade_count","taker_base_volume","taker_quote_volume","ignore"]
    df = pd.read_csv(path, names=cols, header=0)
    df["open_time"] = pd.to_datetime(df["open_time"], unit="ms", utc=True).dt.tz_convert(None)
    for c in ["open","high","low","close","volume"]:
        df[c] = pd.to_numeric(df[c], errors="coerce")

    df = df.set_index("open_time").sort_index()
    
    return df

In [115]:
def _align_xy(x: pd.Series, y: pd.Series) -> pd.DataFrame:
    df = pd.DataFrame({"x": x, "y": y}).astype(float).dropna()
    if df.empty:
        raise ValueError("After alignment/dropna, there are no overlapping observations.")
    return df

def _adf_report(s: pd.Series, alpha=0.05):
    stat, p, usedlag, nobs, crit, icbest = adfuller(s, regression="c", autolag="AIC")
    decision = "Reject unit root → stationary" if (p < alpha and stat < crit["5%"]) else "Fail to reject unit root"
    return {
        "name": "ADF (residual, nc)",
        "stat": stat, "p": p, "crit": crit, "lags": usedlag, "nobs": nobs,
        "decision": decision
    }

def _kpss_report(s: pd.Series, alpha=0.05):
    # KPSS null: stationary
    stat, p, lags, crit = kpss(s, regression="c", nlags="auto")
    decision = "Fail to reject stationarity" if p > alpha else "Reject stationarity"
    return {"name":"KPSS (level)", "stat": stat, "p": p, "crit": crit, "lags": lags, "decision": decision}

def _pp_report(s: pd.Series, alpha=0.05):
    if not _HAVE_ARCH:
        return {"name": "Phillips–Perron", "skipped": True, "reason": "arch not installed"}
    res = PhillipsPerron(s.dropna())
    p = float(res.pvalue); stat = float(res.stat)
    crit = {"1%": res.critical_values["1%"], "5%": res.critical_values["5%"], "10%": res.critical_values["10%"]}
    decision = "Reject unit root → stationary" if p < alpha else "Fail to reject unit root"
    return {"name":"Phillips–Perron", "stat": stat, "p": p, "crit": crit, "lags": res.lags, "decision": decision}

def _dfgls_report(s: pd.Series, alpha=0.05):
    if not _HAVE_ARCH:
        return {"name": "DF-GLS", "skipped": True, "reason": "arch not installed"}
    res = DFGLS(s.dropna(), trend="c")
    p = float(res.pvalue); stat = float(res.stat)
    crit = {"1%": res.critical_values["1%"], "5%": res.critical_values["5%"], "10%": res.critical_values["10%"]}
    decision = "Reject unit root → stationary" if p < alpha else "Fail to reject unit root"
    return {"name":"DF-GLS", "stat": stat, "p": p, "crit": crit, "lags": res.lags, "decision": decision}

def _eg_report(y: pd.Series, x: pd.Series, alpha=0.05):
    tstat, p, crit = coint(y, x, trend="c", autolag="AIC")
    decision = "Reject no-cointegration" if p < alpha else "Fail to reject no-cointegration"
    return {"name":"Engle–Granger", "stat": tstat, "p": p, "crit": crit, "decision": decision}

def _po_report(y: pd.Series, x: pd.Series, alpha=0.05):
    if not _HAVE_ARCH:
        return {"name":"Phillips–Ouliaris", "skipped": True, "reason": "arch not installed"}
    res = phillips_ouliaris(y, x)  # residual-based tests; res.stat, res.pvalue, res.critical_values
    p = float(res.pvalue); stat = float(res.stat)
    crit = res.critical_values
    decision = "Reject no-cointegration" if p < alpha else "Fail to reject no-cointegration"
    return {"name":"Phillips–Ouliaris", "stat": stat, "p": p, "crit": crit, "decision": decision}

def _johansen_report(y: pd.Series, x: pd.Series):
    Z = np.column_stack([y.to_numpy(), x.to_numpy()])
    joh = coint_johansen(Z, det_order=0, k_ar_diff=1)
    # Compare stats to crit values (row-wise)
    trace = pd.DataFrame(joh.trace_stat, columns=["trace_stat"])
    trace["cv_90"], trace["cv_95"], trace["cv_99"] = joh.trace_stat_crit_vals.T
    maxeig = pd.DataFrame(joh.max_eig_stat, columns=["max_eig_stat"])
    maxeig["cv_90"], maxeig["cv_95"], maxeig["cv_99"] = joh.max_eig_stat_crit_vals.T
    rank_trace = int((trace["trace_stat"] > trace["cv_95"]).sum())  # number of rejections at 5%
    rank_max = int((maxeig["max_eig_stat"] > maxeig["cv_95"]).sum())
    return {
        "name":"Johansen",
        "rank_trace_5pct": rank_trace,
        "rank_max_5pct": rank_max,
        "trace_table": trace,
        "maxeig_table": maxeig
    }

def _za_report(s: pd.Series, alpha=0.05):
    s = pd.Series(s).dropna().astype(float)
    za = zivot_andrews(s, trim=0.15, maxlag=None, regression="c", autolag="AIC")

    # Works across statsmodels versions
    za_stat = float(getattr(za, "stat",  za[0] if hasattr(za, "__iter__") else np.nan))
    za_p    = float(getattr(za, "pvalue", za[1] if hasattr(za, "__iter__") else np.nan))
    za_cv   =      getattr(za, "critical_values", za[2] if hasattr(za, "__iter__") else {})
    za_lags = int( getattr(za, "lags", za[3] if hasattr(za, "__iter__") else -1))
    za_bp   = int( getattr(za, "breakpoint", za[4] if hasattr(za, "__iter__") else -1))

    # If you have a DatetimeIndex, show the break date too
    break_at = (s.index[za_bp] if 0 <= za_bp < len(s) else None)

    decision = "Reject unit root with break" if za_p < alpha else "Fail to reject unit root with break"
    return {
        "name": "Zivot–Andrews",
        "stat": za_stat, "p": za_p, "crit": za_cv, "lags": za_lags,
        "break_index": za_bp, "break_at": break_at,
        "decision": decision
    }

def _ou_half_life(s: pd.Series):
    # AR(1): s_t = a + rho s_{t-1} + e_t → half-life = -ln(2)/ln(rho) if 0<rho<1
    s = s.dropna().astype(float)
    y = s.iloc[1:].values
    x = s.shift(1).iloc[1:].values
    rho_model = sm.OLS(y, sm.add_constant(x)).fit()
    rho = float(rho_model.params[1])
    hl = np.inf
    if 0 < rho < 1:
        hl = -np.log(2.0) / np.log(rho)
    return {"rho": rho, "half_life_bars": hl, "reg_summary": rho_model}

def _hurst_rs(s: pd.Series, min_window=16, max_window=None, num_windows=20, seed=0):
    # Simple rescaled-range H estimate
    s = pd.Series(s).dropna().astype(float)
    n = len(s)
    if max_window is None:
        max_window = max(64, n // 6)
    rng = np.geomspace(min_window, max_window, num=num_windows).astype(int)
    rng = np.unique(rng[rng >= 8])
    RS = []
    for m in rng:
        k = n // m
        if k < 2:
            continue
        segs = s.iloc[: k*m].values.reshape(k, m)
        rs_vals = []
        for seg in segs:
            seg = seg - seg.mean()
            z = np.cumsum(seg)
            R = z.max() - z.min()
            S = seg.std(ddof=1)
            if S > 0:
                rs_vals.append(R / S)
        if rs_vals:
            RS.append((m, np.mean(rs_vals)))
    if len(RS) < 2:
        return {"H": np.nan, "points": RS}
    ln_m = np.log([p[0] for p in RS]); ln_rs = np.log([p[1] for p in RS])
    H = float(sm.OLS(ln_rs, sm.add_constant(ln_m)).fit().params[1])
    return {"H": H, "points": RS}



def pairs_trading_report(y: pd.Series, x: pd.Series, spread: pd.Series, alpha=0.05, title="Pairs Trading Report"):
    df = _align_xy(x, y)

    # Run tests
    reports = []
    reports.append(_eg_report(df["y"], df["x"], alpha=alpha))
    reports.append(_po_report(df["y"], df["x"], alpha=alpha))
    reports.append(_adf_report(spread, alpha=alpha))
    reports.append(_kpss_report(spread, alpha=alpha))
    reports.append(_pp_report(spread, alpha=alpha))
    reports.append(_dfgls_report(spread, alpha=alpha))
    reports.append(_za_report(spread, alpha=alpha))
    joh = _johansen_report(df["y"], df["x"])
    ou = _ou_half_life(spread)
    hurst = _hurst_rs(spread)

    # Build human-readable text
    lines = []
    lines += [title, f"Observations: {len(df)}",
              f"Spread mean={spread.mean():.6f}, std={spread.std(ddof=1):.6f}", "---"]

    for r in reports:
        if r.get("skipped"):
            lines += [f"{r['name']}: SKIPPED ({r['reason']})"]
            continue
        name = r["name"]; stat = r["stat"]; p = r["p"]
        if "crit" in r and isinstance(r["crit"], dict):
            crit = r["crit"]
            crit_txt = f"  crit (1%,5%,10%): {crit.get('1%')}, {crit.get('5%')}, {crit.get('10%')}"
        else:
            crit_txt = ""
        extra = []
        if "lags" in r: extra.append(f"lags={r['lags']}")
        if "nobs" in r: extra.append(f"nobs={r['nobs']}")
        lines += [f"{name}: stat={stat:.6f}, p={p:.6g}" + (f" [{', '.join(extra)}]" if extra else ""),
                  crit_txt, f"→ {r['decision']}", "---"]

    lines += [f"Johansen: rank (trace, 5%)={joh['rank_trace_5pct']}, rank (max-eig, 5%)={joh['rank_max_5pct']}",
              "---",
              f"OU half-life (bars): {ou['half_life_bars']:.3f}  (rho={ou['rho']:.6f})",
              f"Hurst (R/S): H={hurst['H']:.3f}",
              "---"]

    text = "\n".join([ln for ln in lines if ln])
    print(text)

    # Return rich object
    return {
        "title": title,
        "nobs": len(df),
        "spread": spread,
        "tests": reports, "johansen": joh, "ou": ou, "hurst": hurst,
        "text": text
    }

## Data ingestion

In [116]:
SYMBOL_A = "BTCUSDT"
SYMBOL_B = "BCHUSDT"
INTERVAL = "1h"
LOOKBACK_MINUTES = 1314000
RUN_DOWNLOAD = True

In [117]:
if RUN_DOWNLOAD:
    results = fetch_perpetual_klines(
        symbols=[SYMBOL_A, SYMBOL_B],
        out_dir=str(OUT_DIR),
        interval=INTERVAL,
        lookback_minutes=LOOKBACK_MINUTES,
        resume=True,
        pause=0.12,
        pause_between_symbols=0.25,
        retries=5,
        verbose=True,
        insecure_ssl=True,
        with_funding=False
    )
    print("Rows written:", results)
else:
    print("RUN_DOWNLOAD=False — skipping download step.")

Found 2 USDT-M perpetual symbols
Interval: 1h (step 3600s)
Global default range: start_ms=1681413177859 end_ms=1760253177859 (LOOKBACK_MINUTES=1314000)
Resume mode: ON
Output dir: arbitrage/data
[1/2] BTCUSDT: fetching klines 1681413177859..1760253177859 -> arbitrage/data/BTCUSDT_1h.csv
[1/2] BTCUSDT: wrote 21900 kline rows
[2/2] BCHUSDT: fetching klines 1681413177859..1760253177859 -> arbitrage/data/BCHUSDT_1h.csv
[2/2] BCHUSDT: wrote 21900 kline rows
Rows written: {'BTCUSDT': 21900, 'BCHUSDT': 21900}


In [118]:
df_a = load_perp_csv(SYMBOL_A, INTERVAL, OUT_DIR)
df_b = load_perp_csv(SYMBOL_B, INTERVAL, OUT_DIR)

common_index = df_a.index.intersection(df_b.index)
df_a = df_a.loc[common_index]
df_b = df_b.loc[common_index]

In [119]:
df_a

Unnamed: 0_level_0,open,high,low,close,volume,close_time,quote_volume,trade_count,taker_base_volume,taker_quote_volume,ignore
open_time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
2023-04-13 20:00:00,30323.0,30323.1,30185.0,30233.0,16141.207,1681419599999,4.881630e+08,155999,6978.600,2.110603e+08,0
2023-04-13 21:00:00,30233.0,30314.5,30225.5,30264.9,5611.276,1681423199999,1.698644e+08,80433,2899.549,8.777937e+07,0
2023-04-13 22:00:00,30265.0,30385.2,30264.9,30287.7,9426.236,1681426799999,2.858054e+08,104598,4714.084,1.429417e+08,0
2023-04-13 23:00:00,30287.8,30380.0,30262.0,30362.7,7863.663,1681430399999,2.384230e+08,86777,4032.303,1.222755e+08,0
2023-04-14 00:00:00,30362.7,30822.8,30293.0,30741.3,61770.177,1681433999999,1.893021e+09,434485,33659.410,1.031596e+09,0
...,...,...,...,...,...,...,...,...,...,...,...
2025-10-12 03:00:00,110072.2,111424.0,109950.0,110643.3,8994.562,1760241599999,9.963455e+08,304020,4898.488,5.427025e+08,0
2025-10-12 04:00:00,110643.3,111909.5,110561.8,111486.1,6082.448,1760245199999,6.772519e+08,210532,3629.325,4.041160e+08,0
2025-10-12 05:00:00,111486.1,111755.7,111142.8,111642.4,3736.053,1760248799999,4.163024e+08,128682,1848.756,2.060351e+08,0
2025-10-12 06:00:00,111642.4,112110.0,111546.0,111634.8,4509.080,1760252399999,5.039046e+08,123654,2168.702,2.423785e+08,0


In [120]:
df_b

Unnamed: 0_level_0,open,high,low,close,volume,close_time,quote_volume,trade_count,taker_base_volume,taker_quote_volume,ignore
open_time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
2023-04-13 20:00:00,130.98,130.98,130.31,130.47,12198.569,1681419599999,1.593672e+06,6167,5431.580,7.095668e+05,0
2023-04-13 21:00:00,130.47,131.13,130.41,130.79,9627.733,1681423199999,1.259167e+06,4095,6647.837,8.695685e+05,0
2023-04-13 22:00:00,130.79,131.17,130.67,131.02,9632.162,1681426799999,1.260419e+06,4470,5281.029,6.909712e+05,0
2023-04-13 23:00:00,131.02,131.15,130.72,131.02,9545.774,1681430399999,1.250241e+06,4354,4743.941,6.213648e+05,0
2023-04-14 00:00:00,131.03,133.92,130.97,133.61,69054.897,1681433999999,9.164248e+06,25409,33753.174,4.477242e+06,0
...,...,...,...,...,...,...,...,...,...,...,...
2025-10-12 03:00:00,500.20,513.67,499.82,510.00,21447.351,1760241599999,1.089024e+07,48913,11312.478,5.745627e+06,0
2025-10-12 04:00:00,509.99,517.29,509.99,512.79,15433.952,1760245199999,7.939346e+06,36622,8396.951,4.319574e+06,0
2025-10-12 05:00:00,512.79,524.09,512.66,520.27,19076.412,1760248799999,9.887886e+06,42238,10208.482,5.290620e+06,0
2025-10-12 06:00:00,520.27,521.37,517.55,520.69,24754.437,1760252399999,1.286362e+07,31734,16902.668,8.785705e+06,0


## Plots

In [121]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=df_a.index, y=df_a["close"], mode="lines", name=f"{SYMBOL_A} ({INTERVAL}) — Close"))
fig.update_layout(
title=f"{SYMBOL_A} ({INTERVAL}) — Close",
xaxis_title="Time",
yaxis_title="Price (USDT)",
hovermode="x unified"
)

In [122]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=df_b.index, y=df_a["close"], mode="lines", name=f"{SYMBOL_B} ({INTERVAL}) — Close"))
fig.update_layout(
title=f"{SYMBOL_B} ({INTERVAL}) — Close",
xaxis_title="Time",
yaxis_title="Price (USDT)",
hovermode="x unified"
)

## Linear relationship

In [123]:
x = df_a["close"].astype(float)
y = df_b["close"].astype(float)

In [124]:
X = add_constant(x)
model = OLS(y, X).fit()
beta = model.params["close"]
intercept = model.params["const"]

In [125]:
# assumes x, y, model, beta, intercept are already defined
xs = np.linspace(float(x.min()), float(x.max()), 300)
pred = model.get_prediction(sm.add_constant(xs))
ci_low, ci_high = pred.conf_int().T
ys = intercept + beta * xs

fig_xy = go.Figure()

# data
fig_xy.add_trace(go.Scattergl(
    x=x, y=y, mode="markers", name="observations",
    marker=dict(size=4, opacity=0.5)
))

# 95% CI band
fig_xy.add_trace(go.Scatter(
    x=np.concatenate([xs, xs[::-1]]),
    y=np.concatenate([ci_high, ci_low[::-1]]),
    fill="toself", name="95% CI", hoverinfo="skip",
    line=dict(width=0), opacity=0.2
))

# OLS fit
fig_xy.add_trace(go.Scatter(
    x=xs, y=ys, mode="lines",
    name=f"OLS: y = {intercept:.3f} + {beta:.3f}·x (R²={model.rsquared:.3f})",
    line=dict(width=2)
))

fig_xy.update_layout(
    title="ETH vs BTC — OLS Fit",
    xaxis_title="BTC close",
    yaxis_title="ETH close",
    hovermode="closest"
)
fig_xy.show()

## Spread

Spread = ETH - (beta * BTC + intercept) using OLS hedge rato

In [126]:
spread = y - (beta * x + intercept)

In [127]:
fig_spread = go.Figure()
fig_spread.add_trace(go.Scatter(x=spread.index, y=spread, mode="lines", name=f"Spread = {SYMBOL_B} - (β*{SYMBOL_A} + c)"))
fig_spread.update_layout(
    title="Pairs Spread",
    xaxis_title="Time",
    yaxis_title="Spread (USDT)",
    hovermode="x unified"
)
fig_spread.show()

## Statistical tests

In [128]:
spread.describe()

count    2.190000e+04
mean     9.701277e-14
std      7.454768e+01
min     -1.610639e+02
25%     -3.600046e+01
50%     -1.984434e+00
75%      3.459688e+01
max      3.533656e+02
Name: close, dtype: float64

In [129]:
result = pairs_trading_report(y, x, spread, alpha=0.05, title=f"Pairs Trading Report: {SYMBOL_B} vs {SYMBOL_A}")



numba is not available, and this function is being executed without JIT
compilation. Either install numba or reinstalling after installing Cython
is strongly recommended.


The test statistic is outside of the range of p-values available in the
look-up table. The actual p-value is smaller than the p-value returned.




Pairs Trading Report: BCHUSDT vs BTCUSDT
Observations: 21900
Spread mean=0.000000, std=74.547679
---
Engle–Granger: stat=-2.917874, p=0.13104
→ Fail to reject no-cointegration
---
Phillips–Ouliaris: stat=-2.887473, p=0.140108
→ Fail to reject no-cointegration
---
ADF (residual, nc): stat=-2.918052, p=0.0432887 [lags=47, nobs=21852]
  crit (1%,5%,10%): -3.4306492892336635, -2.861672275940045, -2.5668404067612385
→ Reject unit root → stationary
---
KPSS (level): stat=2.217472, p=0.01 [lags=90]
  crit (1%,5%,10%): 0.739, 0.463, 0.347
→ Reject stationarity
---
Phillips–Perron: stat=-2.882574, p=0.0474123 [lags=47]
  crit (1%,5%,10%): -3.4306486468187884, -2.8616719920282048, -2.566840255640472
→ Reject unit root → stationary
---
DF-GLS: stat=-1.749334, p=0.0790576 [lags=47]
  crit (1%,5%,10%): -2.5687583098676403, -1.9446269962853815, -1.6210468183465423
→ Fail to reject unit root
---
Zivot–Andrews: stat=-4.060029, p=0.315633 [lags=47]
  crit (1%,5%,10%): -5.27644, -4.81067, -4.56618
→ Fai

## Bollinger bands

In [130]:
bb_window = 24
bb_k = 2.5

mid = spread.rolling(bb_window, min_periods=bb_window//3).mean()
std = spread.rolling(bb_window, min_periods=bb_window//3).std()
upper = mid + bb_k * std
lower = mid - bb_k * std

long_sig  = (spread < lower)
short_sig = (spread > upper)


fig = go.Figure()
fig.add_trace(go.Scatter(x=spread.index, y=spread, name="Spread", mode="lines"))
fig.add_trace(go.Scatter(x=mid.index,    y=mid,    name=f"Mid (MA {bb_window})", mode="lines"))
fig.add_trace(go.Scatter(x=upper.index, y=upper, name=f"Upper (+{bb_k}\u03C3)", mode="lines"))
fig.add_trace(go.Scatter(
    x=lower.index, y=lower, name=f"Lower (-{bb_k}\u03C3)", mode="lines",
    fill="tonexty", fillcolor="rgba(0,0,0,0.08)"
))

fig.add_trace(go.Scatter(
    x=spread.index[long_sig], y=spread[long_sig],
    mode="markers", name="Long spread", marker=dict(symbol="triangle-up", size=8)
))
fig.add_trace(go.Scatter(
    x=spread.index[short_sig], y=spread[short_sig],
    mode="markers", name="Short spread", marker=dict(symbol="triangle-down", size=8)
))

fig.update_layout(
    title=f"Spread with Bollinger Bands (window={bb_window}, k={bb_k})",
    xaxis_title="Time", yaxis_title="Spread (USDT)",
    hovermode="x unified"
)
fig.show()