In [None]:
import pandas as pd
import numpy as np
from collections import defaultdict, Counter
from datetime import timedelta

# ============================================================
# LOAD CSV
# ============================================================
df = pd.read_csv("flattened_snapshots.csv", low_memory=False, parse_dates=['LTT'])

# Detect which premium column exists
prev_p = "Previous_Call_Premium" if "Previous_Call_Premium" in df else "Previous_Call_ltp"
curr_p = "Current_Call_Premium"  if "Current_Call_Premium"  in df else "Current_Call_ltp"
next_p = "Next_Call_Premium"     if "Next_Call_Premium"     in df else "Next_Call_ltp"

prev_str, curr_str, next_str = "Previous_Strikeprice","Current_Strikeprice","Next_Strikeprice"

# ============================================================
# BUILD STRIKE → (timestamp, premium) TIMESERIES
# ============================================================
strike_series = defaultdict(list)

for _, row in df.iterrows():
    t = row["LTT"]
    for sc, pc in [(prev_str, prev_p), (curr_str, curr_p), (next_str, next_p)]:
        if sc in row and pc in row and not pd.isna(row[sc]) and not pd.isna(row[pc]):
            try:
                s = int(row[sc])
                p = float(row[pc])
                strike_series[s].append((t, p))
            except:
                pass

# ============================================================
# SUMMARY + FORECAST FUNCTIONS
# ============================================================
def summarize(series):
    sr = sorted(series, key=lambda x: x[0])
    ps = [p for _, p in sr]

    if not ps:
        return None

    first, last = ps[0], ps[-1]
    peak, trough = max(ps), min(ps)
    pct = ((last-first)/first*100) if first != 0 else 0

    return {
        "first_premium": first,
        "last_premium": last,
        "peak_premium": peak,
        "trough_premium": trough,
        "abs_change": last - first,
        "pct_change": pct,
        "n_obs": len(ps)
    }

def forecast(stats):
    pct = stats["pct_change"]
    last = stats["last_premium"]

    if pct >= 25:
        return (last+15, last+35), (last+25, last+60)
    elif pct >= 8:
        return (last+6, last+18), (last+12, last+30)
    elif pct > 0:
        return (last+2, last+8), (last+5, last+15)
    else:
        return (last-5, last+2), (last-8, last+5)

# ============================================================
# BUILD SUMMARY FOR ALL STRIKES
# ============================================================
records = []

for s, ts in strike_series.items():
    if len(ts) < 3:
        continue

    st = summarize(ts)
    if st is None:
        continue

    f5, f10 = forecast(st)

    rec = {
        "strike": s,
        **st,
        "5min_low":  f5[0],
        "5min_high": f5[1],
        "10min_low": f10[0],
        "10min_high":f10[1],
        "p5_expected_lo": f5[0] - st["last_premium"],
        "p5_expected_hi": f5[1] - st["last_premium"],
        "p10_expected_lo": f10[0] - st["last_premium"],
        "p10_expected_hi": f10[1] - st["last_premium"]
    }

    records.append(rec)

summary = pd.DataFrame(records)

# ============================================================
# STRATEGY TAGS + MONEYFLOW EXTRACTION
# ============================================================
tag_keywords = [
    "RSI", "MACD", "VWAP", "RSI_MACD", "VWAP_Divergence", "OI_Support_Call",
    "Put Buying", "Call Writing", "PnL", "Momentum", "Breakout"
]

strike_info = {
    int(s): {
        "tags": Counter(),
        "call_moneyflow": 0.0,
        "put_moneyflow": 0.0
    }
    for s in summary["strike"]
}

for _, row in df.iterrows():
    for sc in [prev_str, curr_str, next_str]:
        if sc in row and not pd.isna(row[sc]):
            try:
                s = int(row[sc])
            except:
                continue

            if s not in strike_info:
                continue

            # Extract tags
            for col in ["Previous_StrategyTag", "Current_StrategyTag", "Next_StrategyTag"]:
                if col in df and isinstance(row.get(col), str):
                    for kw in tag_keywords:
                        if kw.lower() in row[col].lower():
                            strike_info[s]["tags"][kw] += 1

            # Moneyflow
            for col in [
                "Previous_CallMoneyFlow","Current_CallMoneyFlow","Next_CallMoneyFlow",
                "Previous_PutMoneyFlow","Current_PutMoneyFlow","Next_PutMoneyFlow"
            ]:
                if col in df and not pd.isna(row.get(col)):
                    val = float(row[col])
                    if "Call" in col:
                        strike_info[s]["call_moneyflow"] += val
                    else:
                        strike_info[s]["put_moneyflow"] += val

# ============================================================
# HIGH CONVICTION SIGNAL HIT RATE
# ============================================================
highconv_col = "Current_IsHighConvictionSignal"

# ============================================================
# BUILD FINAL OUTPUT
# ============================================================
final_rows = []

for _, r in summary.iterrows():
    s = int(r["strike"])
    info = strike_info[s]

    # Build reasoning text
    tags = info["tags"]
    reasons = []

    if tags.get("RSI",0) or tags.get("RSI_MACD",0):
        reasons.append("RSI/MACD bullish pattern")
    if tags.get("VWAP_Divergence",0):
        reasons.append("VWAP divergence support")
    if tags.get("OI_Support_Call",0):
        reasons.append("OI call support detected")
    if tags.get("Put Buying",0):
        reasons.append("Put side hedging activity")

    if not reasons:
        reasons.append("No strong signals")

    # Recommended Action
    if r["pct_change"] > 5:
        act = "BUY_CALL"
    elif r["pct_change"] < -5:
        act = "BUY_PUT"
    else:
        act = "HOLD"

    # High-conviction stats
    hc_rows = df[(df[curr_str]==s) & (df.get(highconv_col)==True)]
    hc_total = hc_rows.shape[0]
    hc_success = 0

    for _, row2 in hc_rows.iterrows():
        t0 = row2["LTT"]
        p0 = row2[curr_p]

        window = df[
            (df["LTT"] >= t0) &
            (df["LTT"] <= t0 + timedelta(minutes=3)) &
            (df[curr_str] == s)
        ]

        if not window.empty:
            if window[curr_p].max() > p0:
                hc_success += 1

    final_rows.append({
        **r.to_dict(),
        "call_moneyflow": info["call_moneyflow"],
        "put_moneyflow":  info["put_moneyflow"],
        "tags": ";".join([f"{k}:{v}" for k,v in tags.items()]),
        "reasons": "; ".join(reasons),
        "recommended_action": act,
        "highconv_total": hc_total,
        "highconv_success": hc_success,
        "highconv_hit_rate":
            (hc_success / hc_total) if hc_total > 0 else None,

        # REQUIRED BY YOU
        "Current_Strikeprice": s
    })

final_df = pd.DataFrame(final_rows)

# SAVE OUTPUT
final_df.to_csv("FULL_STRIKE_FORECAST_OUTPUT.csv", index=False)
print("Generated FULL_STRIKE_FORECAST_OUTPUT.csv successfully!")


Generated FULL_STRIKE_FORECAST_OUTPUT.csv successfully!


In [None]:
import pandas as pd
import numpy as np
from collections import defaultdict, Counter
from datetime import timedelta

# ============================================================
# LOAD CSV
# ============================================================
df = pd.read_csv("flattened_snapshots.csv", low_memory=False, parse_dates=['LTT'])

# Detect which premium column exists
prev_p = "Previous_Call_Premium" if "Previous_Call_Premium" in df else "Previous_Call_ltp"
curr_p = "Current_Call_Premium"  if "Current_Call_Premium"  in df else "Current_Call_ltp"
next_p = "Next_Call_Premium"     if "Next_Call_Premium"     in df else "Next_Call_ltp"

prev_str, curr_str, next_str = "Previous_Strikeprice","Current_Strikeprice","Next_Strikeprice"

# ============================================================
# BUILD STRIKE → (timestamp, premium) TIMESERIES
# ============================================================
strike_series = defaultdict(list)

for _, row in df.iterrows():
    t = row["LTT"]
    for sc, pc in [(prev_str, prev_p), (curr_str, curr_p), (next_str, next_p)]:
        if sc in row and pc in row and not pd.isna(row[sc]) and not pd.isna(row[pc]):
            try:
                s = int(row[sc])
                p = float(row[pc])
                strike_series[s].append((t, p))
            except:
                pass

# ============================================================
# SUMMARY + FORECAST FUNCTIONS
# ============================================================
def summarize(series):
    sr = sorted(series, key=lambda x: x[0])
    ps = [p for _, p in sr]

    if not ps:
        return None

    first, last = ps[0], ps[-1]
    peak, trough = max(ps), min(ps)
    pct = ((last-first)/first*100) if first != 0 else 0

    return {
        "first_premium": first,
        "last_premium": last,
        "peak_premium": peak,
        "trough_premium": trough,
        "abs_change": last - first,
        "pct_change": pct,
        "n_obs": len(ps)
    }

def forecast(stats):
    pct = stats["pct_change"]
    last = stats["last_premium"]

    if pct >= 25:
        return (last+15, last+35), (last+25, last+60)
    elif pct >= 8:
        return (last+6, last+18), (last+12, last+30)
    elif pct > 0:
        return (last+2, last+8), (last+5, last+15)
    else:
        return (last-5, last+2), (last-8, last+5)

# ============================================================
# BUILD SUMMARY FOR ALL STRIKES
# ============================================================
records = []

for s, ts in strike_series.items():
    if len(ts) < 3:
        continue

    st = summarize(ts)
    if st is None:
        continue

    f5, f10 = forecast(st)

    rec = {
        "strike": s,
        **st,
        "5min_low":  f5[0],
        "5min_high": f5[1],
        "10min_low": f10[0],
        "10min_high": f10[1],
        "p5_expected_lo": f5[0] - st["last_premium"],
        "p5_expected_hi": f5[1] - st["last_premium"],
        "p10_expected_lo": f10[0] - st["last_premium"],
        "p10_expected_hi": f10[1] - st["last_premium"]
    }

    records.append(rec)

summary = pd.DataFrame(records)

# ============================================================
# STRATEGY TAGS + MONEYFLOW EXTRACTION
# ============================================================
tag_keywords = [
    "RSI", "MACD", "VWAP", "RSI_MACD", "VWAP_Divergence", "OI_Support_Call",
    "Put Buying", "Call Writing", "PnL", "Momentum", "Breakout"
]

strike_info = {
    int(s): {
        "tags": Counter(),
        "call_moneyflow": 0.0,
        "put_moneyflow": 0.0
    }
    for s in summary["strike"]
}

for _, row in df.iterrows():
    for sc in [prev_str, curr_str, next_str]:
        if sc in row and not pd.isna(row[sc]):
            try:
                s = int(row[sc])
            except:
                continue

            if s not in strike_info:
                continue

            # Extract tags
            for col in ["Previous_StrategyTag", "Current_StrategyTag", "Next_StrategyTag"]:
                if col in df and isinstance(row.get(col), str):
                    for kw in tag_keywords:
                        if kw.lower() in row[col].lower():
                            strike_info[s]["tags"][kw] += 1

            # Moneyflow
            for col in [
                "Previous_CallMoneyFlow","Current_CallMoneyFlow","Next_CallMoneyFlow",
                "Previous_PutMoneyFlow","Current_PutMoneyFlow","Next_PutMoneyFlow"
            ]:
                if col in df and not pd.isna(row.get(col)):
                    val = float(row[col])
                    if "Call" in col:
                        strike_info[s]["call_moneyflow"] += val
                    else:
                        strike_info[s]["put_moneyflow"] += val

# ============================================================
# HIGH CONVICTION SIGNAL HIT RATE
# ============================================================
highconv_col = "Current_IsHighConvictionSignal"

# ============================================================
# BUILD FINAL OUTPUT
# ============================================================
final_rows = []

for _, r in summary.iterrows():
    s = int(r["strike"])
    info = strike_info[s]

    # Build reasoning text
    tags = info["tags"]
    reasons = []

    if tags.get("RSI",0) or tags.get("RSI_MACD",0):
        reasons.append("RSI/MACD bullish pattern")
    if tags.get("VWAP_Divergence",0):
        reasons.append("VWAP divergence support")
    if tags.get("OI_Support_Call",0):
        reasons.append("OI call support detected")
    if tags.get("Put Buying",0):
        reasons.append("Put side hedging activity")

    if not reasons:
        reasons.append("No strong signals")

    # Recommended Action
    if r["pct_change"] > 5:
        act = "BUY_CALL"
    elif r["pct_change"] < -5:
        act = "BUY_PUT"
    else:
        act = "HOLD"

    # High-conviction stats
    hc_rows = df[(df[curr_str]==s) & (df.get(highconv_col)==True)]
    hc_total = hc_rows.shape[0]
    hc_success = 0

    for _, row2 in hc_rows.iterrows():
        t0 = row2["LTT"]
        p0 = row2[curr_p]

        window = df[
            (df["LTT"] >= t0) &
            (df["LTT"] <= t0 + timedelta(minutes=3)) &
            (df[curr_str] == s)
        ]

        if not window.empty:
            if window[curr_p].max() > p0:
                hc_success += 1

    final_rows.append({
        **r.to_dict(),
        "call_moneyflow": info["call_moneyflow"],
        "put_moneyflow": info["put_moneyflow"],
        "tags": ";".join([f"{k}:{v}" for k,v in tags.items()]),
        "reasons": "; ".join(reasons),
        "recommended_action": act,
        "highconv_total": hc_total,
        "highconv_success": hc_success,
        "highconv_hit_rate": (hc_success / hc_total) if hc_total > 0 else None,
        "Current_Strikeprice": s
    })

final_df = pd.DataFrame(final_rows)

# SAVE OUTPUT
final_df.to_csv("FULL_STRIKE_FORECAST_OUTPUT.csv", index=False)
print("Generated FULL_STRIKE_FORECAST_OUTPUT.csv successfully!")


Generated FULL_STRIKE_FORECAST_OUTPUT.csv successfully!


In [None]:
#!/usr/bin/env python3
# full_ce_pe_pipeline.py
"""
Full CE + PE forecast pipeline:
- Input: flattened_snapshots.csv (or path provided)
- Outputs:
    - MERGED_CE_PE_FORECAST.csv
    - TOP_BUY_PUTS.csv
    - SPOT_VS_MOMENTUM.png
    - Prints alerts and saves alerts.csv (if any)
"""

import os
import sys
import math
from collections import defaultdict, Counter
from datetime import timedelta
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# -------------------------
# Config / input file
# -------------------------
INPUT_CSV = "flattened_snapshots.csv"  # change if needed
OUT_MERGED = "MERGED_CE_PE_FORECAST.csv"
OUT_TOP_PUTS = "TOP_BUY_PUTS.csv"
OUT_PLOT = "SPOT_VS_MOMENTUM.png"
OUT_ALERTS = "PUT_REVERSAL_ALERTS.csv"

# -------------------------
# Safety check: file exists
# -------------------------
if not os.path.exists(INPUT_CSV):
    print(f"ERROR: input file not found: {INPUT_CSV}")
    sys.exit(1)

# -------------------------
# Load data
# -------------------------
df = pd.read_csv(INPUT_CSV, low_memory=False, parse_dates=['LTT'], infer_datetime_format=True)

# -------------------------
# Auto-detect premium column names (calls and puts)
# -------------------------
def pick_col(candidates):
    for c in candidates:
        if c in df.columns:
            return c
    return None

prev_call_p = pick_col(["Previous_Call_Premium","Previous_Call_ltp","Prev_Call_LTP","Prev_Call_Premium"])
curr_call_p = pick_col(["Current_Call_Premium","Current_Call_ltp","Current_Call_LTP","Curr_Call_Premium"])
next_call_p = pick_col(["Next_Call_Premium","Next_Call_ltp","Next_Call_LTP","Next_Call_Premium"])

prev_put_p = pick_col(["Previous_Put_Premium","Previous_Put_ltp","Prev_Put_LTP","Prev_Put_Premium"])
curr_put_p = pick_col(["Current_Put_Premium","Current_Put_ltp","Current_Put_LTP","Curr_Put_Premium"])
next_put_p = pick_col(["Next_Put_Premium","Next_Put_ltp","Next_Put_LTP","Next_Put_Premium"])

# Strike columns
prev_str = "Previous_Strikeprice" if "Previous_Strikeprice" in df.columns else "Previous_Strike"
curr_str = "Current_Strikeprice" if "Current_Strikeprice" in df.columns else "Current_Strike"
next_str = "Next_Strikeprice" if "Next_Strikeprice" in df.columns else "Next_Strike"

print("Detected columns:")
print(f" Call premiums: {prev_call_p}, {curr_call_p}, {next_call_p}")
print(f" Put premiums : {prev_put_p}, {curr_put_p}, {next_put_p}")
print(f" Strike cols  : {prev_str}, {curr_str}, {next_str}")

# -------------------------
# Helpers: build series, summarize, forecast
# -------------------------
def build_series(pcols):
    """Build strike -> list of (timestamp, premium) pairs using triple columns mapping."""
    series = defaultdict(list)
    for _, row in df.iterrows():
        t = row.get('LTT')
        if pd.isna(t):
            continue
        triples = [(prev_str, pcols[0]), (curr_str, pcols[1]), (next_str, pcols[2])]
        for sc, pc in triples:
            if sc in row and pc and pc in df.columns and not pd.isna(row[sc]) and not pd.isna(row[pc]):
                try:
                    s = int(row[sc])
                    p = float(row[pc])
                    series[s].append((t, p))
                except Exception:
                    # ignore parsing issues
                    continue
    return series

def summarize(series_list):
    """Given a list of (t,p), return first/last/peak/trough/abs/pct/n."""
    sr = sorted(series_list, key=lambda x: x[0])
    ps = [p for _, p in sr]
    if len(ps) == 0:
        return None
    first, last = ps[0], ps[-1]
    peak, trough = max(ps), min(ps)
    abs_change = last - first
    pct_change = (abs_change / first * 100) if first != 0 else float('nan')
    return {
        "first": first,
        "last": last,
        "peak": peak,
        "trough": trough,
        "abs_change": abs_change,
        "pct_change": pct_change,
        "n": len(ps)
    }

def forecast_from_pct(last, pct):
    """Return (5min_low,5min_high), (10min_low,10min_high) using your rules."""
    if pct >= 25:
        return (last+15, last+35), (last+25, last+60)
    elif pct >= 8:
        return (last+6, last+18), (last+12, last+30)
    elif pct > 0:
        return (last+2, last+8), (last+5, last+15)
    else:
        return (last-5, last+2), (last-8, last+5)

# -------------------------
# Build call & put series
# -------------------------
call_series = build_series((prev_call_p, curr_call_p, next_call_p))
put_series  = build_series((prev_put_p, curr_put_p, next_put_p))

# -------------------------
# Summarize all strikes for calls
# -------------------------
call_records = []
for strike, ts in call_series.items():
    if len(ts) < 3:
        continue
    st = summarize(ts)
    if st is None:
        continue
    f5, f10 = forecast_from_pct(st['last'], st['pct_change'])
    call_records.append({
        "strike": strike,
        "n_obs_call": st['n'],
        "first_call_premium": st['first'],
        "last_call_premium": st['last'],
        "peak_call_premium": st['peak'],
        "trough_call_premium": st['trough'],
        "abs_change_call": st['abs_change'],
        "pct_change_call": st['pct_change'],
        "call_5min_low": f5[0], "call_5min_high": f5[1],
        "call_10min_low": f10[0], "call_10min_high": f10[1],
        "call_p5_lo": f5[0] - st['last'], "call_p5_hi": f5[1] - st['last'],
        "call_p10_lo": f10[0] - st['last'], "call_p10_hi": f10[1] - st['last'],
    })

call_df = pd.DataFrame(call_records).set_index('strike') if call_records else pd.DataFrame()

# -------------------------
# Summarize all strikes for puts
# -------------------------
put_records = []
for strike, ts in put_series.items():
    if len(ts) < 3:
        continue
    st = summarize(ts)
    if st is None:
        continue
    f5, f10 = forecast_from_pct(st['last'], st['pct_change'])
    put_records.append({
        "strike": strike,
        "n_obs_put": st['n'],
        "first_put_premium": st['first'],
        "last_put_premium": st['last'],
        "peak_put_premium": st['peak'],
        "trough_put_premium": st['trough'],
        "abs_change_put": st['abs_change'],
        "pct_change_put": st['pct_change'],
        "put_5min_low": f5[0], "put_5min_high": f5[1],
        "put_10min_low": f10[0], "put_10min_high": f10[1],
        "put_p5_lo": f5[0] - st['last'], "put_p5_hi": f5[1] - st['last'],
        "put_p10_lo": f10[0] - st['last'], "put_p10_hi": f10[1] - st['last'],
    })

put_df = pd.DataFrame(put_records).set_index('strike') if put_records else pd.DataFrame()

# -------------------------
# Merge CE + PE summary frames
# -------------------------
if not call_df.empty and not put_df.empty:
    merged = pd.concat([call_df, put_df], axis=1, sort=True).reset_index().rename(columns={'index':'strike'})
elif not call_df.empty:
    merged = call_df.reset_index().rename(columns={'index':'strike'})
    merged['strike'] = merged['strike'].astype(int)
    # ensure put columns exist
    for col in ['pct_change_put','last_put_premium','put_moneyflow']:
        if col not in merged.columns:
            merged[col] = np.nan
elif not put_df.empty:
    merged = put_df.reset_index().rename(columns={'index':'strike'})
    merged['strike'] = merged['strike'].astype(int)
    for col in ['pct_change_call','last_call_premium','call_moneyflow']:
        if col not in merged.columns:
            merged[col] = np.nan
else:
    merged = pd.DataFrame(columns=['strike'])

# -------------------------
# Extract tags and moneyflow from the raw df
# Generic: any column that contains "Tag" or "MoneyFlow" or "CallMoneyFlow"/"PutMoneyFlow"
# -------------------------
tag_keywords = ["RSI","MACD","VWAP","RSI_MACD","VWAP_Divergence","OI_Support_Call","Put Buying","Call Writing","Bearish","Bullish"]

# initialize strike_info
strike_info = {}
for s in merged['strike'].dropna().astype(int).unique():
    strike_info[int(s)] = {"tags": Counter(), "call_moneyflow": 0.0, "put_moneyflow": 0.0}

# scan raw dataframe for tag/moneyflow columns
for _, row in df.iterrows():
    for sc in (prev_str, curr_str, next_str):
        if sc not in df.columns:
            continue
        if pd.isna(row.get(sc)):
            continue
        try:
            s = int(row.get(sc))
        except Exception:
            continue
        if s not in strike_info:
            continue

        # tags
        for col in df.columns:
            if "Tag" in col or "tag" in col:
                v = row.get(col)
                if isinstance(v, str):
                    for kw in tag_keywords:
                        if kw.lower() in v.lower():
                            strike_info[s]['tags'][kw] += 1

        # moneyflow columns (broad match)
        for col in df.columns:
            if "CallMoneyFlow" in col or ("Call" in col and "MoneyFlow" in col):
                try:
                    val = row.get(col)
                    if not pd.isna(val):
                        strike_info[s]['call_moneyflow'] += float(val)
                except Exception:
                    pass
            if "PutMoneyFlow" in col or ("Put" in col and "MoneyFlow" in col):
                try:
                    val = row.get(col)
                    if not pd.isna(val):
                        strike_info[s]['put_moneyflow'] += float(val)
                except Exception:
                    pass

# attach tags and moneyflow to merged
def format_tags(s):
    t = strike_info.get(int(s), {}).get('tags', {})
    return ";".join([f"{k}:{v}" for k,v in t.items()]) if t else ""

merged['tags'] = merged['strike'].apply(lambda s: format_tags(s) if not pd.isna(s) else "")
merged['call_moneyflow'] = merged['strike'].apply(lambda s: strike_info.get(int(s), {}).get('call_moneyflow', 0.0))
merged['put_moneyflow']  = merged['strike'].apply(lambda s: strike_info.get(int(s), {}).get('put_moneyflow', 0.0))

# -------------------------
# Decide recommended_action using both call & put momentum + tag boosts
# -------------------------
def decide_action(row):
    call_pct = row.get('pct_change_call', 0) if not pd.isna(row.get('pct_change_call')) else 0
    put_pct  = row.get('pct_change_put', 0)  if not pd.isna(row.get('pct_change_put')) else 0
    tags = (row.get('tags') or "").lower()
    bear_boost = ('put buying' in tags) or ('call writing' in tags) or ('bearish' in tags)
    bull_boost = ('call buying' in tags) or ('bullish' in tags) or ('oi_support_call' in tags)

    # Prioritize stronger momentum, but allow tag boosts
    if (put_pct > 8) or (put_pct > 5 and bear_boost):
        return "BUY_PUT"
    if (call_pct > 8) or (call_pct > 5 and bull_boost):
        return "BUY_CALL"
    return "HOLD"

merged['recommended_action'] = merged.apply(decide_action, axis=1)

# -------------------------
# Top BUY_PUTs
# -------------------------
top_buy_puts = merged[merged['recommended_action'] == 'BUY_PUT'].copy()
if 'pct_change_put' in top_buy_puts.columns:
    top_buy_puts = top_buy_puts.sort_values('pct_change_put', ascending=False).head(100)
else:
    top_buy_puts = top_buy_puts.head(100)

# -------------------------
# Save merged CSVs
# -------------------------
merged.to_csv(OUT_MERGED, index=False)
top_buy_puts.to_csv(OUT_TOP_PUTS, index=False)
print(f"Saved merged output -> {OUT_MERGED}")
print(f"Saved top buy-put output -> {OUT_TOP_PUTS}")

# -------------------------
# Spot vs momentum chart (summary)
# -------------------------
# Get last spot if exists
if 'SpotPrice' in df.columns:
    df2 = df.sort_values('LTT')
    last_spot = df2['SpotPrice'].dropna().iloc[-1] if len(df2['SpotPrice'].dropna())>0 else np.nan
else:
    last_spot = np.nan

avg_call_mom = merged['pct_change_call'].replace([np.inf, -np.inf], np.nan).dropna().mean() if 'pct_change_call' in merged.columns else np.nan
avg_put_mom  = merged['pct_change_put'].replace([np.inf, -np.inf], np.nan).dropna().mean() if 'pct_change_put' in merged.columns else np.nan

plt.figure(figsize=(8,4))
plt.title("Spot (last) vs Avg Call/Put %change (momentum summary)")
plt.bar([0,1,2], [last_spot if not math.isnan(last_spot) else 0,
                  avg_call_mom if not pd.isna(avg_call_mom) else 0,
                  avg_put_mom if not pd.isna(avg_put_mom) else 0],
        tick_label=['Last Spot','Avg Call %','Avg Put %'])
plt.tight_layout()
plt.savefig(OUT_PLOT, dpi=150)
plt.close()
print(f"Saved momentum plot -> {OUT_PLOT}")

# -------------------------
# Alerts: Put reversal candidates
# criteria: pct_change_put > 8% AND (put buying tag or put_moneyflow > 0 or 'bearish' tag)
# -------------------------
alerts = []
for _, row in merged.iterrows():
    pct_put = row.get('pct_change_put') if 'pct_change_put' in row else None
    if pct_put is None or (isinstance(pct_put, float) and np.isnan(pct_put)):
        continue
    tags = (row.get('tags') or "").lower()
    put_moneyflow = row.get('put_moneyflow', 0.0) if 'put_moneyflow' in row else 0.0
    if pct_put > 8 and (('put buying' in tags) or ('bearish' in tags) or (put_moneyflow and put_moneyflow > 0)):
        alerts.append({
            "strike": int(row['strike']),
            "pct_change_put": pct_put,
            "put_moneyflow": put_moneyflow,
            "tags": row.get('tags', ""),
            "recommended_action": row.get('recommended_action', ""),
            "reason": "Put momentum >8% and bearish tag/moneyflow"
        })

if alerts:
    alerts_df = pd.DataFrame(alerts)
    alerts_df.to_csv(OUT_ALERTS, index=False)
    print(f"Put reversal alerts saved -> {OUT_ALERTS}")
    print("Sample alerts:")
    print(alerts_df.head(20).to_string(index=False))
else:
    print("No put reversal alerts detected.")

# -------------------------
# Done
# -------------------------
print("Pipeline finished.")


  df = pd.read_csv(INPUT_CSV, low_memory=False, parse_dates=['LTT'], infer_datetime_format=True)


Detected columns:
 Call premiums: Previous_Call_Premium, Current_Call_Premium, Next_Call_Premium
 Put premiums : Previous_Put_Premium, Current_Put_Premium, Next_Put_Premium
 Strike cols  : Previous_Strikeprice, Current_Strikeprice, Next_Strikeprice
Saved merged output -> MERGED_CE_PE_FORECAST.csv
Saved top buy-put output -> TOP_BUY_PUTS.csv
Saved momentum plot -> SPOT_VS_MOMENTUM.png
Put reversal alerts saved -> PUT_REVERSAL_ALERTS.csv
Sample alerts:
 strike  pct_change_put  put_moneyflow                                                                                       tags recommended_action                                     reason
  58300      276.490439        15400.0  RSI:11407;MACD:11407;VWAP:24842;RSI_MACD:11407;VWAP_Divergence:24842;OI_Support_Call:8690            BUY_PUT Put momentum >8% and bearish tag/moneyflow
  58400       51.577564        59230.5 VWAP:50812;VWAP_Divergence:50812;OI_Support_Call:18047;RSI:20310;MACD:20310;RSI_MACD:20310            BUY_PUT Put momentum

In [None]:
#!/usr/bin/env python3
# full_ce_pe_pipeline_use_ltp.py
import os, sys, math
from collections import defaultdict, Counter
from datetime import timedelta
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# ---------- CONFIG ----------
INPUT_CSV = "flattened_snapshots.csv"   # your uploaded file
OUT_MERGED = "MERGED_CE_PE_FORECAST.csv"
OUT_TOP_PUTS = "TOP_BUY_PUTS.csv"
OUT_ALERTS = "PUT_REVERSAL_ALERTS.csv"
OUT_PLOT = "SPOT_VS_MOMENTUM.png"

if not os.path.exists(INPUT_CSV):
    print("ERROR: input file not found:", INPUT_CSV)
    sys.exit(1)

# ---------- LOAD ----------
df = pd.read_csv(INPUT_CSV, low_memory=False, parse_dates=['LTT'], infer_datetime_format=True)

# ---------- USE LTP COLUMNS (explicit) ----------
# Call LTP columns (preferred)
PREV_CALL_LTP = "Previous_Call_ltp"
CURR_CALL_LTP = "Current_Call_ltp"
NEXT_CALL_LTP = "Next_Call_ltp"

# Put LTP columns (preferred)
PREV_PUT_LTP  = "Previous_Put_ltp"
CURR_PUT_LTP  = "Current_Put_ltp"
NEXT_PUT_LTP  = "Next_Put_ltp"

# Strike columns (explicit)
PREV_STR = "Previous_Strikeprice"
CURR_STR  = "Current_Strikeprice"
NEXT_STR  = "Next_Strikeprice"

# Validate existence (fall back to *_Premium if LTP missing)
def ensure(col_ltp, col_premium):
    if col_ltp in df.columns:
        return col_ltp
    if col_premium in df.columns:
        return col_premium
    return None

# If your file also contains *_Premium columns, we ignore them because you asked "Use LTP".
prev_call_col = PREV_CALL_LTP if PREV_CALL_LTP in df.columns else ( "Previous_Call_Premium" if "Previous_Call_Premium" in df.columns else None )
curr_call_col = CURR_CALL_LTP if CURR_CALL_LTP in df.columns else ( "Current_Call_Premium"  if "Current_Call_Premium"  in df.columns else None )
next_call_col = NEXT_CALL_LTP if NEXT_CALL_LTP in df.columns else ( "Next_Call_Premium"     if "Next_Call_Premium"     in df.columns else None )

prev_put_col  = PREV_PUT_LTP  if PREV_PUT_LTP  in df.columns else ( "Previous_Put_Premium" if "Previous_Put_Premium" in df.columns else None )
curr_put_col  = CURR_PUT_LTP  if CURR_PUT_LTP  in df.columns else ( "Current_Put_Premium"  if "Current_Put_Premium"  in df.columns else None )
next_put_col  = NEXT_PUT_LTP  if NEXT_PUT_LTP  in df.columns else ( "Next_Put_Premium"     if "Next_Put_Premium"     in df.columns else None )

# Show what we're using
print("Using columns (LTP prioritized):")
print("Calls:", prev_call_col, curr_call_col, next_call_col)
print("Puts :", prev_put_col,  curr_put_col,  next_put_col)
print("Strikes:", PREV_STR, CURR_STR, NEXT_STR)

# ---------- BUILD SERIES ----------
def build_series(prev_col, curr_col, next_col, prev_str_col=PREV_STR, curr_str_col=CURR_STR, next_str_col=NEXT_STR):
    series = defaultdict(list)
    for _, row in df.iterrows():
        t = row.get("LTT")
        if pd.isna(t):
            continue
        triples = [
            (prev_str_col, prev_col),
            (curr_str_col, curr_col),
            (next_str_col, next_col)
        ]
        for sc, pc in triples:
            if sc is None or pc is None:
                continue
            if sc not in df.columns or pc not in df.columns:
                continue
            if pd.isna(row.get(sc)) or pd.isna(row.get(pc)):
                continue
            try:
                s = int(row.get(sc))
                p = float(row.get(pc))
            except Exception:
                continue
            series[s].append((t, p))
    return series

call_series = build_series(prev_call_col, curr_call_col, next_call_col)
put_series  = build_series(prev_put_col, curr_put_col, next_put_col)

print("Detected call strikes:", len(call_series))
print("Detected put strikes :", len(put_series))

# ---------- SUMMARIZE & FORECAST HELPERS ----------
def summarize(pairs):
    sr = sorted(pairs, key=lambda x: x[0])
    ps = [p for _, p in sr]
    if not ps:
        return None
    first, last = ps[0], ps[-1]
    peak, trough = max(ps), min(ps)
    abs_chg = last - first
    pct_chg = (abs_chg / first * 100) if first != 0 else np.nan
    return {"first": first, "last": last, "peak": peak, "trough": trough,
            "abs_change": abs_chg, "pct_change": pct_chg, "n_obs": len(ps)}

def forecast_from_pct(last, pct):
    if pct >= 25:
        return (last+15, last+35), (last+25, last+60)
    elif pct >= 8:
        return (last+6, last+18), (last+12, last+30)
    elif pct > 0:
        return (last+2, last+8), (last+5, last+15)
    else:
        return (last-5, last+2), (last-8, last+5)

# ---------- BUILD CALL & PUT SUMMARIES ----------
call_records = []
for s, ts in call_series.items():
    if len(ts) < 1:
        continue
    st = summarize(ts)
    if st is None:
        continue
    f5, f10 = forecast_from_pct(st["last"], st["pct_change"])
    call_records.append({
        "strike": s,
        "n_obs_call": st["n_obs"],
        "first_call_ltp": st["first"],
        "last_call_ltp": st["last"],
        "peak_call_ltp": st["peak"],
        "trough_call_ltp": st["trough"],
        "abs_change_call": st["abs_change"],
        "pct_change_call": st["pct_change"],
        "call_5min_low": f5[0], "call_5min_high": f5[1],
        "call_10min_low": f10[0], "call_10min_high": f10[1],
        "call_p5_lo": f5[0]-st["last"], "call_p5_hi": f5[1]-st["last"],
        "call_p10_lo": f10[0]-st["last"], "call_p10_hi": f10[1]-st["last"]
    })

put_records = []
for s, ts in put_series.items():
    if len(ts) < 1:
        continue
    st = summarize(ts)
    if st is None:
        continue
    f5, f10 = forecast_from_pct(st["last"], st["pct_change"])
    put_records.append({
        "strike": s,
        "n_obs_put": st["n_obs"],
        "first_put_ltp": st["first"],
        "last_put_ltp": st["last"],
        "peak_put_ltp": st["peak"],
        "trough_put_ltp": st["trough"],
        "abs_change_put": st["abs_change"],
        "pct_change_put": st["pct_change"],
        "put_5min_low": f5[0], "put_5min_high": f5[1],
        "put_10min_low": f10[0], "put_10min_high": f10[1],
        "put_p5_lo": f5[0]-st["last"], "put_p5_hi": f5[1]-st["last"],
        "put_p10_lo": f10[0]-st["last"], "put_p10_hi": f10[1]-st["last"]
    })

call_df = pd.DataFrame(call_records)
put_df  = pd.DataFrame(put_records)

# ---------- OUTER MERGE (call + put) ----------
if not call_df.empty and not put_df.empty:
    merged = pd.merge(call_df, put_df, on="strike", how="outer", sort=True)
elif not call_df.empty:
    merged = call_df.copy()
    # ensure put columns exist
    for c in ["pct_change_put","last_put_ltp","put_moneyflow"]:
        if c not in merged.columns:
            merged[c] = np.nan
elif not put_df.empty:
    merged = put_df.copy()
    for c in ["pct_change_call","last_call_ltp","call_moneyflow"]:
        if c not in merged.columns:
            merged[c] = np.nan
else:
    merged = pd.DataFrame(columns=["strike"])

# ---------- EXTRACT TAGS & MONEYFLOW (use explicit columns you provided) ----------
tag_cols = ["Previous_StrategyTag","Current_StrategyTag","Next_StrategyTag"]
call_money_cols = ["Previous_CallMoneyFlow","Current_CallMoneyFlow","Next_CallMoneyFlow",
                   "Previous_TotalcallMoneyFlow","Current_TotalcallMoneyFlow","Next_TotalcallMoneyFlow"]
put_money_cols  = ["Previous_PutMoneyFlow","Current_PutMoneyFlow","Next_PutMoneyFlow",
                   "Previous_TotalputMoneyFlow","Current_TotalputMoneyFlow","Next_TotalputMoneyFlow"]

# initialize strike_info
strike_info = {}
for s in merged["strike"].dropna().astype(int).unique():
    strike_info[int(s)] = {"tags":Counter(), "call_moneyflow":0.0, "put_moneyflow":0.0}

# scan
for _, row in df.iterrows():
    for sc in (PREV_STR, CURR_STR, NEXT_STR):
        if sc not in df.columns:
            continue
        if pd.isna(row.get(sc)):
            continue
        try:
            s = int(row.get(sc))
        except:
            continue
        if s not in strike_info:
            continue
        # tags
        for col in tag_cols:
            if col in df.columns:
                v = row.get(col)
                if isinstance(v, str) and v.strip():
                    # split common separators
                    for token in [t.strip() for t in v.replace("|",";").split(";") if t.strip()]:
                        # increment token counter
                        strike_info[s]["tags"][token] += 1
        # moneyflows
        for col in call_money_cols:
            if col in df.columns:
                try:
                    val = row.get(col)
                    if not pd.isna(val):
                        strike_info[s]["call_moneyflow"] += float(val)
                except:
                    pass
        for col in put_money_cols:
            if col in df.columns:
                try:
                    val = row.get(col)
                    if not pd.isna(val):
                        strike_info[s]["put_moneyflow"] += float(val)
                except:
                    pass

# attach tags/moneyflow to merged
def fmt_tags(s):
    t = strike_info.get(int(s), {}).get("tags", {})
    return ";".join([f"{k}:{v}" for k,v in t.items()]) if t else ""

merged["tags"] = merged["strike"].apply(lambda s: fmt_tags(s) if not pd.isna(s) else "")
merged["call_moneyflow"] = merged["strike"].apply(lambda s: strike_info.get(int(s), {}).get("call_moneyflow", 0.0))
merged["put_moneyflow"]  = merged["strike"].apply(lambda s: strike_info.get(int(s), {}).get("put_moneyflow", 0.0))

# ---------- DECIDE RECOMMENDED ACTION (dual-side logic) ----------
def decide_action(row):
    call_pct = row.get("pct_change_call") if "pct_change_call" in row else (row.get("pct_change_call") if "pct_change_call" in row.index else 0)
    put_pct  = row.get("pct_change_put")  if "pct_change_put" in row else (row.get("pct_change_put") if "pct_change_put" in row.index else 0)
    call_pct = 0 if pd.isna(call_pct) else call_pct
    put_pct  = 0 if pd.isna(put_pct) else put_pct

    tags = (row.get("tags") or "").lower()
    bear_boost = ("put buying" in tags) or ("call writing" in tags) or ("bearish" in tags)
    bull_boost = ("call buying" in tags) or ("bullish" in tags) or ("oi_support_call" in tags)

    # priority: put momentum (bearish) or call momentum (bullish)
    if (put_pct > 8) or (put_pct > 5 and bear_boost):
        return "BUY_PUT"
    if (call_pct > 8) or (call_pct > 5 and bull_boost):
        return "BUY_CALL"
    return "HOLD"

merged["recommended_action"] = merged.apply(decide_action, axis=1)

# ---------- TOP BUY_PUTS ----------
top_buy_puts = merged[merged["recommended_action"]=="BUY_PUT"].copy()
if "pct_change_put" in top_buy_puts.columns:
    top_buy_puts = top_buy_puts.sort_values("pct_change_put", ascending=False).head(200)
else:
    top_buy_puts = top_buy_puts.head(200)

# ---------- SAVE CSVs ----------
merged.to_csv(OUT_MERGED, index=False)
top_buy_puts.to_csv(OUT_TOP_PUTS, index=False)
print("Saved:", OUT_MERGED, OUT_TOP_PUTS)

# ---------- SPOT vs momentum plot ----------
last_spot = df["SpotPrice"].dropna().iloc[-1] if "SpotPrice" in df.columns and not df["SpotPrice"].dropna().empty else np.nan
avg_call_pct = merged["pct_change_call"].replace([np.inf, -np.inf], np.nan).dropna().mean() if "pct_change_call" in merged.columns else np.nan
avg_put_pct  = merged["pct_change_put"].replace([np.inf, -np.inf], np.nan).dropna().mean()  if "pct_change_put" in merged.columns  else np.nan

plt.figure(figsize=(8,4))
plt.title("Spot (last) vs Avg CE/PE %change")
vals = [last_spot if not math.isnan(last_spot) else 0, avg_call_pct if not pd.isna(avg_call_pct) else 0, avg_put_pct if not pd.isna(avg_put_pct) else 0]
plt.bar(["Spot Last","Avg CE %","Avg PE %"], vals)
plt.tight_layout()
plt.savefig(OUT_PLOT, dpi=150)
plt.close()
print("Saved:", OUT_PLOT)

# ---------- ALERTS: PUT REVERSAL CANDIDATES ----------
alerts = []
for _, r in merged.iterrows():
    pct_put = r.get("pct_change_put")
    if pct_put is None or (isinstance(pct_put, float) and np.isnan(pct_put)):
        continue
    tags = (r.get("tags") or "").lower()
    put_money = r.get("put_moneyflow", 0.0) if "put_moneyflow" in r else 0.0
    if pct_put > 8 and (("put buying" in tags) or ("bearish" in tags) or (put_money and put_money > 0)):
        alerts.append({
            "strike": int(r["strike"]),
            "pct_change_put": pct_put,
            "put_moneyflow": put_money,
            "tags": r.get("tags",""),
            "recommended_action": r.get("recommended_action",""),
            "reason": "Put momentum >8% and bearish tag/moneyflow"
        })

alerts_df = pd.DataFrame(alerts)
if not alerts_df.empty:
    alerts_df.to_csv(OUT_ALERTS, index=False)
    print("Saved:", OUT_ALERTS)
    print("Sample alerts:\n", alerts_df.head(20).to_string(index=False))
else:
    print("No put reversal alerts found.")

print("Pipeline complete.")


  df = pd.read_csv(INPUT_CSV, low_memory=False, parse_dates=['LTT'], infer_datetime_format=True)


Using columns (LTP prioritized):
Calls: Previous_Call_ltp Current_Call_ltp Next_Call_ltp
Puts : Previous_Put_ltp Current_Put_ltp Next_Put_ltp
Strikes: Previous_Strikeprice Current_Strikeprice Next_Strikeprice
Detected call strikes: 8
Detected put strikes : 8
Saved: MERGED_CE_PE_FORECAST.csv TOP_BUY_PUTS.csv
Saved: SPOT_VS_MOMENTUM.png
Saved: PUT_REVERSAL_ALERTS.csv
Sample alerts:
  strike  pct_change_put  put_moneyflow                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  

In [None]:
#!/usr/bin/env python3
# full_ce_pe_pipeline_use_ltp_enriched.py
import os, sys, math
from collections import defaultdict, Counter
from datetime import timedelta
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# ---------- CONFIG ----------
INPUT_CSV = "flattened_snapshots.csv"   # your uploaded file
OUT_MERGED = "MERGED_CE_PE_FORECAST.csv"
OUT_TOP_PUTS = "TOP_BUY_PUTS.csv"
OUT_ALERTS = "PUT_REVERSAL_ALERTS.csv"
OUT_PLOT = "SPOT_VS_MOMENTUM.png"

if not os.path.exists(INPUT_CSV):
    print("ERROR: input file not found:", INPUT_CSV)
    sys.exit(1)

# ---------- LOAD ----------
df = pd.read_csv(INPUT_CSV, low_memory=False, parse_dates=['LTT'], infer_datetime_format=True)

# ---------- USE LTP COLUMNS (explicit) ----------
PREV_CALL_LTP = "Previous_Call_ltp"
CURR_CALL_LTP = "Current_Call_ltp"
NEXT_CALL_LTP = "Next_Call_ltp"

PREV_PUT_LTP  = "Previous_Put_ltp"
CURR_PUT_LTP  = "Current_Put_ltp"
NEXT_PUT_LTP  = "Next_Put_ltp"

PREV_STR = "Previous_Strikeprice"
CURR_STR  = "Current_Strikeprice"
NEXT_STR  = "Next_Strikeprice"

# choose LTP (fallback to _Premium only if ltp missing)
prev_call_col = PREV_CALL_LTP if PREV_CALL_LTP in df.columns else ("Previous_Call_Premium" if "Previous_Call_Premium" in df.columns else None)
curr_call_col = CURR_CALL_LTP if CURR_CALL_LTP in df.columns else ("Current_Call_Premium"  if "Current_Call_Premium"  in df.columns else None)
next_call_col = NEXT_CALL_LTP if NEXT_CALL_LTP in df.columns else ("Next_Call_Premium"     if "Next_Call_Premium"     in df.columns else None)

prev_put_col  = PREV_PUT_LTP  if PREV_PUT_LTP  in df.columns else ("Previous_Put_Premium" if "Previous_Put_Premium" in df.columns else None)
curr_put_col  = CURR_PUT_LTP  if CURR_PUT_LTP  in df.columns else ("Current_Put_Premium"  if "Current_Put_Premium"  in df.columns else None)
next_put_col  = NEXT_PUT_LTP  if NEXT_PUT_LTP  in df.columns else ("Next_Put_Premium"     if "Next_Put_Premium"     in df.columns else None)

print("Using columns (LTP prioritized):")
print("Calls:", prev_call_col, curr_call_col, next_call_col)
print("Puts :", prev_put_col,  curr_put_col,  next_put_col)
print("Strikes:", PREV_STR, CURR_STR, NEXT_STR)

# ---------- BUILD SERIES ----------
def build_series(prev_col, curr_col, next_col, prev_str_col=PREV_STR, curr_str_col=CURR_STR, next_str_col=NEXT_STR):
    series = defaultdict(list)
    for _, row in df.iterrows():
        t = row.get("LTT")
        if pd.isna(t):
            continue
        triples = [
            (prev_str_col, prev_col),
            (curr_str_col, curr_col),
            (next_str_col, next_col)
        ]
        for sc, pc in triples:
            if sc is None or pc is None:
                continue
            if sc not in df.columns or pc not in df.columns:
                continue
            if pd.isna(row.get(sc)) or pd.isna(row.get(pc)):
                continue
            try:
                s = int(row.get(sc))
                p = float(row.get(pc))
            except Exception:
                continue
            series[s].append((t, p))
    return series

call_series = build_series(prev_call_col, curr_call_col, next_call_col)
put_series  = build_series(prev_put_col, curr_put_col, next_put_col)

print("Detected call strikes:", len(call_series))
print("Detected put strikes :", len(put_series))

# ---------- SUMMARIZE & FORECAST HELPERS ----------
def summarize(pairs):
    sr = sorted(pairs, key=lambda x: x[0])
    ps = [p for _, p in sr]
    if not ps:
        return None
    first, last = ps[0], ps[-1]
    peak, trough = max(ps), min(ps)
    abs_chg = last - first
    pct_chg = (abs_chg / first * 100) if first != 0 else np.nan
    return {"first": first, "last": last, "peak": peak, "trough": trough,
            "abs_change": abs_chg, "pct_change": pct_chg, "n_obs": len(ps)}

def forecast_from_pct(last, pct):
    if pct >= 25:
        return (last+15, last+35), (last+25, last+60)
    elif pct >= 8:
        return (last+6, last+18), (last+12, last+30)
    elif pct > 0:
        return (last+2, last+8), (last+5, last+15)
    else:
        return (last-5, last+2), (last-8, last+5)

# ---------- BUILD CALL & PUT SUMMARIES ----------
call_records = []
for s, ts in call_series.items():
    if len(ts) < 1:
        continue
    st = summarize(ts)
    if st is None:
        continue
    f5, f10 = forecast_from_pct(st["last"], st["pct_change"])
    call_records.append({
        "strike": s,
        "n_obs_call": st["n_obs"],
        "first_call_ltp": st["first"],
        "last_call_ltp": st["last"],
        "peak_call_ltp": st["peak"],
        "trough_call_ltp": st["trough"],
        "abs_change_call": st["abs_change"],
        "pct_change_call": st["pct_change"],
        "call_5min_low": f5[0], "call_5min_high": f5[1],
        "call_10min_low": f10[0], "call_10min_high": f10[1],
        "call_p5_lo": f5[0]-st["last"], "call_p5_hi": f5[1]-st["last"],
        "call_p10_lo": f10[0]-st["last"], "call_p10_hi": f10[1]-st["last"]
    })

put_records = []
for s, ts in put_series.items():
    if len(ts) < 1:
        continue
    st = summarize(ts)
    if st is None:
        continue
    f5, f10 = forecast_from_pct(st["last"], st["pct_change"])
    put_records.append({
        "strike": s,
        "n_obs_put": st["n_obs"],
        "first_put_ltp": st["first"],
        "last_put_ltp": st["last"],
        "peak_put_ltp": st["peak"],
        "trough_put_ltp": st["trough"],
        "abs_change_put": st["abs_change"],
        "pct_change_put": st["pct_change"],
        "put_5min_low": f5[0], "put_5min_high": f5[1],
        "put_10min_low": f10[0], "put_10min_high": f10[1],
        "put_p5_lo": f5[0]-st["last"], "put_p5_hi": f5[1]-st["last"],
        "put_p10_lo": f10[0]-st["last"], "put_p10_hi": f10[1]-st["last"]
    })

call_df = pd.DataFrame(call_records)
put_df  = pd.DataFrame(put_records)

# ---------- OUTER MERGE (call + put) ----------
if not call_df.empty and not put_df.empty:
    merged = pd.merge(call_df, put_df, on="strike", how="outer", sort=True)
elif not call_df.empty:
    merged = call_df.copy()
    for c in ["pct_change_put","last_put_ltp","put_moneyflow","n_obs_put","first_put_ltp"]:
        if c not in merged.columns:
            merged[c] = np.nan
elif not put_df.empty:
    merged = put_df.copy()
    for c in ["pct_change_call","last_call_ltp","call_moneyflow","n_obs_call","first_call_ltp"]:
        if c not in merged.columns:
            merged[c] = np.nan
else:
    merged = pd.DataFrame(columns=["strike"])

# ---------- EXTRACT TAGS & MONEYFLOW ----------
tag_cols = ["Previous_StrategyTag","Current_StrategyTag","Next_StrategyTag"]
call_money_cols = ["Previous_CallMoneyFlow","Current_CallMoneyFlow","Next_CallMoneyFlow",
                   "Previous_TotalcallMoneyFlow","Current_TotalcallMoneyFlow","Next_TotalcallMoneyFlow",
                   "Previous_CallMoneyFlow","Current_CallMoneyFlow","Next_CallMoneyFlow"]
put_money_cols  = ["Previous_PutMoneyFlow","Current_PutMoneyFlow","Next_PutMoneyFlow",
                   "Previous_TotalputMoneyFlow","Current_TotalputMoneyFlow","Next_TotalputMoneyFlow",
                   "Previous_PutMoneyFlow","Current_PutMoneyFlow","Next_PutMoneyFlow"]

# initialize strike_info
strike_info = {}
for s in merged["strike"].dropna().astype(int).unique():
    strike_info[int(s)] = {"tags":Counter(), "call_moneyflow":0.0, "put_moneyflow":0.0}

for idx, row in df.iterrows():
    for sc in (PREV_STR, CURR_STR, NEXT_STR):
        if sc not in df.columns:
            continue
        if pd.isna(row.get(sc)):
            continue
        try:
            s = int(row.get(sc))
        except:
            continue
        if s not in strike_info:
            continue
        # tags
        for col in tag_cols:
            if col in df.columns:
                v = row.get(col)
                if isinstance(v, str) and v.strip():
                    # split common separators
                    for token in [t.strip() for t in v.replace("|",";").split(";") if t.strip()]:
                        strike_info[s]["tags"][token] += 1
        # moneyflows
        for col in call_money_cols:
            if col in df.columns:
                try:
                    val = row.get(col)
                    if not pd.isna(val):
                        strike_info[s]["call_moneyflow"] += float(val)
                except:
                    pass
        for col in put_money_cols:
            if col in df.columns:
                try:
                    val = row.get(col)
                    if not pd.isna(val):
                        strike_info[s]["put_moneyflow"] += float(val)
                except:
                    pass

# attach tags/moneyflow to merged
def fmt_tags(s):
    t = strike_info.get(int(s), {}).get("tags", {})
    return ";".join([f"{k}:{v}" for k,v in t.items()]) if t else ""

merged["tags"] = merged["strike"].apply(lambda s: fmt_tags(s) if not pd.isna(s) else "")
merged["call_moneyflow"] = merged["strike"].apply(lambda s: strike_info.get(int(s), {}).get("call_moneyflow", 0.0))
merged["put_moneyflow"]  = merged["strike"].apply(lambda s: strike_info.get(int(s), {}).get("put_moneyflow", 0.0))

# ---------- DECIDE RECOMMENDED ACTION (dual-side logic) ----------
def decide_action(row):
    call_pct = row.get("pct_change_call") if "pct_change_call" in row else (row.get("pct_change_call") if "pct_change_call" in row.index else 0)
    put_pct  = row.get("pct_change_put")  if "pct_change_put" in row else (row.get("pct_change_put") if "pct_change_put" in row.index else 0)
    call_pct = 0 if pd.isna(call_pct) else call_pct
    put_pct  = 0 if pd.isna(put_pct) else put_pct

    tags = (row.get("tags") or "").lower()
    bear_boost = ("put buying" in tags) or ("call writing" in tags) or ("bearish" in tags)
    bull_boost = ("call buying" in tags) or ("bullish" in tags) or ("oi_support_call" in tags)

    if (put_pct > 8) or (put_pct > 5 and bear_boost):
        return "BUY_PUT"
    if (call_pct > 8) or (call_pct > 5 and bull_boost):
        return "BUY_CALL"
    return "HOLD"

merged["recommended_action"] = merged.apply(decide_action, axis=1)

# ---------- ENRICH: unified summary columns + reasons + high-conviction stats ----------
final_rows = []
for _, r in merged.iterrows():
    s = int(r["strike"])
    # Unnamed: 0 (if present) - pick first matching row sample
    sample = df[
        ((df.get("Current_Strikeprice") == s) if "Current_Strikeprice" in df.columns else False) |
        ((df.get("Previous_Strikeprice") == s) if "Previous_Strikeprice" in df.columns else False) |
        ((df.get("Next_Strikeprice") == s) if "Next_Strikeprice" in df.columns else False)
    ]
    unnamed_val = sample["Unnamed: 0"].iloc[0] if ("Unnamed: 0" in df.columns and not sample.empty) else np.nan

    # n_obs (prefer aggregated call+put counts)
    n_obs_call = int(r.get("n_obs_call")) if pd.notna(r.get("n_obs_call")) else 0
    n_obs_put  = int(r.get("n_obs_put"))  if pd.notna(r.get("n_obs_put"))  else 0
    n_obs = n_obs_call + n_obs_put

    # Choose preferred premium fields: prefer CALL summary, else PUT
    def choose(field_call, field_put):
        if field_call in r and pd.notna(r.get(field_call)):
            return r.get(field_call)
        if field_put in r and pd.notna(r.get(field_put)):
            return r.get(field_put)
        return np.nan

    first_premium = choose("first_call_ltp", "first_put_ltp")
    last_premium  = choose("last_call_ltp", "last_put_ltp")
    peak_premium  = choose("peak_call_ltp", "peak_put_ltp")
    trough_premium= choose("trough_call_ltp", "trough_put_ltp")
    abs_change    = choose("abs_change_call", "abs_change_put")
    pct_change    = choose("pct_change_call", "pct_change_put")

    # 5/10min forecast unified
    five_low  = choose("call_5min_low", "put_5min_low")
    five_high = choose("call_5min_high", "put_5min_high")
    ten_low   = choose("call_10min_low", "put_10min_low")
    ten_high  = choose("call_10min_high", "put_10min_high")

    # expected deltas
    p5_expected_lo = (five_low - last_premium) if (pd.notna(five_low) and pd.notna(last_premium)) else np.nan
    p5_expected_hi = (five_high - last_premium) if (pd.notna(five_high) and pd.notna(last_premium)) else np.nan
    p10_expected_lo = (ten_low - last_premium) if (pd.notna(ten_low) and pd.notna(last_premium)) else np.nan
    p10_expected_hi = (ten_high - last_premium) if (pd.notna(ten_high) and pd.notna(last_premium)) else np.nan

    # reasons: build from tags + pct change + moneyflow
    tags_text = r.get("tags","") or ""
    reasons = []
    if "RSI" in tags_text or "Rsi" in tags_text or "rsi" in tags_text:
        reasons.append("RSI momentum")
    if "MACD" in tags_text or "macd" in tags_text:
        reasons.append("MACD confirmation")
    if "VWAP" in tags_text or "vwap" in tags_text:
        reasons.append("VWAP divergence")
    if "OI" in tags_text or "oi" in tags_text:
        reasons.append("OI support/resistance")
    # moneyflow hints
    call_mf = r.get("call_moneyflow", 0.0) if "call_moneyflow" in r else 0.0
    put_mf  = r.get("put_moneyflow", 0.0)  if "put_moneyflow" in r else 0.0
    try:
        if float(call_mf) > 0:
            reasons.append("Call net buying")
    except:
        pass
    try:
        if float(put_mf) > 0:
            reasons.append("Put net buying")
    except:
        pass
    # pct-based reason
    try:
        if pd.notna(pct_change):
            if pct_change > 10:
                reasons.append("Strong premium move")
            elif pct_change > 3:
                reasons.append("Moderate premium move")
    except:
        pass
    if not reasons:
        reasons.append("No strong signals")

    reasons_txt = "; ".join(dict.fromkeys(reasons))  # dedupe preserving order

    # high-conviction stats (use Current_IsHighConvictionSignal column if present)
    hc_col = "Current_IsHighConvictionSignal"
    hc_total = 0
    hc_success = 0
    if hc_col in df.columns:
        hc_rows = df[(df.get("Current_Strikeprice")==s) & (df.get(hc_col)==True)]
        hc_total = int(hc_rows.shape[0])
        for _, hrow in hc_rows.iterrows():
            t0 = hrow.get("LTT")
            # pick which premium to measure based on recommended_action for this strike
            action = r.get("recommended_action","HOLD")
            p0 = None
            if action == "BUY_CALL" and curr_call_col in df.columns:
                p0 = hrow.get(curr_call_col)
            elif action == "BUY_PUT" and curr_put_col in df.columns:
                p0 = hrow.get(curr_put_col)
            else:
                # fallback to call premium if exists, else put
                p0 = hrow.get(curr_call_col) if curr_call_col in df.columns else hrow.get(curr_put_col) if curr_put_col in df.columns else None

            if pd.isna(t0) or p0 is None or pd.isna(p0):
                continue
            window = df[(df["LTT"] >= t0) & (df["LTT"] <= t0 + timedelta(minutes=3)) & ((df.get("Current_Strikeprice")==s) if "Current_Strikeprice" in df.columns else False)]
            if window.empty:
                continue
            # success = premium increased in the next 3 minutes (for both call & put we want premium ↑ for success)
            # use the same column we selected for p0
            if action == "BUY_PUT" and curr_put_col in df.columns:
                try:
                    if window[curr_put_col].max() > p0:
                        hc_success += 1
                except:
                    pass
            else:
                # assume call
                if curr_call_col in df.columns:
                    try:
                        if window[curr_call_col].max() > p0:
                            hc_success += 1
                    except:
                        pass

    hc_rate = (hc_success / hc_total) if hc_total > 0 else None

    final_rows.append({
        # unified fields (user requested)
        "strike": s,
        "Unnamed: 0": unnamed_val,
        "n_obs": n_obs,
        "first_premium": first_premium,
        "last_premium": last_premium,
        "peak_premium": peak_premium,
        "trough_premium": trough_premium,
        "abs_change": abs_change,
        "pct_change": pct_change,
        "5min_low": five_low,
        "5min_high": five_high,
        "10min_low": ten_low,
        "10min_high": ten_high,
        "p5_expected_lo": p5_expected_lo,
        "p5_expected_hi": p5_expected_hi,
        "p10_expected_lo": p10_expected_lo,
        "p10_expected_hi": p10_expected_hi,

        # moneyflow & tags + reasons + recommended action
        "call_moneyflow": r.get("call_moneyflow", 0.0),
        "put_moneyflow":  r.get("put_moneyflow", 0.0),
        "tags": r.get("tags",""),
        "reasons": reasons_txt,
        "recommended_action": r.get("recommended_action","HOLD"),

        # high conviction
        "highconv_total": hc_total,
        "highconv_success": hc_success,
        "highconv_hit_rate": hc_rate,

        # keep reference original fields (helpful)
        "Current_Strikeprice": s
    })

final_df = pd.DataFrame(final_rows)

# preserve original merged columns as well (optional): merge back the merged DF by strike to keep call/put detail
final_df = final_df.merge(merged.add_prefix("merged_"), left_on="strike", right_on="merged_strike", how="left")

# ---------- SAVE CSVs ----------
final_df.to_csv(OUT_MERGED, index=False)
top_buy_puts = merged[merged["recommended_action"]=="BUY_PUT"].copy()
if "pct_change_put" in top_buy_puts.columns:
    top_buy_puts = top_buy_puts.sort_values("pct_change_put", ascending=False).head(200)
else:
    top_buy_puts = top_buy_puts.head(200)
top_buy_puts.to_csv(OUT_TOP_PUTS, index=False)
print("Saved:", OUT_MERGED, OUT_TOP_PUTS)

# ---------- SPOT vs momentum plot ----------
last_spot = df["SpotPrice"].dropna().iloc[-1] if "SpotPrice" in df.columns and not df["SpotPrice"].dropna().empty else np.nan
avg_call_pct = merged["pct_change_call"].replace([np.inf, -np.inf], np.nan).dropna().mean() if "pct_change_call" in merged.columns else np.nan
avg_put_pct  = merged["pct_change_put"].replace([np.inf, -np.inf], np.nan).dropna().mean()  if "pct_change_put" in merged.columns  else np.nan

plt.figure(figsize=(8,4))
plt.title("Spot (last) vs Avg CE/PE %change")
vals = [last_spot if not math.isnan(last_spot) else 0, avg_call_pct if not pd.isna(avg_call_pct) else 0, avg_put_pct if not pd.isna(avg_put_pct) else 0]
plt.bar(["Spot Last","Avg CE %","Avg PE %"], vals)
plt.tight_layout()
plt.savefig(OUT_PLOT, dpi=150)
plt.close()
print("Saved:", OUT_PLOT)

# ---------- ALERTS: PUT REVERSAL CANDIDATES ----------
alerts = []
for _, r in merged.iterrows():
    pct_put = r.get("pct_change_put")
    if pct_put is None or (isinstance(pct_put, float) and np.isnan(pct_put)):
        continue
    tags = (r.get("tags") or "").lower()
    put_money = r.get("put_moneyflow", 0.0) if "put_moneyflow" in r else 0.0
    if pct_put > 8 and (("put buying" in tags) or ("bearish" in tags) or (put_money and put_money > 0)):
        alerts.append({
            "strike": int(r["strike"]),
            "pct_change_put": pct_put,
            "put_moneyflow": put_money,
            "tags": r.get("tags",""),
            "recommended_action": r.get("recommended_action",""),
            "reason": "Put momentum >8% and bearish tag/moneyflow"
        })

alerts_df = pd.DataFrame(alerts)
if not alerts_df.empty:
    alerts_df.to_csv(OUT_ALERTS, index=False)
    print("Saved:", OUT_ALERTS)
    print("Sample alerts:\n", alerts_df.head(20).to_string(index=False))
else:
    print("No put reversal alerts found.")

print("Pipeline complete.")


  df = pd.read_csv(INPUT_CSV, low_memory=False, parse_dates=['LTT'], infer_datetime_format=True)


Using columns (LTP prioritized):
Calls: Previous_Call_ltp Current_Call_ltp Next_Call_ltp
Puts : Previous_Put_ltp Current_Put_ltp Next_Put_ltp
Strikes: Previous_Strikeprice Current_Strikeprice Next_Strikeprice
Detected call strikes: 8
Detected put strikes : 8
Saved: MERGED_CE_PE_FORECAST.csv TOP_BUY_PUTS.csv
Saved: SPOT_VS_MOMENTUM.png
Saved: PUT_REVERSAL_ALERTS.csv
Sample alerts:
  strike  pct_change_put  put_moneyflow                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  

In [None]:
#!/usr/bin/env python3
# full_ce_pe_pipeline_use_ltp_enriched_plus_extras.py
import os, sys, math, json
from collections import defaultdict, Counter
from datetime import timedelta
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# ---------- CONFIG ----------
INPUT_CSV = "flattened_snapshots.csv"   # your uploaded file
OUT_MERGED = "MERGED_CE_PE_FORECAST.csv"
OUT_TOP_PUTS = "TOP_BUY_PUTS.csv"
OUT_TOP_CALLS = "TOP_BUY_CALLS.csv"
OUT_ALL_ACTIONS = "ALL_ACTIONS.csv"
OUT_TOP_HOLD = "TOP_HOLD_BREAKOUTS.csv"
OUT_CE_ONLY = "TOP_CE_ONLY.csv"
OUT_PE_ONLY = "TOP_PE_ONLY.csv"
OUT_ALERTS = "PUT_REVERSAL_ALERTS.csv"
OUT_PLOT = "SPOT_VS_MOMENTUM.png"

OUT_REVERSALS = "TOP_REVERSALS.csv"
OUT_IV_CRUSH = "IV_CRUSH.csv"
OUT_HEATMAP = "STRENGTH_HEATMAP.png"
OUT_AUTOTRADE = "AUTO_TRADES.json"

# thresholds (tweakable)
REVERSAL_DROP_PCT = 8.0          # reversal if premium drops this % from recent peak
REVERSAL_WINDOW_MIN = 6          # minutes to lookback for peak
IV_CRUSH_ABS = 5.0               # absolute IV drop considered crush (percentage points)
IV_CRUSH_REL_PCT = 15.0          # relative IV drop percent
AUTO_TRADE_STRENGTH = 8.0        # minimum strength_score to export auto-trade
DEFAULT_LOTSIZE = 25

if not os.path.exists(INPUT_CSV):
    print("ERROR: input file not found:", INPUT_CSV)
    sys.exit(1)

# ---------- LOAD ----------
df = pd.read_csv(INPUT_CSV, low_memory=False, parse_dates=['LTT'], infer_datetime_format=True)

# ---------- USE LTP COLUMNS (explicit) ----------
PREV_CALL_LTP = "Previous_Call_ltp"
CURR_CALL_LTP = "Current_Call_ltp"
NEXT_CALL_LTP = "Next_Call_ltp"

PREV_PUT_LTP  = "Previous_Put_ltp"
CURR_PUT_LTP  = "Current_Put_ltp"
NEXT_PUT_LTP  = "Next_Put_ltp"

PREV_STR = "Previous_Strikeprice"
CURR_STR  = "Current_Strikeprice"
NEXT_STR  = "Next_Strikeprice"

# choose LTP (fallback to _Premium only if ltp missing)
prev_call_col = PREV_CALL_LTP if PREV_CALL_LTP in df.columns else ("Previous_Call_Premium" if "Previous_Call_Premium" in df.columns else None)
curr_call_col = CURR_CALL_LTP if CURR_CALL_LTP in df.columns else ("Current_Call_Premium"  if "Current_Call_Premium"  in df.columns else None)
next_call_col = NEXT_CALL_LTP if NEXT_CALL_LTP in df.columns else ("Next_Call_Premium"     if "Next_Call_Premium"     in df.columns else None)

prev_put_col  = PREV_PUT_LTP  if PREV_PUT_LTP  in df.columns else ("Previous_Put_Premium" if "Previous_Put_Premium" in df.columns else None)
curr_put_col  = CURR_PUT_LTP  if CURR_PUT_LTP  in df.columns else ("Current_Put_Premium"  if "Current_Put_Premium"  in df.columns else None)
next_put_col  = NEXT_PUT_LTP  if NEXT_PUT_LTP  in df.columns else ("Next_Put_Premium"     if "Next_Put_Premium"     in df.columns else None)

print("Using columns (LTP prioritized):")
print("Calls:", prev_call_col, curr_call_col, next_call_col)
print("Puts :", prev_put_col,  curr_put_col,  next_put_col)
print("Strikes:", PREV_STR, CURR_STR, NEXT_STR)

# ---------- BUILD SERIES ----------
def build_series(prev_col, curr_col, next_col, prev_str_col=PREV_STR, curr_str_col=CURR_STR, next_str_col=NEXT_STR):
    series = defaultdict(list)
    for _, row in df.iterrows():
        t = row.get("LTT")
        if pd.isna(t):
            continue
        triples = [
            (prev_str_col, prev_col),
            (curr_str_col, curr_col),
            (next_str_col, next_col)
        ]
        for sc, pc in triples:
            if sc is None or pc is None:
                continue
            if sc not in df.columns or pc not in df.columns:
                continue
            if pd.isna(row.get(sc)) or pd.isna(row.get(pc)):
                continue
            try:
                s = int(row.get(sc))
                p = float(row.get(pc))
            except Exception:
                continue
            series[s].append((t, p))
    return series

call_series = build_series(prev_call_col, curr_call_col, next_call_col)
put_series  = build_series(prev_put_col, curr_put_col, next_put_col)

print("Detected call strikes:", len(call_series))
print("Detected put strikes :", len(put_series))

# ---------- SUMMARIZE & FORECAST HELPERS ----------
def summarize(pairs):
    sr = sorted(pairs, key=lambda x: x[0])
    ps = [p for _, p in sr]
    if not ps:
        return None
    first, last = ps[0], ps[-1]
    peak, trough = max(ps), min(ps)
    abs_chg = last - first
    pct_chg = (abs_chg / first * 100) if first != 0 else np.nan
    return {"first": first, "last": last, "peak": peak, "trough": trough,
            "abs_change": abs_chg, "pct_change": pct_chg, "n_obs": len(ps), "series_sorted": sr}

def forecast_from_pct(last, pct):
    if pct >= 25:
        return (last+15, last+35), (last+25, last+60)
    elif pct >= 8:
        return (last+6, last+18), (last+12, last+30)
    elif pct > 0:
        return (last+2, last+8), (last+5, last+15)
    else:
        return (last-5, last+2), (last-8, last+5)

# ---------- BUILD CALL & PUT SUMMARIES ----------
call_records = []
for s, ts in call_series.items():
    if len(ts) < 1:
        continue
    st = summarize(ts)
    if st is None:
        continue
    f5, f10 = forecast_from_pct(st["last"], st["pct_change"])
    call_records.append({
        "strike": s,
        "n_obs_call": st["n_obs"],
        "first_call_ltp": st["first"],
        "last_call_ltp": st["last"],
        "peak_call_ltp": st["peak"],
        "trough_call_ltp": st["trough"],
        "abs_change_call": st["abs_change"],
        "pct_change_call": st["pct_change"],
        "call_5min_low": f5[0], "call_5min_high": f5[1],
        "call_10min_low": f10[0], "call_10min_high": f10[1],
        "call_p5_lo": f5[0]-st["last"], "call_p5_hi": f5[1]-st["last"],
        "call_p10_lo": f10[0]-st["last"], "call_p10_hi": f10[1]-st["last"],
        "call_series_sorted": st["series_sorted"]
    })

put_records = []
for s, ts in put_series.items():
    if len(ts) < 1:
        continue
    st = summarize(ts)
    if st is None:
        continue
    f5, f10 = forecast_from_pct(st["last"], st["pct_change"])
    put_records.append({
        "strike": s,
        "n_obs_put": st["n_obs"],
        "first_put_ltp": st["first"],
        "last_put_ltp": st["last"],
        "peak_put_ltp": st["peak"],
        "trough_put_ltp": st["trough"],
        "abs_change_put": st["abs_change"],
        "pct_change_put": st["pct_change"],
        "put_5min_low": f5[0], "put_5min_high": f5[1],
        "put_10min_low": f10[0], "put_10min_high": f10[1],
        "put_p5_lo": f5[0]-st["last"], "put_p5_hi": f5[1]-st["last"],
        "put_p10_lo": f10[0]-st["last"], "put_p10_hi": f10[1]-st["last"],
        "put_series_sorted": st["series_sorted"]
    })

call_df = pd.DataFrame(call_records)
put_df  = pd.DataFrame(put_records)

# ---------- OUTER MERGE (call + put) ----------
if not call_df.empty and not put_df.empty:
    merged = pd.merge(call_df, put_df, on="strike", how="outer", sort=True)
elif not call_df.empty:
    merged = call_df.copy()
    for c in ["pct_change_put","last_put_ltp","put_moneyflow","n_obs_put","first_put_ltp"]:
        if c not in merged.columns:
            merged[c] = np.nan
elif not put_df.empty:
    merged = put_df.copy()
    for c in ["pct_change_call","last_call_ltp","call_moneyflow","n_obs_call","first_call_ltp"]:
        if c not in merged.columns:
            merged[c] = np.nan
else:
    merged = pd.DataFrame(columns=["strike"])

# ---------- EXTRACT TAGS & MONEYFLOW ----------
tag_cols = ["Previous_StrategyTag","Current_StrategyTag","Next_StrategyTag"]
call_money_cols = ["Previous_CallMoneyFlow","Current_CallMoneyFlow","Next_CallMoneyFlow",
                   "Previous_TotalcallMoneyFlow","Current_TotalcallMoneyFlow","Next_TotalcallMoneyFlow"]
put_money_cols  = ["Previous_PutMoneyFlow","Current_PutMoneyFlow","Next_PutMoneyFlow",
                   "Previous_TotalputMoneyFlow","Current_TotalputMoneyFlow","Next_TotalputMoneyFlow"]

# initialize strike_info
strike_info = {}
for s in merged["strike"].dropna().astype(int).unique():
    strike_info[int(s)] = {"tags":Counter(), "call_moneyflow":0.0, "put_moneyflow":0.0}

for idx, row in df.iterrows():
    for sc in (PREV_STR, CURR_STR, NEXT_STR):
        if sc not in df.columns:
            continue
        if pd.isna(row.get(sc)):
            continue
        try:
            s = int(row.get(sc))
        except:
            continue
        if s not in strike_info:
            continue
        # tags
        for col in tag_cols:
            if col in df.columns:
                v = row.get(col)
                if isinstance(v, str) and v.strip():
                    for token in [t.strip() for t in v.replace("|",";").split(";") if t.strip()]:
                        strike_info[s]["tags"][token] += 1
        # moneyflows
        for col in call_money_cols:
            if col in df.columns:
                try:
                    val = row.get(col)
                    if not pd.isna(val):
                        strike_info[s]["call_moneyflow"] += float(val)
                except:
                    pass
        for col in put_money_cols:
            if col in df.columns:
                try:
                    val = row.get(col)
                    if not pd.isna(val):
                        strike_info[s]["put_moneyflow"] += float(val)
                except:
                    pass

# attach tags/moneyflow to merged
def fmt_tags(s):
    t = strike_info.get(int(s), {}).get("tags", {})
    return ";".join([f"{k}:{v}" for k,v in t.items()]) if t else ""

merged["tags"] = merged["strike"].apply(lambda s: fmt_tags(s) if not pd.isna(s) else "")
merged["call_moneyflow"] = merged["strike"].apply(lambda s: strike_info.get(int(s), {}).get("call_moneyflow", 0.0))
merged["put_moneyflow"]  = merged["strike"].apply(lambda s: strike_info.get(int(s), {}).get("put_moneyflow", 0.0))

# ---------- DECIDE RECOMMENDED ACTION (dual-side logic) ----------
def decide_action(row):
    call_pct = row.get("pct_change_call", 0)
    put_pct  = row.get("pct_change_put", 0)
    call_pct = 0 if pd.isna(call_pct) else call_pct
    put_pct  = 0 if pd.isna(put_pct) else put_pct

    tags = (row.get("tags") or "").lower()
    bear_boost = ("put buying" in tags) or ("call writing" in tags) or ("bearish" in tags)
    bull_boost = ("call buying" in tags) or ("bullish" in tags) or ("oi_support_call" in tags)

    if (put_pct > 8) or (put_pct > 5 and bear_boost):
        return "BUY_PUT"
    if (call_pct > 8) or (call_pct > 5 and bull_boost):
        return "BUY_CALL"
    return "HOLD"

merged["recommended_action"] = merged.apply(decide_action, axis=1)

# ---------- ENRICH: unified summary columns + reasons + high-conviction stats ----------
final_rows = []
for _, r in merged.iterrows():
    s = int(r["strike"])
    sample = df[
        ((df.get("Current_Strikeprice") == s) if "Current_Strikeprice" in df.columns else False) |
        ((df.get("Previous_Strikeprice") == s) if "Previous_Strikeprice" in df.columns else False) |
        ((df.get("Next_Strikeprice") == s) if "Next_Strikeprice" in df.columns else False)
    ]
    unnamed_val = sample["Unnamed: 0"].iloc[0] if ("Unnamed: 0" in df.columns and not sample.empty) else np.nan

    n_obs_call = int(r.get("n_obs_call")) if pd.notna(r.get("n_obs_call")) else 0
    n_obs_put  = int(r.get("n_obs_put"))  if pd.notna(r.get("n_obs_put"))  else 0
    n_obs = n_obs_call + n_obs_put

    def choose(field_call, field_put):
        if field_call in r and pd.notna(r.get(field_call)):
            return r.get(field_call)
        if field_put in r and pd.notna(r.get(field_put)):
            return r.get(field_put)
        return np.nan

    first_premium = choose("first_call_ltp", "first_put_ltp")
    last_premium  = choose("last_call_ltp", "last_put_ltp")
    peak_premium  = choose("peak_call_ltp", "peak_put_ltp")
    trough_premium= choose("trough_call_ltp", "trough_put_ltp")
    abs_change    = choose("abs_change_call", "abs_change_put")
    pct_change    = choose("pct_change_call", "pct_change_put")

    five_low  = choose("call_5min_low", "put_5min_low")
    five_high = choose("call_5min_high", "put_5min_high")
    ten_low   = choose("call_10min_low", "put_10min_low")
    ten_high  = choose("call_10min_high", "put_10min_high")

    p5_expected_lo = (five_low - last_premium) if (pd.notna(five_low) and pd.notna(last_premium)) else np.nan
    p5_expected_hi = (five_high - last_premium) if (pd.notna(five_high) and pd.notna(last_premium)) else np.nan
    p10_expected_lo = (ten_low - last_premium) if (pd.notna(ten_low) and pd.notna(last_premium)) else np.nan
    p10_expected_hi = (ten_high - last_premium) if (pd.notna(ten_high) and pd.notna(last_premium)) else np.nan

    tags_text = r.get("tags","") or ""
    reasons = []
    if any(k.lower() in tags_text.lower() for k in ["rsi","RSI"]):
        reasons.append("RSI momentum")
    if "macd" in tags_text.lower():
        reasons.append("MACD confirmation")
    if "vwap" in tags_text.lower():
        reasons.append("VWAP divergence")
    if "oi" in tags_text.lower():
        reasons.append("OI support/resistance")

    call_mf = r.get("call_moneyflow", 0.0) if "call_moneyflow" in r else 0.0
    put_mf  = r.get("put_moneyflow", 0.0)  if "put_moneyflow" in r else 0.0
    try:
        if float(call_mf) > 0:
            reasons.append("Call net buying")
    except:
        pass
    try:
        if float(put_mf) > 0:
            reasons.append("Put net buying")
    except:
        pass
    try:
        if pd.notna(pct_change):
            if pct_change > 10:
                reasons.append("Strong premium move")
            elif pct_change > 3:
                reasons.append("Moderate premium move")
    except:
        pass
    if not reasons:
        reasons.append("No strong signals")
    reasons_txt = "; ".join(dict.fromkeys(reasons))

    hc_col = "Current_IsHighConvictionSignal"
    hc_total = 0
    hc_success = 0
    if hc_col in df.columns:
        hc_rows = df[(df.get("Current_Strikeprice")==s) & (df.get(hc_col)==True)]
        hc_total = int(hc_rows.shape[0])
        for _, hrow in hc_rows.iterrows():
            t0 = hrow.get("LTT")
            action = r.get("recommended_action","HOLD")
            p0 = None
            if action == "BUY_CALL" and curr_call_col in df.columns:
                p0 = hrow.get(curr_call_col)
            elif action == "BUY_PUT" and curr_put_col in df.columns:
                p0 = hrow.get(curr_put_col)
            else:
                p0 = hrow.get(curr_call_col) if curr_call_col in df.columns else hrow.get(curr_put_col) if curr_put_col in df.columns else None

            if pd.isna(t0) or p0 is None or pd.isna(p0):
                continue
            window = df[(df["LTT"] >= t0) & (df["LTT"] <= t0 + timedelta(minutes=3)) & ((df.get("Current_Strikeprice")==s) if "Current_Strikeprice" in df.columns else False)]
            if window.empty:
                continue
            if action == "BUY_PUT" and curr_put_col in df.columns:
                try:
                    if window[curr_put_col].max() > p0:
                        hc_success += 1
                except:
                    pass
            else:
                if curr_call_col in df.columns:
                    try:
                        if window[curr_call_col].max() > p0:
                            hc_success += 1
                    except:
                        pass

    hc_rate = (hc_success / hc_total) if hc_total > 0 else None

    final_rows.append({
        "strike": s,
        "Unnamed: 0": unnamed_val,
        "n_obs": n_obs,
        "first_premium": first_premium,
        "last_premium": last_premium,
        "peak_premium": peak_premium,
        "trough_premium": trough_premium,
        "abs_change": abs_change,
        "pct_change": pct_change,
        "5min_low": five_low,
        "5min_high": five_high,
        "10min_low": ten_low,
        "10min_high": ten_high,
        "p5_expected_lo": p5_expected_lo,
        "p5_expected_hi": p5_expected_hi,
        "p10_expected_lo": p10_expected_lo,
        "p10_expected_hi": p10_expected_hi,
        "call_moneyflow": r.get("call_moneyflow", 0.0),
        "put_moneyflow":  r.get("put_moneyflow", 0.0),
        "tags": r.get("tags",""),
        "reasons": reasons_txt,
        "recommended_action": r.get("recommended_action","HOLD"),
        "highconv_total": hc_total,
        "highconv_success": hc_success,
        "highconv_hit_rate": hc_rate,
        "Current_Strikeprice": s,
        # include merged details for debugging
        "merged_row": r.to_dict()
    })

final_df = pd.DataFrame(final_rows)

# preserve original merged columns as well
final_df = final_df.merge(merged.add_prefix("merged_"), left_on="strike", right_on="merged_strike", how="left")

# ---------- SAVE CSVs ----------
final_df.to_csv(OUT_MERGED, index=False)

# top puts
top_buy_puts = merged[merged["recommended_action"]=="BUY_PUT"].copy()
if "pct_change_put" in top_buy_puts.columns:
    top_buy_puts = top_buy_puts.sort_values("pct_change_put", ascending=False).head(200)
else:
    top_buy_puts = top_buy_puts.head(200)
top_buy_puts.to_csv(OUT_TOP_PUTS, index=False)

# top calls
top_buy_calls = merged[merged["recommended_action"] == "BUY_CALL"].copy()
if "pct_change_call" in top_buy_calls.columns:
    top_buy_calls = top_buy_calls.sort_values("pct_change_call", ascending=False).head(200)
else:
    top_buy_calls = top_buy_calls.head(200)
top_buy_calls.to_csv(OUT_TOP_CALLS, index=False)

# all actions
full_actions = merged.copy()
def score_row(r):
    cp = r.get("pct_change_call", 0)
    pp = r.get("pct_change_put", 0)
    if pd.isna(cp): cp = 0
    if pd.isna(pp): pp = 0
    if r.get("recommended_action") == "BUY_CALL":
        return cp
    if r.get("recommended_action") == "BUY_PUT":
        return pp
    return min(cp, pp) / 10
full_actions["strength_score"] = full_actions.apply(score_row, axis=1)
full_actions = full_actions.sort_values("strength_score", ascending=False)
full_actions.to_csv(OUT_ALL_ACTIONS, index=False)

# CE only / PE only
ce_only = merged.copy()
ce_only = ce_only[ce_only["pct_change_call"].notna() & (ce_only["n_obs_call"].fillna(0) > 0)]
ce_only.to_csv(OUT_CE_ONLY, index=False)

pe_only = merged.copy()
pe_only = pe_only[pe_only["pct_change_put"].notna() & (pe_only["n_obs_put"].fillna(0) > 0)]
pe_only.to_csv(OUT_PE_ONLY, index=False)

print("Saved:", OUT_MERGED, OUT_TOP_PUTS, OUT_TOP_CALLS, OUT_ALL_ACTIONS, OUT_CE_ONLY, OUT_PE_ONLY)

# ---------- SPOT vs momentum plot ----------
last_spot = df["SpotPrice"].dropna().iloc[-1] if "SpotPrice" in df.columns and not df["SpotPrice"].dropna().empty else np.nan
avg_call_pct = merged["pct_change_call"].replace([np.inf, -np.inf], np.nan).dropna().mean() if "pct_change_call" in merged.columns else np.nan
avg_put_pct  = merged["pct_change_put"].replace([np.inf, -np.inf], np.nan).dropna().mean()  if "pct_change_put" in merged.columns  else np.nan

plt.figure(figsize=(8,4))
plt.title("Spot (last) vs Avg CE/PE %change")
vals = [last_spot if not math.isnan(last_spot) else 0, avg_call_pct if not pd.isna(avg_call_pct) else 0, avg_put_pct if not pd.isna(avg_put_pct) else 0]
plt.bar(["Spot Last","Avg CE %","Avg PE %"], vals)
plt.tight_layout()
plt.savefig(OUT_PLOT, dpi=150)
plt.close()
print("Saved:", OUT_PLOT)

# ---------- ALERTS: PUT REVERSAL CANDIDATES ----------
alerts = []
for _, r in merged.iterrows():
    pct_put = r.get("pct_change_put")
    if pct_put is None or (isinstance(pct_put, float) and np.isnan(pct_put)):
        continue
    tags = (r.get("tags") or "").lower()
    put_money = r.get("put_moneyflow", 0.0) if "put_moneyflow" in r else 0.0
    if pct_put > 8 and (("put buying" in tags) or ("bearish" in tags) or (put_money and put_money > 0)):
        alerts.append({
            "strike": int(r["strike"]),
            "pct_change_put": pct_put,
            "put_moneyflow": put_money,
            "tags": r.get("tags",""),
            "recommended_action": r.get("recommended_action",""),
            "reason": "Put momentum >8% and bearish tag/moneyflow"
        })

alerts_df = pd.DataFrame(alerts)
if not alerts_df.empty:
    alerts_df.to_csv(OUT_ALERTS, index=False)
    print("Saved:", OUT_ALERTS)
    print("Sample alerts:\n", alerts_df.head(20).to_string(index=False))
else:
    print("No put reversal alerts found.")

# ---------- OPTIONAL HOLD/CE/PE tables (already saved above) ----------
hold_df = merged[merged["recommended_action"] == "HOLD"].copy()

# breakout score (already used earlier)
def breakout_score(row):
    score = 0
    if pd.notna(row.get("pct_change_call")):
        score += max(row["pct_change_call"], 0)
    if pd.notna(row.get("pct_change_put")):
        score += max(row["pct_change_put"], 0)
    try:
        score += (row.get("call_moneyflow", 0) > 0) * 5
        score += (row.get("put_moneyflow", 0) > 0) * 5
    except:
        pass
    tags = str(row.get("tags", "")).lower()
    if "breakout" in tags: score += 10
    if "momentum" in tags: score += 10
    if "rsi" in tags: score += 5
    if "macd" in tags: score += 5
    return score

hold_df["breakout_score"] = hold_df.apply(breakout_score, axis=1)
hold_df = hold_df.sort_values("breakout_score", ascending=False)
hold_df.to_csv(OUT_TOP_HOLD, index=False)
print("Saved:", OUT_TOP_HOLD)

# ---------- REVERSAL DETECTION ----------
# Heuristic: find recent peak in last REVERSAL_WINDOW_MIN minutes in strike series and if
# premium dropped by REVERSAL_DROP_PCT% or more from that peak -> reversal candidate
reversals = []
for s, rec in (call_records + put_records):
    # note: call_records/put_records entries are dicts with 'strike' and 'call_series_sorted'/'put_series_sorted'
    pass

# work with call_series & put_series which contain sorted lists (unsorted originally)
def detect_reversals_for_series(series_map, side_name):
    out = []
    for s, pairs in series_map.items():
        # sorted by time
        sr = sorted(pairs, key=lambda x: x[0])
        if len(sr) < 3:
            continue
        # consider last timestamp
        t_last = sr[-1][0]
        # find peak within lookback window
        lookback = pd.Timedelta(minutes=REVERSAL_WINDOW_MIN)
        window = [(t,p) for t,p in sr if t >= (t_last - lookback)]
        if not window:
            continue
        # peak price in window and last price
        peak_p = max([p for _,p in window])
        last_p = window[-1][1]
        # percentage drop from peak to last
        drop_pct = 0.0
        try:
            drop_pct = (peak_p - last_p) / peak_p * 100 if peak_p != 0 else 0.0
        except:
            drop_pct = 0.0
        if drop_pct >= REVERSAL_DROP_PCT:
            out.append({
                "strike": s,
                "side": side_name,
                "peak_in_window": peak_p,
                "last_price": last_p,
                "drop_pct": drop_pct,
                "window_minutes": REVERSAL_WINDOW_MIN
            })
    return out

# call_series & put_series are maps of strike -> [(t,p),...]
call_reversals = detect_reversals_for_series(call_series, "CALL")
put_reversals  = detect_reversals_for_series(put_series, "PUT")

rev_df = pd.DataFrame(call_reversals + put_reversals)
if not rev_df.empty:
    rev_df.to_csv(OUT_REVERSALS, index=False)
    print("Saved:", OUT_REVERSALS)
else:
    print("No strong reversals detected by heuristic.")

# ---------- IV-CRUSH DETECTION ----------
# Heuristic: for each strike, compute recent IV change (Current_IV vs previous IV)
iv_crush_list = []
iv_cols_candidates = [
    ("Previous_Call_IV","Current_Call_IV","Next_Call_IV"),
    ("Previous_Put_IV","Current_Put_IV","Next_Put_IV")
]

# We'll operate per-strike, scanning rows where Current_Strikeprice == s and checking IV drops between previous/current or current/next when present
for idx, row in df.iterrows():
    for side, (prev_col, curr_col, next_col) in (("CALL", iv_cols_candidates[0]), ("PUT", iv_cols_candidates[1])):
        if curr_col not in df.columns:
            continue
        s = row.get("Current_Strikeprice") if "Current_Strikeprice" in df.columns else None
        if pd.isna(s):
            continue
        try:
            s = int(s)
        except:
            continue
        iv_curr = row.get(curr_col)
        iv_prev = row.get(prev_col) if prev_col in df.columns else None
        # if both present, compute absolute and relative drop
        if pd.notna(iv_curr) and pd.notna(iv_prev):
            try:
                abs_drop = float(iv_prev) - float(iv_curr)
                rel_drop = (abs_drop / float(iv_prev) * 100) if float(iv_prev) != 0 else 0.0
            except:
                abs_drop = 0.0
                rel_drop = 0.0
            if (abs_drop >= IV_CRUSH_ABS) or (rel_drop >= IV_CRUSH_REL_PCT):
                iv_crush_list.append({
                    "strike": s,
                    "side": side,
                    "iv_prev": iv_prev,
                    "iv_curr": iv_curr,
                    "abs_drop": abs_drop,
                    "rel_drop_pct": rel_drop,
                    "row_index": idx,
                    "LTT": row.get("LTT")
                })

iv_crush_df = pd.DataFrame(iv_crush_list)
if not iv_crush_df.empty:
    iv_crush_df.to_csv(OUT_IV_CRUSH, index=False)
    print("Saved:", OUT_IV_CRUSH)
else:
    print("No IV crush candidates found with thresholds.")

# ---------- STRENGTH HEATMAP ----------
# Build a small matrix: for each strike (rows) show [avg pct_change_call, avg pct_change_put, strength_score]
heat_rows = []
for _, r in merged.iterrows():
    strike = int(r["strike"])
    a = {}
    a["strike"] = strike
    a["avg_pct_call"] = r.get("pct_change_call") if not pd.isna(r.get("pct_change_call")) else 0.0
    a["avg_pct_put"]  = r.get("pct_change_put")  if not pd.isna(r.get("pct_change_put")) else 0.0
    # reuse earlier scoring
    a["strength"] = score_row(r)
    heat_rows.append(a)

heat_df = pd.DataFrame(heat_rows).sort_values("strike", ascending=True)
if not heat_df.empty:
    # create heatmap data: columns = [avg_pct_call, avg_pct_put, strength]
    hm = heat_df[["avg_pct_call","avg_pct_put","strength"]].fillna(0).to_numpy()
    plt.figure(figsize=(6, max(6, len(hm)/20)))
    plt.title("Strike strength heatmap (rows=strikes)")
    # imshow without custom colors (matplotlib default)
    plt.imshow(hm, aspect='auto', interpolation='nearest')
    plt.colorbar(label='value')
    plt.yticks(range(len(heat_df)), heat_df["strike"].astype(str))
    plt.xticks([0,1,2], ["avg_pct_call","avg_pct_put","strength"])
    plt.tight_layout()
    plt.savefig(OUT_HEATMAP, dpi=150)
    plt.close()
    print("Saved:", OUT_HEATMAP)
else:
    print("Heatmap skipped (no heat data).")

# ---------- AUTO-TRADE EXPORT (JSON) ----------
# Export simple order suggestions for strong signals
auto_trades = []
for _, r in full_actions.iterrows():
    strength = r.get("strength_score", 0)
    if pd.isna(strength) or strength < AUTO_TRADE_STRENGTH:
        continue
    action = r.get("recommended_action", "HOLD")
    if action not in ("BUY_CALL","BUY_PUT"):
        continue
    strike = int(r["strike"])
    # choose price: prefer last_call_ltp/last_put_ltp then merged fields
    if action == "BUY_CALL":
        price = r.get("last_call_ltp") if not pd.isna(r.get("last_call_ltp")) else r.get("merged_last_call_ltp") if "merged_last_call_ltp" in r else None
    else:
        price = r.get("last_put_ltp") if not pd.isna(r.get("last_put_ltp")) else r.get("merged_last_put_ltp") if "merged_last_put_ltp" in r else None

    lot = DEFAULT_LOTSIZE
    # if df has Current_Lotsize or Current_Lotsize column, prefer it
    if "Current_Lotsize" in df.columns:
        # pick most recent lotsize for this strike
        sample = df[df.get("Current_Strikeprice")==strike] if "Current_Strikeprice" in df.columns else df[df.get("Previous_Strikeprice")==strike] if "Previous_Strikeprice" in df.columns else None
        if sample is not None and not sample.empty and "Current_Lotsize" in sample.columns:
            try:
                lot = int(sample["Current_Lotsize"].iloc[-1])
            except:
                lot = DEFAULT_LOTSIZE

    order = {
        "strike": strike,
        "side": "CE" if action=="BUY_CALL" else "PE",
        "action": action,
        "suggested_price": float(price) if price is not None and not pd.isna(price) else None,
        "lots": lot,
        "strength_score": float(strength)
    }
    auto_trades.append(order)

# write JSON
with open(OUT_AUTOTRADE, "w") as fh:
    json.dump(auto_trades, fh, indent=2, default=str)

print("Saved:", OUT_AUTOTRADE, " (", len(auto_trades), "orders )")

print("Pipeline complete.")


  df = pd.read_csv(INPUT_CSV, low_memory=False, parse_dates=['LTT'], infer_datetime_format=True)


Using columns (LTP prioritized):
Calls: Previous_Call_ltp Current_Call_ltp Next_Call_ltp
Puts : Previous_Put_ltp Current_Put_ltp Next_Put_ltp
Strikes: Previous_Strikeprice Current_Strikeprice Next_Strikeprice
Detected call strikes: 8
Detected put strikes : 8
Saved: MERGED_CE_PE_FORECAST.csv TOP_BUY_PUTS.csv TOP_BUY_CALLS.csv ALL_ACTIONS.csv TOP_CE_ONLY.csv TOP_PE_ONLY.csv
Saved: SPOT_VS_MOMENTUM.png
Saved: PUT_REVERSAL_ALERTS.csv
Sample alerts:
  strike  pct_change_put  put_moneyflow                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                

ValueError: too many values to unpack (expected 2)

In [None]:
#!/usr/bin/env python3
# full_ce_pe_pipeline_use_ltp_enriched_final.py

import os, sys, math, json
from collections import defaultdict, Counter
from datetime import timedelta
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# ---------------------- CONFIG ---------------------- #
INPUT_CSV = "flattened_snapshots.csv"

OUT_MERGED = "MERGED_CE_PE_FORECAST.csv"
OUT_TOP_PUTS = "TOP_BUY_PUTS.csv"
OUT_TOP_CALLS = "TOP_BUY_CALLS.csv"
OUT_ALL_ACTIONS = "ALL_ACTIONS.csv"

OUT_CE_ONLY = "TOP_CE_ONLY.csv"
OUT_PE_ONLY = "TOP_PE_ONLY.csv"
OUT_HOLD_BREAKOUTS = "TOP_HOLD_BREAKOUTS.csv"

OUT_REVERSALS = "REVERSALS.csv"
OUT_IV_CRUSH = "IV_CRUSH.csv"

OUT_PLOT = "SPOT_VS_MOMENTUM.png"
OUT_HEATMAP = "PREMIUM_HEATMAP.png"
OUT_AUTOTRADE = "AUTO_TRADE_SIGNALS.json"

REVERSAL_WINDOW_MIN = 5
REVERSAL_DROP_PCT = 12        # % drop from peak to last
IV_CRUSH_DROP = 15            # % drop in IV


# ---------------------- LOAD ---------------------- #
if not os.path.exists(INPUT_CSV):
    print("ERROR: input file not found:", INPUT_CSV)
    sys.exit(1)

df = pd.read_csv(
    INPUT_CSV,
    low_memory=False,
    parse_dates=['LTT']  # infer_datetime_format is deprecated
)

# ---------------------- USE LTP COLUMNS ---------------------- #
PREV_CALL_LTP = "Previous_Call_ltp"
CURR_CALL_LTP = "Current_Call_ltp"
NEXT_CALL_LTP = "Next_Call_ltp"

PREV_PUT_LTP  = "Previous_Put_ltp"
CURR_PUT_LTP  = "Current_Put_ltp"
NEXT_PUT_LTP  = "Next_Put_ltp"

PREV_STR = "Previous_Strikeprice"
CURR_STR = "Current_Strikeprice"
NEXT_STR = "Next_Strikeprice"

# Fallback to Premium if LTP missing (rare)
def pick(col_ltp, col_pre):
    if col_ltp in df.columns:
        return col_ltp
    if col_pre in df.columns:
        return col_pre
    return None

prev_call_col = pick(PREV_CALL_LTP, "Previous_Call_Premium")
curr_call_col = pick(CURR_CALL_LTP, "Current_Call_Premium")
next_call_col = pick(NEXT_CALL_LTP, "Next_Call_Premium")

prev_put_col = pick(PREV_PUT_LTP, "Previous_Put_Premium")
curr_put_col = pick(CURR_PUT_LTP, "Current_Put_Premium")
next_put_col = pick(NEXT_PUT_LTP, "Next_Put_Premium")

print("Using Call columns:", prev_call_col, curr_call_col, next_call_col)
print("Using Put  columns:", prev_put_col,  curr_put_col,  next_put_col)


# ---------------------- BUILD SERIES ---------------------- #
def build_series(prev_col, curr_col, next_col):
    series = defaultdict(list)
    for _, row in df.iterrows():
        t = row["LTT"]
        for sc, pc in [(PREV_STR, prev_col), (CURR_STR, curr_col), (NEXT_STR, next_col)]:
            if sc not in row or pc not in row:
                continue
            if pd.isna(row[sc]) or pd.isna(row[pc]):
                continue
            try:
                s = int(row[sc])
                p = float(row[pc])
                series[s].append((t, p))
            except:
                continue
    return series


call_series = build_series(prev_call_col, curr_call_col, next_call_col)
put_series = build_series(prev_put_col, curr_put_col, next_put_col)

print("Detected call strikes:", len(call_series))
print("Detected put strikes :", len(put_series))


# ---------------------- SUMMARY + FORECAST ---------------------- #
def summarize(pairs):
    sr = sorted(pairs, key=lambda x: x[0])
    ps = [p for _, p in sr]
    if not ps:
        return None
    first, last = ps[0], ps[-1]
    abs_chg = last - first
    pct_chg = abs_chg / first * 100 if first != 0 else np.nan
    return {
        "first": first,
        "last": last,
        "peak": max(ps),
        "trough": min(ps),
        "abs_change": abs_chg,
        "pct_change": pct_chg,
        "n_obs": len(ps)
    }


def forecast(last, pct):
    if pct >= 25:
        return (last+15, last+35), (last+25, last+60)
    if pct >= 8:
        return (last+6, last+18), (last+12, last+30)
    if pct > 0:
        return (last+2, last+8), (last+5, last+15)
    return (last-5, last+2), (last-8, last+5)


# ---------------------- CALL / PUT SUMMARY TABLES ---------------------- #
call_records, put_records = [], []

for s, ts in call_series.items():
    st = summarize(ts)
    if not st: continue
    f5, f10 = forecast(st["last"], st["pct_change"])
    call_records.append({
        "strike": s,
        "n_obs_call": st["n_obs"],
        "first_call_ltp": st["first"],
        "last_call_ltp": st["last"],
        "peak_call_ltp": st["peak"],
        "trough_call_ltp": st["trough"],
        "abs_change_call": st["abs_change"],
        "pct_change_call": st["pct_change"],
        "call_5min_low": f5[0], "call_5min_high": f5[1],
        "call_10min_low": f10[0], "call_10min_high": f10[1]
    })

for s, ts in put_series.items():
    st = summarize(ts)
    if not st: continue
    f5, f10 = forecast(st["last"], st["pct_change"])
    put_records.append({
        "strike": s,
        "n_obs_put": st["n_obs"],
        "first_put_ltp": st["first"],
        "last_put_ltp": st["last"],
        "peak_put_ltp": st["peak"],
        "trough_put_ltp": st["trough"],
        "abs_change_put": st["abs_change"],
        "pct_change_put": st["pct_change"],
        "put_5min_low": f5[0], "put_5min_high": f5[1],
        "put_10min_low": f10[0], "put_10min_high": f10[1]
    })

call_df = pd.DataFrame(call_records)
put_df  = pd.DataFrame(put_records)

merged = pd.merge(call_df, put_df, on="strike", how="outer")


# ---------------------- TAGS + MONEYFLOW ---------------------- #
tag_cols = ["Previous_StrategyTag","Current_StrategyTag","Next_StrategyTag"]
call_mflow = ["Previous_CallMoneyFlow","Current_CallMoneyFlow","Next_CallMoneyFlow"]
put_mflow  = ["Previous_PutMoneyFlow","Current_PutMoneyFlow","Next_PutMoneyFlow"]

strike_info = {int(s):{"tags":Counter(),"call_mf":0,"put_mf":0} for s in merged["strike"]}

for _, row in df.iterrows():
    for sc in [PREV_STR, CURR_STR, NEXT_STR]:
        if pd.isna(row.get(sc)): continue
        s = int(row[sc])
        if s not in strike_info: continue

        # tags
        for tc in tag_cols:
            if tc in row and isinstance(row[tc], str):
                tokens = [x.strip() for x in row[tc].replace("|",";").split(";") if x.strip()]
                for t in tokens:
                    strike_info[s]["tags"][t] += 1

        # moneyflow
        for col in call_mflow:
            if col in row and not pd.isna(row[col]):
                strike_info[s]["call_mf"] += float(row[col])
        for col in put_mflow:
            if col in row and not pd.isna(row[col]):
                strike_info[s]["put_mf"] += float(row[col])

merged["tags"] = merged["strike"].apply(lambda s: ";".join([f"{k}:{v}" for k,v in strike_info[int(s)]["tags"].items()]))
merged["call_moneyflow"] = merged["strike"].apply(lambda s: strike_info[int(s)]["call_mf"])
merged["put_moneyflow"]  = merged["strike"].apply(lambda s: strike_info[int(s)]["put_mf"])


# ---------------------- ACTION DECIDER ---------------------- #
def decide(row):
    cp = row.get("pct_change_call",0) or 0
    pp = row.get("pct_change_put",0) or 0
    t = row.get("tags","").lower()

    bull = ("call buying" in t) or (cp > 8)
    bear = ("put buying" in t) or (pp > 8)

    if bear: return "BUY_PUT"
    if bull: return "BUY_CALL"
    return "HOLD"

merged["recommended_action"] = merged.apply(decide, axis=1)


# ---------------------- REVERSAL DETECTION (corrected) ---------------------- #
def detect_reversals(series_map, side):
    out = []
    for s, ts in series_map.items():
        sr = sorted(ts, key=lambda x:x[0])
        if len(sr) < 3:
            continue
        t_last = sr[-1][0]
        window = [(t,p) for t,p in sr if t >= (t_last - timedelta(minutes=REVERSAL_WINDOW_MIN))]
        if not window:
            continue
        peak = max([p for _,p in window])
        last = window[-1][1]
        drop = (peak-last)/peak*100 if peak>0 else 0
        if drop >= REVERSAL_DROP_PCT:
            out.append({"strike":s,"side":side,"peak":peak,"last":last,"drop_pct":drop})
    return out

rev_df = pd.DataFrame(detect_reversals(call_series,"CALL") +
                      detect_reversals(put_series,"PUT"))
rev_df.to_csv(OUT_REVERSALS, index=False)


# ---------------------- IV CRUSH DETECTION ---------------------- #
iv_cols_prev = ["Previous_Call_IV","Previous_Put_IV"]
iv_cols_curr = ["Current_Call_IV","Current_Put_IV"]

iv_crush = []

for _, row in df.iterrows():
    for prev_iv, curr_iv, side in [
        ("Previous_Call_IV","Current_Call_IV","CALL"),
        ("Previous_Put_IV","Current_Put_IV","PUT")
    ]:
        if prev_iv in row and curr_iv in row:
            if pd.notna(row[prev_iv]) and pd.notna(row[curr_iv]):
                if row[prev_iv] > 0:
                    drop = (row[prev_iv] - row[curr_iv]) / row[prev_iv] * 100
                    if drop >= IV_CRUSH_DROP:
                        iv_crush.append({
                            "LTT": row["LTT"],
                            "strike": row.get("Current_Strikeprice"),
                            "side": side,
                            "prev_iv": row[prev_iv],
                            "curr_iv": row[curr_iv],
                            "drop_pct": drop
                        })

iv_crush_df = pd.DataFrame(iv_crush)
iv_crush_df.to_csv(OUT_IV_CRUSH, index=False)


# ---------------------- HEATMAP ---------------------- #
plt.figure(figsize=(10,8))
heatmap_df = merged[["pct_change_call","pct_change_put"]].fillna(0)
plt.imshow(heatmap_df.values, aspect='auto')
plt.colorbar(label="Premium % Change")
plt.title("CALL/PUT Premium Heatmap")
plt.savefig(OUT_HEATMAP, dpi=150)
plt.close()


# ---------------------- AUTO-TRADE EXPORT ---------------------- #
signals = []

for _, r in merged.iterrows():
    if r["recommended_action"] in ["BUY_CALL","BUY_PUT"]:
        signals.append({
            "strike": int(r["strike"]),
            "action": r["recommended_action"],
            "strength": float(r.get("pct_change_call",0)) if r["recommended_action"]=="BUY_CALL"
                        else float(r.get("pct_change_put",0)),
            "reason": r.get("tags","")
        })

with open(OUT_AUTOTRADE,"w") as f:
    json.dump(signals, f, indent=4)


# ---------------------- WRITE OUTPUT CSVs ---------------------- #
merged.to_csv(OUT_MERGED,index=False)
merged[merged["recommended_action"]=="BUY_PUT"].sort_values("pct_change_put",ascending=False).head(200).to_csv(OUT_TOP_PUTS,index=False)
merged[merged["recommended_action"]=="BUY_CALL"].sort_values("pct_change_call",ascending=False).head(200).to_csv(OUT_TOP_CALLS,index=False)

# All actions sorted by strength
merged["strength"] = merged.apply(lambda r: max(
    r.get("pct_change_call",0) or 0,
    r.get("pct_change_put",0) or 0
), axis=1)
merged.sort_values("strength",ascending=False).to_csv(OUT_ALL_ACTIONS,index=False)

# CE-only & PE-only
merged[merged["pct_change_call"].notna()].to_csv(OUT_CE_ONLY,index=False)
merged[merged["pct_change_put"].notna()].to_csv(OUT_PE_ONLY,index=False)

# HOLD breakout
hold_df = merged[merged["recommended_action"]=="HOLD"].copy()
hold_df["breakout_score"] = (
    hold_df["pct_change_call"].clip(lower=0).fillna(0)
    + hold_df["pct_change_put"].clip(lower=0).fillna(0)
)
hold_df.sort_values("breakout_score",ascending=False).to_csv(OUT_HOLD_BREAKOUTS,index=False)

print("✔ All processing complete.")
print("Generated:")
print("-", OUT_MERGED)
print("-", OUT_TOP_PUTS)
print("-", OUT_TOP_CALLS)
print("-", OUT_ALL_ACTIONS)
print("-", OUT_CE_ONLY)
print("-", OUT_PE_ONLY)
print("-", OUT_HOLD_BREAKOUTS)
print("-", OUT_REVERSALS)
print("-", OUT_IV_CRUSH)
print("-", OUT_PLOT)
print("-", OUT_HEATMAP)
print("-", OUT_AUTOTRADE)


Using Call columns: Previous_Call_ltp Current_Call_ltp Next_Call_ltp
Using Put  columns: Previous_Put_ltp Current_Put_ltp Next_Put_ltp
Detected call strikes: 8
Detected put strikes : 8
✔ All processing complete.
Generated:
- MERGED_CE_PE_FORECAST.csv
- TOP_BUY_PUTS.csv
- TOP_BUY_CALLS.csv
- ALL_ACTIONS.csv
- TOP_CE_ONLY.csv
- TOP_PE_ONLY.csv
- TOP_HOLD_BREAKOUTS.csv
- REVERSALS.csv
- IV_CRUSH.csv
- SPOT_VS_MOMENTUM.png
- PREMIUM_HEATMAP.png
- AUTO_TRADE_SIGNALS.json


In [None]:
#!/usr/bin/env python3
# full_ce_pe_pipeline_use_ltp_enriched_final_restored.py
"""
Consolidated CE+PE pipeline using LTP where available.
Produces a merged enriched CSV with all requested columns,
plus auxiliary outputs: top calls/puts, alerts, reversals, IV-crush,
heatmap, auto-trade export, CE-only / PE-only lists, hold-breakouts.

Drop this file next to your flattened_snapshots.csv and run:
    python full_ce_pe_pipeline_use_ltp_enriched_final_restored.py
"""

import os
import sys
import json
from collections import defaultdict, Counter
from datetime import timedelta
import math

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# ---------------------- CONFIG ---------------------- #
INPUT_CSV = "flattened_snapshots.csv"

OUT_MERGED = "MERGED_CE_PE_FORECAST.csv"
OUT_TOP_PUTS = "TOP_BUY_PUTS.csv"
OUT_TOP_CALLS = "TOP_BUY_CALLS.csv"
OUT_ALL_ACTIONS = "ALL_ACTIONS.csv"

OUT_CE_ONLY = "TOP_CE_ONLY.csv"
OUT_PE_ONLY = "TOP_PE_ONLY.csv"
OUT_HOLD_BREAKOUTS = "TOP_HOLD_BREAKOUTS.csv"

OUT_REVERSALS = "REVERSALS.csv"
OUT_IV_CRUSH = "IV_CRUSH.csv"

OUT_PLOT = "SPOT_VS_MOMENTUM.png"
OUT_HEATMAP = "PREMIUM_HEATMAP.png"
OUT_AUTOTRADE = "AUTO_TRADE_SIGNALS.json"

REVERSAL_WINDOW_MIN = 5
REVERSAL_DROP_PCT = 12        # % drop from peak to last to mark reversal
IV_CRUSH_DROP = 15            # % drop in IV to mark IV crush

# ---------------------- LOAD ---------------------- #
if not os.path.exists(INPUT_CSV):
    print(f"ERROR: input file not found: {INPUT_CSV}")
    sys.exit(1)

# Parse LTT as datetime if present; if not, we'll proceed without times.
df = pd.read_csv(INPUT_CSV, low_memory=False)
if "LTT" in df.columns:
    try:
        df["LTT"] = pd.to_datetime(df["LTT"])
    except Exception:
        # keep raw if parse fails
        pass

# ---------------------- COLUMN SELECTION (LTP preferred) ---------------------- #
# Preferred LTP column names
PREV_CALL_LTP = "Previous_Call_ltp"
CURR_CALL_LTP = "Current_Call_ltp"
NEXT_CALL_LTP = "Next_Call_ltp"

PREV_PUT_LTP = "Previous_Put_ltp"
CURR_PUT_LTP = "Current_Put_ltp"
NEXT_PUT_LTP = "Next_Put_ltp"

# Strike columns
PREV_STR = "Previous_Strikeprice"
CURR_STR = "Current_Strikeprice"
NEXT_STR = "Next_Strikeprice"

# fallback prefer _Premium if ltp missing
def pick(col_ltp, col_prem):
    if col_ltp in df.columns:
        return col_ltp
    if col_prem in df.columns:
        return col_prem
    return None

prev_call_col = pick(PREV_CALL_LTP, "Previous_Call_Premium")
curr_call_col = pick(CURR_CALL_LTP, "Current_Call_Premium")
next_call_col = pick(NEXT_CALL_LTP, "Next_Call_Premium")

prev_put_col = pick(PREV_PUT_LTP, "Previous_Put_Premium")
curr_put_col = pick(CURR_PUT_LTP, "Current_Put_Premium")
next_put_col = pick(NEXT_PUT_LTP, "Next_Put_Premium")

print("Using Call columns:", prev_call_col, curr_call_col, next_call_col)
print("Using Put  columns:", prev_put_col, curr_put_col, next_put_col)
print("Using Strike columns:", PREV_STR, CURR_STR, NEXT_STR)

# ---------------------- BUILD TIMESERIES PER STRIKE ---------------------- #
def build_series(str_col_prev, str_col_curr, str_col_next, val_prev, val_curr, val_next):
    series = defaultdict(list)
    for idx, row in df.iterrows():
        t = row.get("LTT", None)
        triples = [
            (str_col_prev, val_prev),
            (str_col_curr, val_curr),
            (str_col_next, val_next)
        ]
        for sc, vc in triples:
            if sc is None or vc is None:
                continue
            if sc not in df.columns or vc not in df.columns:
                continue
            scv = row.get(sc)
            vcv = row.get(vc)
            if pd.isna(scv) or pd.isna(vcv):
                continue
            try:
                s = int(scv)
                p = float(vcv)
            except Exception:
                continue
            # keep timestamp if present, else use index-based monotonic increasing fallback
            if t is None or pd.isna(t):
                # use index as proxy timestamp
                t_use = pd.Timestamp(idx)
            else:
                t_use = t
            series[s].append((t_use, p))
    return series

call_series = build_series(PREV_STR, CURR_STR, NEXT_STR, prev_call_col, curr_call_col, next_call_col)
put_series  = build_series(PREV_STR, CURR_STR, NEXT_STR, prev_put_col, curr_put_col, next_put_col)

print("Detected call strikes:", len(call_series))
print("Detected put strikes :", len(put_series))

# ---------------------- SUMMARY & FORECAST HELPERS ---------------------- #
def summarize(pairs):
    pairs_sorted = sorted(pairs, key=lambda x: x[0])
    prices = [p for _, p in pairs_sorted]
    if not prices:
        return None
    first = prices[0]
    last = prices[-1]
    peak = max(prices)
    trough = min(prices)
    abs_change = last - first
    pct_change = (abs_change / first * 100) if first != 0 else np.nan
    return {
        "first": first,
        "last": last,
        "peak": peak,
        "trough": trough,
        "abs_change": abs_change,
        "pct_change": pct_change,
        "n_obs": len(prices),
        "series_sorted": pairs_sorted
    }

def forecast_from_pct(last, pct):
    """Return ((5min_low,5min_high),(10min_low,10min_high))"""
    if pd.isna(pct):
        pct = 0.0
    if pct >= 25:
        return (last+15, last+35), (last+25, last+60)
    if pct >= 8:
        return (last+6, last+18), (last+12, last+30)
    if pct > 0:
        return (last+2, last+8), (last+5, last+15)
    # neutral/slight negative
    return (last-5, last+2), (last-8, last+5)

# Build per-side summaries
call_summary = {}
for s, pairs in call_series.items():
    st = summarize(pairs)
    if st:
        call_summary[s] = st

put_summary = {}
for s, pairs in put_series.items():
    st = summarize(pairs)
    if st:
        put_summary[s] = st

# All unique strikes from both sides
all_strikes = sorted(set(list(call_summary.keys()) + list(put_summary.keys())))

# ---------------------- BUILD MERGED ENRICHED ROWS ---------------------- #
rows = []
for s in all_strikes:
    cs = call_summary.get(s)
    ps = put_summary.get(s)

    # n_obs unified
    n_obs_call = int(cs["n_obs"]) if cs else 0
    n_obs_put  = int(ps["n_obs"]) if ps else 0
    n_obs = n_obs_call + n_obs_put

    # choose first/last/peak/trough: prefer side with more observations (call over put if tie)
    def prefer(key_call, key_put):
        if cs and key_call in cs:
            return cs[key_call]
        if ps and key_put in ps:
            return ps[key_put]
        return np.nan

    # However to be explicit use logic:
    if n_obs_call >= n_obs_put and cs:
        first_premium = cs["first"]
        last_premium  = cs["last"]
        peak_premium  = cs["peak"]
        trough_premium= cs["trough"]
        abs_change    = cs["abs_change"]
        pct_change    = cs["pct_change"]
        f5, f10 = forecast_from_pct(last_premium, pct_change)
    elif ps:
        first_premium = ps["first"]
        last_premium  = ps["last"]
        peak_premium  = ps["peak"]
        trough_premium= ps["trough"]
        abs_change    = ps["abs_change"]
        pct_change    = ps["pct_change"]
        f5, f10 = forecast_from_pct(last_premium, pct_change)
    else:
        first_premium = last_premium = peak_premium = trough_premium = abs_change = pct_change = np.nan
        f5 = (np.nan, np.nan); f10 = (np.nan, np.nan)

    # p5/p10 expected deltas
    p5_expected_lo = (f5[0] - last_premium) if (not pd.isna(f5[0]) and not pd.isna(last_premium)) else np.nan
    p5_expected_hi = (f5[1] - last_premium) if (not pd.isna(f5[1]) and not pd.isna(last_premium)) else np.nan
    p10_expected_lo = (f10[0] - last_premium) if (not pd.isna(f10[0]) and not pd.isna(last_premium)) else np.nan
    p10_expected_hi = (f10[1] - last_premium) if (not pd.isna(f10[1]) and not pd.isna(last_premium)) else np.nan

    rows.append({
        "strike": s,
        "Unnamed: 0": (df["Unnamed: 0"].iloc[0] if "Unnamed: 0" in df.columns else np.nan),
        "n_obs": n_obs,
        "first_premium": first_premium,
        "last_premium": last_premium,
        "peak_premium": peak_premium,
        "trough_premium": trough_premium,
        "abs_change": abs_change,
        "pct_change": pct_change,
        "5min_low": f5[0],
        "5min_high": f5[1],
        "10min_low": f10[0],
        "10min_high": f10[1],
        "p5_expected_lo": p5_expected_lo,
        "p5_expected_hi": p5_expected_hi,
        "p10_expected_lo": p10_expected_lo,
        "p10_expected_hi": p10_expected_hi,
        # placeholders for call/put details - attach below
        "n_obs_call": n_obs_call,
        "n_obs_put": n_obs_put,
        "first_call_ltp": cs["first"] if cs else np.nan,
        "last_call_ltp": cs["last"] if cs else np.nan,
        "peak_call_ltp": cs["peak"] if cs else np.nan,
        "trough_call_ltp": cs["trough"] if cs else np.nan,
        "abs_change_call": cs["abs_change"] if cs else np.nan,
        "pct_change_call": cs["pct_change"] if cs else np.nan,
        "first_put_ltp": ps["first"] if ps else np.nan,
        "last_put_ltp": ps["last"] if ps else np.nan,
        "peak_put_ltp": ps["peak"] if ps else np.nan,
        "trough_put_ltp": ps["trough"] if ps else np.nan,
        "abs_change_put": ps["abs_change"] if ps else np.nan,
        "pct_change_put": ps["pct_change"] if ps else np.nan,
    })

merged_df = pd.DataFrame(rows)

# ---------------------- EXTRACT TAGS & MONEYFLOW FOR EACH STRIKE ---------------------- #
tag_cols = ["Previous_StrategyTag","Current_StrategyTag","Next_StrategyTag"]
call_money_cols = ["Previous_CallMoneyFlow","Current_CallMoneyFlow","Next_CallMoneyFlow",
                   "Previous_TotalcallMoneyFlow","Current_TotalcallMoneyFlow","Next_TotalcallMoneyFlow"]
put_money_cols  = ["Previous_PutMoneyFlow","Current_PutMoneyFlow","Next_PutMoneyFlow",
                   "Previous_TotalputMoneyFlow","Current_TotalputMoneyFlow","Next_TotalputMoneyFlow"]

# initialize
strike_info = {int(s): {"tags":Counter(), "call_moneyflow":0.0, "put_moneyflow":0.0} for s in merged_df["strike"].astype(int)}

# iterate source df once and aggregate
for idx, row in df.iterrows():
    for sc in (PREV_STR, CURR_STR, NEXT_STR):
        if sc not in df.columns:
            continue
        scval = row.get(sc)
        if pd.isna(scval):
            continue
        try:
            s = int(scval)
        except Exception:
            continue
        if s not in strike_info:
            continue
        # tags
        for col in tag_cols:
            if col in df.columns:
                v = row.get(col)
                if isinstance(v, str) and v.strip():
                    tokens = [t.strip() for t in v.replace("|",";").split(";") if t.strip()]
                    for t in tokens:
                        strike_info[s]["tags"][t] += 1
        # call moneyflow
        for col in call_money_cols:
            if col in df.columns:
                try:
                    v = row.get(col)
                    if not pd.isna(v):
                        strike_info[s]["call_moneyflow"] += float(v)
                except Exception:
                    pass
        # put moneyflow
        for col in put_money_cols:
            if col in df.columns:
                try:
                    v = row.get(col)
                    if not pd.isna(v):
                        strike_info[s]["put_moneyflow"] += float(v)
                except Exception:
                    pass

def tags_to_text(s):
    t = strike_info.get(int(s), {}).get("tags", {})
    if not t:
        return ""
    return ";".join([f"{k}:{v}" for k,v in t.items()])

merged_df["tags"] = merged_df["strike"].apply(lambda s: tags_to_text(s))
merged_df["call_moneyflow"] = merged_df["strike"].apply(lambda s: strike_info.get(int(s), {}).get("call_moneyflow", 0.0))
merged_df["put_moneyflow"] = merged_df["strike"].apply(lambda s: strike_info.get(int(s), {}).get("put_moneyflow", 0.0))

# ---------------------- REASONS (derived) ---------------------- #
def build_reasons(row):
    reasons = []
    tags_text = str(row.get("tags",""))
    # tag based
    if any(k.lower() in tags_text.lower() for k in ["rsimacd","rsi","macd"]):
        reasons.append("RSI/MACD momentum")
    if "VWAP" in tags_text or "vwap" in tags_text:
        reasons.append("VWAP divergence")
    if "OI" in tags_text or "oi" in tags_text:
        reasons.append("OI support/resistance")
    # moneyflow
    if row.get("call_moneyflow",0) > 0:
        reasons.append("Call net buying")
    if row.get("put_moneyflow",0) > 0:
        reasons.append("Put net buying")
    # pct-based
    try:
        pct = float(row.get("pct_change", 0) or 0)
        if pct > 10:
            reasons.append("Strong premium move")
        elif pct > 3:
            reasons.append("Moderate premium move")
    except Exception:
        pass
    if not reasons:
        reasons = ["No strong signals"]
    # dedupe
    seen = set()
    out = []
    for r in reasons:
        if r not in seen:
            out.append(r)
            seen.add(r)
    return "; ".join(out)

merged_df["reasons"] = merged_df.apply(build_reasons, axis=1)

# ---------------------- RECOMMENDED ACTION (dual-side) ---------------------- #
def decide_action(row):
    call_pct = row.get("pct_change_call")
    put_pct = row.get("pct_change_put")
    call_pct = 0 if pd.isna(call_pct) else float(call_pct)
    put_pct = 0 if pd.isna(put_pct) else float(put_pct)
    tags = (row.get("tags") or "").lower()

    bull_boost = ("call buying" in tags) or ("oi_support_call" in tags) or ("bull" in tags)
    bear_boost = ("put buying" in tags) or ("call writing" in tags) or ("bear" in tags)

    # priority rules
    if (put_pct > 8) or (put_pct > 5 and bear_boost):
        return "BUY_PUT"
    if (call_pct > 8) or (call_pct > 5 and bull_boost):
        return "BUY_CALL"
    return "HOLD"

merged_df["recommended_action"] = merged_df.apply(decide_action, axis=1)

# ---------------------- HIGH-CONVICTION STATISTICS ---------------------- #
hc_col = "Current_IsHighConvictionSignal"

def compute_highconv_stats(strike, action):
    total = 0
    success = 0
    if hc_col not in df.columns:
        return 0, 0, None
    # select rows where current strike matches and hc flag true
    cond = (df.get("Current_Strikeprice") == strike) & (df.get(hc_col) == True)
    hc_rows = df[cond]
    total = int(hc_rows.shape[0])
    for idx, hr in hc_rows.iterrows():
        t0 = hr.get("LTT")
        if pd.isna(t0):
            continue
        # pick premium column based on action preference
        if action == "BUY_PUT" and curr_put_col in df.columns:
            base_col = curr_put_col
        elif action == "BUY_CALL" and curr_call_col in df.columns:
            base_col = curr_call_col
        else:
            base_col = curr_call_col if curr_call_col in df.columns else curr_put_col if curr_put_col in df.columns else None
        if base_col is None:
            continue
        p0 = hr.get(base_col)
        if pd.isna(p0):
            continue
        # window 3 minutes forward
        window = df[(df["LTT"] >= t0) & (df["LTT"] <= (t0 + timedelta(minutes=3)))]
        if window.empty:
            continue
        try:
            if window[base_col].max() > p0:
                success += 1
        except Exception:
            pass
    rate = (success/total) if total > 0 else None
    return total, success, rate

hc_totals = []
hc_successes = []
hc_rates = []
for idx, r in merged_df.iterrows():
    s = int(r["strike"])
    action = r.get("recommended_action","HOLD")
    tot, succ, rate = compute_highconv_stats(s, action)
    hc_totals.append(tot)
    hc_successes.append(succ)
    hc_rates.append(rate)

merged_df["highconv_total"] = hc_totals
merged_df["highconv_success"] = hc_successes
merged_df["highconv_hit_rate"] = hc_rates

# add Current_Strikeprice column (explicit)
merged_df["Current_Strikeprice"] = merged_df["strike"]

# ---------------------- REVERSAL DETECTION ---------------------- #
def detect_reversals_in_map(series_map, side_label):
    out = []
    for s, pairs in series_map.items():
        sr = sorted(pairs, key=lambda x: x[0])
        if len(sr) < 3:
            continue
        t_last = sr[-1][0]
        # window lookback
        window = [(t,p) for t,p in sr if t >= (t_last - timedelta(minutes=REVERSAL_WINDOW_MIN))]
        if not window:
            continue
        prices = [p for _,p in window]
        peak = max(prices)
        last_price = prices[-1]
        drop_pct = (peak - last_price) / peak * 100 if peak > 0 else 0
        if drop_pct >= REVERSAL_DROP_PCT:
            out.append({
                "strike": s,
                "side": side_label,
                "peak": peak,
                "last": last_price,
                "drop_pct": drop_pct
            })
    return out

rev_list = detect_reversals_in_map(call_series, "CALL") + detect_reversals_in_map(put_series, "PUT")
rev_df = pd.DataFrame(rev_list)
if not rev_df.empty:
    rev_df.to_csv(OUT_REVERSALS, index=False)
else:
    # create empty file
    pd.DataFrame(columns=["strike","side","peak","last","drop_pct"]).to_csv(OUT_REVERSALS, index=False)

# ---------------------- IV CRUSH DETECTION ---------------------- #
iv_crush_events = []
# iterate rows and check prev vs curr IV fields (existence optional)
for idx, row in df.iterrows():
    for prev_iv_col, curr_iv_col, side in [
        ("Previous_Call_IV","Current_Call_IV","CALL"),
        ("Previous_Put_IV","Current_Put_IV","PUT")
    ]:
        if prev_iv_col in df.columns and curr_iv_col in df.columns:
            prev_iv = row.get(prev_iv_col)
            curr_iv = row.get(curr_iv_col)
            if pd.notna(prev_iv) and pd.notna(curr_iv) and prev_iv > 0:
                drop_pct = (prev_iv - curr_iv) / prev_iv * 100
                if drop_pct >= IV_CRUSH_DROP:
                    iv_crush_events.append({
                        "LTT": row.get("LTT"),
                        "strike": row.get("Current_Strikeprice"),
                        "side": side,
                        "prev_iv": prev_iv,
                        "curr_iv": curr_iv,
                        "drop_pct": drop_pct
                    })

iv_crush_df = pd.DataFrame(iv_crush_events)
if not iv_crush_df.empty:
    iv_crush_df.to_csv(OUT_IV_CRUSH, index=False)
else:
    pd.DataFrame(columns=["LTT","strike","side","prev_iv","curr_iv","drop_pct"]).to_csv(OUT_IV_CRUSH, index=False)

# ---------------------- HEATMAP (CALL/PUT pct change) ---------------------- #
# Prepare simple heatmap matrix: rows = strikes, cols = [pct_change_call, pct_change_put]
heatmap_df = merged_df[["pct_change_call","pct_change_put"]].fillna(0)
if not heatmap_df.empty:
    plt.figure(figsize=(8, max(3, len(heatmap_df)/4)))
    plt.imshow(heatmap_df.values, aspect='auto', interpolation='nearest')
    plt.colorbar(label="Premium % Change")
    plt.title("CALL/PUT Premium % Change Heatmap (rows=strikes)")
    plt.ylabel("strike index (not price)")
    plt.xlabel("0=CE_pct_change, 1=PE_pct_change")
    plt.tight_layout()
    plt.savefig(OUT_HEATMAP, dpi=150)
    plt.close()
else:
    # produce an empty placeholder
    plt.figure(figsize=(6,2)); plt.text(0.5,0.5,"No data"); plt.axis('off'); plt.savefig(OUT_HEATMAP); plt.close()

# ---------------------- SPOT VS MOMENTUM PLOT ---------------------- #
last_spot = df["SpotPrice"].dropna().iloc[-1] if "SpotPrice" in df.columns and not df["SpotPrice"].dropna().empty else np.nan
avg_ce_pct = merged_df["pct_change_call"].replace([np.inf,-np.inf],np.nan).dropna().mean() if "pct_change_call" in merged_df.columns else np.nan
avg_pe_pct = merged_df["pct_change_put"].replace([np.inf,-np.inf],np.nan).dropna().mean() if "pct_change_put" in merged_df.columns else np.nan

vals = [last_spot if not pd.isna(last_spot) else 0, avg_ce_pct if not pd.isna(avg_ce_pct) else 0, avg_pe_pct if not pd.isna(avg_pe_pct) else 0]
labels = ["Spot Last","Avg CE %","Avg PE %"]
plt.figure(figsize=(7,4))
plt.title("Spot (last) vs Avg CE/PE %change")
plt.bar(labels, vals)
plt.tight_layout()
plt.savefig(OUT_PLOT, dpi=150)
plt.close()

# ---------------------- AUTO-TRADE EXPORT ---------------------- #
auto_signals = []
for idx, r in merged_df.iterrows():
    action = r.get("recommended_action","HOLD")
    if action in ("BUY_CALL","BUY_PUT"):
        strength = float(r.get("pct_change_call",0) or r.get("pct_change_put",0) or 0)
        auto_signals.append({
            "strike": int(r["strike"]),
            "action": action,
            "strength": strength,
            "reason": r.get("reasons",""),
            "tags": r.get("tags","")
        })
with open(OUT_AUTOTRADE, "w") as fh:
    json.dump(auto_signals, fh, indent=2, default=str)

# ---------------------- WRITE OUTPUTS: main merged + slices ---------------------- #
# Save merged enriched table with all requested columns
# Ensure column order to match user's expectation
cols_order = [
    "strike","Unnamed: 0","n_obs","n_obs_call","n_obs_put",
    "first_premium","last_premium","peak_premium","trough_premium",
    "abs_change","pct_change",
    "5min_low","5min_high","10min_low","10min_high",
    "p5_expected_lo","p5_expected_hi","p10_expected_lo","p10_expected_hi",
    "call_moneyflow","put_moneyflow",
    "tags","reasons","recommended_action",
    "highconv_total","highconv_success","highconv_hit_rate",
    "Current_Strikeprice",
    # add call/put detailed columns too
    "first_call_ltp","last_call_ltp","peak_call_ltp","trough_call_ltp","abs_change_call","pct_change_call",
    "first_put_ltp","last_put_ltp","peak_put_ltp","trough_put_ltp","abs_change_put","pct_change_put"
]

# keep only existing columns from the order (some may be missing)
cols_existing = [c for c in cols_order if c in merged_df.columns]
# append any other columns to preserve details
other_cols = [c for c in merged_df.columns if c not in cols_existing]
final_cols = cols_existing + other_cols

merged_df.to_csv(OUT_MERGED, index=False, columns=final_cols)

# Top BUY_PUTS (by pct_change_put)
top_puts = merged_df[merged_df["recommended_action"]=="BUY_PUT"].copy()
if not top_puts.empty and "pct_change_put" in top_puts.columns:
    top_puts = top_puts.sort_values("pct_change_put", ascending=False).head(200)
top_puts.to_csv(OUT_TOP_PUTS, index=False)

# Top BUY_CALLS
top_calls = merged_df[merged_df["recommended_action"]=="BUY_CALL"].copy()
if not top_calls.empty and "pct_change_call" in top_calls.columns:
    top_calls = top_calls.sort_values("pct_change_call", ascending=False).head(200)
top_calls.to_csv(OUT_TOP_CALLS, index=False)

# All actions sorted by strength (max of CE/PE pct)
def compute_strength(r):
    try:
        a = r.get("pct_change_call", 0) or 0
        b = r.get("pct_change_put", 0) or 0
        return max(a, b)
    except Exception:
        return 0

merged_df["strength"] = merged_df.apply(compute_strength, axis=1)
merged_df.sort_values("strength", ascending=False).to_csv(OUT_ALL_ACTIONS, index=False)

# CE-only / PE-only tables
if "pct_change_call" in merged_df.columns:
    merged_df[merged_df["pct_change_call"].notna()].to_csv(OUT_CE_ONLY, index=False)
else:
    pd.DataFrame().to_csv(OUT_CE_ONLY, index=False)
if "pct_change_put" in merged_df.columns:
    merged_df[merged_df["pct_change_put"].notna()].to_csv(OUT_PE_ONLY, index=False)
else:
    pd.DataFrame().to_csv(OUT_PE_ONLY, index=False)

# HOLD breakout candidates
hold_df = merged_df[merged_df["recommended_action"]=="HOLD"].copy()
# define breakout_score as sum of positive pct changes
hold_df["breakout_score"] = (hold_df.get("pct_change_call",0).clip(lower=0).fillna(0)
                             + hold_df.get("pct_change_put",0).clip(lower=0).fillna(0))
hold_df.sort_values("breakout_score", ascending=False).to_csv(OUT_HOLD_BREAKOUTS, index=False)

print("✔ Pipeline complete. Files generated:")
for f in [OUT_MERGED, OUT_TOP_PUTS, OUT_TOP_CALLS, OUT_ALL_ACTIONS,
          OUT_CE_ONLY, OUT_PE_ONLY, OUT_HOLD_BREAKOUTS,
          OUT_REVERSALS, OUT_IV_CRUSH, OUT_PLOT, OUT_HEATMAP, OUT_AUTOTRADE]:
    print("-", f)

# Show columns included in the merged output for immediate verification
print("\nMerged CSV columns (sample):")
print(list(merged_df.columns[:50]))


Using Call columns: Previous_Call_ltp Current_Call_ltp Next_Call_ltp
Using Put  columns: Previous_Put_ltp Current_Put_ltp Next_Put_ltp
Using Strike columns: Previous_Strikeprice Current_Strikeprice Next_Strikeprice
Detected call strikes: 8
Detected put strikes : 8
✔ Pipeline complete. Files generated:
- MERGED_CE_PE_FORECAST.csv
- TOP_BUY_PUTS.csv
- TOP_BUY_CALLS.csv
- ALL_ACTIONS.csv
- TOP_CE_ONLY.csv
- TOP_PE_ONLY.csv
- TOP_HOLD_BREAKOUTS.csv
- REVERSALS.csv
- IV_CRUSH.csv
- SPOT_VS_MOMENTUM.png
- PREMIUM_HEATMAP.png
- AUTO_TRADE_SIGNALS.json

Merged CSV columns (sample):
['strike', 'Unnamed: 0', 'n_obs', 'first_premium', 'last_premium', 'peak_premium', 'trough_premium', 'abs_change', 'pct_change', '5min_low', '5min_high', '10min_low', '10min_high', 'p5_expected_lo', 'p5_expected_hi', 'p10_expected_lo', 'p10_expected_hi', 'n_obs_call', 'n_obs_put', 'first_call_ltp', 'last_call_ltp', 'peak_call_ltp', 'trough_call_ltp', 'abs_change_call', 'pct_change_call', 'first_put_ltp', 'last_put_l

In [None]:
#!/usr/bin/env python3
"""
realtime_ce_pe_engine.py

Tail a live text file of JSON "snapshots" (one JSON per line or a JSON list),
parse incremental snapshots, maintain a sliding-window in-memory DataFrame,
compute per-strike CE/PE summaries & forecasts using LTP when available,
detect signals (BUY_CALL / BUY_PUT), reversals, IV crush, and export
AUTO_TRADE_SIGNALS.json continuously.

No CSVs are written (except optional historical dump). Uses minimal pandas,
keeps memory bounded by sliding window.

Configurable parameters at top.
"""

import os
import sys
import time
import json
import signal
from collections import defaultdict, Counter, deque
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any

import pandas as pd
import numpy as np

# ---------------------- CONFIG ---------------------- #
INPUT_FILE = "14112025_BANK_PNL.txt"      # path to live appended snapshots (text)
REFRESH_SECONDS = 1.0                     # poll interval
WINDOW_MINUTES = 15                       # sliding window size for analysis
START_FROM_END = True                     # if True, ignore historical lines and start tailing from EOF
WRITE_OUTPUT_JSON = True                  # write AUTO_TRADE_SIGNALS.json and LATEST_MERGED.json
OUT_AUTOTRADE = "AUTO_TRADE_SIGNALS.json"
OUT_LATEST = "LATEST_MERGED.json"
ALERT_LOG = None                          # path to an alert logfile or None to only print to stdout
IV_CRUSH_DROP = 15.0                      # percent IV drop threshold
REVERSAL_DROP_PCT = 12.0                  # reversal detection threshold (drop from peak)
REVERSAL_WINDOW_MIN = 5                   # look-back window for reversal check (minutes)
MAX_SNAPSHOTS_STORE = 10000               # maximum snapshots to keep in memory (safety)

# ---------------------- GLOBALS ---------------------- #
running = True

# ---------------------- HELPERS ---------------------- #
def graceful_exit(signum, frame):
    global running
    running = False
    print("\nReceived exit signal. Shutting down...")

signal.signal(signal.SIGINT, graceful_exit)
signal.signal(signal.SIGTERM, graceful_exit)

def parse_line_json(line: str) -> Optional[Dict[str, Any]]:
    """
    Parse a single line that is either:
      - a JSON object
      - or a wrapper object whose 'Current' key is a JSON string
    Returns flattened dict (top-level + flattened Current.Previous/Current/Next blocks) similar to prior code.
    """
    line = line.strip()
    if not line:
        return None
    try:
        obj = json.loads(line)
    except Exception:
        # Not a pure JSON object per line — attempt to find {...} inside line
        try:
            start = line.index('{')
            obj = json.loads(line[start:])
        except Exception:
            return None

    # Flatten top-level except nested 'Current' block handled below
    flat = {}
    for k, v in obj.items():
        if k != "Current":
            flat[k] = v

    if "Current" in obj:
        # Current might be a stringified JSON or a dict
        curr_raw = obj["Current"]
        if isinstance(curr_raw, str):
            try:
                curr = json.loads(curr_raw)
            except Exception:
                curr = {}
        elif isinstance(curr_raw, dict):
            curr = curr_raw
        else:
            curr = {}

        for section in ("Previous", "Current", "Next"):
            if section in curr and isinstance(curr[section], dict):
                for key, val in curr[section].items():
                    flat[f"{section}_{key}"] = val
            else:
                # leave placeholders? skip to keep lean
                pass

    return flat

# ---------------------- TAILING FILE ---------------------- #
def tail_file(path, start_from_end=True):
    """
    Generator yielding new lines appended to the file.
    If start_from_end True, begin reading from EOF.
    """
    if not os.path.exists(path):
        raise FileNotFoundError(f"Input file not found: {path}")
    f = open(path, "r", encoding="utf-8", errors="ignore")
    if start_from_end:
        f.seek(0, os.SEEK_END)
    else:
        f.seek(0)
    try:
        while running:
            where = f.tell()
            line = f.readline()
            if not line:
                time.sleep(REFRESH_SECONDS)
                f.seek(where)
            else:
                yield line
    finally:
        try:
            f.close()
        except Exception:
            pass

# ---------------------- IN-MEMORY STORE ---------------------- #
class SnapshotWindow:
    """
    Maintain a sliding window of parsed flattened snapshots (list of dicts).
    Provide helper to convert to DataFrame for analysis.
    """
    def __init__(self, window_minutes: int = WINDOW_MINUTES, max_snapshots: int = MAX_SNAPSHOTS_STORE):
        self.window_minutes = window_minutes
        self.max_snapshots = max_snapshots
        self.store = deque()  # each item: (timestamp (pd.Timestamp or datetime), flat_dict)

    def append(self, flat: Dict[str,Any]):
        # find timestamp
        t = flat.get("LTT") or flat.get("ltt") or flat.get("time") or None
        if isinstance(t, str):
            try:
                t = pd.to_datetime(t)
            except Exception:
                t = pd.Timestamp.now()
        elif t is None:
            t = pd.Timestamp.now()
        elif not isinstance(t, (pd.Timestamp, datetime)):
            try:
                t = pd.to_datetime(t)
            except Exception:
                t = pd.Timestamp.now()
        # push
        self.store.append((t, flat))
        # enforce max length
        while len(self.store) > self.max_snapshots:
            self.store.popleft()
        # prune by time window
        self.prune_old()

    def prune_old(self):
        if not self.store:
            return
        cutoff = pd.Timestamp.now() - pd.Timedelta(minutes=self.window_minutes)
        while self.store and self.store[0][0] < cutoff:
            self.store.popleft()

    def to_dataframe(self) -> pd.DataFrame:
        if not self.store:
            return pd.DataFrame()
        rows = []
        for t, flat in self.store:
            r = dict(flat)
            r["LTT"] = t
            rows.append(r)
        df = pd.DataFrame(rows)
        # coerce types lazily
        return df

# ---------------------- ANALYSIS / SIGNAL LOGIC ---------------------- #
def build_series_from_df(df: pd.DataFrame, prev_call_col, curr_call_col, next_call_col,
                         prev_put_col, curr_put_col, next_put_col,
                         prev_str="Previous_Strikeprice", curr_str="Current_Strikeprice", next_str="Next_Strikeprice"):
    """
    Returns two dicts: call_series[strike] = [(ts,p), ...], put_series likewise.
    Uses LTP columns if available (pass correct column names).
    """
    call_series = defaultdict(list)
    put_series = defaultdict(list)
    if df.empty:
        return call_series, put_series

    for idx, row in df.iterrows():
        t = row.get("LTT") or idx
        for sc, pc in ((prev_str, prev_call_col), (curr_str, curr_call_col), (next_str, next_call_col)):
            if sc in df.columns and pc and pc in df.columns:
                scv = row.get(sc)
                pcv = row.get(pc)
                if pd.isna(scv) or pd.isna(pcv):
                    continue
                try:
                    s = int(scv)
                    p = float(pcv)
                except Exception:
                    continue
                call_series[s].append((t, p))
        for sc, pp in ((prev_str, prev_put_col), (curr_str, curr_put_col), (next_str, next_put_col)):
            if sc in df.columns and pp and pp in df.columns:
                scv = row.get(sc)
                ppv = row.get(pp)
                if pd.isna(scv) or pd.isna(ppv):
                    continue
                try:
                    s = int(scv)
                    p = float(ppv)
                except Exception:
                    continue
                put_series[s].append((t, p))
    return call_series, put_series

def summarize_series(pairs):
    """
    pairs: [(ts,p),...]
    returns dict with first,last,peak,trough,abs_change,pct_change,n_obs,series_sorted
    """
    if not pairs:
        return None
    sr = sorted(pairs, key=lambda x: x[0])
    prices = [p for _,p in sr]
    first = prices[0]
    last = prices[-1]
    peak = max(prices)
    trough = min(prices)
    abs_chg = last - first
    pct_chg = (abs_chg / first * 100) if first != 0 else np.nan
    return {
        "first": first, "last": last, "peak": peak, "trough": trough,
        "abs_change": abs_chg, "pct_change": pct_chg, "n_obs": len(prices),
        "series_sorted": sr
    }

def forecast_from_pct(last, pct):
    if pd.isna(pct):
        pct = 0.0
    if pct >= 25:
        return (last+15, last+35), (last+25, last+60)
    if pct >= 8:
        return (last+6, last+18), (last+12, last+30)
    if pct > 0:
        return (last+2, last+8), (last+5, last+15)
    return (last-5, last+2), (last-8, last+5)

def aggregate_tags_and_mf(df: pd.DataFrame, strikes: List[int]):
    """
    Aggregate tags & moneyflows for each strike present in strikes.
    Returns dict strike -> {"tags":Counter, "call_mf":float, "put_mf":float}
    """
    out = {int(s): {"tags": Counter(), "call_mf":0.0, "put_mf":0.0} for s in strikes}
    tag_cols = ["Previous_StrategyTag","Current_StrategyTag","Next_StrategyTag"]
    call_mflow_cols = ["Previous_CallMoneyFlow","Current_CallMoneyFlow","Next_CallMoneyFlow",
                       "Previous_TotalcallMoneyFlow","Current_TotalcallMoneyFlow","Next_TotalcallMoneyFlow"]
    put_mflow_cols  = ["Previous_PutMoneyFlow","Current_PutMoneyFlow","Next_PutMoneyFlow",
                       "Previous_TotalputMoneyFlow","Current_TotalputMoneyFlow","Next_TotalputMoneyFlow"]

    for idx, row in df.iterrows():
        for sc in ("Previous_Strikeprice","Current_Strikeprice","Next_Strikeprice"):
            if sc not in df.columns:
                continue
            scv = row.get(sc)
            if pd.isna(scv):
                continue
            try:
                s = int(scv)
            except Exception:
                continue
            if s not in out:
                continue
            # tags
            for tc in tag_cols:
                if tc in df.columns:
                    v = row.get(tc)
                    if isinstance(v, str) and v.strip():
                        tokens = [t.strip() for t in v.replace("|",";").split(";") if t.strip()]
                        for t in tokens:
                            out[s]["tags"][t] += 1
            # moneyflow
            for cm in call_mflow_cols:
                if cm in df.columns:
                    val = row.get(cm)
                    if not pd.isna(val):
                        try:
                            out[s]["call_mf"] += float(val)
                        except:
                            pass
            for pm in put_mflow_cols:
                if pm in df.columns:
                    val = row.get(pm)
                    if not pd.isna(val):
                        try:
                            out[s]["put_mf"] += float(val)
                        except:
                            pass
    return out

def decide_action_from_row(row):
    """
    Decide BUY_CALL / BUY_PUT / HOLD given pct_change_call/put and tags string
    """
    call_pct = row.get("pct_change_call") if pd.notna(row.get("pct_change_call")) else 0
    put_pct  = row.get("pct_change_put")  if pd.notna(row.get("pct_change_put"))  else 0
    tags = (row.get("tags") or "").lower()
    bull_boost = ("call buying" in tags) or ("oi_support_call" in tags) or ("bull" in tags)
    bear_boost = ("put buying" in tags) or ("call writing" in tags) or ("bear" in tags)

    if (put_pct > 8) or (put_pct > 5 and bear_boost):
        return "BUY_PUT"
    if (call_pct > 8) or (call_pct > 5 and bull_boost):
        return "BUY_CALL"
    return "HOLD"

# ---------------------- MAIN ANALYSIS PER TICK ---------------------- #
def analyze_window(window: SnapshotWindow):
    """
    Convert stored snapshots to DataFrame, run pipeline, return merged table as list-of-dict and signals list.
    """
    df = window.to_dataframe()
    # pick LTP columns with fallback to Premium
    def pick(df, ltp, prem):
        return ltp if ltp in df.columns else (prem if prem in df.columns else None)

    prev_call_col = pick(df, "Previous_Call_ltp", "Previous_Call_Premium")
    curr_call_col = pick(df, "Current_Call_ltp", "Current_Call_Premium")
    next_call_col = pick(df, "Next_Call_ltp", "Next_Call_Premium")

    prev_put_col = pick(df, "Previous_Put_ltp", "Previous_Put_Premium")
    curr_put_col = pick(df, "Current_Put_ltp", "Current_Put_Premium")
    next_put_col = pick(df, "Next_Put_ltp", "Next_Put_Premium")

    # Build series
    call_series, put_series = build_series_from_df(df, prev_call_col, curr_call_col, next_call_col,
                                                   prev_put_col, curr_put_col, next_put_col)

    all_strikes = sorted(set(list(call_series.keys()) + list(put_series.keys())))
    # Summarize per strike
    merged_rows = []
    for s in all_strikes:
        cs = summarize_series(call_series.get(s, []))
        ps = summarize_series(put_series.get(s, []))
        # choose preferred side: more observations wins
        n_call = cs["n_obs"] if cs else 0
        n_put  = ps["n_obs"] if ps else 0
        if n_call >= n_put and cs:
            first_p = cs["first"]; last_p = cs["last"]; peak = cs["peak"]; trough = cs["trough"]
            abs_chg = cs["abs_change"]; pct = cs["pct_change"]
            f5, f10 = forecast_from_pct(last_p, pct)
            # attach call-specific columns
            row = {
                "strike": int(s),
                "n_obs": n_call + n_put,
                "n_obs_call": n_call, "n_obs_put": n_put,
                "first_premium": first_p, "last_premium": last_p, "peak_premium": peak, "trough_premium": trough,
                "abs_change": abs_chg, "pct_change": pct,
                "5min_low": f5[0], "5min_high": f5[1], "10min_low": f10[0], "10min_high": f10[1],
                "first_call_ltp": cs["first"], "last_call_ltp": cs["last"], "peak_call_ltp": cs["peak"], "trough_call_ltp": cs["trough"],
                "abs_change_call": cs["abs_change"], "pct_change_call": cs["pct_change"],
                "first_put_ltp": ps["first"] if ps else np.nan, "last_put_ltp": ps["last"] if ps else np.nan,
                "pct_change_put": ps["pct_change"] if ps else np.nan
            }
        elif ps:
            first_p = ps["first"]; last_p = ps["last"]; peak = ps["peak"]; trough = ps["trough"]
            abs_chg = ps["abs_change"]; pct = ps["pct_change"]
            f5, f10 = forecast_from_pct(last_p, pct)
            row = {
                "strike": int(s),
                "n_obs": n_call + n_put,
                "n_obs_call": n_call, "n_obs_put": n_put,
                "first_premium": first_p, "last_premium": last_p, "peak_premium": peak, "trough_premium": trough,
                "abs_change": abs_chg, "pct_change": pct,
                "5min_low": f5[0], "5min_high": f5[1], "10min_low": f10[0], "10min_high": f10[1],
                "first_put_ltp": ps["first"], "last_put_ltp": ps["last"], "peak_put_ltp": ps["peak"], "trough_put_ltp": ps["trough"],
                "abs_change_put": ps["abs_change"], "pct_change_put": ps["pct_change"],
                "first_call_ltp": cs["first"] if cs else np.nan, "last_call_ltp": cs["last"] if cs else np.nan,
                "pct_change_call": cs["pct_change"] if cs else np.nan
            }
        else:
            continue

        merged_rows.append(row)

    if not merged_rows:
        return [], []

    merged_df = pd.DataFrame(merged_rows)
    # aggregate tags & moneyflow
    strike_info = aggregate_tags_and_mf(df, merged_df["strike"].tolist())
    merged_df["tags"] = merged_df["strike"].apply(lambda s: ";".join([f"{k}:{v}" for k,v in strike_info[int(s)]["tags"].items()]) if strike_info[int(s)]["tags"] else "")
    merged_df["call_moneyflow"] = merged_df["strike"].apply(lambda s: strike_info[int(s)]["call_mf"])
    merged_df["put_moneyflow"]  = merged_df["strike"].apply(lambda s: strike_info[int(s)]["put_mf"])

    # build reasons
    def build_reasons(row):
        reasons = []
        tags_text = str(row.get("tags",""))
        if any(k in tags_text.lower() for k in ["rsi","macd"]):
            reasons.append("RSI/MACD momentum")
        if "vwap" in tags_text.lower():
            reasons.append("VWAP divergence")
        if "oi" in tags_text.lower():
            reasons.append("OI support/resistance")
        if row.get("call_moneyflow",0) > 0:
            reasons.append("Call net buying")
        if row.get("put_moneyflow",0) > 0:
            reasons.append("Put net buying")
        pct = row.get("pct_change", 0) or 0
        try:
            if pct > 10:
                reasons.append("Strong premium move")
            elif pct > 3:
                reasons.append("Moderate premium move")
        except:
            pass
        return "; ".join(dict.fromkeys(reasons)) if reasons else "No strong signals"

    merged_df["reasons"] = merged_df.apply(build_reasons, axis=1)

    # recommended action
    merged_df["recommended_action"] = merged_df.apply(decide_action_from_row, axis=1)

    # high-conviction stats (quick approximate: current rows marked in df)
    hc_col = "Current_IsHighConvictionSignal"
    totals, successes, rates = [], [], []
    for idx, r in merged_df.iterrows():
        s = int(r["strike"])
        total = 0; success = 0; rate = None
        if hc_col in df.columns:
            cond = (df.get("Current_Strikeprice")==s) & (df.get(hc_col)==True)
            hc_rows = df[cond]
            total = int(hc_rows.shape[0])
            for _,hr in hc_rows.iterrows():
                t0 = hr.get("LTT")
                if pd.isna(t0):
                    continue
                # choose base premium column according to recommended_action
                action = r["recommended_action"]
                base_col = curr_call_col if (action=="BUY_CALL" and curr_call_col in df.columns) else (curr_put_col if (action=="BUY_PUT" and curr_put_col in df.columns) else (curr_call_col if curr_call_col in df.columns else None))
                if base_col is None:
                    continue
                p0 = hr.get(base_col)
                if pd.isna(p0):
                    continue
                window = df[(df["LTT"] >= t0) & (df["LTT"] <= (t0 + pd.Timedelta(minutes=3)))]
                if window.empty:
                    continue
                try:
                    if window[base_col].max() > p0:
                        success += 1
                except:
                    pass
        rate = (success/total) if total>0 else None
        totals.append(total); successes.append(success); rates.append(rate)
    merged_df["highconv_total"] = totals
    merged_df["highconv_success"] = successes
    merged_df["highconv_hit_rate"] = rates

    # Current_Strikeprice column
    merged_df["Current_Strikeprice"] = merged_df["strike"]

    # signal export
    signals = []
    for _, r in merged_df.iterrows():
        if r.get("recommended_action") in ("BUY_CALL","BUY_PUT"):
            strength = float(r.get("pct_change_call",0) or r.get("pct_change_put",0) or 0)
            signals.append({
                "strike": int(r["strike"]),
                "action": r.get("recommended_action"),
                "strength": strength,
                "reason": r.get("reasons",""),
                "tags": r.get("tags",""),
                "last_premium": float(r.get("last_premium") or 0),
                "time": str(pd.Timestamp.now())
            })

    # detect reversals (peak->drop in last REVERSAL_WINDOW_MIN)
    reversals = []
    for s in all_strikes:
        # choose series from call or put whichever present
        series = call_series.get(s) if call_series.get(s) else put_series.get(s)
        if not series:
            continue
        sr = sorted(series, key=lambda x:x[0])
        t_last = sr[-1][0]
        cutoff = t_last - pd.Timedelta(minutes=REVERSAL_WINDOW_MIN)
        window = [p for t,p in sr if t >= cutoff]
        if not window:
            continue
        peak = max(window); lastp = window[-1]
        drop = (peak - lastp)/peak*100 if peak>0 else 0
        if drop >= REVERSAL_DROP_PCT:
            reversals.append({"strike":s,"peak":peak,"last":lastp,"drop_pct":drop})

    # detect IV crush events from df rows
    iv_crush = []
    for idx, row in df.iterrows():
        for prev_iv, curr_iv, side in [("Previous_Call_IV","Current_Call_IV","CALL"), ("Previous_Put_IV","Current_Put_IV","PUT")]:
            if prev_iv in row and curr_iv in row and pd.notna(row[prev_iv]) and pd.notna(row[curr_iv]) and row[prev_iv] > 0:
                drop = (row[prev_iv] - row[curr_iv]) / row[prev_iv] * 100
                if drop >= IV_CRUSH_DROP:
                    iv_crush.append({"time": str(row.get("LTT")), "strike": row.get("Current_Strikeprice"), "side": side, "drop_pct": drop})

    # return merged rows (as dict list) and signals
    return merged_df.to_dict(orient="records"), signals, reversals, iv_crush

# ---------------------- MAIN LOOP ---------------------- #
def run_realtime(input_path: str):
    window = SnapshotWindow(window_minutes=WINDOW_MINUTES)
    tailer = tail_file(input_path, start_from_end=START_FROM_END)

    last_write = 0.0
    print(f"Realtime engine started. Watching: {input_path}")
    printed_header = False

    for raw_line in tailer:
        if not running:
            break
        flat = parse_line_json(raw_line)
        if flat is None:
            continue
        window.append(flat)

        # analyze once per refresh interval (or immediate)
        now = time.time()
        # simple throttle: analyze every REFRESH_SECONDS seconds
        if now - last_write < max(0.01, REFRESH_SECONDS * 0.9):
            continue
        last_write = now

        merged_rows, signals, reversals, iv_crush = analyze_window(window)

        # EXPORT signals and latest merged snapshot
        if WRITE_OUTPUT_JSON:
            try:
                with open(OUT_AUTOTRADE, "w") as fh:
                    json.dump(signals, fh, indent=2, default=str)
                with open(OUT_LATEST, "w") as fh:
                    json.dump({"timestamp": str(pd.Timestamp.now()), "merged": merged_rows, "reversals": reversals, "iv_crush": iv_crush}, fh, indent=2, default=str)
            except Exception as e:
                print("Warning: failed write output json:", e)

        # Print alerts to stdout and optional log file
        if signals:
            for s in signals:
                msg = f"[SIGNAL] {s['time']} Strike {s['strike']} => {s['action']} (str={s['strength']:.2f}) reason={s['reason']}"
                print(msg)
                if ALERT_LOG:
                    with open(ALERT_LOG, "a") as L:
                        L.write(msg + "\n")
        if reversals:
            for r in reversals:
                msg = f"[REVERSAL] Strike {r['strike']} drop {r['drop_pct']:.1f}% (peak {r['peak']} -> last {r['last']})"
                print(msg)
                if ALERT_LOG:
                    with open(ALERT_LOG, "a") as L:
                        L.write(msg + "\n")
        if iv_crush:
            for e in iv_crush:
                msg = f"[IV_CRUSH] {e['time']} Strike {e['strike']} {e['side']} drop {e['drop_pct']:.1f}%"
                print(msg)
                if ALERT_LOG:
                    with open(ALERT_LOG, "a") as L:
                        L.write(msg + "\n")

    print("Realtime engine stopped.")

# ---------------------- RUN ---------------------- #
if __name__ == "__main__":
    if not os.path.exists(INPUT_FILE):
        print("ERROR: input file not found:", INPUT_FILE)
        sys.exit(1)
    run_realtime(INPUT_FILE)


Realtime engine started. Watching: 14112025_BANK_PNL.txt

Received exit signal. Shutting down...
Realtime engine stopped.


In [None]:
#!/usr/bin/env python3
"""
realtime_ce_pe_engine_improved.py

Improved realtime tailing and CE/PE analysis engine based on the user's original script.
Key improvements:
 - Robust JSON parsing (handles object-per-line, JSON lists, and stringified 'Current').
 - Safer tail implementation (handles rotation/truncation) and immediate processing of appended data.
 - Configurable via top-of-file constants; small CLI wrapper available.
 - Reduced pandas overhead by building DataFrame carefully and avoiding excessive apply() usage.
 - Better type coercion and error handling so single malformed snapshot won't stop the engine.
 - Optional historical dump and optional dry-run (no file writes) flags.
 - Clearer logging, and optional alert logfile support.

Behavior: tail input file, keep a sliding time window of snapshots, compute per-strike summaries,
produce AUTO_TRADE_SIGNALS.json and LATEST_MERGED.json (unless WRITE_OUTPUT_JSON=False),
print alerts for signals/reversals/iv-crush to stdout and ALERT_LOG if enabled.

Note: this file is self-contained and intended to run on a machine that has pandas & numpy installed.
"""

import os
import sys
import time
import json
import signal
import argparse
from collections import defaultdict, Counter, deque
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any

import pandas as pd
import numpy as np

# ---------------------- CONFIG ---------------------- #
INPUT_FILE = "14112025_BANK_PNL.txt"      # default path to live appended snapshots (text)
REFRESH_SECONDS = 1.0                     # poll interval (seconds)
WINDOW_MINUTES = 15                       # sliding window size for analysis
START_FROM_END = False  # process existing data first, then tail new changes                     # if True, ignore historical lines and start tailing from EOF
WRITE_OUTPUT_JSON = True                  # write AUTO_TRADE_SIGNALS.json and LATEST_MERGED.json
OUT_AUTOTRADE = "AUTO_TRADE_SIGNALS.json"
OUT_LATEST = "LATEST_MERGED.json"
ALERT_LOG = None                          # path to an alert logfile or None to only print to stdout
IV_CRUSH_DROP = 15.0                      # percent IV drop threshold
REVERSAL_DROP_PCT = 12.0                  # reversal detection threshold (drop from peak)
REVERSAL_WINDOW_MIN = 5                   # look-back window for reversal check (minutes)
MAX_SNAPSHOTS_STORE = 10000               # maximum snapshots to keep in memory (safety)
HISTORICAL_DUMP = None                    # optional: path to write a periodic historical dump (or None)
DRY_RUN = False                           # if True, won't write output files (useful for testing)

running = True

# ---------------------- SIGNAL HANDLING ---------------------- #
def graceful_exit(signum, frame):
    global running
    running = False
    print("\nReceived exit signal. Shutting down...")

signal.signal(signal.SIGINT, graceful_exit)
signal.signal(signal.SIGTERM, graceful_exit)

# ---------------------- JSON PARSING ---------------------- #

def parse_line_json(line: str) -> List[Dict[str, Any]]:
    """
    Parse a line that may contain:
      - a single JSON object
      - a JSON-encoded list of objects
      - a text line that contains a JSON object somewhere inside
    Returns a list of flattened dicts (possibly empty).
    """
    out = []
    if not line:
        return out
    s = line.strip()
    if not s:
        return out

    # Try direct load (object or list)
    try:
        parsed = json.loads(s)
    except Exception:
        # try to find first '{' and parse from there
        try:
            i = s.index('{')
            parsed = json.loads(s[i:])
        except Exception:
            return out

    # normalize to list
    items = parsed if isinstance(parsed, list) else [parsed]

    for obj in items:
        flat = {}
        if not isinstance(obj, dict):
            continue
        # flatten top-level except 'Current' (handled below)
        for k, v in obj.items():
            if k != "Current":
                flat[k] = v
        # handle Current that may be a string or dict and may contain Previous/Current/Next blocks
        curr_raw = obj.get("Current")
        if isinstance(curr_raw, str):
            try:
                curr = json.loads(curr_raw)
            except Exception:
                curr = None
        elif isinstance(curr_raw, dict):
            curr = curr_raw
        else:
            curr = None

        if isinstance(curr, dict):
            for section in ("Previous", "Current", "Next"):
                block = curr.get(section)
                if isinstance(block, dict):
                    for key, val in block.items():
                        flat[f"{section}_{key}"] = val
        out.append(flat)
    return out

# ---------------------- TAIL SUPPORT ---------------------- #

def tail_file(path: str, start_from_end: bool = True):(path: str, start_from_end: bool = True):
    """
    Generator yielding new lines appended to the file.
    Handles file truncation/rotation by checking file size/inode changes.
    """
    if not os.path.exists(path):
        raise FileNotFoundError(f"Input file not found: {path}")

    with open(path, 'r', encoding='utf-8', errors='ignore') as fh:
        if start_from_end:
            fh.seek(0, os.SEEK_END)
        else:
            fh.seek(0)
        last_inode = os.fstat(fh.fileno()).st_ino if hasattr(os, 'fstat') else None
        while running:
            where = fh.tell()
            line = fh.readline()
            if not line:
                # detect truncation/rotation
                try:
                    st = os.stat(path)
                    if last_inode is not None and getattr(st, 'st_ino', None) != last_inode:
                        # file rotated - reopen
                        fh = open(path, 'r', encoding='utf-8', errors='ignore')
                        last_inode = os.fstat(fh.fileno()).st_ino if hasattr(os, 'fstat') else None
                        continue
                    if st.st_size < where:
                        # truncated
                        fh.seek(0)
                except FileNotFoundError:
                    # file removed — wait for it to reappear
                    time.sleep(max(0.1, REFRESH_SECONDS))
                    continue
                time.sleep(max(0.01, REFRESH_SECONDS))
                fh.seek(where)
                continue
            yield line

# ---------------------- IN-MEMORY STORE ---------------------- #
class SnapshotWindow:
    def __init__(self, window_minutes: int = WINDOW_MINUTES, max_snapshots: int = MAX_SNAPSHOTS_STORE):
        self.window_minutes = int(window_minutes)
        self.max_snapshots = int(max_snapshots)
        self.store = deque()  # (timestamp (pd.Timestamp), flat_dict)

    def append(self, flat: Dict[str, Any]):
        # determine timestamp robustly
        t = flat.get('LTT') or flat.get('ltt') or flat.get('time') or flat.get('ts') or None
        if isinstance(t, str):
            try:
                t = pd.to_datetime(t)
            except Exception:
                t = pd.Timestamp.now()
        elif isinstance(t, (pd.Timestamp, datetime)):
            t = pd.Timestamp(t)
        elif t is None:
            t = pd.Timestamp.now()
        else:
            try:
                t = pd.to_datetime(t)
            except Exception:
                t = pd.Timestamp.now()

        self.store.append((t, flat))
        # enforce max length
        while len(self.store) > self.max_snapshots:
            self.store.popleft()
        # prune by time window
        self.prune_old()

    def prune_old(self):
        if not self.store:
            return
        cutoff = pd.Timestamp.now() - pd.Timedelta(minutes=self.window_minutes)
        while self.store and self.store[0][0] < cutoff:
            self.store.popleft()

    def to_dataframe(self) -> pd.DataFrame:
        if not self.store:
            return pd.DataFrame()
        rows = []
        for t, flat in self.store:
            r = dict(flat)
            r['LTT'] = t
            rows.append(r)
        # Create dataframe once
        df = pd.DataFrame(rows)
        return df

# ---------------------- ANALYSIS HELPERS ---------------------- #

def pick_column(df: pd.DataFrame, ltp: str, prem: str) -> Optional[str]:
    return ltp if ltp in df.columns else (prem if prem in df.columns else None)


def build_series_from_df(df: pd.DataFrame, prev_call_col, curr_call_col, next_call_col,
                         prev_put_col, curr_put_col, next_put_col,
                         prev_str="Previous_Strikeprice", curr_str="Current_Strikeprice", next_str="Next_Strikeprice"):
    call_series = defaultdict(list)
    put_series = defaultdict(list)
    if df.empty:
        return call_series, put_series

    # iterate rows once
    for _, row in df.iterrows():
        t = row.get('LTT')
        for sc, pc in ((prev_str, prev_call_col), (curr_str, curr_call_col), (next_str, next_call_col)):
            if sc in row and pc and pc in row:
                scv = row.get(sc)
                pcv = row.get(pc)
                if pd.isna(scv) or pd.isna(pcv):
                    continue
                try:
                    s = int(scv)
                    p = float(pcv)
                except Exception:
                    continue
                call_series[s].append((t, p))
        for sc, pp in ((prev_str, prev_put_col), (curr_str, curr_put_col), (next_str, next_put_col)):
            if sc in row and pp and pp in row:
                scv = row.get(sc)
                ppv = row.get(pp)
                if pd.isna(scv) or pd.isna(ppv):
                    continue
                try:
                    s = int(scv)
                    p = float(ppv)
                except Exception:
                    continue
                put_series[s].append((t, p))
    return call_series, put_series


def summarize_series(pairs):
    if not pairs:
        return None
    sr = sorted(pairs, key=lambda x: x[0])
    prices = [p for _, p in sr]
    first = prices[0]
    last = prices[-1]
    peak = max(prices)
    trough = min(prices)
    abs_chg = last - first
    pct_chg = (abs_chg / first * 100) if first != 0 else np.nan
    return {
        'first': first, 'last': last, 'peak': peak, 'trough': trough,
        'abs_change': abs_chg, 'pct_change': pct_chg, 'n_obs': len(prices),
        'series_sorted': sr
    }


def forecast_from_pct(last, pct):
    if pd.isna(pct):
        pct = 0.0
    if pct >= 25:
        return (last+15, last+35), (last+25, last+60)
    if pct >= 8:
        return (last+6, last+18), (last+12, last+30)
    if pct > 0:
        return (last+2, last+8), (last+5, last+15)
    return (last-5, last+2), (last-8, last+5)


def aggregate_tags_and_mf(df: pd.DataFrame, strikes: List[int]):
    out = {int(s): {'tags': Counter(), 'call_mf': 0.0, 'put_mf': 0.0} for s in strikes}
    tag_cols = ["Previous_StrategyTag", "Current_StrategyTag", "Next_StrategyTag"]
    call_mflow_cols = ["Previous_CallMoneyFlow", "Current_CallMoneyFlow", "Next_CallMoneyFlow",
                       "Previous_TotalcallMoneyFlow", "Current_TotalcallMoneyFlow", "Next_TotalcallMoneyFlow"]
    put_mflow_cols  = ["Previous_PutMoneyFlow", "Current_PutMoneyFlow", "Next_PutMoneyFlow",
                       "Previous_TotalputMoneyFlow", "Current_TotalputMoneyFlow", "Next_TotalputMoneyFlow"]

    if df.empty:
        return out

    for _, row in df.iterrows():
        for sc in ("Previous_Strikeprice", "Current_Strikeprice", "Next_Strikeprice"):
            if sc not in row:
                continue
            scv = row.get(sc)
            if pd.isna(scv):
                continue
            try:
                s = int(scv)
            except Exception:
                continue
            if s not in out:
                continue
            for tc in tag_cols:
                if tc in row:
                    v = row.get(tc)
                    if isinstance(v, str) and v.strip():
                        tokens = [t.strip() for t in v.replace('|', ';').split(';') if t.strip()]
                        for tkn in tokens:
                            out[s]['tags'][tkn] += 1
            for cm in call_mflow_cols:
                if cm in row:
                    val = row.get(cm)
                    if not pd.isna(val):
                        try:
                            out[s]['call_mf'] += float(val)
                        except Exception:
                            pass
            for pm in put_mflow_cols:
                if pm in row:
                    val = row.get(pm)
                    if not pd.isna(val):
                        try:
                            out[s]['put_mf'] += float(val)
                        except Exception:
                            pass
    return out


def decide_action_from_row(row):
    call_pct = row.get('pct_change_call') if pd.notna(row.get('pct_change_call')) else 0
    put_pct  = row.get('pct_change_put')  if pd.notna(row.get('pct_change_put'))  else 0
    tags = (row.get('tags') or '').lower()
    bull_boost = ('call buying' in tags) or ('oi_support_call' in tags) or ('bull' in tags)
    bear_boost = ('put buying' in tags) or ('call writing' in tags) or ('bear' in tags)

    if (put_pct > 8) or (put_pct > 5 and bear_boost):
        return 'BUY_PUT'
    if (call_pct > 8) or (call_pct > 5 and bull_boost):
        return 'BUY_CALL'
    return 'HOLD'

# ---------------------- CORE ANALYSIS ---------------------- #

def analyze_window(window: SnapshotWindow):
    df = window.to_dataframe()
    # pick columns with fallback
    prev_call_col = pick_column(df, 'Previous_Call_ltp', 'Previous_Call_Premium')
    curr_call_col = pick_column(df, 'Current_Call_ltp', 'Current_Call_Premium')
    next_call_col = pick_column(df, 'Next_Call_ltp', 'Next_Call_Premium')

    prev_put_col = pick_column(df, 'Previous_Put_ltp', 'Previous_Put_Premium')
    curr_put_col = pick_column(df, 'Current_Put_ltp', 'Current_Put_Premium')
    next_put_col = pick_column(df, 'Next_Put_ltp', 'Next_Put_Premium')

    call_series, put_series = build_series_from_df(df, prev_call_col, curr_call_col, next_call_col,
                                                   prev_put_col, curr_put_col, next_put_col)

    all_strikes = sorted(set(list(call_series.keys()) + list(put_series.keys())))

    merged_rows = []
    for s in all_strikes:
        cs = summarize_series(call_series.get(s, []))
        ps = summarize_series(put_series.get(s, []))
        n_call = cs['n_obs'] if cs else 0
        n_put  = ps['n_obs'] if ps else 0
        # prefer side with more observations
        if n_call >= n_put and cs:
            first_p = cs['first']; last_p = cs['last']; peak = cs['peak']; trough = cs['trough']
            abs_chg = cs['abs_change']; pct = cs['pct_change']
            f5, f10 = forecast_from_pct(last_p, pct)
            row = {
                'strike': int(s),
                'n_obs': n_call + n_put,
                'n_obs_call': n_call, 'n_obs_put': n_put,
                'first_premium': first_p, 'last_premium': last_p, 'peak_premium': peak, 'trough_premium': trough,
                'abs_change': abs_chg, 'pct_change': pct,
                '5min_low': f5[0], '5min_high': f5[1], '10min_low': f10[0], '10min_high': f10[1],
                'first_call_ltp': cs['first'], 'last_call_ltp': cs['last'], 'peak_call_ltp': cs['peak'], 'trough_call_ltp': cs['trough'],
                'abs_change_call': cs['abs_change'], 'pct_change_call': cs['pct_change'],
                'first_put_ltp': ps['first'] if ps else np.nan, 'last_put_ltp': ps['last'] if ps else np.nan,
                'pct_change_put': ps['pct_change'] if ps else np.nan
            }
        elif ps:
            first_p = ps['first']; last_p = ps['last']; peak = ps['peak']; trough = ps['trough']
            abs_chg = ps['abs_change']; pct = ps['pct_change']
            f5, f10 = forecast_from_pct(last_p, pct)
            row = {
                'strike': int(s),
                'n_obs': n_call + n_put,
                'n_obs_call': n_call, 'n_obs_put': n_put,
                'first_premium': first_p, 'last_premium': last_p, 'peak_premium': peak, 'trough_premium': trough,
                'abs_change': abs_chg, 'pct_change': pct,
                '5min_low': f5[0], '5min_high': f5[1], '10min_low': f10[0], '10min_high': f10[1],
                'first_put_ltp': ps['first'], 'last_put_ltp': ps['last'], 'peak_put_ltp': ps['peak'], 'trough_put_ltp': ps['trough'],
                'abs_change_put': ps['abs_change'], 'pct_change_put': ps['pct_change'],
                'first_call_ltp': cs['first'] if cs else np.nan, 'last_call_ltp': cs['last'] if cs else np.nan,
                'pct_change_call': cs['pct_change'] if cs else np.nan
            }
        else:
            continue
        merged_rows.append(row)

    if not merged_rows:
        return [], [], [], []

    merged_df = pd.DataFrame(merged_rows)
    strike_info = aggregate_tags_and_mf(df, merged_df['strike'].tolist())
    # attach tags and moneyflow
    merged_df['tags'] = merged_df['strike'].apply(lambda s: ';'.join([f"{k}:{v}" for k, v in strike_info[int(s)]['tags'].items()]) if strike_info[int(s)]['tags'] else '')
    merged_df['call_moneyflow'] = merged_df['strike'].apply(lambda s: strike_info[int(s)]['call_mf'])
    merged_df['put_moneyflow']  = merged_df['strike'].apply(lambda s: strike_info[int(s)]['put_mf'])

    # build reasons
    def build_reasons(row):
        reasons = []
        tags_text = str(row.get('tags', ''))
        if any(k in tags_text.lower() for k in ['rsi', 'macd']):
            reasons.append('RSI/MACD momentum')
        if 'vwap' in tags_text.lower():
            reasons.append('VWAP divergence')
        if 'oi' in tags_text.lower():
            reasons.append('OI support/resistance')
        if row.get('call_moneyflow', 0) > 0:
            reasons.append('Call net buying')
        if row.get('put_moneyflow', 0) > 0:
            reasons.append('Put net buying')
        pct = row.get('pct_change', 0) or 0
        try:
            if pct > 10:
                reasons.append('Strong premium move')
            elif pct > 3:
                reasons.append('Moderate premium move')
        except Exception:
            pass
        return '; '.join(dict.fromkeys(reasons)) if reasons else 'No strong signals'

    merged_df['reasons'] = merged_df.apply(build_reasons, axis=1)
    merged_df['recommended_action'] = merged_df.apply(decide_action_from_row, axis=1)

    # quick high-conviction stats (approximation)
    hc_col = 'Current_IsHighConvictionSignal'
    totals, successes, rates = [], [], []
    for _, r in merged_df.iterrows():
        s = int(r['strike'])
        total = success = 0
        if hc_col in df.columns:
            cond = (df.get('Current_Strikeprice') == s) & (df.get(hc_col) == True)
            hc_rows = df[cond]
            total = int(hc_rows.shape[0])
            for _, hr in hc_rows.iterrows():
                t0 = hr.get('LTT')
                if pd.isna(t0):
                    continue
                action = r['recommended_action']
                base_col = curr_call_col if (action == 'BUY_CALL' and curr_call_col in df.columns) else (curr_put_col if (action == 'BUY_PUT' and curr_put_col in df.columns) else (curr_call_col if curr_call_col in df.columns else None))
                if base_col is None:
                    continue
                p0 = hr.get(base_col)
                if pd.isna(p0):
                    continue
                window = df[(df['LTT'] >= t0) & (df['LTT'] <= (t0 + pd.Timedelta(minutes=3)))]
                if window.empty:
                    continue
                try:
                    if window[base_col].max() > p0:
                        success += 1
                except Exception:
                    pass
        rate = (success / total) if total > 0 else None
        totals.append(total); successes.append(success); rates.append(rate)
    merged_df['highconv_total'] = totals
    merged_df['highconv_success'] = successes
    merged_df['highconv_hit_rate'] = rates

    merged_df['Current_Strikeprice'] = merged_df['strike']

    # assemble signals
    signals = []
    for _, r in merged_df.iterrows():
        if r.get('recommended_action') in ('BUY_CALL', 'BUY_PUT'):
            strength = float(r.get('pct_change_call', 0) or r.get('pct_change_put', 0) or 0)
            signals.append({
                'strike': int(r['strike']),
                'action': r.get('recommended_action'),
                'strength': strength,
                'reason': r.get('reasons', ''),
                'tags': r.get('tags', ''),
                'last_premium': float(r.get('last_premium') or 0),
                'time': str(pd.Timestamp.now())
            })

    # detect reversals
    reversals = []
    for s in all_strikes:
        series = call_series.get(s) if call_series.get(s) else put_series.get(s)
        if not series:
            continue
        sr = sorted(series, key=lambda x: x[0])
        t_last = sr[-1][0]
        cutoff = t_last - pd.Timedelta(minutes=REVERSAL_WINDOW_MIN)
        window = [p for t, p in sr if t >= cutoff]
        if not window:
            continue
        peak = max(window); lastp = window[-1]
        drop = (peak - lastp) / peak * 100 if peak > 0 else 0
        if drop >= REVERSAL_DROP_PCT:
            reversals.append({'strike': s, 'peak': peak, 'last': lastp, 'drop_pct': drop})

    # detect IV crush
    iv_crush = []
    if not df.empty:
        for _, row in df.iterrows():
            for prev_iv, curr_iv, side in [("Previous_Call_IV", "Current_Call_IV", "CALL"), ("Previous_Put_IV", "Current_Put_IV", "PUT")]:
                if prev_iv in row and curr_iv in row and pd.notna(row[prev_iv]) and pd.notna(row[curr_iv]) and row[prev_iv] > 0:
                    drop = (row[prev_iv] - row[curr_iv]) / row[prev_iv] * 100
                    if drop >= IV_CRUSH_DROP:
                        iv_crush.append({'time': str(row.get('LTT')), 'strike': row.get('Current_Strikeprice'), 'side': side, 'drop_pct': drop})

    return merged_df.to_dict(orient='records'), signals, reversals, iv_crush

# ---------------------- MAIN LOOP ---------------------- #

def run_realtime(input_path: str, start_from_end: bool = START_FROM_END):
    window = SnapshotWindow(window_minutes=WINDOW_MINUTES)
    tailer = tail_file(input_path, start_from_end)

    last_analyze = 0.0
    print(f"Realtime engine started. Watching: {input_path}")

    for raw_line in tailer:
        if not running:
            break
        try:
            parsed_list = parse_line_json(raw_line)
        except Exception as e:
            print('Parse error:', e)
            continue
        if not parsed_list:
            continue
        for flat in parsed_list:
            try:
                window.append(flat)
            except Exception as e:
                print('Append error (ignored):', e)
                continue

        now = time.time()
        if now - last_analyze < max(0.01, REFRESH_SECONDS * 0.6):
            continue
        last_analyze = now

        try:
            merged_rows, signals, reversals, iv_crush = analyze_window(window)
        except Exception as e:
            print('Analysis error (skipping this cycle):', e)
            continue

        # write outputs
        if WRITE_OUTPUT_JSON and not DRY_RUN:
            try:
                with open(OUT_AUTOTRADE, 'w') as fh:
                    json.dump(signals, fh, indent=2, default=str)
                with open(OUT_LATEST, 'w') as fh:
                    json.dump({'timestamp': str(pd.Timestamp.now()), 'merged': merged_rows, 'reversals': reversals, 'iv_crush': iv_crush}, fh, indent=2, default=str)
            except Exception as e:
                print('Warning: failed to write outputs:', e)

        # optional historical dump
        if HISTORICAL_DUMP:
            try:
                with open(HISTORICAL_DUMP, 'a') as fh:
                    fh.write(json.dumps({'ts': str(pd.Timestamp.now()), 'merged': merged_rows}) + '\n')
            except Exception:
                pass

        # print alerts
        if signals:
            for s in signals:
                msg = f"[SIGNAL] {s['time']} Strike {s['strike']} => {s['action']} (str={s['strength']:.2f}) reason={s['reason']}"
                print(msg)
                if ALERT_LOG:
                    with open(ALERT_LOG, 'a') as L:
                        L.write(msg + '\n')
        if reversals:
            for r in reversals:
                msg = f"[REVERSAL] Strike {r['strike']} drop {r['drop_pct']:.1f}% (peak {r['peak']} -> last {r['last']})"
                print(msg)
                if ALERT_LOG:
                    with open(ALERT_LOG, 'a') as L:
                        L.write(msg + '\n')
        if iv_crush:
            for e in iv_crush:
                msg = f"[IV_CRUSH] {e['time']} Strike {e['strike']} {e['side']} drop {e['drop_pct']:.1f}%"
                print(msg)
                if ALERT_LOG:
                    with open(ALERT_LOG, 'a') as L:
                        L.write(msg + '\n')

    print('Realtime engine stopped.')

# ---------------------- CLI ---------------------- #

def cli():
    p = argparse.ArgumentParser()
    p.add_argument('--file', '-f', dest='file', default=INPUT_FILE, help='Input file to tail')
    p.add_argument('--from-start', action='store_true', help='Start reading from start of file instead of EOF')
    p.add_argument('--no-write', action='store_true', help='Do not write output json files (dry run)')
    p.add_argument('--alert-log', dest='alert_log', default=ALERT_LOG, help='Path to append alert log')
    p.add_argument('--window', type=int, default=WINDOW_MINUTES, help='Sliding window minutes')
    args = p.parse_args()

    global WRITE_OUTPUT_JSON, START_FROM_END, ALERT_LOG, WINDOW_MINUTES, DRY_RUN
    WRITE_OUTPUT_JSON = True
    START_FROM_END = not args.from_start
    ALERT_LOG = args.alert_log
    WINDOW_MINUTES = int(args.window)
    DRY_RUN = bool(args.no_write)

    if not os.path.exists(args.file):
        print('ERROR: input file not found:', args.file)
        sys.exit(1)

    run_realtime(args.file, start_from_end=START_FROM_END)

if __name__ == '__main__':
    cli()


SyntaxError: invalid syntax (ipython-input-4020145307.py, line 124)

In [None]:
#!/usr/bin/env python3
"""
realtime_ce_pe_engine_improved.py

Improved realtime tailing and CE/PE analysis engine based on the user's original script.
Key improvements:
 - Robust JSON parsing (handles object-per-line, JSON lists, and stringified 'Current').
 - Safer tail implementation (handles rotation/truncation) and immediate processing of appended data.
 - Configurable via top-of-file constants; small CLI wrapper available.
 - Reduced pandas overhead by building DataFrame carefully and avoiding excessive apply() usage.
 - Better type coercion and error handling so single malformed snapshot won't stop the engine.
 - Optional historical dump and optional dry-run (no file writes) flags.
 - Clearer logging, and optional alert logfile support.

Behavior: tail input file, keep a sliding time window of snapshots, compute per-strike summaries,
produce AUTO_TRADE_SIGNALS.json and LATEST_MERGED.json (unless WRITE_OUTPUT_JSON=False),
print alerts for signals/reversals/iv-crush to stdout and ALERT_LOG if enabled.

Note: this file is self-contained and intended to run on a machine that has pandas & numpy installed.
"""

import os
import sys
import time
import json
import signal
import argparse
from collections import defaultdict, Counter, deque
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any

import pandas as pd
import numpy as np

# ---------------------- CONFIG ---------------------- #
INPUT_FILE = "14112025_BANK_PNL.txt"      # default path to live appended snapshots (text)
REFRESH_SECONDS = 1.0                     # poll interval (seconds)
WINDOW_MINUTES = 15                       # sliding window size for analysis
START_FROM_END = False  # process existing data first, then tail new changes                     # if True, ignore historical lines and start tailing from EOF
WRITE_OUTPUT_JSON = True                  # write AUTO_TRADE_SIGNALS.json and LATEST_MERGED.json
OUT_AUTOTRADE = "AUTO_TRADE_SIGNALS.json"
OUT_LATEST = "LATEST_MERGED.json"
ALERT_LOG = None                          # path to an alert logfile or None to only print to stdout
IV_CRUSH_DROP = 15.0                      # percent IV drop threshold
REVERSAL_DROP_PCT = 12.0                  # reversal detection threshold (drop from peak)
REVERSAL_WINDOW_MIN = 5                   # look-back window for reversal check (minutes)
MAX_SNAPSHOTS_STORE = 10000               # maximum snapshots to keep in memory (safety)
HISTORICAL_DUMP = None                    # optional: path to write a periodic historical dump (or None)
DRY_RUN = False                           # if True, won't write output files (useful for testing)

running = True

# ---------------------- SIGNAL HANDLING ---------------------- #
def graceful_exit(signum, frame):
    global running
    running = False
    print("\nReceived exit signal. Shutting down...")

signal.signal(signal.SIGINT, graceful_exit)
signal.signal(signal.SIGTERM, graceful_exit)

# ---------------------- JSON PARSING ---------------------- #

def parse_line_json(line: str) -> List[Dict[str, Any]]:
    """
    Parse a line that may contain:
      - a single JSON object
      - a JSON-encoded list of objects
      - a text line that contains a JSON object somewhere inside
    Returns a list of flattened dicts (possibly empty).
    """
    out = []
    if not line:
        return out
    s = line.strip()
    if not s:
        return out

    # Try direct load (object or list)
    try:
        parsed = json.loads(s)
    except Exception:
        # try to find first '{' and parse from there
        try:
            i = s.index('{')
            parsed = json.loads(s[i:])
        except Exception:
            return out

    # normalize to list
    items = parsed if isinstance(parsed, list) else [parsed]

    for obj in items:
        flat = {}
        if not isinstance(obj, dict):
            continue
        # flatten top-level except 'Current' (handled below)
        for k, v in obj.items():
            if k != "Current":
                flat[k] = v
        # handle Current that may be a string or dict and may contain Previous/Current/Next blocks
        curr_raw = obj.get("Current")
        if isinstance(curr_raw, str):
            try:
                curr = json.loads(curr_raw)
            except Exception:
                curr = None
        elif isinstance(curr_raw, dict):
            curr = curr_raw
        else:
            curr = None

        if isinstance(curr, dict):
            for section in ("Previous", "Current", "Next"):
                block = curr.get(section)
                if isinstance(block, dict):
                    for key, val in block.items():
                        flat[f"{section}_{key}"] = val
        out.append(flat)
    return out

# ---------------------- TAIL SUPPORT ---------------------- #

def tail_file(path: str, start_from_end: bool = True):
    """
    Generator yielding new lines appended to the file.
    Handles file truncation/rotation by checking file size/inode changes.
    """
    if not os.path.exists(path):
        raise FileNotFoundError(f"Input file not found: {path}")

    with open(path, 'r', encoding='utf-8', errors='ignore') as fh:
        if start_from_end:
            fh.seek(0, os.SEEK_END)
        else:
            fh.seek(0)
        last_inode = os.fstat(fh.fileno()).st_ino if hasattr(os, 'fstat') else None
        while running:
            where = fh.tell()
            line = fh.readline()
            if not line:
                # detect truncation/rotation
                try:
                    st = os.stat(path)
                    if last_inode is not None and getattr(st, 'st_ino', None) != last_inode:
                        # file rotated - reopen
                        fh = open(path, 'r', encoding='utf-8', errors='ignore')
                        last_inode = os.fstat(fh.fileno()).st_ino if hasattr(os, 'fstat') else None
                        continue
                    if st.st_size < where:
                        # truncated
                        fh.seek(0)
                except FileNotFoundError:
                    # file removed — wait for it to reappear
                    time.sleep(max(0.1, REFRESH_SECONDS))
                    continue
                time.sleep(max(0.01, REFRESH_SECONDS))
                fh.seek(where)
                continue
            yield line

# ---------------------- IN-MEMORY STORE ---------------------- #
class SnapshotWindow:
    def __init__(self, window_minutes: int = WINDOW_MINUTES, max_snapshots: int = MAX_SNAPSHOTS_STORE):
        self.window_minutes = int(window_minutes)
        self.max_snapshots = int(max_snapshots)
        self.store = deque()  # (timestamp (pd.Timestamp), flat_dict)

    def append(self, flat: Dict[str, Any]):
        # determine timestamp robustly
        t = flat.get('LTT') or flat.get('ltt') or flat.get('time') or flat.get('ts') or None
        if isinstance(t, str):
            try:
                t = pd.to_datetime(t)
            except Exception:
                t = pd.Timestamp.now()
        elif isinstance(t, (pd.Timestamp, datetime)):
            t = pd.Timestamp(t)
        elif t is None:
            t = pd.Timestamp.now()
        else:
            try:
                t = pd.to_datetime(t)
            except Exception:
                t = pd.Timestamp.now()

        self.store.append((t, flat))
        # enforce max length
        while len(self.store) > self.max_snapshots:
            self.store.popleft()
        # prune by time window
        self.prune_old()

    def prune_old(self):
        if not self.store:
            return
        cutoff = pd.Timestamp.now() - pd.Timedelta(minutes=self.window_minutes)
        while self.store and self.store[0][0] < cutoff:
            self.store.popleft()

    def to_dataframe(self) -> pd.DataFrame:
        if not self.store:
            return pd.DataFrame()
        rows = []
        for t, flat in self.store:
            r = dict(flat)
            r['LTT'] = t
            rows.append(r)
        # Create dataframe once
        df = pd.DataFrame(rows)
        return df

# ---------------------- ANALYSIS HELPERS ---------------------- #

def pick_column(df: pd.DataFrame, ltp: str, prem: str) -> Optional[str]:
    return ltp if ltp in df.columns else (prem if prem in df.columns else None)


def build_series_from_df(df: pd.DataFrame, prev_call_col, curr_call_col, next_call_col,
                         prev_put_col, curr_put_col, next_put_col,
                         prev_str="Previous_Strikeprice", curr_str="Current_Strikeprice", next_str="Next_Strikeprice"):
    call_series = defaultdict(list)
    put_series = defaultdict(list)
    if df.empty:
        return call_series, put_series

    # iterate rows once
    for _, row in df.iterrows():
        t = row.get('LTT')
        for sc, pc in ((prev_str, prev_call_col), (curr_str, curr_call_col), (next_str, next_call_col)):
            if sc in row and pc and pc in row:
                scv = row.get(sc)
                pcv = row.get(pc)
                if pd.isna(scv) or pd.isna(pcv):
                    continue
                try:
                    s = int(scv)
                    p = float(pcv)
                except Exception:
                    continue
                call_series[s].append((t, p))
        for sc, pp in ((prev_str, prev_put_col), (curr_str, curr_put_col), (next_str, next_put_col)):
            if sc in row and pp and pp in row:
                scv = row.get(sc)
                ppv = row.get(pp)
                if pd.isna(scv) or pd.isna(ppv):
                    continue
                try:
                    s = int(scv)
                    p = float(ppv)
                except Exception:
                    continue
                put_series[s].append((t, p))
    return call_series, put_series


def summarize_series(pairs):
    if not pairs:
        return None
    sr = sorted(pairs, key=lambda x: x[0])
    prices = [p for _, p in sr]
    first = prices[0]
    last = prices[-1]
    peak = max(prices)
    trough = min(prices)
    abs_chg = last - first
    pct_chg = (abs_chg / first * 100) if first != 0 else np.nan
    return {
        'first': first, 'last': last, 'peak': peak, 'trough': trough,
        'abs_change': abs_chg, 'pct_change': pct_chg, 'n_obs': len(prices),
        'series_sorted': sr
    }


def forecast_from_pct(last, pct):
    if pd.isna(pct):
        pct = 0.0
    if pct >= 25:
        return (last+15, last+35), (last+25, last+60)
    if pct >= 8:
        return (last+6, last+18), (last+12, last+30)
    if pct > 0:
        return (last+2, last+8), (last+5, last+15)
    return (last-5, last+2), (last-8, last+5)


def aggregate_tags_and_mf(df: pd.DataFrame, strikes: List[int]):
    out = {int(s): {'tags': Counter(), 'call_mf': 0.0, 'put_mf': 0.0} for s in strikes}
    tag_cols = ["Previous_StrategyTag", "Current_StrategyTag", "Next_StrategyTag"]
    call_mflow_cols = ["Previous_CallMoneyFlow", "Current_CallMoneyFlow", "Next_CallMoneyFlow",
                       "Previous_TotalcallMoneyFlow", "Current_TotalcallMoneyFlow", "Next_TotalcallMoneyFlow"]
    put_mflow_cols  = ["Previous_PutMoneyFlow", "Current_PutMoneyFlow", "Next_PutMoneyFlow",
                       "Previous_TotalputMoneyFlow", "Current_TotalputMoneyFlow", "Next_TotalputMoneyFlow"]

    if df.empty:
        return out

    for _, row in df.iterrows():
        for sc in ("Previous_Strikeprice", "Current_Strikeprice", "Next_Strikeprice"):
            if sc not in row:
                continue
            scv = row.get(sc)
            if pd.isna(scv):
                continue
            try:
                s = int(scv)
            except Exception:
                continue
            if s not in out:
                continue
            for tc in tag_cols:
                if tc in row:
                    v = row.get(tc)
                    if isinstance(v, str) and v.strip():
                        tokens = [t.strip() for t in v.replace('|', ';').split(';') if t.strip()]
                        for tkn in tokens:
                            out[s]['tags'][tkn] += 1
            for cm in call_mflow_cols:
                if cm in row:
                    val = row.get(cm)
                    if not pd.isna(val):
                        try:
                            out[s]['call_mf'] += float(val)
                        except Exception:
                            pass
            for pm in put_mflow_cols:
                if pm in row:
                    val = row.get(pm)
                    if not pd.isna(val):
                        try:
                            out[s]['put_mf'] += float(val)
                        except Exception:
                            pass
    return out


def decide_action_from_row(row):
    call_pct = row.get('pct_change_call') if pd.notna(row.get('pct_change_call')) else 0
    put_pct  = row.get('pct_change_put')  if pd.notna(row.get('pct_change_put'))  else 0
    tags = (row.get('tags') or '').lower()
    bull_boost = ('call buying' in tags) or ('oi_support_call' in tags) or ('bull' in tags)
    bear_boost = ('put buying' in tags) or ('call writing' in tags) or ('bear' in tags)

    if (put_pct > 8) or (put_pct > 5 and bear_boost):
        return 'BUY_PUT'
    if (call_pct > 8) or (call_pct > 5 and bull_boost):
        return 'BUY_CALL'
    return 'HOLD'

# ---------------------- CORE ANALYSIS ---------------------- #

def analyze_window(window: SnapshotWindow):
    df = window.to_dataframe()
    # pick columns with fallback
    prev_call_col = pick_column(df, 'Previous_Call_ltp', 'Previous_Call_Premium')
    curr_call_col = pick_column(df, 'Current_Call_ltp', 'Current_Call_Premium')
    next_call_col = pick_column(df, 'Next_Call_ltp', 'Next_Call_Premium')

    prev_put_col = pick_column(df, 'Previous_Put_ltp', 'Previous_Put_Premium')
    curr_put_col = pick_column(df, 'Current_Put_ltp', 'Current_Put_Premium')
    next_put_col = pick_column(df, 'Next_Put_ltp', 'Next_Put_Premium')

    call_series, put_series = build_series_from_df(df, prev_call_col, curr_call_col, next_call_col,
                                                   prev_put_col, curr_put_col, next_put_col)

    all_strikes = sorted(set(list(call_series.keys()) + list(put_series.keys())))

    merged_rows = []
    for s in all_strikes:
        cs = summarize_series(call_series.get(s, []))
        ps = summarize_series(put_series.get(s, []))
        n_call = cs['n_obs'] if cs else 0
        n_put  = ps['n_obs'] if ps else 0
        # prefer side with more observations
        if n_call >= n_put and cs:
            first_p = cs['first']; last_p = cs['last']; peak = cs['peak']; trough = cs['trough']
            abs_chg = cs['abs_change']; pct = cs['pct_change']
            f5, f10 = forecast_from_pct(last_p, pct)
            row = {
                'strike': int(s),
                'n_obs': n_call + n_put,
                'n_obs_call': n_call, 'n_obs_put': n_put,
                'first_premium': first_p, 'last_premium': last_p, 'peak_premium': peak, 'trough_premium': trough,
                'abs_change': abs_chg, 'pct_change': pct,
                '5min_low': f5[0], '5min_high': f5[1], '10min_low': f10[0], '10min_high': f10[1],
                'first_call_ltp': cs['first'], 'last_call_ltp': cs['last'], 'peak_call_ltp': cs['peak'], 'trough_call_ltp': cs['trough'],
                'abs_change_call': cs['abs_change'], 'pct_change_call': cs['pct_change'],
                'first_put_ltp': ps['first'] if ps else np.nan, 'last_put_ltp': ps['last'] if ps else np.nan,
                'pct_change_put': ps['pct_change'] if ps else np.nan
            }
        elif ps:
            first_p = ps['first']; last_p = ps['last']; peak = ps['peak']; trough = ps['trough']
            abs_chg = ps['abs_change']; pct = ps['pct_change']
            f5, f10 = forecast_from_pct(last_p, pct)
            row = {
                'strike': int(s),
                'n_obs': n_call + n_put,
                'n_obs_call': n_call, 'n_obs_put': n_put,
                'first_premium': first_p, 'last_premium': last_p, 'peak_premium': peak, 'trough_premium': trough,
                'abs_change': abs_chg, 'pct_change': pct,
                '5min_low': f5[0], '5min_high': f5[1], '10min_low': f10[0], '10min_high': f10[1],
                'first_put_ltp': ps['first'], 'last_put_ltp': ps['last'], 'peak_put_ltp': ps['peak'], 'trough_put_ltp': ps['trough'],
                'abs_change_put': ps['abs_change'], 'pct_change_put': ps['pct_change'],
                'first_call_ltp': cs['first'] if cs else np.nan, 'last_call_ltp': cs['last'] if cs else np.nan,
                'pct_change_call': cs['pct_change'] if cs else np.nan
            }
        else:
            continue
        merged_rows.append(row)

    if not merged_rows:
        return [], [], [], []

    merged_df = pd.DataFrame(merged_rows)
    strike_info = aggregate_tags_and_mf(df, merged_df['strike'].tolist())
    # attach tags and moneyflow
    merged_df['tags'] = merged_df['strike'].apply(lambda s: ';'.join([f"{k}:{v}" for k, v in strike_info[int(s)]['tags'].items()]) if strike_info[int(s)]['tags'] else '')
    merged_df['call_moneyflow'] = merged_df['strike'].apply(lambda s: strike_info[int(s)]['call_mf'])
    merged_df['put_moneyflow']  = merged_df['strike'].apply(lambda s: strike_info[int(s)]['put_mf'])

    # build reasons
    def build_reasons(row):
        reasons = []
        tags_text = str(row.get('tags', ''))
        if any(k in tags_text.lower() for k in ['rsi', 'macd']):
            reasons.append('RSI/MACD momentum')
        if 'vwap' in tags_text.lower():
            reasons.append('VWAP divergence')
        if 'oi' in tags_text.lower():
            reasons.append('OI support/resistance')
        if row.get('call_moneyflow', 0) > 0:
            reasons.append('Call net buying')
        if row.get('put_moneyflow', 0) > 0:
            reasons.append('Put net buying')
        pct = row.get('pct_change', 0) or 0
        try:
            if pct > 10:
                reasons.append('Strong premium move')
            elif pct > 3:
                reasons.append('Moderate premium move')
        except Exception:
            pass
        return '; '.join(dict.fromkeys(reasons)) if reasons else 'No strong signals'

    merged_df['reasons'] = merged_df.apply(build_reasons, axis=1)
    merged_df['recommended_action'] = merged_df.apply(decide_action_from_row, axis=1)

    # quick high-conviction stats (approximation)
    hc_col = 'Current_IsHighConvictionSignal'
    totals, successes, rates = [], [], []
    for _, r in merged_df.iterrows():
        s = int(r['strike'])
        total = success = 0
        if hc_col in df.columns:
            cond = (df.get('Current_Strikeprice') == s) & (df.get(hc_col) == True)
            hc_rows = df[cond]
            total = int(hc_rows.shape[0])
            for _, hr in hc_rows.iterrows():
                t0 = hr.get('LTT')
                if pd.isna(t0):
                    continue
                action = r['recommended_action']
                base_col = curr_call_col if (action == 'BUY_CALL' and curr_call_col in df.columns) else (curr_put_col if (action == 'BUY_PUT' and curr_put_col in df.columns) else (curr_call_col if curr_call_col in df.columns else None))
                if base_col is None:
                    continue
                p0 = hr.get(base_col)
                if pd.isna(p0):
                    continue
                window = df[(df['LTT'] >= t0) & (df['LTT'] <= (t0 + pd.Timedelta(minutes=3)))]
                if window.empty:
                    continue
                try:
                    if window[base_col].max() > p0:
                        success += 1
                except Exception:
                    pass
        rate = (success / total) if total > 0 else None
        totals.append(total); successes.append(success); rates.append(rate)
    merged_df['highconv_total'] = totals
    merged_df['highconv_success'] = successes
    merged_df['highconv_hit_rate'] = rates

    merged_df['Current_Strikeprice'] = merged_df['strike']

    # assemble signals
    signals = []
    for _, r in merged_df.iterrows():
        if r.get('recommended_action') in ('BUY_CALL', 'BUY_PUT'):
            strength = float(r.get('pct_change_call', 0) or r.get('pct_change_put', 0) or 0)
            signals.append({
                'strike': int(r['strike']),
                'action': r.get('recommended_action'),
                'strength': strength,
                'reason': r.get('reasons', ''),
                'tags': r.get('tags', ''),
                'last_premium': float(r.get('last_premium') or 0),
                'time': str(pd.Timestamp.now())
            })

    # detect reversals
    reversals = []
    for s in all_strikes:
        series = call_series.get(s) if call_series.get(s) else put_series.get(s)
        if not series:
            continue
        sr = sorted(series, key=lambda x: x[0])
        t_last = sr[-1][0]
        cutoff = t_last - pd.Timedelta(minutes=REVERSAL_WINDOW_MIN)
        window = [p for t, p in sr if t >= cutoff]
        if not window:
            continue
        peak = max(window); lastp = window[-1]
        drop = (peak - lastp) / peak * 100 if peak > 0 else 0
        if drop >= REVERSAL_DROP_PCT:
            reversals.append({'strike': s, 'peak': peak, 'last': lastp, 'drop_pct': drop})

    # detect IV crush
    iv_crush = []
    if not df.empty:
        for _, row in df.iterrows():
            for prev_iv, curr_iv, side in [("Previous_Call_IV", "Current_Call_IV", "CALL"), ("Previous_Put_IV", "Current_Put_IV", "PUT")]:
                if prev_iv in row and curr_iv in row and pd.notna(row[prev_iv]) and pd.notna(row[curr_iv]) and row[prev_iv] > 0:
                    drop = (row[prev_iv] - row[curr_iv]) / row[prev_iv] * 100
                    if drop >= IV_CRUSH_DROP:
                        iv_crush.append({'time': str(row.get('LTT')), 'strike': row.get('Current_Strikeprice'), 'side': side, 'drop_pct': drop})

    return merged_df.to_dict(orient='records'), signals, reversals, iv_crush

# ---------------------- MAIN LOOP ---------------------- #

def run_realtime(input_path: str, start_from_end: bool = START_FROM_END):
    window = SnapshotWindow(window_minutes=WINDOW_MINUTES)
    tailer = tail_file(input_path, start_from_end)

    last_analyze = 0.0
    print(f"Realtime engine started. Watching: {input_path}")

    for raw_line in tailer:
        if not running:
            break
        try:
            parsed_list = parse_line_json(raw_line)
        except Exception as e:
            print('Parse error:', e)
            continue
        if not parsed_list:
            continue
        for flat in parsed_list:
            try:
                window.append(flat)
            except Exception as e:
                print('Append error (ignored):', e)
                continue

        now = time.time()
        if now - last_analyze < max(0.01, REFRESH_SECONDS * 0.6):
            continue
        last_analyze = now

        try:
            merged_rows, signals, reversals, iv_crush = analyze_window(window)
        except Exception as e:
            print('Analysis error (skipping this cycle):', e)
            continue

        # write outputs
        if WRITE_OUTPUT_JSON and not DRY_RUN:
            try:
                with open(OUT_AUTOTRADE, 'w') as fh:
                    json.dump(signals, fh, indent=2, default=str)
                with open(OUT_LATEST, 'w') as fh:
                    json.dump({'timestamp': str(pd.Timestamp.now()), 'merged': merged_rows, 'reversals': reversals, 'iv_crush': iv_crush}, fh, indent=2, default=str)
            except Exception as e:
                print('Warning: failed to write outputs:', e)

        # optional historical dump
        if HISTORICAL_DUMP:
            try:
                with open(HISTORICAL_DUMP, 'a') as fh:
                    fh.write(json.dumps({'ts': str(pd.Timestamp.now()), 'merged': merged_rows}) + '\n')
            except Exception:
                pass

        # print alerts
        if signals:
            for s in signals:
                msg = f"[SIGNAL] {s['time']} Strike {s['strike']} => {s['action']} (str={s['strength']:.2f}) reason={s['reason']}"
                print(msg)
                if ALERT_LOG:
                    with open(ALERT_LOG, 'a') as L:
                        L.write(msg + '\n')
        if reversals:
            for r in reversals:
                msg = f"[REVERSAL] Strike {r['strike']} drop {r['drop_pct']:.1f}% (peak {r['peak']} -> last {r['last']})"
                print(msg)
                if ALERT_LOG:
                    with open(ALERT_LOG, 'a') as L:
                        L.write(msg + '\n')
        if iv_crush:
            for e in iv_crush:
                msg = f"[IV_CRUSH] {e['time']} Strike {e['strike']} {e['side']} drop {e['drop_pct']:.1f}%"
                print(msg)
                if ALERT_LOG:
                    with open(ALERT_LOG, 'a') as L:
                        L.write(msg + '\n')

    print('Realtime engine stopped.')

# ---------------------- CLI ---------------------- #

def cli():
    p = argparse.ArgumentParser()
    p.add_argument('--file', '-f', dest='file', default=INPUT_FILE, help='Input file to tail')
    p.add_argument('--from-start', action='store_true', help='Start reading from start of file instead of EOF')
    p.add_argument('--no-write', action='store_true', help='Do not write output json files (dry run)')
    p.add_argument('--alert-log', dest='alert_log', default=ALERT_LOG, help='Path to append alert log')
    p.add_argument('--window', type=int, default=WINDOW_MINUTES, help='Sliding window minutes')
    args = p.parse_args()

    global WRITE_OUTPUT_JSON, START_FROM_END, ALERT_LOG, WINDOW_MINUTES, DRY_RUN
    WRITE_OUTPUT_JSON = True
    START_FROM_END = not args.from_start
    ALERT_LOG = args.alert_log
    WINDOW_MINUTES = int(args.window)
    DRY_RUN = bool(args.no_write)

    if not os.path.exists(args.file):
        print('ERROR: input file not found:', args.file)
        sys.exit(1)

    run_realtime(args.file, start_from_end=START_FROM_END)

if __name__ == '__main__':
    cli()


SyntaxError: name 'ALERT_LOG' is used prior to global declaration (ipython-input-4211273433.py, line 620)

In [None]:
#!/usr/bin/env python3
"""
realtime_ce_pe_engine_improved.py

Improved realtime tailing and CE/PE analysis engine based on the user's original script.
Key improvements:
 - Robust JSON parsing (handles object-per-line, JSON lists, and stringified 'Current').
 - Safer tail implementation (handles rotation/truncation) and immediate processing of appended data.
 - Configurable via top-of-file constants; small CLI wrapper available.
 - Reduced pandas overhead by building DataFrame carefully and avoiding excessive apply() usage.
 - Better type coercion and error handling so single malformed snapshot won't stop the engine.
 - Optional historical dump and optional dry-run (no file writes) flags.
 - Clearer logging, and optional alert logfile support.

Behavior: tail input file, keep a sliding time window of snapshots, compute per-strike summaries,
produce AUTO_TRADE_SIGNALS.json and LATEST_MERGED.json (unless WRITE_OUTPUT_JSON=False),
print alerts for signals/reversals/iv-crush to stdout and ALERT_LOG if enabled.

Note: this file is self-contained and intended to run on a machine that has pandas & numpy installed.
"""

import os
import sys
import time
import json
import signal
import argparse
from collections import defaultdict, Counter, deque
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any

import pandas as pd
import numpy as np

# ---------------------- CONFIG ---------------------- #
INPUT_FILE = "14112025_BANK_PNL.txt"      # default path to live appended snapshots (text)
REFRESH_SECONDS = 1.0                     # poll interval (seconds)
WINDOW_MINUTES = 15                       # sliding window size for analysis
START_FROM_END = False  # process existing data first, then tail new changes                     # if True, ignore historical lines and start tailing from EOF
WRITE_OUTPUT_JSON = True                  # write AUTO_TRADE_SIGNALS.json and LATEST_MERGED.json
OUT_AUTOTRADE = "AUTO_TRADE_SIGNALS.json"
OUT_LATEST = "LATEST_MERGED.json"
ALERT_LOG = None                          # path to an alert logfile or None to only print to stdout
IV_CRUSH_DROP = 15.0                      # percent IV drop threshold
REVERSAL_DROP_PCT = 12.0                  # reversal detection threshold (drop from peak)
REVERSAL_WINDOW_MIN = 5                   # look-back window for reversal check (minutes)
MAX_SNAPSHOTS_STORE = 10000               # maximum snapshots to keep in memory (safety)
HISTORICAL_DUMP = None                    # optional: path to write a periodic historical dump (or None)
DRY_RUN = False                           # if True, won't write output files (useful for testing)

running = True

# ---------------------- SIGNAL HANDLING ---------------------- #
def graceful_exit(signum, frame):
    global running
    running = False
    print("\nReceived exit signal. Shutting down...")

signal.signal(signal.SIGINT, graceful_exit)
signal.signal(signal.SIGTERM, graceful_exit)

# ---------------------- JSON PARSING ---------------------- #

def parse_line_json(line: str) -> List[Dict[str, Any]]:
    """
    Parse a line that may contain:
      - a single JSON object
      - a JSON-encoded list of objects
      - a text line that contains a JSON object somewhere inside
    Returns a list of flattened dicts (possibly empty).
    """
    out = []
    if not line:
        return out
    s = line.strip()
    if not s:
        return out

    # Try direct load (object or list)
    try:
        parsed = json.loads(s)
    except Exception:
        # try to find first '{' and parse from there
        try:
            i = s.index('{')
            parsed = json.loads(s[i:])
        except Exception:
            return out

    # normalize to list
    items = parsed if isinstance(parsed, list) else [parsed]

    for obj in items:
        flat = {}
        if not isinstance(obj, dict):
            continue
        # flatten top-level except 'Current' (handled below)
        for k, v in obj.items():
            if k != "Current":
                flat[k] = v
        # handle Current that may be a string or dict and may contain Previous/Current/Next blocks
        curr_raw = obj.get("Current")
        if isinstance(curr_raw, str):
            try:
                curr = json.loads(curr_raw)
            except Exception:
                curr = None
        elif isinstance(curr_raw, dict):
            curr = curr_raw
        else:
            curr = None

        if isinstance(curr, dict):
            for section in ("Previous", "Current", "Next"):
                block = curr.get(section)
                if isinstance(block, dict):
                    for key, val in block.items():
                        flat[f"{section}_{key}"] = val
        out.append(flat)
    return out

# ---------------------- TAIL SUPPORT ---------------------- #

def tail_file(path: str, start_from_end: bool = True):
    """
    Generator yielding new lines appended to the file.
    Handles file truncation/rotation by checking file size/inode changes.
    """
    if not os.path.exists(path):
        raise FileNotFoundError(f"Input file not found: {path}")

    with open(path, 'r', encoding='utf-8', errors='ignore') as fh:
        if start_from_end:
            fh.seek(0, os.SEEK_END)
        else:
            fh.seek(0)
        last_inode = os.fstat(fh.fileno()).st_ino if hasattr(os, 'fstat') else None
        while running:
            where = fh.tell()
            line = fh.readline()
            if not line:
                # detect truncation/rotation
                try:
                    st = os.stat(path)
                    if last_inode is not None and getattr(st, 'st_ino', None) != last_inode:
                        # file rotated - reopen
                        fh = open(path, 'r', encoding='utf-8', errors='ignore')
                        last_inode = os.fstat(fh.fileno()).st_ino if hasattr(os, 'fstat') else None
                        continue
                    if st.st_size < where:
                        # truncated
                        fh.seek(0)
                except FileNotFoundError:
                    # file removed — wait for it to reappear
                    time.sleep(max(0.1, REFRESH_SECONDS))
                    continue
                time.sleep(max(0.01, REFRESH_SECONDS))
                fh.seek(where)
                continue
            yield line

# ---------------------- IN-MEMORY STORE ---------------------- #
class SnapshotWindow:
    def __init__(self, window_minutes: int = WINDOW_MINUTES, max_snapshots: int = MAX_SNAPSHOTS_STORE):
        self.window_minutes = int(window_minutes)
        self.max_snapshots = int(max_snapshots)
        self.store = deque()  # (timestamp (pd.Timestamp), flat_dict)

    def append(self, flat: Dict[str, Any]):
        # determine timestamp robustly
        t = flat.get('LTT') or flat.get('ltt') or flat.get('time') or flat.get('ts') or None
        if isinstance(t, str):
            try:
                t = pd.to_datetime(t)
            except Exception:
                t = pd.Timestamp.now()
        elif isinstance(t, (pd.Timestamp, datetime)):
            t = pd.Timestamp(t)
        elif t is None:
            t = pd.Timestamp.now()
        else:
            try:
                t = pd.to_datetime(t)
            except Exception:
                t = pd.Timestamp.now()

        self.store.append((t, flat))
        # enforce max length
        while len(self.store) > self.max_snapshots:
            self.store.popleft()
        # prune by time window
        self.prune_old()

    def prune_old(self):
        if not self.store:
            return
        cutoff = pd.Timestamp.now() - pd.Timedelta(minutes=self.window_minutes)
        while self.store and self.store[0][0] < cutoff:
            self.store.popleft()

    def to_dataframe(self) -> pd.DataFrame:
        if not self.store:
            return pd.DataFrame()
        rows = []
        for t, flat in self.store:
            r = dict(flat)
            r['LTT'] = t
            rows.append(r)
        # Create dataframe once
        df = pd.DataFrame(rows)
        return df

# ---------------------- ANALYSIS HELPERS ---------------------- #

def pick_column(df: pd.DataFrame, ltp: str, prem: str) -> Optional[str]:
    return ltp if ltp in df.columns else (prem if prem in df.columns else None)


def build_series_from_df(df: pd.DataFrame, prev_call_col, curr_call_col, next_call_col,
                         prev_put_col, curr_put_col, next_put_col,
                         prev_str="Previous_Strikeprice", curr_str="Current_Strikeprice", next_str="Next_Strikeprice"):
    call_series = defaultdict(list)
    put_series = defaultdict(list)
    if df.empty:
        return call_series, put_series

    # iterate rows once
    for _, row in df.iterrows():
        t = row.get('LTT')
        for sc, pc in ((prev_str, prev_call_col), (curr_str, curr_call_col), (next_str, next_call_col)):
            if sc in row and pc and pc in row:
                scv = row.get(sc)
                pcv = row.get(pc)
                if pd.isna(scv) or pd.isna(pcv):
                    continue
                try:
                    s = int(scv)
                    p = float(pcv)
                except Exception:
                    continue
                call_series[s].append((t, p))
        for sc, pp in ((prev_str, prev_put_col), (curr_str, curr_put_col), (next_str, next_put_col)):
            if sc in row and pp and pp in row:
                scv = row.get(sc)
                ppv = row.get(pp)
                if pd.isna(scv) or pd.isna(ppv):
                    continue
                try:
                    s = int(scv)
                    p = float(ppv)
                except Exception:
                    continue
                put_series[s].append((t, p))
    return call_series, put_series


def summarize_series(pairs):
    if not pairs:
        return None
    sr = sorted(pairs, key=lambda x: x[0])
    prices = [p for _, p in sr]
    first = prices[0]
    last = prices[-1]
    peak = max(prices)
    trough = min(prices)
    abs_chg = last - first
    pct_chg = (abs_chg / first * 100) if first != 0 else np.nan
    return {
        'first': first, 'last': last, 'peak': peak, 'trough': trough,
        'abs_change': abs_chg, 'pct_change': pct_chg, 'n_obs': len(prices),
        'series_sorted': sr
    }


def forecast_from_pct(last, pct):
    if pd.isna(pct):
        pct = 0.0
    if pct >= 25:
        return (last+15, last+35), (last+25, last+60)
    if pct >= 8:
        return (last+6, last+18), (last+12, last+30)
    if pct > 0:
        return (last+2, last+8), (last+5, last+15)
    return (last-5, last+2), (last-8, last+5)


def aggregate_tags_and_mf(df: pd.DataFrame, strikes: List[int]):
    out = {int(s): {'tags': Counter(), 'call_mf': 0.0, 'put_mf': 0.0} for s in strikes}
    tag_cols = ["Previous_StrategyTag", "Current_StrategyTag", "Next_StrategyTag"]
    call_mflow_cols = ["Previous_CallMoneyFlow", "Current_CallMoneyFlow", "Next_CallMoneyFlow",
                       "Previous_TotalcallMoneyFlow", "Current_TotalcallMoneyFlow", "Next_TotalcallMoneyFlow"]
    put_mflow_cols  = ["Previous_PutMoneyFlow", "Current_PutMoneyFlow", "Next_PutMoneyFlow",
                       "Previous_TotalputMoneyFlow", "Current_TotalputMoneyFlow", "Next_TotalputMoneyFlow"]

    if df.empty:
        return out

    for _, row in df.iterrows():
        for sc in ("Previous_Strikeprice", "Current_Strikeprice", "Next_Strikeprice"):
            if sc not in row:
                continue
            scv = row.get(sc)
            if pd.isna(scv):
                continue
            try:
                s = int(scv)
            except Exception:
                continue
            if s not in out:
                continue
            for tc in tag_cols:
                if tc in row:
                    v = row.get(tc)
                    if isinstance(v, str) and v.strip():
                        tokens = [t.strip() for t in v.replace('|', ';').split(';') if t.strip()]
                        for tkn in tokens:
                            out[s]['tags'][tkn] += 1
            for cm in call_mflow_cols:
                if cm in row:
                    val = row.get(cm)
                    if not pd.isna(val):
                        try:
                            out[s]['call_mf'] += float(val)
                        except Exception:
                            pass
            for pm in put_mflow_cols:
                if pm in row:
                    val = row.get(pm)
                    if not pd.isna(val):
                        try:
                            out[s]['put_mf'] += float(val)
                        except Exception:
                            pass
    return out


def decide_action_from_row(row):
    call_pct = row.get('pct_change_call') if pd.notna(row.get('pct_change_call')) else 0
    put_pct  = row.get('pct_change_put')  if pd.notna(row.get('pct_change_put'))  else 0
    tags = (row.get('tags') or '').lower()
    bull_boost = ('call buying' in tags) or ('oi_support_call' in tags) or ('bull' in tags)
    bear_boost = ('put buying' in tags) or ('call writing' in tags) or ('bear' in tags)

    if (put_pct > 8) or (put_pct > 5 and bear_boost):
        return 'BUY_PUT'
    if (call_pct > 8) or (call_pct > 5 and bull_boost):
        return 'BUY_CALL'
    return 'HOLD'

# ---------------------- CORE ANALYSIS ---------------------- #

def analyze_window(window: SnapshotWindow):
    df = window.to_dataframe()
    # pick columns with fallback
    prev_call_col = pick_column(df, 'Previous_Call_ltp', 'Previous_Call_Premium')
    curr_call_col = pick_column(df, 'Current_Call_ltp', 'Current_Call_Premium')
    next_call_col = pick_column(df, 'Next_Call_ltp', 'Next_Call_Premium')

    prev_put_col = pick_column(df, 'Previous_Put_ltp', 'Previous_Put_Premium')
    curr_put_col = pick_column(df, 'Current_Put_ltp', 'Current_Put_Premium')
    next_put_col = pick_column(df, 'Next_Put_ltp', 'Next_Put_Premium')

    call_series, put_series = build_series_from_df(df, prev_call_col, curr_call_col, next_call_col,
                                                   prev_put_col, curr_put_col, next_put_col)

    all_strikes = sorted(set(list(call_series.keys()) + list(put_series.keys())))

    merged_rows = []
    for s in all_strikes:
        cs = summarize_series(call_series.get(s, []))
        ps = summarize_series(put_series.get(s, []))
        n_call = cs['n_obs'] if cs else 0
        n_put  = ps['n_obs'] if ps else 0
        # prefer side with more observations
        if n_call >= n_put and cs:
            first_p = cs['first']; last_p = cs['last']; peak = cs['peak']; trough = cs['trough']
            abs_chg = cs['abs_change']; pct = cs['pct_change']
            f5, f10 = forecast_from_pct(last_p, pct)
            row = {
                'strike': int(s),
                'n_obs': n_call + n_put,
                'n_obs_call': n_call, 'n_obs_put': n_put,
                'first_premium': first_p, 'last_premium': last_p, 'peak_premium': peak, 'trough_premium': trough,
                'abs_change': abs_chg, 'pct_change': pct,
                '5min_low': f5[0], '5min_high': f5[1], '10min_low': f10[0], '10min_high': f10[1],
                'first_call_ltp': cs['first'], 'last_call_ltp': cs['last'], 'peak_call_ltp': cs['peak'], 'trough_call_ltp': cs['trough'],
                'abs_change_call': cs['abs_change'], 'pct_change_call': cs['pct_change'],
                'first_put_ltp': ps['first'] if ps else np.nan, 'last_put_ltp': ps['last'] if ps else np.nan,
                'pct_change_put': ps['pct_change'] if ps else np.nan
            }
        elif ps:
            first_p = ps['first']; last_p = ps['last']; peak = ps['peak']; trough = ps['trough']
            abs_chg = ps['abs_change']; pct = ps['pct_change']
            f5, f10 = forecast_from_pct(last_p, pct)
            row = {
                'strike': int(s),
                'n_obs': n_call + n_put,
                'n_obs_call': n_call, 'n_obs_put': n_put,
                'first_premium': first_p, 'last_premium': last_p, 'peak_premium': peak, 'trough_premium': trough,
                'abs_change': abs_chg, 'pct_change': pct,
                '5min_low': f5[0], '5min_high': f5[1], '10min_low': f10[0], '10min_high': f10[1],
                'first_put_ltp': ps['first'], 'last_put_ltp': ps['last'], 'peak_put_ltp': ps['peak'], 'trough_put_ltp': ps['trough'],
                'abs_change_put': ps['abs_change'], 'pct_change_put': ps['pct_change'],
                'first_call_ltp': cs['first'] if cs else np.nan, 'last_call_ltp': cs['last'] if cs else np.nan,
                'pct_change_call': cs['pct_change'] if cs else np.nan
            }
        else:
            continue
        merged_rows.append(row)

    if not merged_rows:
        return [], [], [], []

    merged_df = pd.DataFrame(merged_rows)
    strike_info = aggregate_tags_and_mf(df, merged_df['strike'].tolist())
    # attach tags and moneyflow
    merged_df['tags'] = merged_df['strike'].apply(lambda s: ';'.join([f"{k}:{v}" for k, v in strike_info[int(s)]['tags'].items()]) if strike_info[int(s)]['tags'] else '')
    merged_df['call_moneyflow'] = merged_df['strike'].apply(lambda s: strike_info[int(s)]['call_mf'])
    merged_df['put_moneyflow']  = merged_df['strike'].apply(lambda s: strike_info[int(s)]['put_mf'])

    # build reasons
    def build_reasons(row):
        reasons = []
        tags_text = str(row.get('tags', ''))
        if any(k in tags_text.lower() for k in ['rsi', 'macd']):
            reasons.append('RSI/MACD momentum')
        if 'vwap' in tags_text.lower():
            reasons.append('VWAP divergence')
        if 'oi' in tags_text.lower():
            reasons.append('OI support/resistance')
        if row.get('call_moneyflow', 0) > 0:
            reasons.append('Call net buying')
        if row.get('put_moneyflow', 0) > 0:
            reasons.append('Put net buying')
        pct = row.get('pct_change', 0) or 0
        try:
            if pct > 10:
                reasons.append('Strong premium move')
            elif pct > 3:
                reasons.append('Moderate premium move')
        except Exception:
            pass
        return '; '.join(dict.fromkeys(reasons)) if reasons else 'No strong signals'

    merged_df['reasons'] = merged_df.apply(build_reasons, axis=1)
    merged_df['recommended_action'] = merged_df.apply(decide_action_from_row, axis=1)

    # quick high-conviction stats (approximation)
    hc_col = 'Current_IsHighConvictionSignal'
    totals, successes, rates = [], [], []
    for _, r in merged_df.iterrows():
        s = int(r['strike'])
        total = success = 0
        if hc_col in df.columns:
            cond = (df.get('Current_Strikeprice') == s) & (df.get(hc_col) == True)
            hc_rows = df[cond]
            total = int(hc_rows.shape[0])
            for _, hr in hc_rows.iterrows():
                t0 = hr.get('LTT')
                if pd.isna(t0):
                    continue
                action = r['recommended_action']
                base_col = curr_call_col if (action == 'BUY_CALL' and curr_call_col in df.columns) else (curr_put_col if (action == 'BUY_PUT' and curr_put_col in df.columns) else (curr_call_col if curr_call_col in df.columns else None))
                if base_col is None:
                    continue
                p0 = hr.get(base_col)
                if pd.isna(p0):
                    continue
                window = df[(df['LTT'] >= t0) & (df['LTT'] <= (t0 + pd.Timedelta(minutes=3)))]
                if window.empty:
                    continue
                try:
                    if window[base_col].max() > p0:
                        success += 1
                except Exception:
                    pass
        rate = (success / total) if total > 0 else None
        totals.append(total); successes.append(success); rates.append(rate)
    merged_df['highconv_total'] = totals
    merged_df['highconv_success'] = successes
    merged_df['highconv_hit_rate'] = rates

    merged_df['Current_Strikeprice'] = merged_df['strike']

    # assemble signals
    signals = []
    for _, r in merged_df.iterrows():
        if r.get('recommended_action') in ('BUY_CALL', 'BUY_PUT'):
            strength = float(r.get('pct_change_call', 0) or r.get('pct_change_put', 0) or 0)
            signals.append({
                'strike': int(r['strike']),
                'action': r.get('recommended_action'),
                'strength': strength,
                'reason': r.get('reasons', ''),
                'tags': r.get('tags', ''),
                'last_premium': float(r.get('last_premium') or 0),
                'time': str(pd.Timestamp.now())
            })

    # detect reversals
    reversals = []
    for s in all_strikes:
        series = call_series.get(s) if call_series.get(s) else put_series.get(s)
        if not series:
            continue
        sr = sorted(series, key=lambda x: x[0])
        t_last = sr[-1][0]
        cutoff = t_last - pd.Timedelta(minutes=REVERSAL_WINDOW_MIN)
        window = [p for t, p in sr if t >= cutoff]
        if not window:
            continue
        peak = max(window); lastp = window[-1]
        drop = (peak - lastp) / peak * 100 if peak > 0 else 0
        if drop >= REVERSAL_DROP_PCT:
            reversals.append({'strike': s, 'peak': peak, 'last': lastp, 'drop_pct': drop})

    # detect IV crush
    iv_crush = []
    if not df.empty:
        for _, row in df.iterrows():
            for prev_iv, curr_iv, side in [("Previous_Call_IV", "Current_Call_IV", "CALL"), ("Previous_Put_IV", "Current_Put_IV", "PUT")]:
                if prev_iv in row and curr_iv in row and pd.notna(row[prev_iv]) and pd.notna(row[curr_iv]) and row[prev_iv] > 0:
                    drop = (row[prev_iv] - row[curr_iv]) / row[prev_iv] * 100
                    if drop >= IV_CRUSH_DROP:
                        iv_crush.append({'time': str(row.get('LTT')), 'strike': row.get('Current_Strikeprice'), 'side': side, 'drop_pct': drop})

    return merged_df.to_dict(orient='records'), signals, reversals, iv_crush

# ---------------------- MAIN LOOP ---------------------- #

def run_realtime(input_path: str, start_from_end: bool = START_FROM_END):
    window = SnapshotWindow(window_minutes=WINDOW_MINUTES)
    tailer = tail_file(input_path, start_from_end)

    last_analyze = 0.0
    print(f"Realtime engine started. Watching: {input_path}")

    for raw_line in tailer:
        if not running:
            break
        try:
            parsed_list = parse_line_json(raw_line)
        except Exception as e:
            print('Parse error:', e)
            continue
        if not parsed_list:
            continue
        for flat in parsed_list:
            try:
                window.append(flat)
            except Exception as e:
                print('Append error (ignored):', e)
                continue

        now = time.time()
        if now - last_analyze < max(0.01, REFRESH_SECONDS * 0.6):
            continue
        last_analyze = now

        try:
            merged_rows, signals, reversals, iv_crush = analyze_window(window)
        except Exception as e:
            print('Analysis error (skipping this cycle):', e)
            continue

        # write outputs
        if WRITE_OUTPUT_JSON and not DRY_RUN:
            try:
                with open(OUT_AUTOTRADE, 'w') as fh:
                    json.dump(signals, fh, indent=2, default=str)
                with open(OUT_LATEST, 'w') as fh:
                    json.dump({'timestamp': str(pd.Timestamp.now()), 'merged': merged_rows, 'reversals': reversals, 'iv_crush': iv_crush}, fh, indent=2, default=str)
            except Exception as e:
                print('Warning: failed to write outputs:', e)

        # optional historical dump
        if HISTORICAL_DUMP:
            try:
                with open(HISTORICAL_DUMP, 'a') as fh:
                    fh.write(json.dumps({'ts': str(pd.Timestamp.now()), 'merged': merged_rows}) + '\n')
            except Exception:
                pass

        # print alerts
        if signals:
            for s in signals:
                msg = f"[SIGNAL] {s['time']} Strike {s['strike']} => {s['action']} (str={s['strength']:.2f}) reason={s['reason']}"
                print(msg)
                if ALERT_LOG:
                    with open(ALERT_LOG, 'a') as L:
                        L.write(msg + '\n')
        if reversals:
            for r in reversals:
                msg = f"[REVERSAL] Strike {r['strike']} drop {r['drop_pct']:.1f}% (peak {r['peak']} -> last {r['last']})"
                print(msg)
                if ALERT_LOG:
                    with open(ALERT_LOG, 'a') as L:
                        L.write(msg + '\n')
        if iv_crush:
            for e in iv_crush:
                msg = f"[IV_CRUSH] {e['time']} Strike {e['strike']} {e['side']} drop {e['drop_pct']:.1f}%"
                print(msg)
                if ALERT_LOG:
                    with open(ALERT_LOG, 'a') as L:
                        L.write(msg + '\n')

    print('Realtime engine stopped.')

# ---------------------- CLI ---------------------- #

def cli():
    p = argparse.ArgumentParser()
    p.add_argument('--file', '-f', dest='file', default=INPUT_FILE, help='Input file to tail')
    p.add_argument('--from-start', action='store_true', help='Start reading from start of file instead of EOF')
    p.add_argument('--no-write', action='store_true', help='Do not write output json files (dry run)')
    p.add_argument('--alert-log', dest='alert_log', default=ALERT_LOG, help='Path to append alert log')
    p.add_argument('--window', type=int, default=WINDOW_MINUTES, help='Sliding window minutes')
    args = p.parse_args()

    global WRITE_OUTPUT_JSON, START_FROM_END, DRY_RUN  # WINDOW_MINUTES set via globals() below
    WRITE_OUTPUT_JSON = True
    START_FROM_END = not args.from_start
    # set ALERT_LOG separately to avoid global conflict
    globals()['ALERT_LOG'] = args.alert_log
    globals()['WINDOW_MINUTES'] = int(args.window)
    DRY_RUN = bool(args.no_write)

    if not os.path.exists(args.file):
        print('ERROR: input file not found:', args.file)
        sys.exit(1)

    run_realtime(args.file, start_from_end=START_FROM_END)

if __name__ == '__main__':
    cli()


Realtime engine started. Watching: /root/.local/share/jupyter/runtime/kernel-3b3ea389-36a4-4fd9-bc51-b08e84c883db.json

Received exit signal. Shutting down...
Realtime engine stopped.


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import json
import pandas as pd
import numpy as np

# ---------------------------------------------------------
# LOAD DATA
# ---------------------------------------------------------
path = "last50.json"      # <-- change if needed
with open(path, "r", encoding="utf-8") as f:
    data = json.load(f)

df = pd.json_normalize(data)


# ---------------------------------------------------------
# HELPERS
# ---------------------------------------------------------
def to_bool(x):
    if pd.isna(x):
        return False
    if isinstance(x, bool):
        return x
    s = str(x).strip().lower()
    return s in ("true", "1", "yes", "y", "t")


def normalize(series):
    if series.max() == series.min():
        return series * 0
    return (series - series.min()) / (series.max() - series.min())


# ---------------------------------------------------------
# NORMALIZE BOOLEANS
# ---------------------------------------------------------
bool_cols = [c for c in df.columns if 'Is' in c or 'HighConviction' in c]
for c in bool_cols:
    df[c] = df[c].apply(to_bool)


# ---------------------------------------------------------
# NORMALIZE NUMERICS
# ---------------------------------------------------------
numeric_candidates = [
    'Current_CallScore','Current_PutScore','Next_CallScore','Next_PutScore',
    'SpotPrice','AvgPrice','Current_CallMoneyFlow','Current_PutMoneyFlow',
    'Next_CallMoneyFlow','Next_PutMoneyFlow','CallTTQ','PutTTQ',
    'Next_CallTTQ','Next_PutTTQ'
]

for c in numeric_candidates:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors='coerce').fillna(0.0)


# ---------------------------------------------------------
# CALCULATE recalc_strength
# ---------------------------------------------------------
df["recalc_strength"] = (
    df["Current_IsHighConvictionSignal"].astype(int) * 10
    + df["Current_CallScore"]
    + df["Current_PutScore"]
)


# ---------------------------------------------------------
# CALCULATE alt_strength
# ---------------------------------------------------------
def alt_strength(row):
    cs = row.get("Current_CallScore", 0)
    ps = row.get("Current_PutScore", 0)
    side = str(row.get("Current_BuySide") or "").upper()

    base = cs + ps
    if side == "CALL":
        base += 0.5 * cs
    elif side == "PUT":
        base += 0.5 * ps

    if row.get("Current_IsHighConvictionSignal"):
        base += 10

    return base

df["alt_strength"] = df.apply(alt_strength, axis=1)


# ---------------------------------------------------------
# AGREEMENT COUNT (count of CURRENT boolean signals)
# ---------------------------------------------------------
current_bool_cols = [
    c for c in df.columns
    if c.startswith("Current_") and df[c].dtype == bool
]

df["agreement_count"] = df[current_bool_cols].sum(axis=1)


# ---------------------------------------------------------
# FORWARD VALIDATION (damn-sure signals)
# ---------------------------------------------------------
df["forward_confirm"] = False
df["forward_confirm_call"] = False
df["forward_confirm_put"] = False

for i, row in df.iterrows():
    side = str(row.get("Current_BuySide") or "").upper()
    cond = True

    if side == "CALL":
        cond &= row.get("Next_CallScore", 0) >= row.get("Current_CallScore", 0)
        cond &= row.get("Next_IsHighConvictionSignal", False)
        cond &= row.get("Next_CallMoneyFlow", 0) >= row.get("Current_CallMoneyFlow", 0)

        if cond:
            df.at[i, "forward_confirm"] = True
            df.at[i, "forward_confirm_call"] = True

    elif side == "PUT":
        cond &= row.get("Next_PutScore", 0) >= row.get("Current_PutScore", 0)
        cond &= row.get("Next_IsHighConvictionSignal", False)
        cond &= row.get("Next_PutMoneyFlow", 0) >= row.get("Current_PutMoneyFlow", 0)

        if cond:
            df.at[i, "forward_confirm"] = True
            df.at[i, "forward_confirm_put"] = True


# ---------------------------------------------------------
# CORRELATION SUPPORT (numeric features supporting strength)
# ---------------------------------------------------------
numeric_cols = df.select_dtypes(include=[np.number]).columns
numeric_cols = [c for c in numeric_cols if c not in ("recalc_strength", "alt_strength")]

corrs = {}
for c in numeric_cols:
    try:
        corr_val = df["recalc_strength"].corr(df[c])
        corrs[c] = corr_val if not pd.isna(corr_val) else 0
    except:
        corrs[c] = 0

# use only moderately correlated columns
corr_support_cols = [c for c, v in corrs.items() if abs(v) >= 0.25]

df["corr_support_raw"] = 0.0
for c in corr_support_cols:
    col = df[c]
    if col.max() != col.min():
        norm = (col - col.min()) / (col.max() - col.min())
    else:
        norm = col * 0
    df["corr_support_raw"] += np.sign(corrs[c]) * norm * abs(corrs[c])


# ---------------------------------------------------------
# FINAL SCORE (WEIGHTED)
# ---------------------------------------------------------
df["norm_recalc"] = normalize(df["recalc_strength"])
df["norm_alt"] = normalize(df["alt_strength"])
df["norm_agreement"] = normalize(df["agreement_count"])
df["norm_corr"] = normalize(df["corr_support_raw"])
df["norm_forward"] = df["forward_confirm"].astype(int)

weights = {
    "recalc": 0.20,
    "alt": 0.15,
    "agreement": 0.15,
    "corr": 0.15,
    "forward": 0.35,
}

df["final_score"] = (
    df["norm_recalc"] * weights["recalc"]
    + df["norm_alt"] * weights["alt"]
    + df["norm_agreement"] * weights["agreement"]
    + df["norm_corr"] * weights["corr"]
    + df["norm_forward"] * weights["forward"]
)


# ---------------------------------------------------------
# EXTRACT FINAL DAMN–SURE SIGNALS
# ---------------------------------------------------------
confirmed = df[df["forward_confirm"] == True].copy()
confirmed = confirmed.sort_values("final_score", ascending=False)

cols_to_show = [
    "LTT", "SpotPrice", "Current_BuySide",
    "Current_IsHighConvictionSignal",
    "Current_CallScore", "Current_PutScore",
    "agreement_count", "forward_confirm",
    "forward_confirm_call", "forward_confirm_put",
    "recalc_strength", "alt_strength",
    "final_score"
]

print("\n\n===== FORWARD-VALIDATED (DAMN-SURE) SIGNALS =====\n")
print(confirmed[cols_to_show].to_string(index=False))


# ---------------------------------------------------------
# EXPORT CSV
# ---------------------------------------------------------
confirmed.to_csv("confirmed_damn_sure_signals.csv", index=False)
print("\nSaved: confirmed_damn_sure_signals.csv\n")


  c /= stddev[:, None]
  c /= stddev[None, :]




===== FORWARD-VALIDATED (DAMN-SURE) SIGNALS =====

             LTT  SpotPrice Current_BuySide  Current_IsHighConvictionSignal  Current_CallScore  Current_PutScore  agreement_count  forward_confirm  forward_confirm_call  forward_confirm_put  recalc_strength  alt_strength  final_score
14-11-2025 14:59   58198.70             PUT                            True                  4                 7                2             True                 False                 True               21          24.5     0.872779
14-11-2025 14:59   58191.10             PUT                           False                  4                 7                1             True                 False                 True               11          14.5     0.547162
14-11-2025 15:00   58282.95             PUT                           False                  4                 6                1             True                 False                 True               10          13.0     0.462823
14-11-2025 