In [1]:
# Import the class from the Python file (module)
import pandas as pd
import matplotlib.pyplot as plt
import os
# from dotenv import load_dotenv
# from pathlib import Path
from sklearn.preprocessing import StandardScaler
import seaborn as sns
from BinanceClient import BinanceClient
import numpy as np
from typing import Final
import joblib
from BatchFeatures import BatchFeatures
from datetime import datetime, timedelta
%matplotlib widget

## Load pair df

In [2]:
import os
from datetime import datetime, timedelta, timezone

def interval_slug(s: str) -> str:
    return s.strip().replace(" ", "").replace("/", "").lower()

def make_db_name(pair: str, interval: str, weeks: int) -> str:
    return f"{pair}_{interval_slug(interval)}_{weeks}weeks.db"

def load_or_fetch_pair_df(pair: str, interval: str, weeks: int) -> tuple[str, "pd.DataFrame"]:
    db_name = make_db_name(pair, interval, weeks)
    db_path = "./db/" + db_name

    print(f"[{pair}] DB: {db_path}")

    binance_client = BinanceClient(db_path)
    binance_client.set_interval(interval)

    df = None

    if os.path.exists(db_path):
        df = binance_client.fetch_data_from_db(pair)
        if df is not None and not df.empty:
            print(f"[{pair}] Loaded {len(df):,} rows from DB.")
        else:
            df = None

    if df is None:
        print(f"[{pair}] No usable DB data found -> fetching from Binance...")

        api_secret = os.getenv("BINANCE_SECRET_KEY")
        api_key = os.getenv("BINANCE_API_KEY")
        binance_client.make(api_key, api_secret)

        server_time = binance_client.get_server_time()
        end_dt = datetime.fromtimestamp(server_time["serverTime"] / 1000, tz=timezone.utc)
        start_dt = end_dt - timedelta(weeks=weeks)

        start_ms = int(start_dt.timestamp() * 1000)
        end_ms = int(end_dt.timestamp() * 1000)

        data = binance_client.fetch_data(pair, start_ms, end_ms)
        if data is None or data.empty:
            raise RuntimeError(f"[{pair}] No data returned from Binance for the requested range.")

        binance_client.store_data_to_db(pair, data)

        df = binance_client.fetch_data_from_db(pair)
        if df is None or df.empty:
            raise RuntimeError(f"[{pair}] Data fetched/stored but DB load returned empty.")

        print(f"[{pair}] Fetched + stored + loaded {len(df):,} rows.")

    df = df.sort_index()
    return db_path, df


## Load COINS, then align timestamps

In [3]:
import pandas as pd
import numpy as np

def detect_volume_events(
    df: pd.DataFrame,
    symbol: str,
    vol_win: int = 144,          # 12 hours on 5m
    impulse_k: int = 12,         # 60 min impulse
    rvol_thresh: float = 6.0,    # strict
    impulse_thresh: float = 0.04,# +4% over impulse_k
    lookahead: int = 24,         # 2 hours forward path
    cooldown: int = 12,          # avoid logging same burst repeatedly (60 min)
):
    """
    Logs candidate 'flow shock' events:
      - RVOL spike relative to rolling median
      - Positive impulse over last impulse_k bars
    Then measures forward path stats over lookahead bars.
    """
    d = df.copy().sort_index()
    d = d[["open","high","low","close","volume"]].dropna()

    vol_med = d["volume"].rolling(vol_win).median()
    rvol = d["volume"] / vol_med
    impulse = d["close"] / d["close"].shift(impulse_k) - 1.0

    out = []
    i = 0
    n = len(d)

    while i < n - lookahead:
        if (rvol.iloc[i] >= rvol_thresh) and (impulse.iloc[i] >= impulse_thresh):
            px0 = float(d["close"].iloc[i])
            ts0 = d.index[i]

            future = d["close"].iloc[i+1:i+1+lookahead]
            fmax = float(future.max())
            fmin = float(future.min())
            max_fwd_return = fmax / px0 - 1.0
            max_drawdown = fmin / px0 - 1.0

            # retrace from the peak within the lookahead window
            # find peak time then worst after that peak
            peak_idx = future.values.argmax()
            peak_px = float(future.iloc[peak_idx])
            after_peak = future.iloc[peak_idx:]  # includes peak bar
            trough_after_peak = float(after_peak.min())
            max_retrace = trough_after_peak / peak_px - 1.0  # negative means retrace

            # time to max retrace (bars after event)
            trough_idx = after_peak.values.argmin()
            time_to_max_retrace_bars = int(peak_idx + trough_idx + 1)

            out.append({
                "symbol": symbol,
                "event_ts": ts0,
                "close_event": px0,
                "rvol": float(rvol.iloc[i]),
                "impulse": float(impulse.iloc[i]),
                "max_fwd_return": max_fwd_return,
                "max_drawdown": max_drawdown,
                "max_retrace": max_retrace,
                "time_to_max_retrace_bars": time_to_max_retrace_bars,
            })

            i += cooldown  # skip ahead so we don't log every bar of the same burst
        else:
            i += 1

    return pd.DataFrame(out)


## Get all Binance coin pairs

In [4]:
import requests
import pandas as pd

BINANCE_REST = "https://api.binance.com"

def get_spot_usdt_symbols():
    """All Spot symbols that trade against USDT and are currently TRADING."""
    info = requests.get(f"{BINANCE_REST}/api/v3/exchangeInfo", timeout=20).json()
    syms = []
    for s in info["symbols"]:
        if s.get("status") != "TRADING":
            continue
        if s.get("isSpotTradingAllowed") is not True:
            continue
        if s.get("quoteAsset") != "USDT":
            continue

        sym = s["symbol"]

        # Exclude leveraged tokens & some common non-spot-like tickers
        bad_substrings = ["UPUSDT", "DOWNUSDT", "BULLUSDT", "BEARUSDT", "3LUSDT", "3SUSDT", "5LUSDT", "5SUSDT"]
        if any(sym.endswith(x) for x in bad_substrings):
            continue

        syms.append(sym)
    return sorted(set(syms))

def rank_symbols_by_quote_volume(symbols):
    """Return DataFrame of symbols with 24h quoteVolume (USDT) sorted desc."""
    tickers = requests.get(f"{BINANCE_REST}/api/v3/ticker/24hr", timeout=20).json()
    # Build a map for fast lookup
    wanted = set(symbols)

    rows = []
    for t in tickers:
        sym = t.get("symbol")
        if sym not in wanted:
            continue
        # quoteVolume is in quoteAsset units, here USDT
        qv = float(t.get("quoteVolume", 0.0))
        rows.append({
            "symbol": sym,
            "quoteVolumeUSDT_24h": qv,
            "lastPrice": float(t.get("lastPrice", 0.0)),
            "priceChangePercent": float(t.get("priceChangePercent", 0.0)),
            "count": int(t.get("count", 0)),  # trade count 24h
        })

    df = pd.DataFrame(rows)
    df = df.sort_values("quoteVolumeUSDT_24h", ascending=False).reset_index(drop=True)
    return df

def get_top_usdt_pairs(n=100, min_quote_vol_usdt=None):
    """Top-N by 24h quote volume; optionally filter by minimum quote volume."""
    syms = get_spot_usdt_symbols()
    ranked = rank_symbols_by_quote_volume(syms)

    if min_quote_vol_usdt is not None:
        ranked = ranked[ranked["quoteVolumeUSDT_24h"] >= float(min_quote_vol_usdt)].copy()

    top = ranked.head(n).copy()
    return top, ranked


In [5]:
top100, ranked_all = get_top_usdt_pairs(n=100)
pairs = top100["symbol"].tolist()

len(pairs)


100

In [6]:
interval = "5m"
weeks = 52

paths = {}
dfs = {}

for sym in pairs:
    db_path, df = load_or_fetch_pair_df(sym, interval, weeks)
    paths[sym] = db_path
    dfs[sym] = df
    dfs[sym] = df


[USDCUSDT] DB: ./db/USDCUSDT_5m_52weeks.db
[USDCUSDT] Loaded 104,832 rows from DB.
[BTCUSDT] DB: ./db/BTCUSDT_5m_52weeks.db
[BTCUSDT] Loaded 104,832 rows from DB.
[ETHUSDT] DB: ./db/ETHUSDT_5m_52weeks.db
[ETHUSDT] Loaded 104,832 rows from DB.
[FOGOUSDT] DB: ./db/FOGOUSDT_5m_52weeks.db
[FOGOUSDT] Loaded 2,907 rows from DB.
[SOLUSDT] DB: ./db/SOLUSDT_5m_52weeks.db
[SOLUSDT] Loaded 104,832 rows from DB.
[PAXGUSDT] DB: ./db/PAXGUSDT_5m_52weeks.db
[PAXGUSDT] Loaded 104,832 rows from DB.
[USD1USDT] DB: ./db/USD1USDT_5m_52weeks.db
[USD1USDT] Loaded 71,475 rows from DB.
[FDUSDUSDT] DB: ./db/FDUSDUSDT_5m_52weeks.db
[FDUSDUSDT] Loaded 104,832 rows from DB.
[XRPUSDT] DB: ./db/XRPUSDT_5m_52weeks.db
[XRPUSDT] Loaded 104,832 rows from DB.
[ZECUSDT] DB: ./db/ZECUSDT_5m_52weeks.db
[ZECUSDT] Loaded 104,832 rows from DB.
[BNBUSDT] DB: ./db/BNBUSDT_5m_52weeks.db
[BNBUSDT] Loaded 104,832 rows from DB.
[DOGEUSDT] DB: ./db/DOGEUSDT_5m_52weeks.db
[DOGEUSDT] Loaded 104,832 rows from DB.
[SUIUSDT] DB: ./db/SUI

## Detect Comporession state

In [7]:
import pandas as pd
import numpy as np

def detect_compression_state(
    df: pd.DataFrame,
    *,
    atr_short: int = 20,          # ~100 min on 5m
    atr_long: int = 100,          # ~8 hours on 5m
    vol_win: int = 144,           # volume median window (12 hours)
    vol_ratio_thresh: float = 0.6,
    rvol_thresh: float = 0.6,
    min_duration: int = 12        # bars of sustained compression (60 min)
):
    """
    Detects pre-shock compression state.

    Returns df with added columns:
      - atr
      - atr_med
      - vol_compression
      - rvol
      - volu_compression
      - compression_raw
      - compression_duration
      - is_compressed
    """

    d = df.copy().sort_index()
    d = d[["open","high","low","close","volume"]].dropna()

    # -----------------------
    # 1) Volatility (ATR)
    # -----------------------
    high = d["high"]
    low  = d["low"]
    close = d["close"]

    tr = pd.concat([
        high - low,
        (high - close.shift()).abs(),
        (low - close.shift()).abs()
    ], axis=1).max(axis=1)

    d["atr"] = tr.rolling(atr_short).mean()
    d["atr_med"] = d["atr"].rolling(atr_long).median()

    d["vol_compression"] = d["atr"] / d["atr_med"]
    d["is_vol_compressed"] = d["vol_compression"] <= vol_ratio_thresh

    # -----------------------
    # 2) Volume compression
    # -----------------------
    vol_med = d["volume"].rolling(vol_win).median()
    d["rvol"] = d["volume"] / vol_med
    d["is_volume_compressed"] = d["rvol"] <= rvol_thresh

    # -----------------------
    # 3) Raw compression flag
    # -----------------------
    d["compression_raw"] = (
        d["is_vol_compressed"] &
        d["is_volume_compressed"]
    )

    # -----------------------
    # 4) Duration counter
    # -----------------------
    duration = np.zeros(len(d), dtype=int)

    for i in range(1, len(d)):
        if d["compression_raw"].iloc[i]:
            duration[i] = duration[i-1] + 1
        else:
            duration[i] = 0

    d["compression_duration"] = duration

    # -----------------------
    # 5) Final state
    # -----------------------
    d["is_compressed"] = d["compression_duration"] >= min_duration

    return d


In [8]:
df = dfs["BTCUSDT"]
df_c = detect_compression_state(df)

df_c[["vol_compression","rvol","compression_duration","is_compressed"]].tail(30)


Unnamed: 0_level_0,vol_compression,rvol,compression_duration,is_compressed
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2026-01-24 12:50:00,0.868259,1.387158,0,False
2026-01-24 12:55:00,0.837331,1.140229,0,False
2026-01-24 13:00:00,0.797691,0.936581,0,False
2026-01-24 13:05:00,0.779997,0.651361,0,False
2026-01-24 13:10:00,0.775064,0.883699,0,False
2026-01-24 13:15:00,0.7865,2.852841,0,False
2026-01-24 13:20:00,0.698898,1.816785,0,False
2026-01-24 13:25:00,0.675804,0.651221,0,False
2026-01-24 13:30:00,0.65734,0.881418,0,False
2026-01-24 13:35:00,0.694816,0.81219,0,False


In [9]:
df_c["is_compressed"].mean()

np.float64(0.0025660103785103785)

In [12]:
def get_compression_times(df_c):
    """
    df_c = output of detect_compression_state(df)
    Returns DatetimeIndex of bars where compression starts
    (we take the FIRST bar of each compression block)
    """
    comp = df_c["is_compressed"]

    # compression start = is_compressed == True AND previous == False
    comp_start = comp & (~comp.shift(1).fillna(False))

    return df_c.index[comp_start]

def get_shock_times(events_sym):
    """
    events_sym = events filtered to one symbol
    Returns DatetimeIndex of shock timestamps
    """
    return pd.to_datetime(events_sym["event_ts"]).sort_values()

def compression_leads_shock(
    compression_times,
    shock_times,
    *,
    max_lead_minutes=120
):
    """
    For each compression start time, check whether
    a shock occurs within max_lead_minutes AFTER it.
    """
    lead_td = pd.Timedelta(minutes=max_lead_minutes)

    hits = 0
    for t in compression_times:
        if ((shock_times > t) & (shock_times <= t + lead_td)).any():
            hits += 1

    total = len(compression_times)

    return {
        "compression_events": total,
        "hits": hits,
        "hit_rate": hits / total if total > 0 else np.nan
    }



In [14]:
events = []

for sym, df in dfs.items():
    ev = detect_volume_events(df, sym)
    events.append(ev)

# Convert list of DataFrames into a single DataFrame
events = pd.concat(events, ignore_index=True)

type(events), events.head()


(pandas.core.frame.DataFrame,
     symbol            event_ts  close_event       rvol   impulse  \
 0  BTCUSDT 2025-03-02 16:15:00     89370.23  35.169505  0.049267   
 1  BTCUSDT 2025-03-02 17:20:00     92623.03  24.089593  0.040878   
 2  BTCUSDT 2025-04-07 14:15:00     80243.10  12.829945  0.040335   
 3  BTCUSDT 2025-04-09 17:20:00     80744.00  27.602926  0.040441   
 4  BTCUSDT 2025-10-10 22:15:00    113213.78   6.726021  0.088853   
 
    max_fwd_return  max_drawdown  max_retrace  time_to_max_retrace_bars  
 0        0.058518     -0.004590    -0.012570                        23  
 1        0.021344     -0.001134    -0.022009                        18  
 2       -0.010208     -0.030371    -0.020371                        11  
 3        0.026891      0.000949    -0.013591                        19  
 4        0.012397     -0.012771    -0.024860                        24  )

In [18]:
import numpy as np
import pandas as pd

# -----------------------------
# Helpers: ATR + linear slope
# -----------------------------
def _atr14(series_df: pd.DataFrame, n: int = 14) -> pd.Series:
    h = series_df["high"]
    l = series_df["low"]
    c = series_df["close"]
    tr = pd.concat(
        [(h - l), (h - c.shift()).abs(), (l - c.shift()).abs()],
        axis=1
    ).max(axis=1)
    return tr.rolling(n).mean()

def _lin_slope(y: np.ndarray) -> float:
    """Slope of y versus index (0..n-1). Returns 0 if too short or constant."""
    y = np.asarray(y, dtype=float)
    n = len(y)
    if n < 3 or np.all(~np.isfinite(y)):
        return np.nan
    x = np.arange(n, dtype=float)
    y = y - np.nanmean(y)
    x = x - x.mean()
    denom = np.nansum(x * x)
    return float(np.nansum(x * y) / denom) if denom > 0 else 0.0

# -----------------------------
# Candle overlap + wick/body
# -----------------------------
def _overlap_ratio(window: pd.DataFrame) -> float:
    """Mean normalized overlap between consecutive candles (0..1-ish)."""
    if len(window) < 2:
        return np.nan
    h = window["high"].to_numpy(dtype=float)
    l = window["low"].to_numpy(dtype=float)
    rng = (h - l)
    rng[rng == 0] = np.nan

    # overlap between t and t-1
    ov = np.minimum(h[1:], h[:-1]) - np.maximum(l[1:], l[:-1])
    ov = np.maximum(ov, 0.0)
    # normalize by current candle range
    ov_norm = ov / rng[1:]
    return float(np.nanmean(ov_norm))

def _wick_to_body_ratio(window: pd.DataFrame) -> float:
    """(upper+lower wick)/body averaged over window."""
    o = window["open"].to_numpy(dtype=float)
    c = window["close"].to_numpy(dtype=float)
    h = window["high"].to_numpy(dtype=float)
    l = window["low"].to_numpy(dtype=float)

    body = np.abs(c - o)
    body[body == 0] = np.nan

    upper = h - np.maximum(o, c)
    lower = np.minimum(o, c) - l
    wick = upper + lower
    return float(np.nanmean(wick / body))

# -----------------------------
# Feature extraction for one window
# -----------------------------
def extract_window_features(
    d: pd.DataFrame,
    t0: pd.Timestamp,
    *,
    pre_bars: int = 36,        # 180 min
    gap_bars: int = 2,         # exclude last 10 min
    atr_long_med_win: int = 144 # 12h baseline for median
) -> dict | None:
    """
    Compute pre-shock window features for a single event time t0.

    Window is: [t0 - pre_bars, t0 - gap_bars]
    Assumes d is indexed by datetime and has open/high/low/close/volume.
    """
    if not isinstance(d.index, pd.DatetimeIndex):
        raise ValueError("df must have a DatetimeIndex")

    # Ensure sorted
    d = d.sort_index()

    # Need baseline columns
    if "atr14" not in d.columns:
        d["atr14"] = _atr14(d, 14)
    if "vol_med_12h" not in d.columns:
        d["vol_med_12h"] = d["volume"].rolling(atr_long_med_win).median()
    if "rvol_12h" not in d.columns:
        d["rvol_12h"] = d["volume"] / d["vol_med_12h"]
    if "atr14_med_12h" not in d.columns:
        d["atr14_med_12h"] = d["atr14"].rolling(atr_long_med_win).median()

    # Locate t0
    if t0 not in d.index:
        # if exact timestamp missing, align to nearest previous bar
        loc = d.index.searchsorted(t0, side="right") - 1
        if loc < 0:
            return None
        t0 = d.index[loc]

    end_loc = d.index.get_loc(t0) - gap_bars
    start_loc = end_loc - pre_bars + 1

    if start_loc < 0 or end_loc <= start_loc:
        return None

    w = d.iloc[start_loc:end_loc+1].copy()
    if len(w) < max(10, pre_bars // 2):
        return None

    # Baselines at window end (most relevant "current regime")
    atr_med = d["atr14_med_12h"].iloc[end_loc]
    if not np.isfinite(atr_med) or atr_med <= 0:
        return None

    # Features
    atr_mean = float(np.nanmean(w["atr14"].to_numpy(dtype=float)))
    atr_mean_ratio = atr_mean / float(atr_med)

    atr_slope = _lin_slope(w["atr14"].to_numpy(dtype=float))
    rvol_mean = float(np.nanmean(w["rvol_12h"].to_numpy(dtype=float)))
    rvol_slope = _lin_slope(w["rvol_12h"].to_numpy(dtype=float))

    window_high = float(np.nanmax(w["high"]))
    window_low  = float(np.nanmin(w["low"]))
    window_close = float(w["close"].iloc[-1])
    range_pct = (window_high - window_low) / window_close if window_close != 0 else np.nan

    overlap_ratio = _overlap_ratio(w)
    wick_to_body = _wick_to_body_ratio(w)

    return {
        "t0": t0,
        "win_start": w.index[0],
        "win_end": w.index[-1],
        "atr_mean_ratio": atr_mean_ratio,
        "atr_slope": atr_slope,
        "rvol_mean": rvol_mean,
        "rvol_slope": rvol_slope,
        "range_pct": range_pct,
        "overlap_ratio": overlap_ratio,
        "wick_to_body_ratio": wick_to_body,
    }

# -----------------------------
# Build dataset: shock vs control
# -----------------------------
def build_preshock_dataset(
    dfs: dict[str, pd.DataFrame],
    events: pd.DataFrame,
    *,
    pre_bars: int = 36,
    gap_bars: int = 2,
    control_per_event: int = 1,
    control_lookahead_bars: int = 36,   # control must have NO shock in next 3h
    seed: int = 42,
) -> pd.DataFrame:
    """
    Returns a labeled dataset with:
      label=1 for pre-shock windows
      label=0 for matched control windows (same symbol, same window length)
    """
    rng = np.random.default_rng(seed)

    ev = events.copy()
    ev["event_ts"] = pd.to_datetime(ev["event_ts"])
    ev = ev.sort_values(["symbol", "event_ts"])

    rows = []

    # Precompute shock times per symbol for fast control selection
    shock_times_by_sym = {
        sym: pd.to_datetime(g["event_ts"]).sort_values().to_numpy(dtype="datetime64[ns]")
        for sym, g in ev.groupby("symbol")
    }

    for sym, g in ev.groupby("symbol"):
        if sym not in dfs or dfs[sym].empty:
            continue

        d = dfs[sym].copy().sort_index()
        d = d[["open","high","low","close","volume"]].dropna()
        if len(d) < (pre_bars + gap_bars + 200):
            continue

        # Make sure index is datetime
        if not isinstance(d.index, pd.DatetimeIndex):
            raise ValueError(f"{sym} df index must be DatetimeIndex")

        # Precompute baseline columns once per symbol
        d["atr14"] = _atr14(d, 14)
        d["vol_med_12h"] = d["volume"].rolling(144).median()
        d["rvol_12h"] = d["volume"] / d["vol_med_12h"]
        d["atr14_med_12h"] = d["atr14"].rolling(144).median()

        shock_times = shock_times_by_sym.get(sym, np.array([], dtype="datetime64[ns]"))

        for t0 in pd.to_datetime(g["event_ts"]):
            # --- Positive (pre-shock) ---
            feat = extract_window_features(
                d, t0, pre_bars=pre_bars, gap_bars=gap_bars
            )
            if feat is None:
                continue

            feat.update({"symbol": sym, "label": 1})
            rows.append(feat)

            # --- Controls (non-shock) ---
            # We sample control windows that are NOT too close to any shock:
            # specifically: no shock in [t_ctrl, t_ctrl + control_lookahead_bars]
            # and the control window itself must exist.
            if control_per_event <= 0:
                continue

            # Candidate range: pick end_loc such that start_loc >= 0 and end_loc within df
            end_min = pre_bars + gap_bars  # minimal index for t0 location
            end_max = len(d) - control_lookahead_bars - 1
            if end_max <= end_min:
                continue

            # Convert shock_times to pandas index positions for quick exclusion
            # We'll just exclude by time comparisons (simpler, good enough)
            attempts = 0
            got = 0
            while got < control_per_event and attempts < 200:
                attempts += 1
                end_loc = int(rng.integers(end_min, end_max))
                t_ctrl = d.index[end_loc]  # treat as "event time"

                # Exclude if within pre_bars of start or near this shock t0 to avoid leakage
                if abs((t_ctrl - feat["t0"]).total_seconds()) < 6 * 3600:
                    continue

                # Check: no shock in next lookahead window
                t1 = t_ctrl
                t2 = t_ctrl + pd.Timedelta(minutes=5 * control_lookahead_bars)

                if shock_times.size > 0:
                    # numpy datetime64 compare
                    t1n = np.datetime64(t1.to_datetime64())
                    t2n = np.datetime64(t2.to_datetime64())
                    has_future_shock = np.any((shock_times > t1n) & (shock_times <= t2n))
                    if has_future_shock:
                        continue

                feat_c = extract_window_features(
                    d, t_ctrl, pre_bars=pre_bars, gap_bars=gap_bars
                )
                if feat_c is None:
                    continue

                feat_c.update({"symbol": sym, "label": 0})
                rows.append(feat_c)
                got += 1

    return pd.DataFrame(rows)


In [19]:
# Build dataset
ds = build_preshock_dataset(
    dfs=dfs,
    events=events,
    pre_bars=36,            # 180 min
    gap_bars=2,             # exclude last 10 min
    control_per_event=1,    # start with 1:1 matching
    control_lookahead_bars=36,  # control has no shock in next 3h
    seed=42
)

ds.shape, ds["label"].value_counts()


((10076, 12),
 label
 1    5038
 0    5038
 Name: count, dtype: int64)

In [20]:
# Any NaNs?
ds.isna().mean().sort_values(ascending=False).head(10)

# Compare distributions quickly (shock vs control)
cols = ["atr_mean_ratio","atr_slope","rvol_mean","rvol_slope","range_pct","overlap_ratio","wick_to_body_ratio"]
ds.groupby("label")[cols].median().T


label,0,1
atr_mean_ratio,1.00518,1.209002
atr_slope,-8.918e-08,1.7e-05
rvol_mean,1.457157,2.520168
rvol_slope,-0.001365018,0.054635
range_pct,0.02537937,0.054166
overlap_ratio,0.5745321,0.579448
wick_to_body_ratio,1.448094,1.871671


In [21]:
def compute_instability_features_over_window(
    d: pd.DataFrame,
    *,
    pre_bars: int = 36,
    gap_bars: int = 2,
    vol_med_win: int = 144
) -> pd.DataFrame:
    """
    Adds rolling-window (per bar) pre-shock features aligned to each bar t0,
    computed from [t0-pre_bars, t0-gap_bars].
    Output columns are same as dataset: atr_mean_ratio, atr_slope, rvol_mean, rvol_slope,
    range_pct, wick_to_body_ratio.
    """
    df = d.copy().sort_index()
    df = df[["open","high","low","close","volume"]].dropna()

    # precompute base series
    df["atr14"] = _atr14(df, 14)
    df["atr14_med_12h"] = df["atr14"].rolling(vol_med_win).median()
    df["vol_med_12h"] = df["volume"].rolling(vol_med_win).median()
    df["rvol_12h"] = df["volume"] / df["vol_med_12h"]

    # container columns
    cols = ["atr_mean_ratio","atr_slope","rvol_mean","rvol_slope","range_pct","wick_to_body_ratio"]
    for c in cols:
        df[c] = np.nan

    # iterate bars (vectorizing slopes/wicks is possible later; keep correct first)
    for end_loc in range(pre_bars + gap_bars, len(df)):
        t0 = df.index[end_loc]
        # window ends at end_loc-gap_bars
        w_end = end_loc - gap_bars
        w_start = w_end - pre_bars + 1
        w = df.iloc[w_start:w_end+1]

        atr_med = df["atr14_med_12h"].iloc[w_end]
        if not np.isfinite(atr_med) or atr_med <= 0:
            continue

        atr_mean = float(np.nanmean(w["atr14"].to_numpy(dtype=float)))
        df.at[t0, "atr_mean_ratio"] = atr_mean / float(atr_med)

        df.at[t0, "atr_slope"] = _lin_slope(w["atr14"].to_numpy(dtype=float))

        df.at[t0, "rvol_mean"] = float(np.nanmean(w["rvol_12h"].to_numpy(dtype=float)))
        df.at[t0, "rvol_slope"] = _lin_slope(w["rvol_12h"].to_numpy(dtype=float))

        window_high = float(np.nanmax(w["high"]))
        window_low  = float(np.nanmin(w["low"]))
        window_close = float(w["close"].iloc[-1])
        df.at[t0, "range_pct"] = (window_high - window_low) / window_close if window_close else np.nan

        df.at[t0, "wick_to_body_ratio"] = _wick_to_body_ratio(w)

    return df


def classify_instability(df_feat: pd.DataFrame) -> pd.Series:
    """
    Returns boolean Series indexed like df_feat: True when in pre-shock instability state.
    """
    return (
        (df_feat["rvol_mean"] >= 2.0) &
        (df_feat["rvol_slope"] > 0.0) &
        (df_feat["atr_mean_ratio"] >= 1.10) &
        (df_feat["wick_to_body_ratio"] >= 1.60) &
        (df_feat["range_pct"] >= 0.035)
    )


In [22]:
sym = "BTCUSDT"
df_feat = compute_instability_features_over_window(dfs[sym])
df_feat["is_instability"] = classify_instability(df_feat)

df_feat["is_instability"].mean(), df_feat["is_instability"].tail(50)


(np.float64(0.007659874847374847),
 timestamp
 2026-01-24 11:10:00    False
 2026-01-24 11:15:00    False
 2026-01-24 11:20:00    False
 2026-01-24 11:25:00    False
 2026-01-24 11:30:00    False
 2026-01-24 11:35:00    False
 2026-01-24 11:40:00    False
 2026-01-24 11:45:00    False
 2026-01-24 11:50:00    False
 2026-01-24 11:55:00    False
 2026-01-24 12:00:00    False
 2026-01-24 12:05:00    False
 2026-01-24 12:10:00    False
 2026-01-24 12:15:00    False
 2026-01-24 12:20:00    False
 2026-01-24 12:25:00    False
 2026-01-24 12:30:00    False
 2026-01-24 12:35:00    False
 2026-01-24 12:40:00    False
 2026-01-24 12:45:00    False
 2026-01-24 12:50:00    False
 2026-01-24 12:55:00    False
 2026-01-24 13:00:00    False
 2026-01-24 13:05:00    False
 2026-01-24 13:10:00    False
 2026-01-24 13:15:00    False
 2026-01-24 13:20:00    False
 2026-01-24 13:25:00    False
 2026-01-24 13:30:00    False
 2026-01-24 13:35:00    False
 2026-01-24 13:40:00    False
 2026-01-24 13:45:00    

In [24]:
def best_threshold_youden(ds: pd.DataFrame, feature: str, higher_is_more_shock: bool = True):
    x = ds[feature].to_numpy(dtype=float)
    y = ds["label"].to_numpy(dtype=int)

    # candidate thresholds = percentiles (fast, robust)
    qs = np.linspace(5, 95, 91)
    thrs = np.percentile(x[np.isfinite(x)], qs)

    best = None
    for thr in thrs:
        pred = (x >= thr) if higher_is_more_shock else (x <= thr)
        pred = pred & np.isfinite(x)

        tp = np.sum((pred == 1) & (y == 1))
        fp = np.sum((pred == 1) & (y == 0))
        tn = np.sum((pred == 0) & (y == 0))
        fn = np.sum((pred == 0) & (y == 1))

        tpr = tp / (tp + fn) if (tp + fn) else 0
        fpr = fp / (fp + tn) if (fp + tn) else 0
        j = tpr - fpr

        if best is None or j > best["J"]:
            best = {"feature": feature, "thr": float(thr), "TPR": tpr, "FPR": fpr, "J": j}

    return best

features = ["rvol_mean","rvol_slope","atr_mean_ratio","wick_to_body_ratio","range_pct"]
best_list = [best_threshold_youden(ds, f, higher_is_more_shock=True) for f in features]
pd.DataFrame(best_list).sort_values("J", ascending=False)


Unnamed: 0,feature,thr,TPR,FPR,J
4,range_pct,0.036742,0.829297,0.270742,0.558555
1,rvol_slope,0.032369,0.607185,0.212783,0.394403
0,rvol_mean,2.099454,0.602422,0.257642,0.34478
2,atr_mean_ratio,1.116728,0.613934,0.306074,0.30786
3,wick_to_body_ratio,1.341384,0.751687,0.548233,0.203454


In [25]:
def mark_future_shock(df_feat: pd.DataFrame, events: pd.DataFrame, horizon_bars: int):
    """
    For each bar in df_feat, mark whether a shock occurs within the next horizon_bars.
    """
    shock_times = pd.to_datetime(events["event_ts"]).values

    df = df_feat.copy()
    df["future_shock"] = False

    idx = df.index.values
    for i in range(len(idx)):
        t0 = idx[i]
        t1 = idx[min(i + horizon_bars, len(idx) - 1)]
        # any shock in (t0, t1]?
        df.iloc[i, df.columns.get_loc("future_shock")] = np.any(
            (shock_times > t0) & (shock_times <= t1)
        )

    return df


In [27]:
def classify_pre_shock_instability_v1(df_feat: pd.DataFrame) -> pd.Series:
    """
    Rule-based pre-shock instability classifier.
    Calibrated via Youden J on your dataset.
    """
    return (
        (df_feat["range_pct"] >= 0.037) &
        (df_feat["rvol_slope"] >= 0.032) &
        (df_feat["rvol_mean"] >= 2.10)
    )


In [28]:
sym = "BTCUSDT"

df_feat = compute_instability_features_over_window(dfs[sym])
df_feat["is_instability"] = classify_pre_shock_instability_v1(df_feat)

ev = events.query("symbol == @sym")
df_feat = mark_future_shock(df_feat, ev, horizon_bars=24)  # 120 min

p_base = df_feat["future_shock"].mean()
p_cond = df_feat.loc[df_feat["is_instability"], "future_shock"].mean()

p_base, p_cond, p_cond / p_base


(np.float64(0.0010397588522588523),
 np.float64(0.04932735426008968),
 np.float64(47.44114864030937))

In [19]:
import numpy as np
import pandas as pd

# ------------------------------------------------------------
# Helpers: ATR and simple linear slope
# ------------------------------------------------------------
def _atr(series_df: pd.DataFrame, n: int = 14) -> pd.Series:
    h = series_df["high"]
    l = series_df["low"]
    c = series_df["close"]
    tr = pd.concat([(h - l), (h - c.shift()).abs(), (l - c.shift()).abs()], axis=1).max(axis=1)
    return tr.rolling(n).mean()

def _lin_slope(y: np.ndarray) -> float:
    y = np.asarray(y, dtype=float)
    n = len(y)
    if n < 3 or np.all(~np.isfinite(y)):
        return np.nan
    x = np.arange(n, dtype=float)
    ym = np.nanmean(y)
    y = y - ym
    x = x - x.mean()
    denom = np.nansum(x * x)
    return float(np.nansum(x * y) / denom) if denom > 0 else 0.0

def _wick_to_body_ratio(window: pd.DataFrame) -> float:
    o = window["open"].to_numpy(dtype=float)
    c = window["close"].to_numpy(dtype=float)
    h = window["high"].to_numpy(dtype=float)
    l = window["low"].to_numpy(dtype=float)

    body = np.abs(c - o)
    body[body == 0] = np.nan

    upper = h - np.maximum(o, c)
    lower = np.minimum(o, c) - l
    wick = upper + lower

    x = wick / body
    x = x[np.isfinite(x)]
    return float(np.mean(x)) if x.size else np.nan

# ------------------------------------------------------------
# Compute rolling/window features aligned to each bar t0
# NOTE: This is still the expensive step. We cache its output.
# ------------------------------------------------------------
def compute_instability_features_over_window(
    d: pd.DataFrame,
    *,
    pre_bars: int = 36,
    gap_bars: int = 2,
    vol_med_win: int = 144,   # 12h baseline
    atr_n: int = 14
) -> pd.DataFrame:
    df = d.copy().sort_index()
    df = df[["open","high","low","close","volume"]].dropna()

    if not isinstance(df.index, pd.DatetimeIndex):
        raise ValueError("df index must be DatetimeIndex")

    # base series (computed once)
    df["atr14"] = _atr(df, atr_n)
    df["atr14_med_12h"] = df["atr14"].rolling(vol_med_win).median()
    df["vol_med_12h"] = df["volume"].rolling(vol_med_win).median()
    df["rvol_12h"] = df["volume"] / df["vol_med_12h"]

    # features to fill
    cols = ["atr_mean_ratio","atr_slope","rvol_mean","rvol_slope","range_pct","wick_to_body_ratio"]
    for c in cols:
        df[c] = np.nan

    # Python loop (heavy). Keep correct first; optimize later if needed.
    for end_loc in range(pre_bars + gap_bars, len(df)):
        t0 = df.index[end_loc]
        w_end = end_loc - gap_bars
        w_start = w_end - pre_bars + 1

        w = df.iloc[w_start:w_end+1]
        atr_med = df["atr14_med_12h"].iloc[w_end]
        if not np.isfinite(atr_med) or atr_med <= 0:
            continue

        atr_mean = float(np.nanmean(w["atr14"].to_numpy(dtype=float)))
        df.at[t0, "atr_mean_ratio"] = atr_mean / float(atr_med)
        df.at[t0, "atr_slope"] = _lin_slope(w["atr14"].to_numpy(dtype=float))

        rvol_arr = w["rvol_12h"].to_numpy(dtype=float)
        df.at[t0, "rvol_mean"] = float(np.nanmean(rvol_arr))
        df.at[t0, "rvol_slope"] = _lin_slope(rvol_arr)

        window_high = float(np.nanmax(w["high"]))
        window_low  = float(np.nanmin(w["low"]))
        window_close = float(w["close"].iloc[-1])
        df.at[t0, "range_pct"] = (window_high - window_low) / window_close if window_close else np.nan

        df.at[t0, "wick_to_body_ratio"] = _wick_to_body_ratio(w)

    return df

# ------------------------------------------------------------
# Instability classifier (your v1, based on Youden J results)
# ------------------------------------------------------------
def classify_pre_shock_instability_v1(df_feat: pd.DataFrame) -> pd.Series:
    return (
        (df_feat["range_pct"] >= 0.037) &
        (df_feat["rvol_slope"] >= 0.032) &
        (df_feat["rvol_mean"] >= 2.10)
    )

# ------------------------------------------------------------
# FIX #2: Fast "future shock within horizon" flag (vectorized)
# ------------------------------------------------------------
def mark_future_shock_fast(df_feat: pd.DataFrame, events_sym: pd.DataFrame, horizon_bars: int) -> pd.DataFrame:
    df = df_feat.copy()
    df["future_shock"] = False

    if events_sym.empty:
        return df

    # Convert shock times -> indices in df.index
    shock_times = pd.to_datetime(events_sym["event_ts"])
    shock_pos = df.index.searchsorted(shock_times)

    # Keep only valid positions
    shock_pos = shock_pos[(shock_pos >= 0) & (shock_pos < len(df))]
    if len(shock_pos) == 0:
        return df

    shock_flag = np.zeros(len(df), dtype=int)
    shock_flag[shock_pos] = 1

    # any shock in the next horizon_bars?
    # We shift "backwards" by -horizon to align "future window starting now"
    future_any = (
        pd.Series(shock_flag, index=df.index)
        .rolling(horizon_bars, min_periods=1)
        .sum()
        .shift(-horizon_bars)
        .fillna(0)
        .gt(0)
    )

    df["future_shock"] = future_any
    return df

In [20]:
from tqdm.auto import tqdm
import time

# ============================================================
# FIX #1: Cache instability once per symbol (WITH PROGRESS)
# ============================================================
instability_cache = {}

t0 = time.time()

for sym in tqdm(pairs, desc="Computing instability per symbol"):
    if sym not in dfs:
        continue

    df = dfs[sym]
    if df is None or df.empty:
        continue

    df_feat = compute_instability_features_over_window(df)
    df_feat["is_instability"] = classify_pre_shock_instability_v1(df_feat)

    instability_cache[sym] = df_feat[["is_instability"]]

t1 = time.time()
print(f"Instability cache built in {(t1 - t0)/60:.2f} minutes")


Computing instability per symbol:   0%|          | 0/100 [00:00<?, ?it/s]

  return float(np.nanmean(wick / body))


KeyboardInterrupt: 

In [None]:
import time
import numpy as np
import pandas as pd
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm.auto import tqdm

# ---- Worker must be TOP-LEVEL for multiprocessing (do not nest it) ----
def _worker_compute_instability(sym: str, df: pd.DataFrame):
    """
    Returns (sym, is_instability_df) where is_instability_df has one column: ['is_instability']
    """
    if df is None or df.empty:
        return sym, None

    df_feat = compute_instability_features_over_window(df)
    df_feat["is_instability"] = classify_pre_shock_instability_v1(df_feat)

    # Return only the small piece to reduce inter-process overhead
    out = df_feat[["is_instability"]].copy()
    return sym, out


def build_instability_cache_parallel(dfs: dict, pairs: list, n_workers: int = 12):
    tasks = [(sym, dfs[sym]) for sym in pairs if sym in dfs and dfs[sym] is not None and not dfs[sym].empty]

    instability_cache = {}
    t0 = time.time()

    with ProcessPoolExecutor(max_workers=n_workers) as ex:
        futures = {ex.submit(_worker_compute_instability, sym, df): sym for sym, df in tasks}

        for fut in tqdm(as_completed(futures), total=len(futures), desc=f"Instability x{n_workers}"):
            sym = futures[fut]
            try:
                sym2, out = fut.result()
                if out is not None:
                    instability_cache[sym2] = out
            except Exception as e:
                print(f"[{sym}] failed: {e}")

    t1 = time.time()
    print(f"Instability cache built for {len(instability_cache)}/{len(tasks)} symbols in {(t1-t0)/60:.2f} min")
    return instability_cache


# Choose how many cores to use
N_WORKERS = 4  # you said 10–12 is fine
instability_cache = build_instability_cache_parallel(dfs, pairs, n_workers=N_WORKERS)


In [18]:
# ============================================================
# FIX #3: Conditional shock probability (FAST, WITH PROGRESS)
# ============================================================
rows = []
horizon_bars = 24

t0 = time.time()

for sym in tqdm(pairs, desc="Computing conditional shock probability"):
    if sym not in instability_cache:
        continue

    ev = events.query("symbol == @sym")
    if ev.empty:
        continue

    df_feat = instability_cache[sym]
    df_feat = mark_future_shock_fast(df_feat, ev, horizon_bars=horizon_bars)

    n_inst = int(df_feat["is_instability"].sum())
    if n_inst < 10:
        continue

    p_base = float(df_feat["future_shock"].mean())
    p_cond = float(df_feat.loc[df_feat["is_instability"], "future_shock"].mean())

    rows.append({
        "symbol": sym,
        "base_prob": p_base,
        "cond_prob": p_cond,
        "lift": (p_cond / p_base) if p_base > 0 else np.nan,
        "instability_rate": float(df_feat["is_instability"].mean()),
        "n_instability": n_inst,
        "n_shocks": len(ev),
    })

t1 = time.time()
print(f"Conditional probability pass in {(t1 - t0):.2f} seconds")

prob_df = pd.DataFrame(rows).sort_values("lift", ascending=False)
prob_df


KeyboardInterrupt: 

In [17]:
prob_df.describe()
prob_df[prob_df["lift"] > 2.0]


NameError: name 'prob_df' is not defined

In [16]:
prob_df.sort_values("instability_rate")


NameError: name 'prob_df' is not defined