In [2]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Scalp Radar (Notebook/CLI Safe) — Live Multi-Exchange Scanner for Repetitive, Liquid Volatility

- Uses ccxt to fetch *live* OHLCV + tickers.
- Scores markets for short–moderate timeframe scalping.
- Jupyter-safe (ignores unknown args like "-f <kernel.json>").
"""

from __future__ import annotations
import os, sys, math, time, json, argparse, logging
from typing import List, Dict, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed

# --- tiny installer (if needed) ---
def _pip(pkgs: List[str]):
    import subprocess
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-U", *pkgs])
    except Exception as e:
        print("⚠ pip install failed:", e, file=sys.stderr)

try:
    import numpy as np
    import pandas as pd
except Exception:
    _pip(["numpy","pandas"]); import numpy as np; import pandas as pd

try:
    import ccxt
except Exception:
    _pip(["ccxt"]); import ccxt

try:
    from tabulate import tabulate
except Exception:
    _pip(["tabulate"]); from tabulate import tabulate

logging.basicConfig(level=logging.WARNING, format="%(levelname)s: %(message)s")

# ---------------- Metrics ----------------
def realized_vol_pct(close: pd.Series) -> float:
    r = close.pct_change().dropna()
    return float(r.std() * np.sqrt(len(r)) * 100) if len(r) else 0.0

def atr_percent(df: pd.DataFrame, n: int = 14) -> float:
    h, l, c = df["high"], df["low"], df["close"]
    tr = pd.concat([(h-l).abs(), (h-c.shift()).abs(), (l-c.shift()).abs()], axis=1).max(axis=1)
    atr = tr.rolling(n, min_periods=max(5, n//2)).mean().iloc[-1] if len(tr) else np.nan
    last = float(c.iloc[-1]) if len(c) else np.nan
    if not np.isfinite(atr) or not np.isfinite(last) or last == 0: return 0.0
    return float(100 * atr / last)

def choppiness_index(df: pd.DataFrame, n: int = 14) -> float:
    h, l, c = df["high"], df["low"], df["close"]
    tr = pd.concat([(h-l).abs(), (h-c.shift()).abs(), (l-c.shift()).abs()], axis=1).max(axis=1)
    tr_sum = tr.rolling(n, min_periods=max(5, n//2)).sum()
    hh = h.rolling(n, min_periods=max(5, n//2)).max()
    ll = l.rolling(n, min_periods=max(5, n//2)).min()
    denom = (hh - ll).replace(0, np.nan)
    val = 100 * np.log10(tr_sum / denom) / np.log10(n)
    out = float(val.iloc[-1]) if len(val) else 0.0
    # Normalize to 0..1 (~38 trending .. ~61 choppy)
    return max(0.0, min(1.0, (out - 38) / (61 - 38)))

# ---------------- Helpers ----------------
DEFAULT_EXS = ["binance", "bybit", "okx", "kucoin", "kraken", "bitget", "gateio", "mexc"]

def build_ex(eid: str):
    cls = getattr(ccxt, eid)
    return cls({"enableRateLimit": True})

def pick_pairs(ex, prefer_quote=("USDT","USD")) -> List[str]:
    markets = ex.load_markets()
    out = []
    for s, m in markets.items():
        if m.get("active") is False: continue
        quote = (m.get("quote") or "").upper()
        base  = (m.get("base")  or "").upper()
        if quote in prefer_quote and (m.get("swap") or m.get("spot")):
            if any(x in base for x in ["DOWN","UP","BULL","BEAR","3L","3S"]):  # exclude leveraged tokens
                continue
            out.append(s)
    return sorted(list(dict.fromkeys(out)))

def fetch_ohlcv_safe(ex, symbol: str, timeframe: str, limit: int, retries: int = 2) -> pd.DataFrame:
    for i in range(retries+1):
        try:
            rows = ex.fetch_ohlcv(symbol, timeframe=timeframe, limit=limit)
            if not rows: return pd.DataFrame()
            df = pd.DataFrame(rows, columns=["ts","open","high","low","close","volume"])
            df["ts"] = pd.to_datetime(df["ts"], unit="ms", utc=True)
            return df
        except Exception:
            time.sleep((ex.rateLimit or 200)/1000.0)
    return pd.DataFrame()

def fetch_ticker_safe(ex, symbol: str) -> Dict[str, float]:
    try:
        t = ex.fetch_ticker(symbol)
        return {
            "last": float(t.get("last") or 0.0),
            "bid": float(t.get("bid") or 0.0),
            "ask": float(t.get("ask") or 0.0),
            "quoteVolume": float(t.get("quoteVolume") or t.get("info", {}).get("quoteVolume", 0.0) or 0.0),
            "baseVolume": float(t.get("baseVolume") or 0.0),
        }
    except Exception:
        return {"last":0.0,"bid":0.0,"ask":0.0,"quoteVolume":0.0,"baseVolume":0.0}

def compute_metrics(df: pd.DataFrame, ticker: Dict) -> Dict[str, float]:
    if df.empty: 
        return dict(rv=0, atrp=0, chop=0, spreadp=1, qvol=0)
    rv   = realized_vol_pct(df["close"])
    atrp = atr_percent(df, 14)
    chop = choppiness_index(df, 14)
    bid, ask, last = ticker.get("bid", 0.0), ticker.get("ask", 0.0), ticker.get("last", 0.0)
    spreadp = 100 * (ask - bid) / ask if (ask and bid and ask > bid) else (0.06 if last>0 else 1.0)
    qvol = float(ticker.get("quoteVolume", 0.0))
    return dict(rv=rv, atrp=atrp, chop=chop, spreadp=spreadp, qvol=qvol)

def rank_markets(rows: List[Dict], top: int) -> pd.DataFrame:
    if not rows: return pd.DataFrame(columns=["Exchange","Symbol","Last","RV%","ATR%","CHOP(0-1)","Spread%","QVol(24h)","ScalpScore"])
    df = pd.DataFrame(rows)
    # z-normalize for scoring
    def z(s): 
        s = pd.Series(s, dtype=float)
        return (s - s.mean())/(s.std(ddof=0) if s.std(ddof=0) else 1.0)
    score = 0.35*z(df["rv"]) + 0.25*z(df["atrp"]) + 0.20*z(df["chop"]) + 0.20*z(df["qvol"]) - 0.20*z(df["spreadp"])
    df["ScalpScore"] = score
    df = df.sort_values("ScalpScore", ascending=False)
    # pretty cols
    df["RV%"]       = df["rv"].round(3)
    df["ATR%"]      = df["atrp"].round(3)
    df["CHOP(0-1)"] = df["chop"].round(3)
    df["Spread%"]   = df["spreadp"].round(3)
    df["QVol(24h)"] = df["qvol"]
    df = df[["Exchange","Symbol","TF","Last","RV%","ATR%","CHOP(0-1)","Spread%","QVol(24h)","ScalpScore"]].head(top)
    return df

# ---------------- Scanner ----------------
def scan(exchanges: List[str], timeframe: str, hours: float, min_quote_vol: float, top: int, symbols_per_ex: int, max_workers: int = 8) -> pd.DataFrame:
    limit_map = {"1m": int(hours*60), "3m": int(hours*20), "5m": int(hours*12),
                 "15m": int(hours*4), "30m": int(hours*2), "1h": int(hours)}
    limit = min(1500, max(100, limit_map.get(timeframe, int(hours))))
    results: List[Dict] = []

    for eid in exchanges:
        try:
            ex = build_ex(eid)
        except Exception as e:
            print(f"⚠ Could not init {eid}: {e}")
            continue

        # markets & quick liquidity screen
        try:
            syms = pick_pairs(ex)
        except Exception as e:
            print(f"⚠ load_markets failed on {eid}: {e}")
            continue

        tickers = {}
        try:
            t_all = ex.fetch_tickers()
            for s, t in t_all.items():
                if s in syms:
                    tickers[s] = {
                        "quoteVolume": float(t.get("quoteVolume") or 0.0),
                        "bid": float(t.get("bid") or 0.0),
                        "ask": float(t.get("ask") or 0.0),
                        "last": float(t.get("last") or 0.0),
                    }
        except Exception:
            pass

        # prioritize by live quote volume
        syms_sorted = sorted(syms, key=lambda s: (tickers.get(s, {}).get("quoteVolume") or 0), reverse=True)
        if symbols_per_ex > 0:
            syms_sorted = syms_sorted[:symbols_per_ex]

        def work(symbol: str):
            df = fetch_ohlcv_safe(ex, symbol, timeframe, limit)
            if df.empty or len(df) < 50:
                return None
            t = fetch_ticker_safe(ex, symbol)
            if t["quoteVolume"] < min_quote_vol:
                return None
            m = compute_metrics(df, t)
            return {
                "Exchange": eid,
                "Symbol": symbol,
                "TF": timeframe,
                "Last": round(t["last"], 8),
                **m
            }

        # light concurrency per exchange (ccxt instances are mostly safe for read ops)
        with ThreadPoolExecutor(max_workers=max_workers) as pool:
            futs = [pool.submit(work, s) for s in syms_sorted]
            for f in as_completed(futs):
                r = f.result()
                if r: results.append(r)

    return rank_markets(results, top)

# ---------------- I/O ----------------
def pretty_print(df: pd.DataFrame):
    if df is None or df.empty:
        print("\nNo markets passed filters. Try lower --min-quote-vol or different timeframe.")
        return
    tbl = df.copy()
    tbl["QVol(24h)"] = tbl["QVol(24h)"].map(lambda x: f"{float(x):,.0f}")
    tbl["ScalpScore"] = tbl["ScalpScore"].map(lambda x: f"{float(x):.3f}")
    print("\nTop scalpable markets (live)")
    print(tabulate(tbl.reset_index(drop=True), headers="keys", tablefmt="github", showindex=True))

def save_outputs(df: pd.DataFrame, out_base: str):
    if df is None or df.empty: 
        print("Nothing to save.")
        return
    csv_path  = f"{out_base}.csv"
    json_path = f"{out_base}.json"
    df.to_csv(csv_path, index=False)
    with open(json_path, "w", encoding="utf-8") as f:
        json.dump(df.to_dict(orient="records"), f, indent=2)
    print(f"Saved: {csv_path}\nSaved: {json_path}")

# ---------------- CLI / Entry ----------------
def build_cli():
    p = argparse.ArgumentParser(description="Scalp Radar — rank pairs by repetitive, liquid volatility")
    p.add_argument("--ex", nargs="+", default=DEFAULT_EXS, help="Exchanges (ccxt ids)")
    p.add_argument("--timeframe", default="5m", choices=["1m","3m","5m","15m","30m","1h"], help="Candle timeframe")
    p.add_argument("--hours", type=float, default=24.0, help="Lookback window (hours)")
    p.add_argument("--min-quote-vol", type=float, default=3e6, help="Min 24h quote volume (USDT/USD)")
    p.add_argument("--top", type=int, default=25, help="Rows to show")
    p.add_argument("--symbols-per-ex", type=int, default=120, help="Max symbols per exchange to sample")
    p.add_argument("--save-base", default=None, help="If set, save CSV/JSON to this base path (e.g., scans/scalp_ranks)")
    p.add_argument("--workers", type=int, default=8, help="Max concurrent fetches per exchange")
    return p

def run(exchanges=DEFAULT_EXS, timeframe="5m", hours=24.0, min_quote_vol=3e6, top=25, symbols_per_ex=120, workers=8, save_base=None) -> pd.DataFrame:
    df = scan(
        exchanges=exchanges,
        timeframe=timeframe,
        hours=hours,
        min_quote_vol=min_quote_vol,
        top=top,
        symbols_per_ex=symbols_per_ex,
        max_workers=workers
    )
    pretty_print(df)
    if save_base: 
        os.makedirs(os.path.dirname(save_base) or ".", exist_ok=True)
        save_outputs(df, save_base)
    return df

def main():
    parser = build_cli()
    # JUPYTER-SAFE: ignore unknown args like "-f <kernel.json>"
    args, _unknown = parser.parse_known_args()
    df = run(
        exchanges=args.ex,
        timeframe=args.timeframe,
        hours=args.hours,
        min_quote_vol=args.min_quote_vol,
        top=args.top,
        symbols_per_ex=args.symbols_per_ex,
        workers=args.workers,
        save_base=args.save_base
    )

if __name__ == "__main__":
    main()



Top scalpable markets (live)
|    | Exchange   | Symbol          | TF   |          Last |    RV% |   ATR% |   CHOP(0-1) |   Spread% | QVol(24h)     |   ScalpScore |
|----|------------|-----------------|------|---------------|--------|--------|-------------|-----------|---------------|--------------|
|  0 | bybit      | BTC/USDT:USDT   | 5m   | 108154        |  1.746 |  0.213 |       0.283 |     0     | 4,678,448,862 |        2.639 |
|  1 | bitget     | K/USDT          | 5m   |      0.03655  | 96.472 |  1.046 |       0.797 |     0.382 | 9,715,622     |        2.536 |
|  2 | gateio     | RVV/USDT        | 5m   |      0.010756 | 64.368 |  2.431 |       0.939 |     0.102 | 17,803,172    |        2.481 |
|  3 | kucoin     | BAS/USDT        | 5m   |      0.02772  | 59.971 |  3.95  |       0.296 |     0.108 | 56,637,003    |        2.455 |
|  4 | kucoin     | RVV/USDT        | 5m   |      0.010639 | 64.399 |  2.604 |       0.74  |     0.085 | 21,254,333    |        2.437 |
|  5 | bitget     