
# `upstox_19sep_v4.ipynb` — **Buy-Only NIFTY Intraday Scalper**
Live‑first engine with NIFTY spot anchor, Greeks, IV‑z gating, marketable‑limit **V3**, live reconciliation, HTTP backfill, **buy‑only planner**, and **protective broker‑resident stops**.


In [None]:

import os, json, time, threading, math, traceback, uuid, re
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
import numpy as np
import pandas as pd
from IPython.display import display

pd.set_option("display.width", 160)
pd.set_option("display.max_columns", 120)

try:
    import upstox_client
    from upstox_client.rest import ApiException
    UPSDK_AVAILABLE = True
except Exception as e:
    UPSDK_AVAILABLE = False
    print("Upstox SDK not available. Install to run live streaming and orders.")
    print("pip install upstox-python-sdk  # confirm exact package per Upstox docs")


In [None]:

# ---- Toggles & constants ----
SIMULATION_MODE = False
USE_LLM = True
USE_ROUTER = True

ORDERS_LIVE = False
EXIT_MANAGER_LIVE = False
RECONCILE_LIVE_PNL = True
BACKFILL_ENABLED = True

UNDERLYING = "NIFTY"
UNDERLYING_SPOT_TOKEN = "NSE_INDEX|Nifty 50"
SPAN_STRIKES = 2
WEBSOCKET_MODE = "full_d30"
IST = "Asia/Kolkata"

STRIKE_STEP = {"NIFTY": 50, "BANKNIFTY": 100, "FINNIFTY": 50}
MAX_QTY_PER_LEG = 300
MAX_OPEN_LEGS = 3

RECENTER_COOLDOWN_S = 60.0
RECENTER_LOG = True

# Buy-only scalper gates
DELTA_MIN, DELTA_MAX = 0.35, 0.55
DEPTH_IMB_MIN = 0.15
IV_Z_MAX = 2.0
IV_Z_MIN_COUNT = 30

# Execution
USE_MARKETABLE_LIMITS = True
LIMIT_BUFFER_TICKS = 1
ORDER_MIN_GAP_MS = 200

# LLM + Router (optional)
LLM_ENDPOINT = os.getenv("LLM_ENDPOINT", "")  # stub
LLM_TIMEOUT_S = float(os.getenv("LLM_TIMEOUT_S", "0.12"))
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://127.0.0.1:11434")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "")
OLLAMA_TIMEOUT_S = float(os.getenv("OLLAMA_TIMEOUT_S", "0.20"))
OLLAMA_NUM_PREDICT = int(os.getenv("OLLAMA_NUM_PREDICT", "16"))
LLM_SCORE_THRESHOLD = 0.55
STUB_LOWER, STUB_UPPER = 0.35, 0.75

LATENCY_LOG = []; LATENCY_LOG_MAX = 5000
ROUTER_LOG = []; ROUTER_LOG_MAX = 5000


In [None]:

class CredentialUpstox:
    ACCESS_TOKEN = os.getenv("UPSTOX_ACCESS_TOKEN", "")

PRODUCT_MAP = {"MIS": "I", "NRML": "D"}
DEFAULT_PRODUCT = "MIS"


In [None]:

def to_ist_ms(ms) -> pd.Timestamp:
    try:
        return pd.to_datetime(int(ms), unit="ms", utc=True).tz_convert(IST)
    except Exception:
        return pd.NaT

def resolve_next_listed_expiry(df_instruments: pd.DataFrame, underlying: str, today=None) -> str:
    t = pd.Timestamp.now(IST).normalize() if today is None else pd.Timestamp(today, tz=IST).normalize()
    dfx = df_instruments[(df_instruments["segment"] == "NSE_FO") &
                         (df_instruments["name"].str.upper() == underlying.upper()) &
                         (df_instruments["instrument_type"].isin(["CE","PE"]))].copy()
    if dfx.empty: raise ValueError(f"No derivatives found for {underlying} in instruments master")
    dfx["_exp"] = pd.to_datetime(dfx["expiry"], errors="coerce")
    dfx = dfx[dfx["_exp"] >= t.tz_localize(None)]
    if dfx.empty: raise ValueError(f"No upcoming expiry >= {t.date()} for {underlying}")
    return dfx["_exp"].min().strftime("%Y-%m-%d")

def nearest_strikes_from_spot(spot_ltp: float, underlying: str, span: int = 2):
    step = STRIKE_STEP.get(underlying.upper(), 50)
    nearest = int(step * math.floor((spot_ltp + step/2) / step))
    return [nearest + i * step for i in range(-span, span+1)]

def get_upstox_quote_client():
    if not UPSDK_AVAILABLE: raise RuntimeError("Upstox SDK is not available.")
    if not CredentialUpstox.ACCESS_TOKEN: raise RuntimeError("ACCESS_TOKEN missing. Set UPSTOX_ACCESS_TOKEN.")
    configuration = upstox_client.Configuration(); configuration.access_token = CredentialUpstox.ACCESS_TOKEN
    return upstox_client.MarketQuoteV3Api(upstox_client.ApiClient(configuration))

def _extract_ltp_from_entry(entry: dict) -> float:
    if not isinstance(entry, dict): raise KeyError("Invalid LTP entry")
    for k in ("ltp","last_traded_price","last","close","price"):
        if k in entry and entry[k] is not None: return float(entry[k])
    if "ltpc" in entry and isinstance(entry["ltpc"], dict) and "ltp" in entry["ltpc"]:
        return float(entry["ltpc"]["ltp"])
    raise KeyError(f"No LTP field found in entry keys={list(entry.keys())}")

def get_index_spot_ltp(instrument_key: str = None) -> float:
    instrument_key = instrument_key or UNDERLYING_SPOT_TOKEN
    api = get_upstox_quote_client()
    try:
        api_response = api.get_ltp(instrument_key=[instrument_key] if not isinstance(instrument_key, list) else instrument_key)
        data_dict = api_response.to_dict() if hasattr(api_response, "to_dict") else dict(api_response)
        data = data_dict.get("data", {})
        entry = data.get(instrument_key) or (next(iter(data.values())) if data else {})
        ltp = _extract_ltp_from_entry(entry)
        if not np.isfinite(ltp): raise RuntimeError(f"LTP non-finite: {ltp}")
        return float(ltp)
    except ApiException as e:
        raise RuntimeError(f"Upstox get_ltp ApiException: {e}")


In [None]:

def load_instruments_live() -> pd.DataFrame:
    url = "https://assets.upstox.com/market-quote/instruments/exchange/NSE.json.gz"
    df = pd.read_json(url)
    if "expiry" in df:
        exp = pd.to_datetime(df["expiry"], unit="ms", errors="coerce")
        mask = exp.isna() & df["expiry"].notna()
        if mask.any():
            exp2 = pd.to_datetime(df.loc[mask, "expiry"], errors="coerce")
            exp.loc[mask] = exp2
        df["expiry"] = exp.dt.strftime("%Y-%m-%d")
    for col in ("strike_price","lot_size","tick_size","minimum_lot"):
        if col in df: df[col] = pd.to_numeric(df[col], errors="coerce")
    return df

df_futureOptions = load_instruments_live()
expiry_target = resolve_next_listed_expiry(df_futureOptions, UNDERLYING)
df_chain = df_futureOptions[(df_futureOptions["segment"]=="NSE_FO") &
                            (df_futureOptions["name"].str.upper()==UNDERLYING.upper()) &
                            (df_futureOptions["instrument_type"].isin(["CE","PE"])) &
                            (df_futureOptions["expiry"]==expiry_target)].copy()
assert not df_chain.empty, f"No options for {UNDERLYING} {expiry_target}"

try:
    spot_ltp_initial = get_index_spot_ltp(UNDERLYING_SPOT_TOKEN)
    print(f"NIFTY 50 spot LTP: {spot_ltp_initial:.2f}")
except Exception as e:
    print("Spot LTP lookup failed; fallback to chain median:", e)
    spot_ltp_initial = float(df_chain["strike_price"].median())

strike_list = nearest_strikes_from_spot(spot_ltp_initial, UNDERLYING, span=SPAN_STRIKES)
df_chain_sel = df_chain[df_chain["strike_price"].isin(strike_list)].sort_values(["strike_price","instrument_type"])
token_list = df_chain_sel["instrument_key"].dropna().astype(str).unique().tolist()
print(f"Selected strikes from spot {spot_ltp_initial:.2f}: {sorted(set(strike_list))}")
display(df_chain_sel.head(8))


In [None]:

df_feed = pd.DataFrame(columns=[
    "Token","Ltp","Ltq","Cp",
    "BidP1","BidQ1","AskP1","AskQ1",
    "Ltt","Oi","Iv","Atp","Tbq","Tsq",
    "Delta","Theta","Gamma","Vega","Rho"
])
df_feed_enriched = pd.DataFrame()
_df_lock = threading.Lock()

def enrich_feed(_df_feed: pd.DataFrame, _df_meta: pd.DataFrame, cols_to_add=None) -> pd.DataFrame:
    if cols_to_add is None:
        cols_to_add = ["lot_size","trading_symbol","strike_price","tick_size","instrument_type","expiry","name"]
    left = _df_feed.copy()
    right = _df_meta[["instrument_key"] + [c for c in cols_to_add if c in _df_meta.columns]].drop_duplicates("instrument_key")
    out = left.merge(right, left_on="Token", right_on="instrument_key", how="left", validate="m:1")
    out["Mid"] = np.where(out["BidP1"].notna() & out["AskP1"].notna(), (out["BidP1"] + out["AskP1"])/2.0, out["Ltp"])
    out["Spread"] = np.where(out["BidP1"].notna() & out["AskP1"].notna(), (out["AskP1"] - out["BidP1"]), np.nan)
    out["DepthImb"] = np.where(
        (out["BidQ1"].notna() & out["AskQ1"].notna() & ((out["BidQ1"] + out["AskQ1"]) > 0)),
        (out["BidQ1"] - out["AskQ1"]) / (out["BidQ1"] + out["AskQ1"]),
        np.nan,
    )
    return out

from collections import defaultdict
_iv_stats = defaultdict(lambda: {"n":0, "mean":0.0, "M2":0.0})
def update_iv_stats(token: str, iv_value: float):
    if iv_value is None or not np.isfinite(iv_value): return
    s = _iv_stats[token]
    n1 = s["n"] + 1
    delta = iv_value - s["mean"]
    mean = s["mean"] + delta / n1
    delta2 = iv_value - mean
    M2 = s["M2"] + delta * delta2
    s["n"], s["mean"], s["M2"] = n1, mean, M2

def iv_zscore_for(token: str, iv_value: float):
    s = _iv_stats[token]
    if s["n"] < max(IV_Z_MIN_COUNT, 2):
        return 0.0, True
    var = s["M2"] / max(s["n"] - 1, 1)
    std = math.sqrt(max(var, 1e-12))
    z = (iv_value - s["mean"]) / std if std > 0 else 0.0
    return z, (abs(z) <= IV_Z_MAX)

live_streamer = None
current_tokens = list(token_list)
spot_ltp_current = spot_ltp_initial
last_center_nearest = int(STRIKE_STEP[UNDERLYING] * math.floor((spot_ltp_initial + STRIKE_STEP[UNDERLYING]/2)/STRIKE_STEP[UNDERLYING]))
_last_recenter_ts = 0.0


In [None]:

def start_live_stream(tokens: List[str], mode: str = "full_d30"):
    global live_streamer
    if not UPSDK_AVAILABLE: raise RuntimeError("Upstox SDK not installed.")
    if not CredentialUpstox.ACCESS_TOKEN: raise RuntimeError("ACCESS_TOKEN missing.")

    configuration = upstox_client.Configuration()
    configuration.access_token = CredentialUpstox.ACCESS_TOKEN
    api_client = upstox_client.ApiClient(configuration)
    streamer = upstox_client.MarketDataStreamerV3(api_client, instrument_key=tokens, mode=mode)

    def _on_message(msg):
        global df_feed, df_feed_enriched, spot_ltp_current
        feeds = msg.get("feeds", {})
        for token, payload in feeds.items():
            ff = payload.get("fullFeed",{}).get("marketFF",{})
            ltpc = ff.get("ltpc",{})
            level = ff.get("marketLevel",{}).get("bidAskQuote",[{}])
            greeks = ff.get("optionGreeks",{}) or {}
            if token == UNDERLYING_SPOT_TOKEN:
                try:
                    if ltpc.get("ltp") is not None:
                        spot_ltp_current = float(ltpc.get("ltp"))
                except Exception: pass
                continue
            try:
                iv_val = ff.get("iv")
                if iv_val is not None:
                    update_iv_stats(token, float(iv_val))
            except Exception: pass
            row = {
                "Token": token,
                "Ltp": float(ltpc.get("ltp")) if ltpc.get("ltp") is not None else np.nan,
                "Ltq": float(ltpc.get("ltq")) if ltpc.get("ltq") is not None else np.nan,
                "Cp": float(ltpc.get("cp")) if ltpc.get("cp") is not None else np.nan,
                "BidP1": float(level[0].get("bidP")) if level and level[0].get("bidP") is not None else np.nan,
                "BidQ1": float(level[0].get("bidQ")) if level and level[0].get("bidQ") is not None else np.nan,
                "AskP1": float(level[0].get("askP")) if level and level[0].get("askP") is not None else np.nan,
                "AskQ1": float(level[0].get("askQ")) if level and level[0].get("askQ") is not None else np.nan,
                "Ltt": to_ist_ms(ltpc.get("ltt")),
                "Oi": float(ff.get("oi")) if ff.get("oi") is not None else np.nan,
                "Iv": float(ff.get("iv")) if ff.get("iv") is not None else np.nan,
                "Atp": float(ff.get("atp")) if ff.get("atp") is not None else np.nan,
                "Tbq": float(ff.get("tbq")) if ff.get("tbq") is not None else np.nan,
                "Tsq": float(ff.get("tsq")) if ff.get("tsq") is not None else np.nan,
                "Delta": float(greeks.get("delta")) if greeks.get("delta") is not None else np.nan,
                "Theta": float(greeks.get("theta")) if greeks.get("theta") is not None else np.nan,
                "Gamma": float(greeks.get("gamma")) if greeks.get("gamma") is not None else np.nan,
                "Vega":  float(greeks.get("vega"))  if greeks.get("vega")  is not None else np.nan,
                "Rho":   float(greeks.get("rho"))   if greeks.get("rho")   is not None else np.nan,
            }
            with _df_lock:
                if token in df_feed["Token"].values:
                    for k,v in row.items(): df_feed.loc[df_feed["Token"]==token, k] = v
                else:
                    df_feed = pd.concat([df_feed, pd.DataFrame([row])], ignore_index=True)
                df_feed_enriched = enrich_feed(df_feed, df_chain)

    streamer.on_message = _on_message
    streamer.on_open = lambda: print("Market WS opened")
    streamer.on_error = lambda e: print("Market WS error:", e)
    streamer.on_close = lambda: print("Market WS closed")
    streamer.connect()
    live_streamer = streamer
    return streamer

def stop_live_stream():
    global live_streamer
    try:
        if live_streamer is not None:
            if hasattr(live_streamer, "close"): live_streamer.close()
            elif hasattr(live_streamer, "disconnect"): live_streamer.disconnect()
            elif hasattr(live_streamer, "ws"): live_streamer.ws.close()
            print("Market stream stop requested.")
    except Exception as e:
        print("Error stopping stream:", e)
    finally:
        live_streamer = None


In [None]:

def compute_token_list_for_spot(spot_ltp: float):
    strikes = nearest_strikes_from_spot(spot_ltp, UNDERLYING, span=SPAN_STRIKES)
    df_sel = df_chain[df_chain["strike_price"].isin(strikes)].sort_values(["strike_price","instrument_type"])
    tokens = df_sel["instrument_key"].dropna().astype(str).unique().tolist()
    return tokens

def maybe_recenter_tokens():
    global last_center_nearest, _last_recenter_ts, current_tokens, df_chain_sel
    # Defer recenter while positions are open to avoid churn during risk
    try:
        if any(v.get("qty",0)>0 for v in open_positions.values()):
            return False
    except Exception:
        pass
    step = STRIKE_STEP.get(UNDERLYING.upper(), 50)
    nearest = int(step * math.floor((spot_ltp_current + step/2)/step))
    now = time.time()
    if abs(nearest - last_center_nearest) >= step and (now - _last_recenter_ts) >= RECENTER_COOLDOWN_S:
        try:
            new_tokens = compute_token_list_for_spot(spot_ltp_current)
            if not new_tokens: return False
            tokens_with_spot = list(dict.fromkeys(new_tokens + [UNDERLYING_SPOT_TOKEN]))
            if RECENTER_LOG:
                print(f"[RECENTER] spot={spot_ltp_current:.2f}, nearest={nearest}, old_center={last_center_nearest}")
                print(f"[RECENTER] tokens: {len(current_tokens)} → {len(new_tokens)}")
            stop_live_stream(); start_live_stream(tokens_with_spot, mode=WEBSOCKET_MODE)
            last_center_nearest = nearest; _last_recenter_ts = now; current_tokens = new_tokens
            df_chain_sel = df_chain[df_chain["instrument_key"].isin(new_tokens)].sort_values(["strike_price","instrument_type"])
            return True
        except Exception as e:
            print("Recenter failed:", e); traceback.print_exc()
    return False

def recenter_daemon():
    while True:
        try:
            time.sleep(1.0); maybe_recenter_tokens()
        except Exception:
            time.sleep(2.0); continue


In [None]:

def _compact_row(r: pd.Series):
    return {
        "token": str(r["Token"]),
        "tsym": str(r.get("trading_symbol", "")),
        "mid": float(r.get("Mid", np.nan)),
        "spread": float(r.get("Spread", np.nan)),
        "delta": float(r.get("Delta", np.nan)),
        "gamma": float(r.get("Gamma", np.nan)),
        "theta": float(r.get("Theta", np.nan)),
        "iv": float(r.get("Iv", np.nan)),
        "depthImb": float(r.get("DepthImb", np.nan)),
        "strike": float(r.get("strike_price", np.nan)),
        "type": str(r.get("instrument_type","")),
    }


In [None]:

def _router_log(entry: dict):
    entry = dict(entry); entry.setdefault("t", time.perf_counter())
    ROUTER_LOG.append(entry)
    if len(ROUTER_LOG) > ROUTER_LOG_MAX:
        del ROUTER_LOG[: len(ROUTER_LOG) - ROUTER_LOG_MAX]

def _post_stub(context: dict):
    t0 = time.perf_counter()
    if not LLM_ENDPOINT:
        return 0.6, (time.perf_counter() - t0)*1000.0, False
    try:
        import requests
        r = requests.post(LLM_ENDPOINT, json=context, timeout=LLM_TIMEOUT_S)
        s = float(r.json().get("score", 0.6)) if r.ok else 0.6
        return max(0.0, min(1.0, s)), (time.perf_counter() - t0)*1000.0, True
    except Exception:
        return 0.6, (time.perf_counter() - t0)*1000.0, False

def _post_ollama(context: dict):
    t0 = time.perf_counter()
    if not OLLAMA_MODEL:
        return 0.6, (time.perf_counter() - t0)*1000.0, False
    try:
        import requests
        prompt = (
           "You are a buy-only NIFTY options scalping scorer. Return JSON {"decision": one of ['buy_ce','buy_pe','pass'], "score": [0,1]}\n"
           "No explanation. Focus on mid/spread/Δ/Γ/Θ/iv/depthImb + spot trend.\n"
        )
        payload = {"model": OLLAMA_MODEL, "prompt": prompt + json.dumps(context), "stream": False,
                   "options": {"temperature": 0.1, "num_predict": OLLAMA_NUM_PREDICT}, "format": "json"}
        r = requests.post(f"{OLLAMA_HOST}/api/generate", json=payload, timeout=OLLAMA_TIMEOUT_S)
        txt = r.json().get("response","") if r.ok else ""
        try:
            obj = json.loads(txt)
            s = float(obj.get("score", 0.6))
            d = str(obj.get("decision","pass"))
        except Exception:
            s, d = 0.6, "pass"
        return max(0.0, min(1.0, s)), (time.perf_counter() - t0)*1000.0, True, d
    except Exception:
        return 0.6, (time.perf_counter() - t0)*1000.0, False, "pass"

def score_with_router_and_meta_long_only(context: dict):
    thr = LLM_SCORE_THRESHOLD
    context["mode"] = "long_only"
    stub_score, stub_ms, stub_ok = _post_stub(context)
    route = "no_stub"; ollama_score = None; ollama_ms = 0.0; decision = "pass"

    if stub_ok and (STUB_LOWER < stub_score < STUB_UPPER):
        o_score, o_ms, o_ok, decision = _post_ollama(context)
        if o_ok:
            final_score = o_score; route = "gray_ollama"
        else:
            final_score = stub_score; route = "gray_fallback_stub"; decision = "pass"
        ollama_score, ollama_ms = o_score, o_ms
    else:
        final_score = stub_score
        route = "stub_high" if stub_score >= STUB_UPPER else ("stub_low" if stub_score <= STUB_LOWER else "stub_neutral")
        decision = "pass"

    final_decision = (final_score >= thr)
    entry = {"route": route, "stub_score": stub_score, "stub_ms": round(stub_ms, 2),
             "ollama_score": ollama_score, "ollama_ms": round(ollama_ms, 2),
             "final_score": final_score, "final_decision": final_decision, "used_ollama": (route=="gray_ollama"),
             "decision": decision}
    _router_log(entry)
    return float(final_score), entry


In [None]:

def ask_llm_for_strategy_buy_only(df_snapshot: pd.DataFrame) -> Dict[str, Any]:
    if df_snapshot.empty:
        return {"legs": [], "meta": {"reason": "empty_snapshot"}}

    cols = ["Mid","strike_price","instrument_type","Delta","Spread","tick_size","DepthImb","Iv","Token","lot_size"]
    miss = [c for c in cols if c not in df_snapshot.columns]
    if miss:
        return {"legs": [], "meta": {"reason": f"missing_columns:{miss}"}}

    snap = df_snapshot.dropna(subset=["Mid","strike_price","instrument_type","Delta"]).copy()
    if snap.empty:
        return {"legs": [], "meta": {"reason": "no_valid_rows"}}

    snap["absDeltaGap"] = (snap["Delta"].abs() - 0.50).abs()

    def _spread_ok(r):
        tick = float(r.get("tick_size") or 0.05)
        spread = float(r.get("Spread") or np.inf)
        mid = float(r.get("Mid") or np.nan)
        ticks_spread = spread / max(tick,1e-6)
        rel_spread = spread / max(mid,1e-9) if np.isfinite(mid) else np.inf
        return (ticks_spread <= 3) and (rel_spread <= 0.004)

    def _ok_buy_row(r):
        if not _spread_ok(r):
            return False
        if not (DELTA_MIN <= abs(float(r["Delta"])) <= DELTA_MAX):
            return False
        imb = float(r.get("DepthImb") or 0.0)
        if r["instrument_type"]=="CE" and imb < +DEPTH_IMB_MIN: return False
        if r["instrument_type"]=="PE" and imb > -DEPTH_IMB_MIN: return False
        z, iv_ok = iv_zscore_for(r["Token"], float(r.get("Iv") or np.nan))
        return iv_ok

    ce = snap[(snap["instrument_type"]=="CE")]
    pe = snap[(snap["instrument_type"]=="PE")]
    ce_ok = ce[ce.apply(_ok_buy_row, axis=1)].sort_values(["absDeltaGap","Spread"]).head(1)
    pe_ok = pe[pe.apply(_ok_buy_row, axis=1)].sort_values(["absDeltaGap","Spread"]).head(1)

    pick = None
    if not ce_ok.empty and not pe_ok.empty:
        pick = ce_ok.iloc[0] if abs(float(ce_ok.iloc[0]["DepthImb"])) >= abs(float(pe_ok.iloc[0]["DepthImb"])) else pe_ok.iloc[0]
    elif not ce_ok.empty:
        pick = ce_ok.iloc[0]
    elif not pe_ok.empty:
        pick = pe_ok.iloc[0]
    else:
        return {"legs": [], "meta": {"reason": "no_candidate_passed_filters"}}

    lot = int(pick.get("lot_size") or 50)
    leg = {"token": str(pick["Token"]), "side": "BUY", "qty": lot, "product": DEFAULT_PRODUCT, "order_type": "LIMIT", "_row": pick.to_dict()}
    meta = {"mode": "long_only", "picked": pick["instrument_type"]}

    # Optional: route with LLM (long-only semantics)
    context = {"spot": float(spot_ltp_current), "row": _compact_row(pick), "rules": {"delta":[DELTA_MIN, DELTA_MAX], "ivZmax":IV_Z_MAX}}
    score, router_meta = score_with_router_and_meta_long_only(context)
    meta.update({"score": score, **router_meta})

    if USE_LLM and (score < LLM_SCORE_THRESHOLD):
        return {"legs": [], "meta": meta}
    return {"legs": [leg], "meta": meta}


## Upstox V3 Execution & Live Reconciliation (buy-only + protective stops)

In [None]:

UPSTOX_DOCS_BASE = 'https://upstox.com/developer/api-documentation/orders'
UPSTOX_WS_DOCS = 'https://upstox.com/developer/api-documentation/get-portfolio-stream-feed/'
VERBOSE_EXEC  = True
VERBOSE_RECON = True
ORDER_WS_LAST_TS = time.time()

class UpstoxV3Exec:
    def __init__(self, access_token: str):
        self.access_token = access_token
        configuration = upstox_client.Configuration(); configuration.access_token = self.access_token
        self._api_client = upstox_client.ApiClient(configuration)
        self.order_api_v3 = upstox_client.OrderApiV3(self._api_client)

    @staticmethod
    def _now_ms():
        return int(time.time() * 1000)

    def marketable_limit(self, best_bid: float, best_ask: float, side: str, tick_size: float, buffer_ticks: int = 1) -> float:
        # BUY: ceil; SELL (to close): floor
        ts = max(tick_size, 1e-6)
        if side.upper() == "BUY":
            raw = (best_ask + buffer_ticks * ts); ticks = math.ceil(raw/ts)
        else:
            raw = (best_bid - buffer_ticks * ts); ticks = math.floor(raw/ts)
        return float(ticks * ts)

    def place_order_v3(self, *, instrument_token: str, side: str, quantity: int,
                       best_bid: float, best_ask: float, price: float = None,
                       product: str = "I", validity: str = "DAY", disclosed_quantity: int = 0,
                       trigger_price: float = 0.0, is_amo: bool = False,
                       marketable_limit: bool = True, buffer_ticks: int = 1,
                       tick_size: float = 0.05, tag: str = None) -> dict:
        if tag is None:
            tag = f"router-{uuid.uuid4().hex[:8]}"
        if price is None and marketable_limit:
            price = self.marketable_limit(best_bid, best_ask, side, tick_size, buffer_ticks)
        req = upstox_client.PlaceOrderV3Request(
            quantity=int(quantity), product=product, validity=validity, price=float(price if price is not None else 0.0),
            tag=tag, instrument_token=instrument_token,
            order_type='LIMIT' if price else 'MARKET', transaction_type=side.upper(),
            disclosed_quantity=int(disclosed_quantity), trigger_price=float(trigger_price), is_amo=bool(is_amo), slice=False
        )
        t0 = self._now_ms()
        try:
            resp = self.order_api_v3.place_order(req)
            t1 = self._now_ms()
            order_id = None
            if hasattr(resp, "data") and resp.data:
                if isinstance(resp.data, dict) and "order_id" in resp.data: order_id = resp.data["order_id"]
                elif isinstance(resp.data, dict) and "order_ids" in resp.data: 
                    order_ids = resp.data.get("order_ids") or []; order_id = order_ids[0] if order_ids else None
            info = {"ok": True, "order_id": order_id, "sent_ts": t0, "ack_ts": t1, "broker_latency_ms": (t1 - t0), "tag": tag}
            if VERBOSE_EXEC: print("[V3/place] ok", info)
            return info
        except ApiException as e:
            t1 = self._now_ms()
            err = {"ok": False, "error": str(e), "sent_ts": t0, "ack_ts": t1}
            print("[V3/place] ERROR", err); return err

# Protective stop & cancel helpers
def place_stop_exit_v3(self, *, instrument_token: str, exit_side: str, trigger_price: float,
                       quantity: int, product: str = "I", validity: str = "DAY", tag: str = None) -> dict:
    req = upstox_client.PlaceOrderV3Request(
        quantity=int(quantity), product=product, validity=validity, order_type='SL-M',
        trigger_price=float(trigger_price), instrument_token=instrument_token,
        transaction_type=exit_side.upper(), tag=(tag or f"prot-stop-{uuid.uuid4().hex[:8]}"),
        slice=False, is_amo=False
    )
    t0 = int(time.time()*1000)
    try:
        resp = self.order_api_v3.place_order(req)
        oid = (resp.data or {}).get("order_id") if hasattr(resp, "data") else None
        return {"ok": True, "order_id": oid, "sent_ts": t0}
    except ApiException as e:
        return {"ok": False, "error": str(e), "sent_ts": t0}

def cancel_order_v3(self, order_id: str) -> dict:
    t0 = int(time.time()*1000)
    try:
        if hasattr(self.order_api_v3, "cancel_order"):
            resp = self.order_api_v3.cancel_order(order_id)
        elif hasattr(self.order_api_v3, "cancelOrder"):
            resp = self.order_api_v3.cancelOrder(order_id)
        else:
            raise RuntimeError("cancel_order method not found in SDK")
        return {"ok": True, "order_id": order_id, "sent_ts": t0}
    except Exception as e:
        return {"ok": False, "error": str(e), "sent_ts": t0}

UpstoxV3Exec.place_stop_exit_v3 = place_stop_exit_v3
UpstoxV3Exec.cancel_order_v3 = cancel_order_v3

exec_v3 = UpstoxV3Exec(CredentialUpstox.ACCESS_TOKEN) if (UPSDK_AVAILABLE and CredentialUpstox.ACCESS_TOKEN) else None


In [None]:

from collections import defaultdict

FILL_LOG_COLS = ['ts', 'order_id', 'status', 'filled_qty', 'avg_price', 'trading_symbol', 'instrument_token', 'transaction_type', 'raw']
FILL_LOG = pd.DataFrame(columns=FILL_LOG_COLS)

open_positions = {}

class UpstoxPortfolioReconciler:
    def __init__(self, access_token: str):
        configuration = upstox_client.Configuration(); configuration.access_token = access_token
        api_client = upstox_client.ApiClient(configuration)
        self.streamer = upstox_client.PortfolioDataStreamer(api_client, order_update=True, position_update=False, holding_update=False, gtt_update=False)

        self.order_meta = {}
        self.last_seen_fill_qty = defaultdict(int)
        self.last_seen_fill_val = defaultdict(float)
        self.exit_wap = defaultdict(lambda: {"num": 0.0, "den": 0, "orders": {}})

    def attach_meta(self, order_id: str, meta: dict):
        self.order_meta[order_id] = meta or {}

    def _append_fill_log(self, record: dict):
        global FILL_LOG
        FILL_LOG.loc[len(FILL_LOG)] = [
            record.get('ts'), record.get('order_id'), record.get('status'), record.get('filled_quantity'),
            record.get('average_price'), record.get('trading_symbol'), record.get('instrument_token'),
            record.get('transaction_type'), json.dumps(record)
        ]

    @staticmethod
    def _extract_update(message) -> dict:
        try:
            data = message
            if isinstance(message, (bytes, str)): data = json.loads(message)
        except Exception: data = message
        d = data.get('data') if isinstance(data, dict) and isinstance(data.get('data'), dict) else (data if isinstance(data, dict) else {})
        flat = {
            'ts': d.get('order_timestamp') or d.get('exchange_timestamp') or int(time.time()*1000),
            'order_id': d.get('order_id') or d.get('id') or d.get('orderId'),
            'status': (d.get('status') or '').lower(),
            'filled_quantity': d.get('filled_quantity') or d.get('filledQuantity') or d.get('quantity') or 0,
            'average_price': d.get('average_price') or d.get('avg_price') or d.get('averagePrice') or None,
            'trading_symbol': d.get('trading_symbol') or d.get('tradingsymbol') or d.get('symbol'),
            'instrument_token': d.get('instrument_token') or d.get('instrumentKey') or d.get('instrument'),
            'transaction_type': d.get('transaction_type') or d.get('side') or d.get('transactionType')
        }
        return flat

    def _on_message(self, message):
        global ORDER_WS_LAST_TS
        ORDER_WS_LAST_TS = time.time()

        upd = self._extract_update(message)
        oid = upd.get('order_id')
        if not oid:
            if VERBOSE_RECON: print("[recon] no order_id:", message)
            return

        cur_qty = int(upd.get('filled_quantity') or 0)
        cur_avg = float(upd.get('average_price') or 0.0)
        cur_val = cur_qty * cur_avg
        prev_qty = self.last_seen_fill_qty[oid]
        prev_val = self.last_seen_fill_val[oid]
        d_qty = max(cur_qty - prev_qty, 0)
        d_val = max(cur_val - prev_val, 0.0)
        self.last_seen_fill_qty[oid] = cur_qty
        self.last_seen_fill_val[oid] = cur_val

        self._append_fill_log(upd)
        if d_qty > 0 and VERBOSE_RECON:
            print(f"[recon] FILL {oid} Δqty={d_qty} cum={cur_qty} @ {cur_avg}")

        meta = self.order_meta.get(oid, {})
        eidx = meta.get("exit_of_entry_idx")

        # Protective stop fill finalization
        sidx = meta.get("stop_for_entry_idx")
        if sidx is not None:
            status = (upd.get("status") or "").lower()
            if status in ("complete","completed"):
                try:
                    rec = TRADE_LOG[sidx]
                    avg_px = float(upd.get("average_price") or rec.get("exit_price") or 0.0)
                    rec["exit_reason"] = rec.get("exit_reason") or "stop"
                    rec["exit_price"] = float(avg_px) if avg_px else rec.get("exit_price")
                    rec["exit_ts"] = pd.Timestamp.now(tz=IST)
                    entry = float(rec.get("entry_price") or 0.0)
                    side  = str(rec.get("side") or "BUY").upper()
                    pnl_leg = (avg_px - entry) if side=="BUY" else (entry - avg_px)
                    rec["pnl_abs"] = float(pnl_leg)
                    rec["pnl_pct"] = float(pnl_leg / entry) if entry else rec.get("pnl_pct")
                    rec["remaining_qty"] = 0
                    stk = TRADE_OPEN_STACK.get(rec["token"], [])
                    if stk and sidx in stk:
                        try:
                            stk.remove(sidx)
                        except ValueError:
                            pass
                    rec["stop_active"] = False
                except Exception as e:
                    if VERBOSE_RECON: print("[recon] stop finalize error:", e)

        if eidx is not None:
            agg = self.exit_wap[eidx]
            ostate = agg["orders"].setdefault(oid, {"cum_qty": 0, "cum_val": 0.0})
            ostate["cum_qty"] = cur_qty
            ostate["cum_val"] = cur_val

            if d_qty > 0 and d_val > 0:
                agg["num"] += d_val
                agg["den"] += d_qty
                exit_wap = agg["num"] / max(agg["den"], 1)
                try:
                    rec = TRADE_LOG[eidx]
                    rec["exit_price"] = float(exit_wap)
                    rec["exit_filled_qty"] = int(agg["den"])
                    entry = float(rec.get("entry_price") or 0.0)
                    side  = str(rec.get("side") or "SELL").upper()
                    pnl_leg = (exit_wap - entry) if side=="BUY" else (entry - exit_wap)
                    rec["pnl_abs"] = float(pnl_leg)
                    rec["pnl_pct"] = float(pnl_leg / entry) if entry else None
                except Exception as e:
                    if VERBOSE_RECON: print("[recon] ledger update error:", e)

            status = (upd.get("status") or "").lower()
            if status in ("complete", "completed", "cancelled", "rejected"):
                try:
                    rec = TRADE_LOG[eidx]
                    rec["exit_fill_ts"] = pd.Timestamp.now(tz=IST)
                    # Cancel protective stop if still active
                    stop_id = rec.get("stop_order_id")
                    if stop_id and rec.get("stop_active"):
                        if 'exec_v3' in globals() and exec_v3 is not None:
                            exec_v3.cancel_order_v3(stop_id)
                        rec["stop_active"] = False
                except Exception as e:
                    if VERBOSE_RECON: print("[recon] stop cancel error:", e)

        cb = globals().get("on_upstox_order_update")
        if callable(cb):
            try: cb(upd, meta)
            except Exception as e:
                if VERBOSE_RECON: print("[ledger-hook] error:", e)

    def start(self):
        self.streamer.on('message', self._on_message)
        self.streamer.auto_reconnect(True, 5, 20)
        self.streamer.connect()

recon = UpstoxPortfolioReconciler(CredentialUpstox.ACCESS_TOKEN) if (UPSDK_AVAILABLE and CredentialUpstox.ACCESS_TOKEN) else None

def start_reconciler():
    if recon is None:
        print("Reconciler not initialized (SDK/token missing)."); return
    print("Starting Portfolio Stream Feed reconciler…"); recon.start()


In [None]:

# Ledger & multi-entry (still supported, but BUY-only for entries)
TRADE_LOG = []
TRADE_OPEN_STACK = {}

def _now_ts():
    return pd.Timestamp.now(tz=IST)

def _open_count(token: str) -> int:
    return len([i for i in TRADE_OPEN_STACK.get(token, []) if TRADE_LOG[i].get("exit_ts") is None])

def log_trade_open(token: str, side: str, qty: int, price: float, plan_meta: dict, row_data: dict,
                   target_pct=None, stop_pct=None, order_id: Optional[str]=None):
    rec = {
        "token": token, "side": side, "qty": int(qty),
        "remaining_qty": int(qty),
        "entry_price": float(price), "entry_ts": _now_ts(),
        "target_pct": float(target_pct if target_pct is not None else 0.20),
        "stop_pct": float(stop_pct if stop_pct is not None else 0.40),
        "router_route": plan_meta.get("route") if plan_meta else None,
        "score_final": plan_meta.get("final_score") if plan_meta else None,
        "score_stub": plan_meta.get("stub_score") if plan_meta else None,
        "score_ollama": plan_meta.get("ollama_score") if plan_meta else None,
        "stub_ms": plan_meta.get("stub_ms"), "ollama_ms": plan_meta.get("ollama_ms"),
        "order_id": order_id,
        "stop_order_id": None, "stop_active": False,
        "exit_reason": None, "exit_price": None, "exit_ts": None,
        "pnl_abs": None, "pnl_pct": None, "hold_s": None,
    }
    TRADE_LOG.append(rec)
    idx = len(TRADE_LOG) - 1
    TRADE_OPEN_STACK.setdefault(token, []).append(idx)
    return idx

def log_trade_exit(token: str, reason: str, exit_price: float, match="LIFO", exit_qty=None):
    stack = TRADE_OPEN_STACK.get(token, [])
    if not stack: return None
    idx = stack[-1] if match == "LIFO" else stack[0]
    rec = TRADE_LOG[idx]
    if rec.get("exit_ts") is not None:
        stack.pop(-1 if match=="LIFO" else 0)
        return log_trade_exit(token, reason, exit_price, match, exit_qty)
    rem = int(rec.get("remaining_qty", rec["qty"])); take = rem if (exit_qty is None or exit_qty >= rem) else int(exit_qty)
    new_rem = rem - take
    side = rec["side"].upper(); entry = rec["entry_price"]
    pnl_leg = (exit_price - entry) if side=="BUY" else (entry - exit_price)
    realized_pct = pnl_leg / entry if entry else 0.0
    if new_rem <= 0:
        rec["remaining_qty"] = 0; rec["exit_reason"] = reason; rec["exit_price"] = float(exit_price); rec["exit_ts"] = _now_ts()
        rec["pnl_abs"] = float(pnl_leg); rec["pnl_pct"] = float(realized_pct); rec["hold_s"] = float((rec["exit_ts"] - rec["entry_ts"]).total_seconds())
        stack.pop(-1 if match=="LIFO" else 0)
        return idx
    else:
        rec["remaining_qty"] = new_rem
        return idx


In [None]:

@dataclass
class ExitConfig:
    dry_run: bool = True
    target_pct: float = 0.2
    stop_pct: float = 0.4
    check_interval_s: float = 1.0

_exit_thread = None; _exit_stop_evt = threading.Event()

def eval_exits_for_token(token: str, row: pd.Series) -> list:
    out = []
    for idx in list(TRADE_OPEN_STACK.get(token, [])):
        rec = TRADE_LOG[idx]
        if rec.get("exit_ts") is not None: continue
        rem = int(rec.get("remaining_qty", rec["qty"]))
        if rem <= 0: continue
        side = rec["side"].upper(); entry = float(rec["entry_price"])
        px = float(row.get("Mid") if np.isfinite(row.get("Mid", np.nan)) else row.get("Ltp"))
        if not np.isfinite(px) or entry <= 0: continue
        pnl_pct = (px - entry)/entry if side=="BUY" else (entry - px)/entry
        if pnl_pct >= rec["target_pct"]:
            out.append({"action":"EXIT","reason":"target","token":token,"qty":rem,"side":"SELL","exit_price":px,"idx":idx})
        elif pnl_pct <= -rec["stop_pct"]:
            out.append({"action":"EXIT","reason":"stop","token":token,"qty":rem,"side":"SELL","exit_price":px,"idx":idx})
    return out

def _exit_worker(cfg: ExitConfig):
    while not _exit_stop_evt.is_set():
        time.sleep(cfg.check_interval_s)
        with _df_lock:
            snapshot = df_feed_enriched.copy()
        for token, pos in list(open_positions.items()):
            row = snapshot.loc[snapshot["Token"]==token]
            if row.empty: 
                continue
            row = row.iloc[0]
            signals = eval_exits_for_token(token, row)
            for sig in signals:
                # Decision-time logging
                log_trade_exit(token, sig["reason"], sig["exit_price"], match="LIFO", exit_qty= sig["qty"])
                if cfg.dry_run or (exec_v3 is None):
                    print(f"[EXIT-SIM] {sig}")
                else:
                    best_bid = float(row.get("BidP1")) if np.isfinite(row.get("BidP1", np.nan)) else float(row.get("Mid", 0.0))
                    best_ask = float(row.get("AskP1")) if np.isfinite(row.get("AskP1", np.nan)) else float(row.get("Mid", 0.0))
                    tick = float(row.get("tick_size") or 0.05)
                    tag = f"LLM-EXIT-e{sig['idx']}"
                    info = exec_v3.place_order_v3(
                        instrument_token=token, side=sig["side"], quantity=int(sig["qty"]),
                        best_bid=best_bid, best_ask=best_ask, tick_size=tick, product=PRODUCT_MAP.get(DEFAULT_PRODUCT,"I"),
                        marketable_limit=True, buffer_ticks=LIMIT_BUFFER_TICKS, tag=tag
                    )
                    print("[EXIT-LIVE] sent:", info)
                    if recon is not None and info.get("ok") and info.get("order_id"):
                        recon.attach_meta(info["order_id"], {
                            "exit_of_entry_idx": sig["idx"],
                            "planned_qty": int(sig["qty"]),
                            "token": token,
                            "side": sig["side"],
                            "tag": tag,
                            "sent_ts": info.get("sent_ts"),
                            "ack_ts": info.get("ack_ts"),
                        })
            if signals:
                open_positions[token] = {"qty": 0, "side": pos.get("side","BUY"), "avg_price": pos.get("avg_price", 0.0)}

def start_exit_manager(cfg: ExitConfig):
    global _exit_thread, _exit_stop_evt
    _exit_stop_evt = threading.Event()
    _exit_thread = threading.Thread(target=_exit_worker, args=(cfg,), daemon=True)
    _exit_thread.start()

def stop_exit_manager():
    global _exit_thread, _exit_stop_evt
    _exit_stop_evt.set()
    if _exit_thread is not None:
        _exit_thread.join(timeout=5)


In [None]:

_last_order_ts = 0.0
def _lat_log(event: str, **kwargs):
    ts = time.perf_counter()
    LATENCY_LOG.append({"t": ts, "event": event, **kwargs})
    if len(LATENCY_LOG) > LATENCY_LOG_MAX:
        del LATENCY_LOG[: len(LATENCY_LOG) - LATENCY_LOG_MAX]


In [None]:

def place_orders(plan: Dict[str, Any], df_enriched: pd.DataFrame, dry_run: bool = True) -> List[Dict[str, Any]]:
    results = []
    if not plan or "legs" not in plan:
        return results
    plan_meta = plan.get("meta", {})

    global _last_order_ts
    for leg in plan["legs"]:
        token = str(leg["token"])
        row = df_enriched.loc[df_enriched["Token"]==token]
        if row.empty:
            results.append({"status":"rejected","reason":"token_not_found","leg":leg}); continue
        row = row.iloc[0]
        lot = int(row.get("lot_size") or 0); qty = int(leg.get("qty", 0))
        if lot and qty % lot != 0:
            results.append({"status":"rejected","reason":f"qty_not_multiple_of_lot({lot})","leg":leg}); continue
        if qty <= 0 or qty > MAX_QTY_PER_LEG:
            results.append({"status":"rejected","reason":"qty_bounds","leg":leg}); continue

        side = leg["side"].upper()
        if side != "BUY":
            results.append({"status":"rejected","reason":"buy_only_guard"}); continue

        product_code = PRODUCT_MAP.get(leg.get("product", DEFAULT_PRODUCT), PRODUCT_MAP[DEFAULT_PRODUCT])
        best_bid = float(row.get("BidP1")) if np.isfinite(row.get("BidP1", np.nan)) else float(row.get("Mid", 0.0))
        best_ask = float(row.get("AskP1")) if np.isfinite(row.get("AskP1", np.nan)) else float(row.get("Mid", 0.0))
        tick = float(row.get("tick_size") or 0.05)

        now = time.perf_counter()
        delay_ms = ORDER_MIN_GAP_MS - (now - _last_order_ts) * 1000.0
        if delay_ms > 0: time.sleep(delay_ms / 1000.0)
        _last_order_ts = time.perf_counter()
        _lat_log("order_send", token=token, side=side, qty=qty, order_type="LIMIT")

        if dry_run or (exec_v3 is None):
            fill = float(row.get("Mid")) if np.isfinite(row.get("Mid", np.nan)) else float(row.get("Ltp", 0.0))
            results.append({"status":"simulated","token":token,"qty":qty,"side":side,"product":product_code,
                            "order_type":"LIMIT","limit_price":fill, "fill_price":fill, "router_meta": plan_meta})
            pos = open_positions.get(token, {"qty": 0, "side": side, "avg_price": 0.0})
            new_qty = pos["qty"] + qty if side == pos["side"] else pos["qty"] - qty
            new_avg = (pos["avg_price"]*pos["qty"] + fill*qty)/max(new_qty,1) if new_qty>0 and side==pos["side"] else fill
            open_positions[token] = {"qty": max(new_qty,0), "side": side, "avg_price": new_avg}
            entry_idx = log_trade_open(token, side, qty, fill, plan_meta, leg.get("_row", {}), order_id=None)
            _lat_log("order_ack", token=token)
        else:
            info = exec_v3.place_order_v3(
                instrument_token=token, side=side, quantity=qty,
                best_bid=best_bid, best_ask=best_ask, tick_size=tick,
                product=PRODUCT_MAP.get(DEFAULT_PRODUCT,"I"), validity="DAY",
                marketable_limit=True, buffer_ticks=LIMIT_BUFFER_TICKS, tag="BUY-ENTRY"
            )
            if not info.get("ok"):
                results.append({"status":"rejected","reason":"v3_place_error","detail":info}); continue
            order_id = info.get("order_id")
            fill = float(row.get("Mid") or row.get("Ltp") or 0.0)
            pos = open_positions.get(token, {"qty": 0, "side": side, "avg_price": 0.0})
            new_qty = pos["qty"] + qty if side == pos["side"] else pos["qty"] - qty
            new_avg = (pos["avg_price"]*pos["qty"] + fill*qty)/max(new_qty,1) if new_qty>0 and side==pos["side"] else fill
            open_positions[token] = {"qty": max(new_qty,0), "side": side, "avg_price": new_avg}
            entry_idx = log_trade_open(token, side, qty, fill, plan_meta, leg.get("_row", {}), order_id=str(order_id))
            if recon is not None and order_id:
                recon.attach_meta(order_id, {"entry_idx": entry_idx, "token": token, "side": side, "qty": qty,
                                             "sent_ts": info.get("sent_ts"), "ack_ts": info.get("ack_ts"), "tag": info.get("tag")})
            # --- Protective SELL stop (SL-M) to close the long ---
            entry_price = float(fill)
            stop_trig = entry_price * (1 - float(TRADE_LOG[entry_idx]["stop_pct"]))
            stop_trig = math.floor(stop_trig / max(tick,1e-6)) * max(tick,1e-6)
            stop_info = exec_v3.place_stop_exit_v3(
                instrument_token=token, exit_side="SELL", trigger_price=stop_trig,
                quantity=qty, product=PRODUCT_MAP.get(DEFAULT_PRODUCT,"I"),
                tag=f"STOP-e{entry_idx}"
            )
            if stop_info.get("ok"):
                TRADE_LOG[entry_idx]["stop_order_id"] = stop_info["order_id"]
                TRADE_LOG[entry_idx]["stop_active"] = True
                if recon is not None:
                    recon.attach_meta(stop_info["order_id"], {"stop_for_entry_idx": entry_idx, "token": token})
            results.append({"status":"placed","order_id":order_id,"token":token,"qty":qty,"side":side,"limit_price":None, "router_meta": plan_meta})
            _lat_log("order_ack", token=token)

    return results


In [None]:

def validate_strategy(plan: Dict[str, Any], df_enriched: pd.DataFrame) -> Optional[Dict[str, Any]]:
    if not plan or not plan.get("legs"): return None
    if len(plan["legs"]) > MAX_OPEN_LEGS: return None
    for leg in plan["legs"]:
        if leg.get("side","BUY").upper() != "BUY": return None  # buy-only guard
        t = str(leg["token"])
        if df_enriched.loc[df_enriched["Token"]==t].empty: return None
    return plan


### HTTP Backfill Daemon (Order Book + Trades + Positions)

In [None]:

BACKFILL_MIN_INTERVAL_S = 90.0
BACKFILL_TRIGGER_NO_WS_S = 15.0
BACKFILL_EOD_GUARD = True
BACKFILL_WINDOW_MIN = 6
UPSTOX_API_BASE = os.getenv("UPSTOX_API_BASE", "https://api.upstox.com/v2")

_last_backfill_ts = 0.0
_backfill_thread = None
_backfill_stop_evt = threading.Event()

def _http_get_json(url: str, params=None, timeout=2.5):
    try:
        import requests
    except Exception as e:
        raise RuntimeError("Install 'requests' for backfill.") from e
    headers = {"Accept": "application/json", "Authorization": f"Bearer {CredentialUpstox.ACCESS_TOKEN}"}
    r = requests.get(url, headers=headers, params=params, timeout=timeout)
    r.raise_for_status()
    return r.json()

def fetch_order_book_today():
    url = f"{UPSTOX_API_BASE}/order/retrieve-all"
    return _http_get_json(url)

def fetch_trades_today():
    url = f"{UPSTOX_API_BASE}/order/trades/get-trades-for-day"
    return _http_get_json(url)

def fetch_positions():
    url = f"{UPSTOX_API_BASE}/portfolio/short-term-positions"
    return _http_get_json(url)

def _parse_epoch_ms(v):
    try: return int(float(v))
    except Exception: return None

def reconcile_via_http():
    global open_positions
    ob = fetch_order_book_today()
    tr = fetch_trades_today()
    pos = fetch_positions()

    trades = tr.get("data") or tr.get("trades") or []
    trades_by_tok_side = {}
    for t in trades:
        tok = str(t.get("instrument_token") or t.get("instrumentKey") or "")
        side = str(t.get("transaction_type") or t.get("transactionType") or "").upper()
        trades_by_tok_side.setdefault((tok, side), []).append({
            "quantity": int(t.get("quantity") or 0),
            "price": float(t.get("average_price") or t.get("price") or 0.0),
            "ts": _parse_epoch_ms(t.get("exchange_timestamp") or t.get("timestamp"))
        })

    def _wap(lst, cap_qty=None):
        if not lst: return None
        num, den = 0.0, 0
        rem = cap_qty if cap_qty is not None else None
        for x in sorted(lst, key=lambda z: z["ts"] or 0):
            q = int(x["quantity"]); p = float(x["price"])
            if rem is None:
                num += p*q; den += q
            else:
                take = min(q, max(rem, 0))
                if take <= 0: break
                num += p*take; den += take; rem -= take
                if rem <= 0: break
        return (num/den) if den>0 else None

    n_entry_adj, n_exit_adj = 0, 0
    for rec in TRADE_LOG:
        tok = str(rec.get("token"))
        side = str(rec.get("side") or "BUY").upper()
        qty  = int(rec.get("qty") or 0)

        ent = _wap(trades_by_tok_side.get((tok, side), []), cap_qty=qty)
        if ent and rec.get("entry_price") != float(ent):
            rec["entry_price"] = float(ent); n_entry_adj += 1

        if rec.get("exit_ts") is not None:
            opp = "SELL"  # closing a buy
            lst = trades_by_tok_side.get((tok, opp), [])
            if not lst: continue
            exit_ts_ms = int(rec["exit_ts"].value/1e6) if hasattr(rec["exit_ts"], "value") else None
            if exit_ts_ms is None: continue
            win_ms = BACKFILL_WINDOW_MIN*60*1000
            filt = [x for x in lst if x["ts"] is not None and abs(x["ts"] - exit_ts_ms) <= win_ms]
            ex_wap = _wap(filt, cap_qty=qty)
            if ex_wap and rec.get("exit_price") != float(ex_wap):
                rec["exit_price"] = float(ex_wap)
                entry = float(rec.get("entry_price") or 0.0)
                pnl_leg = (ex_wap - entry)
                rec["pnl_abs"] = float(pnl_leg)
                rec["pnl_pct"] = float(pnl_leg / entry) if entry else None
                n_exit_adj += 1

    try:
        pdata = pos.get("data") or pos.get("positions") or []
        broker_net = {}
        for p in pdata:
            tk = str(p.get("instrument_token") or p.get("instrumentKey") or "")
            if not tk: continue
            netq = int(p.get("quantity") or p.get("net_qty") or p.get("net_quantity") or 0)
            avgp = float(p.get("average_price") or p.get("avg_price") or 0.0)
            broker_net[tk] = {"qty": netq, "avg_price": avgp}
        for tk, v in broker_net.items():
            side = "BUY" if v["qty"] >= 0 else "SELL"
            open_positions[tk] = {"qty": abs(int(v["qty"])), "side": side, "avg_price": float(v["avg_price"])}
    except Exception as e:
        if VERBOSE_RECON: print("[backfill] positions parse error:", e)

    return {"entry_adjusted": n_entry_adj, "exit_adjusted": n_exit_adj}

def _have_live_things():
    if any((r.get("exit_ts") is None and int(r.get("remaining_qty", r.get("qty",0)))>0) for r in TRADE_LOG):
        return True
    if any(v.get("qty",0)>0 for v in open_positions.values()):
        return True
    return False

def backfill_once():
    _lat_log("backfill_start")
    try:
        info = reconcile_via_http()
        _lat_log("backfill_end", **info)
        if VERBOSE_RECON:
            print(f"[backfill] entries_adj={info['entry_adjusted']} exits_adj={info['exit_adjusted']}")
        return info
    except Exception as e:
        _lat_log("backfill_end", error=str(e))
        if VERBOSE_RECON:
            print("[backfill] ERROR:", e)
        return {"error": str(e)}

def _backfill_worker():
    global _last_backfill_ts
    while not _backfill_stop_evt.is_set():
        try:
            time.sleep(1.0)
            now = time.time()
            if not BACKFILL_ENABLED:
                continue
            if (now - _last_backfill_ts) < BACKFILL_MIN_INTERVAL_S:
                continue
            hhmm = pd.Timestamp.now(tz=IST).strftime("%H:%M")
            eod_guard = BACKFILL_EOD_GUARD and (hhmm >= "15:29" and hhmm <= "15:30")
            stale_ws = (now - ORDER_WS_LAST_TS) >= BACKFILL_TRIGGER_NO_WS_S
            if eod_guard or (stale_ws and _have_live_things()):
                backfill_once()
                _last_backfill_ts = now
        except Exception:
            time.sleep(2.0)
            continue

def start_backfill_daemon():
    global _backfill_thread, _backfill_stop_evt
    _backfill_stop_evt = threading.Event()
    _backfill_thread = threading.Thread(target=_backfill_worker, daemon=True)
    _backfill_thread.start()
    print("Backfill daemon started.")

def stop_backfill_daemon():
    global _backfill_thread, _backfill_stop_evt
    _backfill_stop_evt.set()
    if _backfill_thread is not None:
        _backfill_thread.join(timeout=5)
    print("Backfill daemon stopped.")


In [None]:

if SIMULATION_MODE:
    raise RuntimeError("This notebook is live-first. Set SIMULATION_MODE=False to proceed.")

tokens_with_spot = list(dict.fromkeys(token_list + [UNDERLYING_SPOT_TOKEN]))
streamer = start_live_stream(tokens_with_spot, mode=WEBSOCKET_MODE)
time.sleep(3.0)

with _df_lock:
    snapshot = df_feed_enriched.copy()
print("Feed snapshot rows:", len(snapshot))
display(snapshot.tail(6))

try:
    if RECONCILE_LIVE_PNL: start_reconciler()
except Exception as _e:
    print("Reconciler start skipped:", _e)

try:
    if BACKFILL_ENABLED: start_backfill_daemon()
except Exception as _e:
    print("Backfill daemon start skipped:", _e)

def latency_report(n_tail: int = 50) -> pd.DataFrame:
    return pd.DataFrame(LATENCY_LOG[-n_tail:])

def router_report(n_tail: int = 200):
    tail = ROUTER_LOG[-n_tail:]
    df = pd.DataFrame(tail)
    if df.empty: return df, {}
    agree = df["final_decision"].mean()
    used_ollama_rate = df["used_ollama"].mean()
    avg_stub = df["stub_ms"].mean()
    avg_ollama = df.loc[df["used_ollama"], "ollama_ms"].mean() if (df["used_ollama"].any()) else float("nan")
    routes = df["route"].value_counts().to_dict()
    summary = {"n": int(len(df)), "decision_rate": float(round(agree, 4)), "used_ollama_rate": float(round(used_ollama_rate, 4)),
               "avg_stub_ms": float(round(avg_stub, 2)), "avg_ollama_ms": float(round(avg_ollama, 2)) if avg_ollama == avg_ollama else None, "routes": routes}
    return df, summary

_lat_log("decision_start", rows=len(snapshot))
plan = ask_llm_for_strategy_buy_only(snapshot)
_lat_log("decision_end", legs=len(plan.get("legs", [])))

try:
    _lat_log("validate_start")
    validated = validate_strategy(plan, snapshot)
    _lat_log("validate_end", ok=True)
except Exception as e:
    validated = None; _lat_log("validate_end", ok=False, err=str(e)); print("Validation failed:", e)

orders = []
if validated:
    _lat_log("place_start", nlegs=len(validated["legs"]))
    orders = place_orders(validated, snapshot, dry_run=(not ORDERS_LIVE))
    _lat_log("place_end", norders=len(orders))
    print("Order results:"); print(json.dumps(orders, indent=2))

cfg = ExitConfig(dry_run=(not EXIT_MANAGER_LIVE))
start_exit_manager(cfg)

_recenter_thread = threading.Thread(target=recenter_daemon, daemon=True); _recenter_thread.start()

print("Live scalper pipeline running. Use the Stop cell to close sockets and exit manager.")


In [None]:

with _df_lock:
    mon = df_feed_enriched.copy()
display(mon.tail(12))
df_router, router_summary = router_report(500)
display(df_router.tail(10))
print("Router summary:", router_summary)


In [None]:

def _time_bucket(ts):
    if pd.isna(ts): return "unknown"
    t = ts.tz_convert(IST).time() if ts.tzinfo else ts.tz_localize(IST).time()
    if t >= pd.to_datetime("09:15").time() and t < pd.to_datetime("10:00").time():
        return "open"
    if t >= pd.to_datetime("14:30").time() and t <= pd.to_datetime("15:30").time():
        return "close"
    return "mid"

def _iv_regime(z):
    if pd.isna(z): return "unknown"
    a = abs(float(z))
    if a <= 1.0: return "stable"
    if a <= 2.0: return "elevated"
    return "spike"

def router_performance_report():
    df = pd.DataFrame([r for r in TRADE_LOG if r.get("exit_ts") is not None])
    if df.empty:
        return df, {}, pd.DataFrame(), {}
    df["hit"] = (df["exit_reason"]=="target").astype(int)
    df["pnl_pct"] = df["pnl_pct"].astype(float)
    grp = df.groupby("router_route", dropna=False)

    def _agg_expect(g):
        win = g[g["pnl_pct"]>0]["pnl_pct"].mean()
        loss = (-g[g["pnl_pct"]<0]["pnl_pct"]).mean()
        win_rate = (g["pnl_pct"]>0).mean()
        exp = g["pnl_pct"].mean()
        return pd.Series({
            "n": len(g),
            "hit_rate": g["hit"].mean(),
            "win_rate": win_rate,
            "avg_win_pct": win if pd.notna(win) else 0.0,
            "avg_loss_pct": loss if pd.notna(loss) else 0.0,
            "expectancy_pct": exp,
            "volatility_pct": g["pnl_pct"].std(),
            "avg_hold_s": g["hold_s"].mean()
        })

    perf = grp.apply(_agg_expect).reset_index()
    return df, perf, df.sort_values("exit_ts"), {}

df_trades, perf, _, _ = router_performance_report()
display(df_trades.tail(10))
print("Performance by route:"); display(perf)


In [None]:

# Stop & cleanup
try:
    stop_exit_manager()
except Exception: pass
try:
    stop_live_stream()
except Exception: pass
try:
    stop_backfill_daemon()
except Exception: pass
print("Shutdown requested.")



# --- v4.1 PATCHES (Buy-only) ---
# This cell and the following override/extend earlier definitions to improve:
# - Stop escalation correctness (modify vs convert; no unconditional widen)
# - Dynamic SL offsets (latency EWM + ticks-based micro-vol)
# - Unified V3 order wrappers for SL and SL-M + latency logging
# - DepthImb smoothing + persistence; microprice drift and spot micro-momentum
# - Delta/spread gates adapt to regime; trading window guard + per-token cooldown
# - Reconciler status set, partial stop fill tracking, meta on conversion


In [None]:

# === v4.1 toggles & params ===
TRADING_WINDOW = ("09:18","15:20")     # IST trading window for new entries
PER_TOKEN_COOLDOWN_S = 90.0            # cooldown after an exit before re-entry on same token
USE_DEPTH_IMB_PERSIST = True
DEPTH_IMB_MIN_TICKS = 3                # require this many consecutive ticks of directional imbalance
MIN_GAMMA_EFF = 0.0                    # optional floor on Gamma * |spot_slope|
MAX_ABS_THETA = None                   # e.g., 30.0 to skip highly negative theta; set None to disable

EOD_FLATTEN_ENABLED = True
EOD_FLATTEN_AT = "15:24"               # force exits at 15:24 IST if any qty remains
BROKER_FEE_PER_LOT = 0.0               # for simulation adjustments (if SIM mode used)


In [None]:

import json, time, math, threading, numpy as np, pandas as pd
from collections import deque

def log_event(**kv):
    try:
        kv["ts"] = float(time.time())
        with open("/mnt/data/trade_events.jsonl","a") as f:
            f.write(json.dumps(kv, default=str) + "\n")
    except Exception:
        pass

def _in_trading_window():
    try:
        now = pd.Timestamp.now(tz=IST).strftime("%H:%M")
        return TRADING_WINDOW[0] <= now <= TRADING_WINDOW[1]
    except Exception:
        return True

def _last_exit_ts_for(token: str):
    # Search TRADE_LOG from end for last exit on token
    for rec in reversed(TRADE_LOG):
        if rec.get("token")==token and rec.get("exit_ts") is not None:
            try:
                return rec["exit_ts"]
            except Exception:
                return None
    return None


In [None]:

def _recent_latency_ms(default=60.0):
    try:
        df = pd.DataFrame(LATENCY_LOG)
        s = df.loc[df["event"]=="broker_latency_ms","ms"].tail(200)
        if s.empty: return float(default)
        return float(s.ewm(alpha=0.2).mean().iloc[-1])
    except Exception:
        return float(default)


In [None]:

# --- UpstoxV3Exec v4.1 extensions ---
try:
    _UpExec = UpstoxV3Exec
except NameError:
    _UpExec = None

if _UpExec is not None:
    # add modify
    def _modify_order_v3(self, order_id: str, *, price: float = None, trigger_price: float = None,
                         order_type: str = None, validity: str = None, quantity: int = None) -> dict:
        try:
            req = upstox_client.ModifyOrderV3Request(
                order_id=order_id,
                price=float(price) if price is not None else None,
                trigger_price=float(trigger_price) if trigger_price is not None else None,
                order_type=order_type, validity=validity,
                quantity=int(quantity) if quantity is not None else None
            )
            t0 = self._now_ms()
            resp = self.order_api_v3.modify_order(req)
            t1 = self._ow_ms() if hasattr(self, "_ow_ms") else self._now_ms()
            return {"ok": True, "order_id": order_id, "broker_latency_ms": (t1 - t0)}
        except Exception as e:
            return {"ok": False, "error": str(e), "order_id": order_id}

    UpstoxV3Exec.modify_order_v3 = _modify_order_v3

    # add place stop-limit
    def _place_stop_limit_exit_v3(self, *, instrument_token: str, exit_side: str, trigger_price: float,
                                  limit_price: float, quantity: int, product: str = "I", validity: str = "DAY",
                                  tag: str = None) -> dict:
        req = upstox_client.PlaceOrderV3Request(
            quantity=int(quantity), product=product, validity=validity, order_type='SL',
            price=float(limit_price), trigger_price=float(trigger_price),
            instrument_token=instrument_token, transaction_type=exit_side.upper(), tag=(tag or f"prot-sl-{uuid.uuid4().hex[:8]}"),
            slice=False, is_amo=False
        )
        t0 = self._now_ms()
        try:
            resp = self.order_api_v3.place_order(req); t1 = self._now_ms()
            oid = (resp.data or {}).get("order_id") if hasattr(resp,"data") else None
            info = {"ok": True, "order_id": oid, "sent_ts": t0, "ack_ts": t1, "broker_latency_ms": (t1 - t0)}
            LATENCY_LOG.append({"t": time.perf_counter(), "event":"broker_latency_ms", "ms": info["broker_latency_ms"]})
            return info
        except ApiException as e:
            return {"ok": False, "error": str(e), "sent_ts": t0}

    UpstoxV3Exec.place_stop_limit_exit_v3 = _place_stop_limit_exit_v3

    # wrap place_order_v3 to log broker latency
    try:
        _orig_place = UpstoxV3Exec.place_order_v3
        def _place_order_v3_with_lat(self, *args, **kwargs):
            info = _orig_place(self, *args, **kwargs)
            try:
                if info and isinstance(info, dict) and "broker_latency_ms" in info:
                    LATENCY_LOG.append({"t": time.perf_counter(), "event":"broker_latency_ms", "ms": info["broker_latency_ms"]})
            except Exception:
                pass
            return info
        UpstoxV3Exec.place_order_v3 = _place_order_v3_with_lat
    except Exception:
        pass


In [None]:

# --- Microstructure analytics (v4.1) ---
from collections import defaultdict, deque
_tick_var = defaultdict(lambda: {"m_prev": None, "var": 1.0})
_imb_ewm = defaultdict(float)
_imb_persist = defaultdict(int)
_micro_prev = defaultdict(lambda: None)

SPOT_BUF = deque(maxlen=64)  # (ts, ltp)

def update_tick_vol(token: str, mid: float, tick: float):
    if not np.isfinite(mid): return
    s = _tick_var[token]
    if s["m_prev"] is not None:
        d = (mid - s["m_prev"]) / max(tick,1e-6)
        s["var"] = 0.9*s["var"] + 0.1*(d*d)
    s["m_prev"] = mid

def vol_ticks_for(token: str) -> float:
    return math.sqrt(max(_tick_var[token]["var"], 1e-6))

def smooth_imb(token: str, raw: float) -> float:
    v = _imb_ewm[token]
    if np.isfinite(raw):
        v = 0.8*v + 0.2*raw
        _imb_ewm[token] = v
    return _imb_ewm[token]

def update_persist(token: str, imb_ewm: float, side: str):
    want = (imb_ewm >= +DEPTH_IMB_MIN) if side=="CE" else (imb_ewm <= -DEPTH_IMB_MIN)
    _imb_persist[token] = (_imb_persist[token] + 1) if want else 0

def microprice(bidp, bidq, askp, askq):
    try:
        qsum = float(bidq) + float(askq)
        if qsum <= 0: return float("nan")
        return (askp*bidq + bidp*askq) / qsum
    except Exception:
        return float("nan")

def spot_slope_and_r2():
    # simple OLS slope on last few points
    if len(SPOT_BUF) < 6: return 0.0, 0.0
    xs = np.arange(len(SPOT_BUF), dtype=float)
    ys = np.array([p[1] for p in SPOT_BUF], dtype=float)
    x = xs - xs.mean(); y = ys - ys.mean()
    denom = (x**2).sum()
    if denom <= 0: return 0.0, 0.0
    slope = (x*y).sum() / denom
    yhat = x*slope
    ss_tot = (y**2).sum(); ss_res = ((y - yhat)**2).sum()
    r2 = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0
    return float(slope), float(r2)


In [None]:

# --- Override start_live_stream with v4.1 analytics & faster updates ---
def start_live_stream(tokens: list, mode: str = "full_d30"):
    global live_streamer, df_feed, df_feed_enriched, spot_ltp_current
    if not UPSDK_AVAILABLE: raise RuntimeError("Upstox SDK not installed.")
    if not CredentialUpstox.ACCESS_TOKEN: raise RuntimeError("ACCESS_TOKEN missing.")

    configuration = upstox_client.Configuration()
    configuration.access_token = CredentialUpstox.ACCESS_TOKEN
    api_client = upstox_client.ApiClient(configuration)
    streamer = upstox_client.MarketDataStreamerV3(api_client, instrument_key=tokens, mode=mode)

    feed_index = {}  # token -> row index for df_feed

    def _on_message(msg):
        global df_feed, df_feed_enriched, spot_ltp_current
        feeds = msg.get("feeds", {})
        for token, payload in feeds.items():
            ff = payload.get("fullFeed",{}).get("marketFF",{})
            ltpc = ff.get("ltpc",{})
            level = ff.get("marketLevel",{}).get("bidAskQuote",[{}])
            greeks = ff.get("optionGreeks",{}) or {}
            if token == UNDERLYING_SPOT_TOKEN:
                try:
                    if ltpc.get("ltp") is not None:
                        spot_ltp_current = float(ltpc.get("ltp"))
                        SPOT_BUF.append((time.time(), spot_ltp_current))
                except Exception: pass
                continue
            # build row
            bidp = float(level[0].get("bidP")) if level and level[0].get("bidP") is not None else np.nan
            bidq = float(level[0].get("bidQ")) if level and level[0].get("bidQ") is not None else np.nan
            askp = float(level[0].get("askP")) if level and level[0].get("askP") is not None else np.nan
            askq = float(level[0].get("askQ")) if level and level[0].get("askQ") is not None else np.nan
            mid = (bidp + askp)/2.0 if np.isfinite(bidp) and np.isfinite(askp) else (float(ltpc.get("ltp")) if ltpc.get("ltp") is not None else np.nan)
            raw_imb = ((bidq - askq) / (bidq + askq)) if (np.isfinite(bidq) and np.isfinite(askq) and (bidq+askq)>0) else np.nan
            imb_ewm = smooth_imb(token, raw_imb)
            mic = microprice(bidp, bidq, askp, askq)

            row = {
                "Token": token,
                "Ltp": float(ltpc.get("ltp")) if ltpc.get("ltp") is not None else np.nan,
                "Ltq": float(ltpc.get("ltq")) if ltpc.get("ltq") is not None else np.nan,
                "Cp": float(ltpc.get("cp")) if ltpc.get("cp") is not None else np.nan,
                "BidP1": bidp, "BidQ1": bidq, "AskP1": askp, "AskQ1": askq,
                "Ltt": to_ist_ms(ltpc.get("ltt")),
                "Oi": float(ff.get("oi")) if ff.get("oi") is not None else np.nan,
                "Iv": float(ff.get("iv")) if ff.get("iv") is not None else np.nan,
                "Atp": float(ff.get("atp")) if ff.get("atp") is not None else np.nan,
                "Tbq": float(ff.get("tbq")) if ff.get("tbq") is not None else np.nan,
                "Tsq": float(ff.get("tsq")) if ff.get("tsq") is not None else np.nan,
                "Delta": float(greeks.get("delta")) if greeks.get("delta") is not None else np.nan,
                "Theta": float(greeks.get("theta")) if greeks.get("theta") is not None else np.nan,
                "Gamma": float(greeks.get("gamma")) if greeks.get("gamma") is not None else np.nan,
                "Vega":  float(greeks.get("vega"))  if greeks.get("vega")  is not None else np.nan,
                "Rho":   float(greeks.get("rho"))   if greeks.get("rho")   is not None else np.nan,
                "Mid": mid, "DepthImb": raw_imb, "DepthImbEWM": imb_ewm, "MicroP": mic
            }
            with _df_lock:
                if token in feed_index:
                    idx = feed_index[token]
                    for k,v in row.items(): df_feed.at[idx, k] = v
                else:
                    idx = len(df_feed)
                    df_feed = pd.concat([df_feed, pd.DataFrame([row])], ignore_index=True)
                    feed_index[token] = idx
                # enrich
                df_loc = df_feed  # already has Mid/Spread etc from previous enrich; recompute spread quickly
                if np.isfinite(bidp) and np.isfinite(askp):
                    df_loc.at[idx,"Spread"] = (askp - bidp)
                # attach meta columns from chain on full rebuild periodically to avoid heavy merge each tick
                # cheap projection for now:
                df_feed_enriched = enrich_feed(df_feed, df_chain)
                # update vol ticks
                try:
                    tick = float(df_feed_enriched.loc[df_feed_enriched["Token"]==token, "tick_size"].iloc[0])
                except Exception:
                    tick = 0.05
                update_tick_vol(token, mid, tick)

    streamer.on_message = _on_message
    streamer.on_open = lambda: print("Market WS opened (v4.1)")
    streamer.on_error = lambda e: print("Market WS error:", e)
    streamer.on_close = lambda: print("Market WS closed")
    streamer.connect()
    live_streamer = streamer
    return streamer


In [None]:

def ask_llm_for_strategy_buy_only(df_snapshot: pd.DataFrame) -> Dict[str, Any]:
    if df_snapshot.empty or not _in_trading_window():
        return {"legs": [], "meta": {"reason": "empty_or_outside_window"}}

    cols = ["Mid","strike_price","instrument_type","Delta","Spread","tick_size","DepthImbEWM","Token","lot_size",
            "BidP1","BidQ1","AskP1","AskQ1","Gamma","Theta"]
    miss = [c for c in cols if c not in df_snapshot.columns]
    if miss:
        return {"legs": [], "meta": {"reason": f"missing_columns:{miss}"}}

    # compute spot slope & r2 and adapt delta band
    slope, r2 = spot_slope_and_r2()
    if abs(slope) > 0 and r2 >= 0.35:
        dmin, dmax = 0.45, 0.60   # trending → slightly higher delta
    else:
        dmin, dmax = DELTA_MIN, DELTA_MAX

    snap = df_snapshot.dropna(subset=["Mid","strike_price","instrument_type","Delta"]).copy()
    if snap.empty:
        return {"legs": [], "meta": {"reason": "no_valid_rows"}}

    snap["absDeltaGap"] = (snap["Delta"].abs() - 0.50).abs()

    def _spread_caps(token: str):
        vt = vol_ticks_for(token)
        cap_ticks = min(5, max(2, round(0.6 * vt)))
        cap_pct = 0.004 if vt < 3 else 0.006
        return cap_ticks, cap_pct

    def _spread_ok(r):
        tick = float(r.get("tick_size") or 0.05)
        spread = float(r.get("Spread") or np.inf)
        mid = float(r.get("Mid") or np.nan)
        cap_ticks, cap_pct = _spread_caps(r["Token"])
        ticks_spread = spread / max(tick,1e-6)
        rel_spread = spread / max(mid,1e-9) if np.isfinite(mid) else np.inf
        return (ticks_spread <= cap_ticks) and (rel_spread <= cap_pct)

    def _ok_buy_row(r):
        token = r["Token"]
        if not _spread_ok(r): return False
        if not (dmin <= abs(float(r["Delta"])) <= dmax): return False
        imb = float(r.get("DepthImbEWM") or 0.0)
        # persistence
        pers_ok = True
        if USE_DEPTH_IMB_PERSIST:
            pers = _imb_persist.get(token, 0)
            side = r["instrument_type"]
            # update counter by latest imb (approx); require minimum
            want = (imb >= +DEPTH_IMB_MIN) if side=="CE" else (imb <= -DEPTH_IMB_MIN)
            if not want or pers < DEPTH_IMB_MIN_TICKS:
                pers_ok = False
        if not pers_ok: return False
        # Gamma/Theta consideration (optional)
        if MAX_ABS_THETA is not None:
            th = abs(float(r.get("Theta") or 0.0))
            if th > MAX_ABS_THETA: return False
        if MIN_GAMMA_EFF > 0.0:
            geff = abs(float(r.get("Gamma") or 0.0)) * abs(slope)
            if geff < MIN_GAMMA_EFF: return False
        # Cooldown after last exit
        lastx = _last_exit_ts_for(token)
        if lastx is not None:
            try:
                if (pd.Timestamp.now(tz=IST) - lastx).total_seconds() < PER_TOKEN_COOLDOWN_S:
                    return False
            except Exception:
                pass
        # IV z‑gate remains (reuse existing function)
        z, iv_ok = iv_zscore_for(token, float(r.get("Iv") or np.nan))
        return iv_ok

    ce = snap[(snap["instrument_type"]=="CE")]
    pe = snap[(snap["instrument_type"]=="PE")]
    ce_ok = ce[ce.apply(_ok_buy_row, axis=1)].sort_values(["absDeltaGap","Spread"]).head(1)
    pe_ok = pe[pe.apply(_ok_buy_row, axis=1)].sort_values(["absDeltaGap","Spread"]).head(1)

    pick = None
    if not ce_ok.empty and not pe_ok.empty:
        # choose by stronger absolute imbalance and microprice drift
        def _score(r):
            imb = abs(float(r["DepthImbEWM"]))
            bq, aq = float(r.get("BidQ1") or 0.0), float(r.get("AskQ1") or 0.0)
            mp = microprice(float(r.get("BidP1") or 0.0), bq, float(r.get("AskP1") or 0.0), aq)
            return imb + (0.01 if np.isfinite(mp) else 0.0)
        r1, r2 = ce_ok.iloc[0], pe_ok.iloc[0]
        pick = r1 if _score(r1) >= _score(r2) else r2
    elif not ce_ok.empty:
        pick = ce_ok.iloc[0]
    elif not pe_ok.empty:
        pick = pe_ok.iloc[0]
    else:
        return {"legs": [], "meta": {"reason": "no_candidate_passed_filters"}}

    lot = int(pick.get("lot_size") or 50)
    leg = {"token": str(pick["Token"]), "side": "BUY", "qty": lot, "product": DEFAULT_PRODUCT, "order_type": "LIMIT", "_row": pick.to_dict()}
    meta = {"mode": "long_only", "picked": pick["instrument_type"], "score": 0.60, "dmin": dmin, "dmax": dmax, "slope": slope, "r2": r2}

    # Optional router scoring (unchanged)
    context = {"spot": float(spot_ltp_current), "row": _compact_row(pick), "rules": {"delta":[dmin, dmax], "ivZmax":IV_Z_MAX}}
    score, router_meta = score_with_router_and_meta_long_only(context)
    meta.update({"score": score, **router_meta})

    if USE_LLM and (score < LLM_SCORE_THRESHOLD):
        return {"legs": [], "meta": meta}
    return {"legs": [leg], "meta": meta}


In [None]:

# --- Override compute_dynamic_stop to use EWM latency and ticks-based vol ---
def compute_dynamic_stop(entry_price: float, side: str, tick: float, spread: float, bidq1: float,
                         iv_z: float, latency_ms: float, stop_pct: float, token: str = None):
    trigger = entry_price * (1 - stop_pct)
    ticks_spread = spread / max(tick, 1e-6)
    vt = vol_ticks_for(token) if token else 1.0
    lat_ticks = max(0.0, _recent_latency_ms(latency_ms) / 25.0)
    offset_ticks = int(max(1, round(0.5 * ticks_spread + 0.4 * vt + 0.1 * lat_ticks)))
    limit = trigger - offset_ticks * tick
    limit = math.floor(limit / max(tick,1e-6)) * max(tick,1e-6)
    use_sl = (spread <= 0.30) and (bidq1 >= 1) and (abs(iv_z) <= 2.0)
    return use_sl, float(trigger), float(limit), {"offset_ticks": offset_ticks, "vt": vt}


In [None]:

def _place_protective_stop_for_entry(entry_idx: int, token: str, qty: int, entry_price: float, row: pd.Series):
    tick = float(row.get("tick_size") or 0.05)
    spread = float(row.get("Spread") or 0.0)
    bidq1 = float(row.get("BidQ1") or 0.0)
    iv = float(row.get("Iv") or np.nan)
    iv_z, _ok = iv_zscore_for(token, iv)
    lat_ms = _recent_latency_ms(60.0)
    stop_pct = float(TRADE_LOG[entry_idx]["stop_pct"])
    use_sl, trig, limit, meta = compute_dynamic_stop(entry_price, "BUY", tick, spread, bidq1, iv_z, lat_ms, stop_pct, token=token)

    if not USE_SL_PROTECT:
        use_sl = False
    if exec_v3 is None:
        return {"ok": False, "error": "exec_v3 not initialized"}

    if use_sl:
        info = exec_v3.place_stop_limit_exit_v3(instrument_token=token, exit_side="SELL", trigger_price=trig,
                                                limit_price=limit, quantity=qty, product=PRODUCT_MAP.get(DEFAULT_PRODUCT,"I"),
                                                tag=f"STOP-L-e{entry_idx}")
        if info.get("ok"):
            oid = info.get("order_id")
            TRADE_LOG[entry_idx]["stop_order_id"] = oid
            TRADE_LOG[entry_idx]["stop_active"] = True
            TRADE_LOG[entry_idx]["stop_mode"] = "SL"
            if recon is not None:
                recon.attach_meta(oid, {"stop_for_entry_idx": entry_idx, "token": token, "planned_qty": qty})
            register_stop_sm(oid, token, entry_idx, limit, trig, tick)
            return {"ok": True, "order_id": oid, "mode": "SL", **meta}
        # fallthrough to SL-M if placement failed

    info = exec_v3.place_stop_exit_v3(instrument_token=token, exit_side="SELL", trigger_price=trig,
                                      quantity=qty, product=PRODUCT_MAP.get(DEFAULT_PRODUCT,"I"),
                                      tag=f"STOP-e{entry_idx}")
    if info.get("ok"):
        TRADE_LOG[entry_idx]["stop_order_id"] = info["order_id"]
        TRADE_LOG[entry_idx]["stop_active"] = True
        TRADE_LOG[entry_idx]["stop_mode"] = "SL-M"
        if recon is not None:
            recon.attach_meta(info["order_id"], {"stop_for_entry_idx": entry_idx, "token": token, "planned_qty": qty})
        register_stop_sm(info["order_id"], token, entry_idx, limit=trig, trigger=trig, tick=tick)
        STOP_SM[info["order_id"]]["active"] = False
    return info


In [None]:

def _stop_escalator_worker():
    while not _stop_escalate_evt.is_set():
        time.sleep(0.1)
        now = time.perf_counter()
        for oid in list(STOP_SM.keys()):
            st = STOP_SM.get(oid)
            if not st or not st.get("active"):
                continue
            # planned and filled qty from TRADE_LOG if available
            eidx = st.get("entry_idx")
            planned = int(TRADE_LOG[eidx]["qty"]) if (eidx is not None and eidx < len(TRADE_LOG)) else None
            filled = int(TRADE_LOG[eidx].get("exit_filled_qty") or 0) if (eidx is not None and eidx < len(TRADE_LOG)) else 0
            if planned is not None and filled >= planned:
                st["active"] = False; continue

            if (now - st["placed_ts"]) * 1000.0 < SL_GRACE_MS:
                continue
            best_bid = _best_bid_for(st["token"])
            if best_bid is None:
                continue
            move_away = (best_bid < (st["limit"] - st["tick"]))  # for SELL, worse is below limit
            if move_away and st["widens"] < SL_MAX_WIDENS:
                new_limit = st["limit"] - SL_WIDEN_TICKS * st["tick"]
                ok = exec_v3.modify_order_v3(oid, price=new_limit).get("ok", False) if exec_v3 is not None else False
                if ok:
                    st["limit"] = new_limit
                    st["widens"] += 1
                    st["placed_ts"] = now
                    if VERBOSE_RECON:
                        print(f"[stop-sm] widened {oid} -> {new_limit:.2f} widens={st['widens']}")
                    continue
            # convert to SL-M as last resort
            if exec_v3 is not None:
                try:
                    exec_v3.cancel_order_v3(oid)
                except Exception:
                    pass
                info = exec_v3.place_stop_exit_v3(instrument_token=st["token"], exit_side="SELL",
                                                  trigger_price=st["trigger"], quantity=int(TRADE_LOG[st["entry_idx"]]["qty"]),
                                                  product=PRODUCT_MAP.get(DEFAULT_PRODUCT,"I"),
                                                  tag=f"STOP-SLM-e{st['entry_idx']}")
                if info.get("ok"):
                    new_oid = info.get("order_id")
                    STOP_SM[new_oid] = dict(STOP_SM[oid], placed_ts=time.perf_counter(), widens=0)
                    STOP_SM[oid]["active"] = False
                    try:
                        TRADE_LOG[eidx]["stop_converted_to_slm"] = True
                        TRADE_LOG[eidx]["stop_mode"] = "SL→SL-M"
                    except Exception:
                        pass
                    if recon is not None:
                        recon.attach_meta(new_oid, {"stop_for_entry_idx": eidx, "token": st["token"]})
                    if VERBOSE_RECON:
                        print(f"[stop-sm] converted {oid} to SL-M")
                else:
                    st["active"] = False
            else:
                st["active"] = False


In [None]:

# --- Reconciler v4.1: broader statuses, partial stop tracking, pre-cancel on target ---
_terminal_filled = {"complete","completed","filled","fully_filled"}
_terminal_ended  = _terminal_filled | {"cancelled","rejected"}

def _recon_on_message_v41(self, message):
    global ORDER_WS_LAST_TS
    ORDER_WS_LAST_TS = time.time()

    upd = self._extract_update(message)
    oid = upd.get('order_id')
    if not oid:
        if VERBOSE_RECON: print("[recon] no order_id:", message)
        return

    cur_qty = int(upd.get('filled_quantity') or 0)
    cur_avg = float(upd.get('average_price') or 0.0)
    cur_val = cur_qty * cur_avg
    prev_qty = self.last_seen_fill_qty[oid]
    prev_val = self.last_seen_fill_val[oid]
    d_qty = max(cur_qty - prev_qty, 0)
    d_val = max(cur_val - prev_val, 0.0)
    self.last_seen_fill_qty[oid] = cur_qty
    self.last_seen_fill_val[oid] = cur_val

    self._append_fill_log(upd)
    if d_qty > 0 and VERBOSE_RECON:
        print(f"[recon] FILL {oid} Δqty={d_qty} cum={cur_qty} @ {cur_avg}")

    meta = self.order_meta.get(oid, {})
    eidx = meta.get("exit_of_entry_idx")
    sidx = meta.get("stop_for_entry_idx")
    status = (upd.get("status") or "").lower()

    # Track partial stop fills against planned qty
    if sidx is not None:
        try:
            TRADE_LOG[sidx]["exit_filled_qty"] = int(cur_qty)
        except Exception:
            pass
        if status in _terminal_filled:
            try:
                rec = TRADE_LOG[sidx]
                avg_px = float(upd.get("average_price") or rec.get("exit_price") or 0.0)
                rec["exit_reason"] = rec.get("exit_reason") or "stop"
                rec["exit_price"] = float(avg_px) if avg_px else rec.get("exit_price")
                rec["exit_ts"] = pd.Timestamp.now(tz=IST)
                entry = float(rec.get("entry_price") or 0.0)
                side  = str(rec.get("side") or "BUY").upper()
                pnl_leg = (avg_px - entry) if side=="BUY" else (entry - avg_px)
                rec["pnl_abs"] = float(pnl_leg); rec["pnl_pct"] = float(pnl_leg / entry) if entry else rec.get("pnl_pct")
                rec["remaining_qty"] = 0; rec["stop_active"] = False
                stk = TRADE_OPEN_STACK.get(rec["token"], [])
                if stk and sidx in stk:
                    try: stk.remove(sidx)
                    except ValueError: pass
                # mark STOP_SM inactive
                if oid in STOP_SM: STOP_SM[oid]["active"] = False
            except Exception as e:
                if VERBOSE_RECON: print("[recon] stop finalize error:", e)

    # Exit WAP aggregation for target/other exits
    if eidx is not None:
        agg = self.exit_wap[eidx]
        ostate = agg["orders"].setdefault(oid, {"cum_qty": 0, "cum_val": 0.0})
        ostate["cum_qty"] = cur_qty; ostate["cum_val"] = cur_val
        if d_qty > 0 and d_val > 0:
            agg["num"] += d_val; agg["den"] += d_qty
            exit_wap = agg["num"] / max(agg["den"], 1)
            try:
                rec = TRADE_LOG[eidx]
                rec["exit_price"] = float(exit_wap)
                rec["exit_filled_qty"] = int(agg["den"])
                entry = float(rec.get("entry_price") or 0.0)
                side  = str(rec.get("side") or "SELL").upper()
                pnl_leg = (exit_wap - entry) if side=="BUY" else (entry - exit_wap)
                rec["pnl_abs"] = float(pnl_leg); rec["pnl_pct"] = float(pnl_leg / entry) if entry else None
            except Exception as e:
                if VERBOSE_RECON: print("[recon] ledger update error:", e)

        if status in _terminal_ended:
            try:
                rec = TRADE_LOG[eidx]; rec["exit_fill_ts"] = pd.Timestamp.now(tz=IST)
                # Cancel protective stop if still active
                stop_id = rec.get("stop_order_id")
                if stop_id and rec.get("stop_active"):
                    if 'exec_v3' in globals() and exec_v3 is not None:
                        exec_v3.cancel_order_v3(stop_id)
                    rec["stop_active"] = False
                    if stop_id in STOP_SM: STOP_SM[stop_id]["active"] = False
            except Exception as e:
                if VERBOSE_RECON: print("[recon] stop cancel error:", e)

# Monkey patch class method
if 'UpstoxPortfolioReconciler' in globals():
    UpstoxPortfolioReconciler._on_message = _recon_on_message_v41


In [None]:

def _flatten_all_positions_eod():
    if exec_v3 is None: 
        print("[EOD] exec_v3 not init; skip flatten"); 
        return
    with _df_lock:
        snapshot = df_feed_enriched.copy()
    for token, pos in list(open_positions.items()):
        q = int(pos.get("qty") or 0)
        if q <= 0: continue
        row = snapshot.loc[snapshot["Token"]==token]
        if row.empty: continue
        row = row.iloc[0]
        best_bid = float(row.get("BidP1") or row.get("Mid") or 0.0)
        best_ask = float(row.get("AskP1") or row.get("Mid") or 0.0)
        tick = float(row.get("tick_size") or 0.05)
        info = exec_v3.place_order_v3(instrument_token=token, side="SELL", quantity=q,
                                      best_bid=best_bid, best_ask=best_ask, tick_size=tick,
                                      product=PRODUCT_MAP.get(DEFAULT_PRODUCT,"I"), marketable_limit=True,
                                      buffer_ticks=LIMIT_BUFFER_TICKS, tag="EOD-FLAT")
        print("[EOD] flatten sent:", token, info)

# Hook into backfill worker timing by redefining _backfill_worker with EOD flatten gate
try:
    _orig_backfill_worker = _backfill_worker
    def _backfill_worker():
        global _last_backfill_ts
        while not _backfill_stop_evt.is_set():
            time.sleep(1.0)
            now = time.time()
            # EOD flatten
            try:
                hhmm = pd.Timestamp.now(tz=IST).strftime("%H:%M")
                if EOD_FLATTEN_ENABLED and hhmm == EOD_FLATTEN_AT and _have_live_things():
                    _flatten_all_positions_eod()
            except Exception:
                pass
            if not BACKFILL_ENABLED: continue
            if (now - _last_backfill_ts) < BACKFILL_MIN_INTERVAL_S: continue
            eod_guard = (hhmm >= "15:29" and hhmm <= "15:30")
            stale_ws = (now - ORDER_WS_LAST_TS) >= BACKFILL_TRIGGER_NO_WS_S
            if eod_guard or (stale_ws and _have_live_things()):
                backfill_once(); _last_backfill_ts = now
except Exception:
    pass
