In [9]:
import sys, platform
print("Python:", sys.version)
print("Arch:", platform.machine(), "| OS:", platform.system())

Python: 3.12.4 | packaged by Anaconda, Inc. | (main, Jun 18 2024, 15:03:56) [MSC v.1929 64 bit (AMD64)]
Arch: AMD64 | OS: Windows


In [1]:
pip install blpapi

Note: you may need to restart the kernel to use updated packages.


In [5]:
%pip install pandas numpy scipy matplotlib

Collecting pandas
  Downloading pandas-2.3.2-cp310-cp310-win_amd64.whl.metadata (19 kB)
Collecting numpy
  Downloading numpy-2.2.6-cp310-cp310-win_amd64.whl.metadata (60 kB)
Collecting scipy
  Downloading scipy-1.15.3-cp310-cp310-win_amd64.whl.metadata (60 kB)
Collecting matplotlib
  Downloading matplotlib-3.10.6-cp310-cp310-win_amd64.whl.metadata (11 kB)
Collecting pytz>=2020.1 (from pandas)
  Downloading pytz-2025.2-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas)
  Downloading tzdata-2025.2-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting contourpy>=1.0.1 (from matplotlib)
  Downloading contourpy-1.3.2-cp310-cp310-win_amd64.whl.metadata (5.5 kB)
Collecting cycler>=0.10 (from matplotlib)
  Downloading cycler-0.12.1-py3-none-any.whl.metadata (3.8 kB)
Collecting fonttools>=4.22.0 (from matplotlib)
  Downloading fonttools-4.60.0-cp310-cp310-win_amd64.whl.metadata (113 kB)
Collecting kiwisolver>=1.3.1 (from matplotlib)
  Downloading kiwisolver-1.4.9-cp310-cp



In [7]:
# --- Bloomberg Options + Stock Data Loader (fully functional with blpapi) ---
# Requirements: Bloomberg Terminal running on this machine (Desktop API) and `pip install blpapi`
# Optional: pip install pandas numpy scipy

import blpapi
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from scipy.stats import norm

# ------------------------
# Session helpers
# ------------------------
def _start_bbg_session(host="localhost", port=8194):
    """Start a Bloomberg API session (desktop API defaults)."""
    session_options = blpapi.SessionOptions()
    session_options.setServerHost(host)
    session_options.setServerPort(port)

    session = blpapi.Session(session_options)
    if not session.start():
        raise RuntimeError("Failed to start Bloomberg session.")
    if not session.openService("//blp/refdata"):
        raise RuntimeError("Failed to open //blp/refdata.")
    return session, session.getService("//blp/refdata")

def _send_request_sync(session, request, timeout_ms=120_000):
    """Send a request and synchronously collect all response messages."""
    session.sendRequest(request)
    out_msgs = []
    while True:
        ev = session.nextEvent(timeout_ms)
        for msg in ev:
            out_msgs.append(msg)
        if ev.eventType() == blpapi.Event.RESPONSE:
            break
    return out_msgs

# ------------------------
# Stock (BDH – HistoricalDataRequest)
# ------------------------
def get_equity_history(symbol_bbg: str,
                       start_date: str,
                       end_date: str,
                       fields=("PX_LAST",),
                       periodicity="DAILY") -> pd.DataFrame:
    """
    Pull historical data for an equity.
    symbol_bbg: e.g., 'AAPL US Equity'
    dates as 'YYYYMMDD'
    """
    session, ref = _start_bbg_session()
    try:
        req = ref.createRequest("HistoricalDataRequest")
        req.getElement("securities").appendValue(symbol_bbg)
        for f in fields:
            req.getElement("fields").appendValue(f)
        req.set("startDate", start_date)
        req.set("endDate", end_date)
        req.set("periodicitySelection", periodicity)

        msgs = _send_request_sync(session, req)

        rows = []
        for msg in msgs:
            if not msg.hasElement("securityData"): 
                continue
            sd = msg.getElement("securityData")
            fd = sd.getElement("fieldData")
            for i in range(fd.numValues()):
                row = fd.getValueAsElement(i)
                d = row.getElementAsDatetime("date")
                out = {"date": pd.to_datetime(d.strftime("%Y-%m-%d"))}
                for f in fields:
                    if row.hasElement(f):
                        # numbers might be NaN; handle gracefully
                        try:
                            out[f] = row.getElementAsFloat64(f)
                        except Exception:
                            out[f] = np.nan
                rows.append(out)

        df = pd.DataFrame(rows).sort_values("date").set_index("date")
        return df
    finally:
        session.stop()

# ------------------------
# Options chain tickers (BDS – bulk field CHAIN_TICKERS)
# Then detail for each option (BDP – ReferenceDataRequest)
# ------------------------
def get_option_chain(symbol_bbg: str,
                     expiry: str | None = None,
                     moneyness: str | None = None,
                     put_call: str | None = None,
                     max_points: int | None = None) -> pd.DataFrame:
    """
    Retrieve an option chain and key fields for each option using Bloomberg API.

    Parameters
    ----------
    symbol_bbg : str
        Underlying, e.g. 'AAPL US Equity' or 'SPX Index'
    expiry : str | None
        Override expiry to a specific date, format 'YYYYMMDD' (e.g., '20251219').
        (Override field: CHAIN_EXP_DT_OVRD)
    moneyness : str | None
        Moneyness band like '90%-110%' around spot (Override: CHAIN_STRIKE_PX_OVRD).
        You can also pass absolute strikes like '100,105,110'.
    put_call : str | None
        'C' for calls, 'P' for puts (Override: CHAIN_PUT_CALL_TYPE_OVRD).
        Leave None for both.
    max_points : int | None
        Limit number of chain points returned (Override: CHAIN_POINTS_LIMIT)

    Returns
    -------
    DataFrame with columns:
        - security
        - OPT_STRIKE_PX, OPT_EXPIRE_DT, OPT_PUT_CALL
        - PX_BID, PX_ASK, MID, IMPLIED_VOLATILITY, DELTA, GAMMA, THETA, VEGA
        - OPT_UNDL_PX (underlying), TICKER / UNIQUE_ID if available
    """
    session, ref = _start_bbg_session()
    try:
        # 1) BDS bulk request for chain tickers
        bds_req = ref.createRequest("ReferenceDataRequest")
        bds_req.getElement("securities").appendValue(symbol_bbg)
        bds_req.getElement("fields").appendValue("CHAIN_TICKERS")  # bulk field

        # Overrides (see FLDS <GO> on CHAIN_TICKERS for choices)
        overrides = bds_req.getElement("overrides")
        if expiry:
            ov = overrides.appendElement()
            ov.setElement("fieldId", "CHAIN_EXP_DT_OVRD")
            ov.setElement("value", expiry)
        if moneyness:
            ov = overrides.appendElement()
            ov.setElement("fieldId", "CHAIN_STRIKE_PX_OVRD")
            ov.setElement("value", moneyness)
        if put_call in {"C", "P"}:
            ov = overrides.appendElement()
            ov.setElement("fieldId", "CHAIN_PUT_CALL_TYPE_OVRD")
            ov.setElement("value", put_call)
        if max_points:
            ov = overrides.appendElement()
            ov.setElement("fieldId", "CHAIN_POINTS_LIMIT")
            ov.setElement("value", str(max_points))

        msgs = _send_request_sync(session, bds_req)

        # Parse bulk field CHAIN_TICKERS
        option_securities = []
        for msg in msgs:
            if not msg.hasElement("securityData"):
                continue
            sdata_array = msg.getElement("securityData")
            for i in range(sdata_array.numValues()):
                sdata = sdata_array.getValueAsElement(i)
                if not sdata.hasElement("fieldData"):
                    continue
                fdata = sdata.getElement("fieldData")
                if fdata.hasElement("CHAIN_TICKERS"):
                    bulk = fdata.getElement("CHAIN_TICKERS")
                    for j in range(bulk.numValues()):
                        row = bulk.getValueAsElement(j)
                        # rows typically contain a 'Ticker' element
                        if row.hasElement("Ticker"):
                            option_securities.append(row.getElementAsString("Ticker"))

        if not option_securities:
            return pd.DataFrame(columns=["security"])

        # 2) BDP reference request for each option ticker (bid/ask/iv/greeks/etc.)
        fields = [
            "PX_BID", "PX_ASK",
            "IMPLIED_VOLATILITY",
            "DELTA", "GAMMA", "THETA", "VEGA",
            "OPT_STRIKE_PX", "OPT_EXPIRE_DT", "OPT_PUT_CALL",
            "OPT_UNDL_PX"
        ]
        bdp_req = ref.createRequest("ReferenceDataRequest")
        secs_el = bdp_req.getElement("securities")
        for sec in option_securities:
            secs_el.appendValue(sec)
        flds_el = bdp_req.getElement("fields")
        for f in fields:
            flds_el.appendValue(f)

        msgs2 = _send_request_sync(session, bdp_req)

        # Parse into DataFrame
        out = []
        for msg in msgs2:
            if not msg.hasElement("securityData"):
                continue
            sdata_array = msg.getElement("securityData")
            for i in range(sdata_array.numValues()):
                sdata = sdata_array.getValueAsElement(i)
                sec = sdata.getElementAsString("security")
                fdata = sdata.getElement("fieldData")
                row = {"security": sec}
                for f in fields:
                    if fdata.hasElement(f):
                        try:
                            row[f] = fdata.getElementAsFloat64(f)
                        except Exception:
                            try:
                                # some fields are strings/dates
                                row[f] = fdata.getElementAsString(f)
                            except Exception:
                                row[f] = np.nan
                # nice to have: mid
                bid = row.get("PX_BID", np.nan)
                ask = row.get("PX_ASK", np.nan)
                row["MID"] = np.nan if (pd.isna(bid) or pd.isna(ask)) else (bid + ask) / 2.0
                out.append(row)

        df = pd.DataFrame(out)
        # standardize types
        if "OPT_EXPIRE_DT" in df.columns:
            df["OPT_EXPIRE_DT"] = pd.to_datetime(df["OPT_EXPIRE_DT"], errors="coerce")
        if "OPT_PUT_CALL" in df.columns:
            df["OPT_PUT_CALL"] = df["OPT_PUT_CALL"].astype(str).str.upper()
        if "OPT_STRIKE_PX" in df.columns:
            df["OPT_STRIKE_PX"] = pd.to_numeric(df["OPT_STRIKE_PX"], errors="coerce")
        return df.sort_values(["OPT_EXPIRE_DT", "OPT_STRIKE_PX", "OPT_PUT_CALL", "security"]).reset_index(drop=True)
    finally:
        session.stop()

# ------------------------
# Black–Scholes (for your mispricing logic)
# ------------------------
def black_scholes(S, K, T, r, sigma, option_type="call"):
    d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    if option_type.lower() == "call":
        return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)
    else:
        return K * np.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1)

# ------------------------
# Example usage (uncomment to run on a terminal PC)
# ------------------------
# underlying = "AAPL US Equity"
# end = datetime.today()
# start = end - timedelta(days=120)
# hist = get_equity_history(underlying, start.strftime("%Y%m%d"), end.strftime("%Y%m%d"),
#                           fields=("PX_LAST",))
# chain = get_option_chain(
#     underlying,
#     expiry=None,             # e.g., '20251219' to target a specific expiry
#     moneyness="90%-110%",    # or '100,105,110' for absolute strikes
#     put_call=None,           # 'C' for calls only, 'P' for puts only, None for both
#     max_points=500           # optional cap
# )
# print(hist.tail())
# print(chain.head())

In [9]:
# ===================== Add a publication-ready plot to the Bloomberg backtest =====================
# Requirements:
#   - matplotlib (pip install matplotlib)
#   - Works with the res dict returned by backtest_parity(...) defined earlier

import matplotlib.pyplot as plt
import pandas as pd
from pathlib import Path
from datetime import datetime

def plot_backtest(res, title="Parity Arbitrage Backtest", outfile=None):
    """
    Create a clean equity curve plot from the backtest results and save to PNG.
    - res: dict returned by backtest_parity(...)
    - title: chart title
    - outfile: path to save PNG; if None, saves to ./backtest_<timestamp>.png

    The curve is daily **cumulative realized PnL** (units), so it’s ready for reporting.
    """
    # Build a daily cumulative realized PnL series from trade exits
    uh = res["underlying_history"].copy()
    tr = res["trades"].copy() if not res["trades"].empty else pd.DataFrame(columns=["exit_date","pnl"])

    # daily index over backtest window
    daily = pd.Series(0.0, index=uh.index)

    # add realized PnL on exit dates
    if not tr.empty:
        tr = tr.set_index("exit_date").sort_index()
        for dt, row in tr.iterrows():
            if dt in daily.index:
                daily.loc[dt] += float(row["pnl"])

    cum_pnl = daily.cumsum()  # cumulative realized pnl (units)

    # set output file
    if outfile is None:
        stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        outfile = Path(f"./backtest_{stamp}.png")
    else:
        outfile = Path(outfile)

    # ---- Plot (single figure, no custom colors/styles) ----
    plt.figure(figsize=(10, 5))
    plt.plot(cum_pnl.index, cum_pnl.values, linewidth=2)
    plt.title(title)
    plt.xlabel("Date")
    plt.ylabel("Cumulative Realized PnL (units)")
    plt.grid(True, linewidth=0.5, alpha=0.6)
    plt.tight_layout()
    plt.savefig(outfile, dpi=200)
    plt.close()

    # Small summary for the report header if you need it
    summary = res["summary"]
    print("Saved chart to:", outfile)
    print(
        f"Summary → Total PnL: {summary.get('total_pnl_units', 0):.4f} | "
        f"Trades: {summary.get('num_trades', 0)} | "
        f"Hit Ratio: {summary.get('hit_ratio', float('nan'))} | "
        f"Avg Hold (days): {summary.get('avg_hold_days', float('nan'))}"
    )

# -------------------- Example: run backtest + plot --------------------
# NOTE: Uncomment the two lines below and run on a Bloomberg-enabled machine (with the earlier backtester code loaded).

# res = backtest_parity(
#     underlying="AAPL US Equity", start="20240101", end="20240930",
#     min_dte=25, max_dte=45, gap_threshold=0.10
# )
# plot_backtest(res, title="AAPL Parity Arbitrage Backtest (2024YTD)", outfile="aapl_parity_backtest.png")

### 🇧🇷 Brazilian Equities — Bloomberg Tickers

| Company               | B3 Ticker | Bloomberg Ticker        |
|------------------------|------------|----------------------------|
| Vale                   | VALE3      | `VALE3 BZ Equity`          |
| Petrobras (PN)         | PETR4      | `PETR4 BZ Equity`          |
| Embraer                 | EMBR3      | `EMBR3 BZ Equity`          |
| Banco do Brasil         | BBAS3      | `BBAS3 BZ Equity`          |
| Bradesco (PN)            | BBDC4      | `BBDC4 BZ Equity`          |
| Itaú Unibanco (PN)       | ITUB4      | `ITUB4 BZ Equity`          |
| Ambev                    | ABEV3      | `ABEV3 BZ Equity`          |
| Magazine Luiza           | MGLU3      | `MGLU3 BZ Equity`          |
| Suzano                    | SUZB3      | `SUZB3 BZ Equity`          |
| Gerdau                     | GGBR4      | `GGBR4 BZ Equity`          |
| Eletrobras                   | ELET3      | `ELET3 BZ Equity`          |
| Weg                           | WEGE3      | `WEGE3 BZ Equity`          |
| Localiza                       | RENT3      | `RENT3 BZ Equity`          |
| B3 (stock exchange)                | B3SA3      | `B3SA3 BZ Equity`          |
| Lojas Renner                                   | LREN3      | `LREN3 BZ Equity`          |


## New Code

In [19]:
# ======================= ALL-IN-ONE: Bloomberg loader + backtest + plot + example run =======================
# Requirements in this kernel: blpapi, pandas, numpy, scipy, matplotlib
# Run on a Bloomberg Terminal PC (Desktop API; localhost:8194)

import blpapi, pandas as pd, numpy as np, matplotlib.pyplot as plt
from scipy.stats import norm
from datetime import datetime, timedelta
from pathlib import Path

# -------------------- Bloomberg session helpers --------------------
def _start_bbg_session(host="localhost", port=8194):
    so = blpapi.SessionOptions()
    so.setServerHost(host); so.setServerPort(port)
    s = blpapi.Session(so)
    if not s.start(): raise RuntimeError("Failed to start Bloomberg session.")
    if not s.openService("//blp/refdata"): raise RuntimeError("Failed to open //blp/refdata.")
    return s, s.getService("//blp/refdata")

def _send_req(session, req, timeout_ms=120_000):
    session.sendRequest(req)
    msgs = []
    while True:
        ev = session.nextEvent(timeout_ms)
        for m in ev: msgs.append(m)
        if ev.eventType() == blpapi.Event.RESPONSE:
            break
    return msgs

# -------------------- BDH: historical for any security (e.g., equity, rates) --------------------
def get_bdh_equity(symbol_bbg, start_yyyymmdd, end_yyyymmdd, fields=("PX_LAST",), periodicity="DAILY"):
    s, ref = _start_bbg_session()
    try:
        r = ref.createRequest("HistoricalDataRequest")
        r.getElement("securities").appendValue(symbol_bbg)
        fe = r.getElement("fields")
        for f in fields: fe.appendValue(f)
        r.set("startDate", start_yyyymmdd)
        r.set("endDate", end_yyyymmdd)
        r.set("periodicitySelection", periodicity)
        msgs = _send_req(s, r)

        rows = []
        for msg in msgs:
            if not msg.hasElement("securityData"): continue
            sd = msg.getElement("securityData")
            fd = sd.getElement("fieldData")
            for i in range(fd.numValues()):
                row = fd.getValueAsElement(i)
                d = row.getElementAsDatetime("date")
                out = {"date": pd.to_datetime(d.strftime("%Y-%m-%d"))}
                for f in fields:
                    out[f] = row.getElementAsFloat64(f) if row.hasElement(f) else np.nan
                rows.append(out)
        df = pd.DataFrame(rows).sort_values("date").set_index("date")
        return df
    finally:
        s.stop()

def get_riskfree_series(start_yyyymmdd, end_yyyymmdd, ticker="USGG3M Index"):
    """Risk-free proxy. For Brazil you can try 'BZSELIC Index' if available on your terminal."""
    rf = get_bdh_equity(ticker, start_yyyymmdd, end_yyyymmdd, fields=("PX_LAST",))
    rf = rf.rename(columns={"PX_LAST": "RF_YLD_PCT"})
    rf["rf_daily"] = (rf["RF_YLD_PCT"]/100.0)/365.0  # simple daily approx
    return rf[["rf_daily"]]

# -------------------- Option chain (BDS CHAIN_TICKERS + BDP per-option fields) --------------------
def get_chain_for_day(underlying_bbg, spot_px, trade_date, target_min_dte=25, target_max_dte=45, max_points=600):
    s, ref = _start_bbg_session()
    try:
        expiries = []
        base = trade_date + timedelta(days=(target_min_dte + target_max_dte)//2)
        for k in range(-21, 22, 7):  # probe +/- 3 weeks around the target month
            expiries.append((base + timedelta(days=k)).strftime("%Y%m%d"))
        expiries = list(dict.fromkeys(expiries))

        best_rows = []
        for exp in expiries:
            bds = ref.createRequest("ReferenceDataRequest")
            bds.getElement("securities").appendValue(underlying_bbg)
            bds.getElement("fields").appendValue("CHAIN_TICKERS")
            ov = bds.getElement("overrides")
            o1 = ov.appendElement(); o1.setElement("fieldId","CHAIN_EXP_DT_OVRD");   o1.setElement("value",exp)
            o2 = ov.appendElement(); o2.setElement("fieldId","CHAIN_POINTS_LIMIT"); o2.setElement("value",str(max_points))
            msgs = _send_req(s, bds)

            tickers = []
            for msg in msgs:
                if not msg.hasElement("securityData"): continue
                sdata = msg.getElement("securityData").getValueAsElement(0)
                fdata = sdata.getElement("fieldData") if sdata.hasElement("fieldData") else None
                if fdata is None or not fdata.hasElement("CHAIN_TICKERS"): continue
                bulk = fdata.getElement("CHAIN_TICKERS")
                for i in range(bulk.numValues()):
                    e = bulk.getValueAsElement(i)
                    if e.hasElement("Ticker"):
                        tickers.append(e.getElementAsString("Ticker"))
            if not tickers:
                continue

            # BDP on chain
            fields = ["PX_BID","PX_ASK","IMPLIED_VOLATILITY","DELTA","GAMMA","THETA","VEGA",
                      "OPT_STRIKE_PX","OPT_EXPIRE_DT","OPT_PUT_CALL","OPT_UNDL_PX"]
            bdp = ref.createRequest("ReferenceDataRequest")
            se = bdp.getElement("securities"); [se.appendValue(t) for t in tickers]
            fe = bdp.getElement("fields");     [fe.appendValue(f) for f in fields]
            msgs2 = _send_req(s, bdp)

            rows = []
            for msg in msgs2:
                if not msg.hasElement("securityData"): continue
                arr = msg.getElement("securityData")
                for i in range(arr.numValues()):
                    e = arr.getValueAsElement(i)
                    sec = e.getElementAsString("security")
                    fdata = e.getElement("fieldData")
                    row = {"security": sec}
                    for f in fields:
                        if fdata.hasElement(f):
                            try: row[f] = fdata.getElementAsFloat64(f)
                            except Exception:
                                try: row[f] = fdata.getElementAsString(f)
                                except Exception: row[f] = np.nan
                    rows.append(row)
            df = pd.DataFrame(rows)
            if df.empty:
                continue
            df["OPT_EXPIRE_DT"] = pd.to_datetime(df["OPT_EXPIRE_DT"], errors="coerce")
            df["OPT_STRIKE_PX"] = pd.to_numeric(df["OPT_STRIKE_PX"], errors="coerce")
            df["OPT_PUT_CALL"]  = df["OPT_PUT_CALL"].astype(str).str.upper()
            df["DTE"] = (df["OPT_EXPIRE_DT"] - pd.Timestamp(trade_date)).dt.days
            df = df[(df["DTE"]>=target_min_dte) & (df["DTE"]<=target_max_dte)].copy()
            if df.empty:
                continue
            df["MID"] = (pd.to_numeric(df["PX_BID"], errors="coerce") + pd.to_numeric(df["PX_ASK"], errors="coerce"))/2.0
            df["spot_for_day"] = spot_px
            best_rows.append(df)

        if not best_rows:
            return pd.DataFrame()
        chain = pd.concat(best_rows, ignore_index=True).dropna(subset=["OPT_STRIKE_PX","MID","OPT_PUT_CALL","OPT_EXPIRE_DT"])
        # choose ATM call+put (closest strike to spot) among the kept expiries
        chain["atm_dist"] = (chain["OPT_STRIKE_PX"] - spot_px).abs()
        atm_calls = chain[chain["OPT_PUT_CALL"]=="C"].sort_values(["atm_dist","DTE"]).groupby("OPT_EXPIRE_DT").head(1)
        atm_puts  = chain[chain["OPT_PUT_CALL"]=="P"].sort_values(["atm_dist","DTE"]).groupby("OPT_EXPIRE_DT").head(1)
        pairs = atm_calls.merge(atm_puts, on="OPT_EXPIRE_DT", suffixes=("_C","_P"))
        if pairs.empty:
            return pd.DataFrame()
        pairs["combo_dist"] = pairs["atm_dist_C"] + pairs["atm_dist_P"]
        best = pairs.sort_values(["combo_dist","DTE_C"]).head(1)
        return pd.DataFrame([{
            "expiry":   best["OPT_EXPIRE_DT"].iloc[0],
            "DTE":      int(best["DTE_C"].iloc[0]),
            "call_tkr": best["security_C"].iloc[0],
            "put_tkr":  best["security_P"].iloc[0],
            "K":        float(best["OPT_STRIKE_PX_C"].iloc[0]),
            "C_mid":    float(best["MID_C"].iloc[0]),
            "P_mid":    float(best["MID_P"].iloc[0]),
            "IV_call":  float(best["IMPLIED_VOLATILITY_C"].iloc[0]) if pd.notna(best["IMPLIED_VOLATILITY_C"].iloc[0]) else np.nan,
            "IV_put":   float(best["IMPLIED_VOLATILITY_P"].iloc[0]) if pd.notna(best["IMPLIED_VOLATILITY_P"].iloc[0]) else np.nan
        }])
    finally:
        s.stop()

# -------------------- Quant helpers --------------------
def black_scholes_price(S, K, T, r, sigma, cp):
    d1 = (np.log(S/K) + (r + 0.5*sigma*sigma)*T)/(sigma*np.sqrt(T))
    d2 = d1 - sigma*np.sqrt(T)
    return (S*norm.cdf(d1) - K*np.exp(-r*T)*norm.cdf(d2)) if cp.upper()=="C" else (K*np.exp(-r*T)*norm.cdf(-d2) - S*norm.cdf(-d1))

def realized_vol_from_close(close):
    ret = np.log(close/close.shift(1)).dropna()
    return ret.rolling(21).std()*np.sqrt(252)

# -------------------- The backtest --------------------
def backtest_parity(
    underlying="AAPL US Equity",
    start="20240101",
    end  ="20240930",
    min_dte=25, max_dte=45,
    gap_threshold=0.10,
    max_hold_days=60,
    riskfree_ticker="USGG3M Index"  # for BRL try "BZSELIC Index" if available
):
    px = get_bdh_equity(underlying, start, end, fields=("PX_LAST",))
    rf = get_riskfree_series(start, end, ticker=riskfree_ticker)
    df = px.join(rf, how="left").fillna(method="ffill")
    df["rv_21"] = realized_vol_from_close(df["PX_LAST"])

    trades, equity_curve = [], []
    open_pos = None

    for dt, row in df.iterrows():
        S = row["PX_LAST"]
        rf_day = float(row["rf_daily"]) if pd.notna(row.get("rf_daily", np.nan)) else 0.0

        if open_pos:
            open_pos["days_held"] += 1
            open_pos["cash"] *= (1.0 + rf_day)
            equity_curve.append({"date": dt, "equity": open_pos["equity_mark"]})
        else:
            equity_curve.append({"date": dt, "equity": 0.0})

        if open_pos:
            if dt >= open_pos["expiry"] or open_pos["days_held"] >= max_hold_days:
                pnl = open_pos["gap_signed"]  # simple convergence realization
                trades.append({**open_pos, "exit_date": dt, "pnl": pnl})
                open_pos = None
            continue

        chain = get_chain_for_day(underlying, S, dt, min_dte, max_dte)
        if chain.empty:
            continue

        K    = chain["K"].iloc[0]
        Cmid = chain["C_mid"].iloc[0]
        Pmid = chain["P_mid"].iloc[0]
        expiry = chain["expiry"].iloc[0]
        dte   = int(chain["DTE"].iloc[0])
        T     = dte/365.0

        r_ann = df.loc[:dt].tail(1)["rf_daily"].iloc[0]*365.0 if "rf_daily" in df.columns else 0.0
        dfac  = np.exp(-r_ann*T)
        gap   = (Cmid - Pmid) - (S - K*dfac)

        if abs(gap) < gap_threshold:
            continue

        direction = "short_call_long_put_shortS_longB" if gap > 0 else "long_call_short_put_longS_shortB"
        open_pos = {
            "entry_date": dt, "expiry": expiry, "days_held": 0,
            "underlying": underlying,
            "S0": S, "K": K, "C_mid0": Cmid, "P_mid0": Pmid,
            "r_ann": r_ann, "T": T, "dte": dte,
            "gap": float(gap), "gap_signed": float(np.sign(gap)*abs(gap)),
            "direction": direction,
            "cash": K*dfac if gap>0 else -K*dfac,
            "equity_mark": float(np.sign(gap)*abs(gap))
        }

    eq = pd.DataFrame(equity_curve).set_index("date")
    tr = pd.DataFrame(trades)

    if not tr.empty:
        total_pnl = tr["pnl"].sum()
        hitrate   = (tr["pnl"]>0).mean()
        avg_hold  = tr["days_held"].mean()
    else:
        total_pnl = 0.0; hitrate = np.nan; avg_hold = np.nan

    return {
        "underlying_history": df,
        "equity_curve": eq,
        "trades": tr,
        "summary": {
            "total_pnl_units": float(total_pnl),
            "num_trades": int(len(tr)),
            "hit_ratio": float(hitrate) if pd.notna(hitrate) else None,
            "avg_hold_days": float(avg_hold) if pd.notna(avg_hold) else None
        }
    }

# -------------------- Plot helper (saves PNG) --------------------
def plot_backtest(res, title="Parity Arbitrage Backtest", outfile=None):
    uh = res["underlying_history"].copy()
    tr = res["trades"].copy() if not res["trades"].empty else pd.DataFrame(columns=["exit_date","pnl"])
    daily = pd.Series(0.0, index=uh.index)
    if not tr.empty:
        tr = tr.set_index("exit_date").sortindex() if hasattr(tr, "sortindex") else tr.set_index("exit_date").sort_index()
        for dt, row in tr.iterrows():
            if dt in daily.index:
                daily.loc[dt] += float(row["pnl"])
    cum_pnl = daily.cumsum()
    if outfile is None:
        outfile = Path(f"./backtest_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")

    plt.figure(figsize=(10,5))
    plt.plot(cum_pnl.index, cum_pnl.values, linewidth=2)
    plt.title(title); plt.xlabel("Date"); plt.ylabel("Cumulative Realized PnL (units)")
    plt.grid(True, linewidth=0.5, alpha=0.6); plt.tight_layout()
    plt.savefig(outfile, dpi=200); plt.close()
    print("Saved chart to:", outfile)

# -------------------- Example run (edit ticker/dates and run the cell) --------------------
underlying = "VALE3 BZ Equity"   # ex: "PETR4 BZ Equity", "AAPL US Equity"
start      = "20240101"
end        = datetime.today().strftime("%Y%m%d")
min_dte, max_dte = 25, 45
gap_threshold    = 0.10

res = backtest_parity(
    underlying=underlying,
    start=start,
    end=end,
    min_dte=min_dte,
    max_dte=max_dte,
    gap_threshold=gap_threshold,
    riskfree_ticker="BZSELIC Index"  # for BRL, try "BZSELIC Index" if available
)
print("Summary:", res["summary"])
print("\nFirst trades:\n", res["trades"].head())

outfile = f"{underlying.replace(' ','_')}_parity_backtest.png"
plot_backtest(res, title=f"{underlying} Parity Arbitrage Backtest", outfile=outfile)
print(f"Chart saved to: {outfile}")
# ===========================================================================================================#

AttributeError: 'Element' object has no attribute 'getElementAsFloat64'

In [25]:
# ======================= ALL-IN-ONE (FIXED): Bloomberg loader + backtest + plot + example run =======================
# Requirements in this kernel: blpapi, pandas, numpy, scipy, matplotlib
# Run on a Bloomberg Terminal PC (Desktop API; localhost:8194)

import blpapi, pandas as pd, numpy as np, matplotlib.pyplot as plt
from scipy.stats import norm
from datetime import datetime, timedelta
from pathlib import Path

# -------------------- Bloomberg session helpers --------------------
def _start_bbg_session(host="localhost", port=8194):
    so = blpapi.SessionOptions()
    so.setServerHost(host); so.setServerPort(port)
    s = blpapi.Session(so)
    if not s.start(): raise RuntimeError("Failed to start Bloomberg session.")
    if not s.openService("//blp/refdata"): raise RuntimeError("Failed to open //blp/refdata.")
    return s, s.getService("//blp/refdata")

def _send_req(session, req, timeout_ms=120_000):
    session.sendRequest(req)
    msgs = []
    while True:
        ev = session.nextEvent(timeout_ms)
        for m in ev: msgs.append(m)
        if ev.eventType() == blpapi.Event.RESPONSE:
            break
    return msgs

# -------------------- Safe extractor for blpapi.Element --------------------
def _blp_get(parent_el: blpapi.Element, field_name: str):
    """
    Safely extract a field value from a blpapi Element (parent_el).
    Returns np.nan if absent. Converts blpapi.Datetime to pandas.Timestamp.
    """
    if not parent_el.hasElement(field_name):
        return np.nan
    sub = parent_el.getElement(field_name)

    # Try numeric first
    for meth in ("getValueAsFloat64", "getValueAsInt64", "getValueAsInteger"):
        try:
            return getattr(sub, meth)()
        except Exception:
            pass

    # Bool / Datetime / String
    for meth in ("getValueAsBool", "getValueAsDatetime", "getValueAsString"):
        try:
            val = getattr(sub, meth)()
            if isinstance(val, blpapi.Datetime):
                # Convert Bloomberg Datetime to pandas Timestamp (date-only)
                return pd.to_datetime(val.strftime("%Y-%m-%d"))
            return val
        except Exception:
            pass

    # Final fallback
    try:
        return sub.getValue()
    except Exception:
        return np.nan

# -------------------- BDH: historical for any security (e.g., equity, rates) --------------------
def get_bdh_equity(symbol_bbg, start_yyyymmdd, end_yyyymmdd, fields=("PX_LAST",), periodicity="DAILY"):
    s, ref = _start_bbg_session()
    try:
        r = ref.createRequest("HistoricalDataRequest")
        r.getElement("securities").appendValue(symbol_bbg)
        fe = r.getElement("fields")
        for f in fields: fe.appendValue(f)
        r.set("startDate", start_yyyymmdd)
        r.set("endDate", end_yyyymmdd)
        r.set("periodicitySelection", periodicity)
        msgs = _send_req(s, r)

        rows = []
        for msg in msgs:
            if not msg.hasElement("securityData"): continue
            sd = msg.getElement("securityData")
            if not sd.hasElement("fieldData"): continue
            fd = sd.getElement("fieldData")
            for i in range(fd.numValues()):
                row_el = fd.getValueAsElement(i)
                d = row_el.getElementAsDatetime("date")
                out = {"date": pd.to_datetime(d.strftime("%Y-%m-%d"))}
                for f in fields:
                    val = _blp_get(row_el, f)
                    out[f] = val if val is not None else np.nan
                rows.append(out)

        df = pd.DataFrame(rows)
        if df.empty:
            return pd.DataFrame(columns=list(fields)).astype(float)
        df = df.sort_values("date").set_index("date")
        # best effort: coerce numerics
        for f in fields:
            df[f] = pd.to_numeric(df[f], errors="ignore")
        return df
    finally:
        s.stop()

# -------------------- Helper: parse BDP messages to DataFrame --------------------
def _parse_bdp_messages_to_df(msgs, fields):
    rows = []
    for msg in msgs:
        if not msg.hasElement("securityData"):
            continue
        sdata = msg.getElement("securityData")
        for i in range(sdata.numValues()):
            e = sdata.getValueAsElement(i)
            sec = e.getElementAsString("security")
            if not e.hasElement("fieldData"):
                continue
            fdata = e.getElement("fieldData")
            row = {"security": sec}
            for f in fields:
                val = _blp_get(fdata, f)
                row[f] = val if val is not None else np.nan
            rows.append(row)
    return pd.DataFrame(rows)

# -------------------- Option chain (BDS CHAIN_TICKERS + BDP per-option fields) --------------------
def get_chain_for_day(underlying_bbg, spot_px, trade_date, target_min_dte=25, target_max_dte=45, max_points=600):
    s, ref = _start_bbg_session()
    try:
        # Probe expiries around ~1 month
        expiries = []
        base = trade_date + timedelta(days=(target_min_dte + target_max_dte)//2)
        for k in range(-21, 22, 7):  # ±3 weeks
            expiries.append((base + timedelta(days=k)).strftime("%Y%m%d"))
        expiries = list(dict.fromkeys(expiries))

        best_rows = []
        for exp in expiries:
            # BDS: CHAIN_TICKERS
            bds = ref.createRequest("ReferenceDataRequest")
            bds.getElement("securities").appendValue(underlying_bbg)
            bds.getElement("fields").appendValue("CHAIN_TICKERS")
            ov = bds.getElement("overrides")
            o1 = ov.appendElement(); o1.setElement("fieldId","CHAIN_EXP_DT_OVRD");   o1.setElement("value",exp)
            o2 = ov.appendElement(); o2.setElement("fieldId","CHAIN_POINTS_LIMIT"); o2.setElement("value",str(max_points))
            msgs = _send_req(s, bds)

            tickers = []
            for msg in msgs:
                if not msg.hasElement("securityData"): continue
                arr = msg.getElement("securityData")
                for i in range(arr.numValues()):
                    sd = arr.getValueAsElement(i)
                    if not sd.hasElement("fieldData"): continue
                    fdata = sd.getElement("fieldData")
                    if not fdata.hasElement("CHAIN_TICKERS"): continue
                    bulk = fdata.getElement("CHAIN_TICKERS")
                    for j in range(bulk.numValues()):
                        e = bulk.getValueAsElement(j)
                        if e.hasElement("Ticker"):
                            tickers.append(e.getElementAsString("Ticker"))

            if not tickers:
                continue

            # BDP: per-option fields
            fields = ["PX_BID","PX_ASK","IMPLIED_VOLATILITY","DELTA","GAMMA","THETA","VEGA",
                      "OPT_STRIKE_PX","OPT_EXPIRE_DT","OPT_PUT_CALL","OPT_UNDL_PX"]
            bdp = ref.createRequest("ReferenceDataRequest")
            se = bdp.getElement("securities"); [se.appendValue(t) for t in tickers]
            fe = bdp.getElement("fields");     [fe.appendValue(f) for f in fields]
            msgs2 = _send_req(s, bdp)

            df = _parse_bdp_messages_to_df(msgs2, fields)
            if df.empty:
                continue

            # Clean up & filter
            df["OPT_EXPIRE_DT"] = pd.to_datetime(df["OPT_EXPIRE_DT"], errors="coerce")
            df["OPT_STRIKE_PX"] = pd.to_numeric(df["OPT_STRIKE_PX"], errors="coerce")
            df["OPT_PUT_CALL"]  = df["OPT_PUT_CALL"].astype(str).str.upper()
            df["DTE"] = (df["OPT_EXPIRE_DT"] - pd.Timestamp(trade_date)).dt.days
            df = df[(df["DTE"]>=target_min_dte) & (df["DTE"]<=target_max_dte)].copy()
            if df.empty:
                continue
            df["PX_BID"] = pd.to_numeric(df["PX_BID"], errors="coerce")
            df["PX_ASK"] = pd.to_numeric(df["PX_ASK"], errors="coerce")
            df["MID"] = (df["PX_BID"] + df["PX_ASK"])/2.0
            df["spot_for_day"] = spot_px
            best_rows.append(df)

        if not best_rows:
            return pd.DataFrame()
        chain = pd.concat(best_rows, ignore_index=True).dropna(subset=["OPT_STRIKE_PX","MID","OPT_PUT_CALL","OPT_EXPIRE_DT"])
        # choose ATM call+put (closest strike to spot) among the kept expiries
        chain["atm_dist"] = (chain["OPT_STRIKE_PX"] - spot_px).abs()
        atm_calls = chain[chain["OPT_PUT_CALL"]=="C"].sort_values(["atm_dist","DTE"]).groupby("OPT_EXPIRE_DT").head(1)
        atm_puts  = chain[chain["OPT_PUT_CALL"]=="P"].sort_values(["atm_dist","DTE"]).groupby("OPT_EXPIRE_DT").head(1)
        pairs = atm_calls.merge(atm_puts, on="OPT_EXPIRE_DT", suffixes=("_C","_P"))
        if pairs.empty:
            return pd.DataFrame()
        pairs["combo_dist"] = pairs["atm_dist_C"] + pairs["atm_dist_P"]
        best = pairs.sort_values(["combo_dist","DTE_C"]).head(1)
        return pd.DataFrame([{
            "expiry":   best["OPT_EXPIRE_DT"].iloc[0],
            "DTE":      int(best["DTE_C"].iloc[0]),
            "call_tkr": best["security_C"].iloc[0],
            "put_tkr":  best["security_P"].iloc[0],
            "K":        float(best["OPT_STRIKE_PX_C"].iloc[0]),
            "C_mid":    float(best["MID_C"].iloc[0]),
            "P_mid":    float(best["MID_P"].iloc[0]),
            "IV_call":  float(best["IMPLIED_VOLATILITY_C"].iloc[0]) if pd.notna(best["IMPLIED_VOLATILITY_C"].iloc[0]) else np.nan,
            "IV_put":   float(best["IMPLIED_VOLATILITY_P"].iloc[0]) if pd.notna(best["IMPLIED_VOLATILITY_P"].iloc[0]) else np.nan
        }])
    finally:
        s.stop()

# -------------------- Quant helpers --------------------
def black_scholes_price(S, K, T, r, sigma, cp):
    d1 = (np.log(S/K) + (r + 0.5*sigma*sigma)*T)/(sigma*np.sqrt(T))
    d2 = d1 - sigma*np.sqrt(T)
    return (S*norm.cdf(d1) - K*np.exp(-r*T)*norm.cdf(d2)) if cp.upper()=="C" else (K*np.exp(-r*T)*norm.cdf(-d2) - S*norm.cdf(-d1))

def realized_vol_from_close(close):
    ret = np.log(close/close.shift(1)).dropna()
    return ret.rolling(21).std()*np.sqrt(252)

def get_riskfree_series(start_yyyymmdd, end_yyyymmdd, ticker="USGG3M Index"):
    rf = get_bdh_equity(ticker, start_yyyymmdd, end_yyyymmdd, fields=("PX_LAST",))
    rf = rf.rename(columns={"PX_LAST": "RF_YLD_PCT"})
    rf["rf_daily"] = (rf["RF_YLD_PCT"]/100.0)/365.0
    return rf[["rf_daily"]]

# -------------------- The backtest --------------------
def backtest_parity(
    underlying="AAPL US Equity",
    start="20240101",
    end  ="20250918",
    min_dte=25, max_dte=45,
    gap_threshold=0.10,
    max_hold_days=60,
    riskfree_ticker="USGG3M Index"  # for BRL, try "BZSELIC Index" if available on your terminal
):
    px = get_bdh_equity(underlying, start, end, fields=("PX_LAST",))
    rf = get_riskfree_series(start, end, ticker=riskfree_ticker)
    df = px.join(rf, how="left").fillna(method="ffill")
    df["rv_21"] = realized_vol_from_close(df["PX_LAST"])

    trades, equity_curve = [], []
    open_pos = None

    for dt, row in df.iterrows():
        S = row["PX_LAST"]
        rf_day = float(row["rf_daily"]) if pd.notna(row.get("rf_daily", np.nan)) else 0.0

        if open_pos:
            open_pos["days_held"] += 1
            open_pos["cash"] *= (1.0 + rf_day)
            equity_curve.append({"date": dt, "equity": open_pos["equity_mark"]})
        else:
            equity_curve.append({"date": dt, "equity": 0.0})

        if open_pos:
            if dt >= open_pos["expiry"] or open_pos["days_held"] >= max_hold_days:
                pnl = open_pos["gap_signed"]  # simple convergence realization
                trades.append({**open_pos, "exit_date": dt, "pnl": pnl})
                open_pos = None
            continue

        chain = get_chain_for_day(underlying, S, dt, min_dte, max_dte)
        if chain.empty:
            continue

        K    = chain["K"].iloc[0]
        Cmid = chain["C_mid"].iloc[0]
        Pmid = chain["P_mid"].iloc[0]
        expiry = chain["expiry"].iloc[0]
        dte   = int(chain["DTE"].iloc[0])
        T     = dte/365.0

        r_ann = df.loc[:dt].tail(1)["rf_daily"].iloc[0]*365.0 if "rf_daily" in df.columns else 0.0
        dfac  = np.exp(-r_ann*T)
        gap   = (Cmid - Pmid) - (S - K*dfac)

        if abs(gap) < gap_threshold:
            continue

        direction = "short_call_long_put_shortS_longB" if gap > 0 else "long_call_short_put_longS_shortB"
        open_pos = {
            "entry_date": dt, "expiry": expiry, "days_held": 0,
            "underlying": underlying,
            "S0": S, "K": K, "C_mid0": Cmid, "P_mid0": Pmid,
            "r_ann": r_ann, "T": T, "dte": dte,
            "gap": float(gap), "gap_signed": float(np.sign(gap)*abs(gap)),
            "direction": direction,
            "cash": K*dfac if gap>0 else -K*dfac,
            "equity_mark": float(np.sign(gap)*abs(gap))
        }

    eq = pd.DataFrame(equity_curve).set_index("date")
    tr = pd.DataFrame(trades)

    if not tr.empty:
        total_pnl = tr["pnl"].sum()
        hitrate   = (tr["pnl"]>0).mean()
        avg_hold  = tr["days_held"].mean()
    else:
        total_pnl = 0.0; hitrate = np.nan; avg_hold = np.nan

    return {
        "underlying_history": df,
        "equity_curve": eq,
        "trades": tr,
        "summary": {
            "total_pnl_units": float(total_pnl),
            "num_trades": int(len(tr)),
            "hit_ratio": float(hitrate) if pd.notna(hitrate) else None,
            "avg_hold_days": float(avg_hold) if pd.notna(avg_hold) else None
        }
    }

# -------------------- Plot helper (saves PNG) --------------------
def plot_backtest(res, title="Parity Arbitrage Backtest", outfile=None):
    uh = res["underlying_history"].copy()
    tr = res["trades"].copy() if not res["trades"].empty else pd.DataFrame(columns=["exit_date","pnl"])
    daily = pd.Series(0.0, index=uh.index)
    if not tr.empty:
        tr = tr.set_index("exit_date").sort_index()
        for dt, row in tr.iterrows():
            if dt in daily.index:
                daily.loc[dt] += float(row["pnl"])
    cum_pnl = daily.cumsum()
    if outfile is None:
        outfile = Path(f"./backtest_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")

    plt.figure(figsize=(10,5))
    plt.plot(cum_pnl.index, cum_pnl.values, linewidth=2)
    plt.title(title); plt.xlabel("Date"); plt.ylabel("Cumulative Realized PnL (units)")
    plt.grid(True, linewidth=0.5, alpha=0.6); plt.tight_layout()
    plt.savefig(outfile, dpi=200); plt.close()
    print("Saved chart to:", outfile)

# -------------------- Example run (edit ticker/dates and run the cell) --------------------
underlying = "VALE3 BZ Equity"   # e.g., "PETR4 BZ Equity", "ITUB4 BZ Equity", "AAPL US Equity"
start      = "20240101"
end        = datetime.today().strftime("%Y%m%d")
min_dte, max_dte = 25, 45
gap_threshold    = 0.10

res = backtest_parity(
    underlying=underlying,
    start=start,
    end=end,
    min_dte=min_dte,
    max_dte=max_dte,
    gap_threshold=gap_threshold,
    riskfree_ticker="BZSELIC Index"  # BRL proxy; or "USGG3M Index"
)
print("Summary:", res["summary"])
print("\nFirst trades:\n", res["trades"].head())

outfile = f"{underlying.replace(' ','_')}_parity_backtest.png"
plot_backtest(res, title=f"{underlying} Parity Arbitrage Backtest", outfile=outfile)
print(f"Chart saved to: {outfile}")
# ===============================================================================================================

  df[f] = pd.to_numeric(df[f], errors="ignore")
  df = px.join(rf, how="left").fillna(method="ffill")


Summary: {'total_pnl_units': 0.0, 'num_trades': 0, 'hit_ratio': None, 'avg_hold_days': None}

First trades:
 Empty DataFrame
Columns: []
Index: []
Saved chart to: VALE3_BZ_Equity_parity_backtest.png
Chart saved to: VALE3_BZ_Equity_parity_backtest.png


In [26]:
# ⚡ FAST TEST (short window, wider filters)
underlying = "VALE3 BZ Equity"      # or any other like "PETR4 BZ Equity"
start      = "20240701"              # ~2.5 months back
end        = "20240815"              # ~6 weeks range → runs very fast

min_dte    = 10                       # allow shorter-dated options
max_dte    = 60                       # also allow slightly longer ones
gap_threshold = 0.05                  # easier to trigger trades

res = backtest_parity(
    underlying=underlying,
    start=start,
    end=end,
    min_dte=min_dte,
    max_dte=max_dte,
    gap_threshold=gap_threshold,
    riskfree_ticker="BZSELIC Index"
)

print("Summary:", res["summary"])
print("\nFirst trades:\n", res["trades"].head())

outfile = f"{underlying.replace(' ','_')}_fasttest.png"
plot_backtest(res, title=f"{underlying} Parity Arbitrage Backtest (Fast Test)", outfile=outfile)
print("Chart saved to:", outfile)

  df[f] = pd.to_numeric(df[f], errors="ignore")
  df = px.join(rf, how="left").fillna(method="ffill")


Summary: {'total_pnl_units': 0.0, 'num_trades': 0, 'hit_ratio': None, 'avg_hold_days': None}

First trades:
 Empty DataFrame
Columns: []
Index: []
Saved chart to: VALE3_BZ_Equity_fasttest.png
Chart saved to: VALE3_BZ_Equity_fasttest.png


In [31]:
# ======================= FAST BACKTEST (persistent session, caching, sampling, progress) =======================
import blpapi, pandas as pd, numpy as np, matplotlib.pyplot as plt
from scipy.stats import norm
from datetime import datetime, timedelta
from pathlib import Path

# -------------------- Bloomberg session (persistent) --------------------
class BBGSession:
    def __init__(self, host="localhost", port=8194):
        self.host, self.port = host, port
        self.session = None
        self.ref = None
    def __enter__(self):
        so = blpapi.SessionOptions()
        so.setServerHost(self.host); so.setServerPort(self.port)
        self.session = blpapi.Session(so)
        if not self.session.start(): raise RuntimeError("Failed to start session.")
        if not self.session.openService("//blp/refdata"): raise RuntimeError("Failed to open //blp/refdata.")
        self.ref = self.session.getService("//blp/refdata")
        return self
    def __exit__(self, exc_type, exc, tb):
        if self.session: self.session.stop()

def _send_req(session, req, timeout_ms=120_000):
    session.sendRequest(req)
    msgs = []
    while True:
        ev = session.nextEvent(timeout_ms)
        for m in ev: msgs.append(m)
        if ev.eventType() == blpapi.Event.RESPONSE:
            break
    return msgs

def _blp_get(parent_el: blpapi.Element, field_name: str):
    if not parent_el.hasElement(field_name): return np.nan
    sub = parent_el.getElement(field_name)
    for meth in ("getValueAsFloat64","getValueAsInt64","getValueAsInteger"):
        try: return getattr(sub, meth)()
        except Exception: pass
    for meth in ("getValueAsBool","getValueAsDatetime","getValueAsString"):
        try:
            val = getattr(sub, meth)()
            if isinstance(val, blpapi.Datetime):
                return pd.to_datetime(val.strftime("%Y-%m-%d"))
            return val
        except Exception: pass
    try: return sub.getValue()
    except Exception: return np.nan

# -------------------- Data pulls (reuse same session) --------------------
def get_bdh_equity(bbg: BBGSession, symbol, start_yyyymmdd, end_yyyymmdd, fields=("PX_LAST",), periodicity="DAILY"):
    r = bbg.ref.createRequest("HistoricalDataRequest")
    r.getElement("securities").appendValue(symbol)
    fe = r.getElement("fields")
    for f in fields: fe.appendValue(f)
    r.set("startDate", start_yyyymmdd); r.set("endDate", end_yyyymmdd)
    r.set("periodicitySelection", periodicity)
    msgs = _send_req(bbg.session, r)

    rows = []
    for msg in msgs:
        if not msg.hasElement("securityData"): continue
        sd = msg.getElement("securityData")
        if not sd.hasElement("fieldData"): continue
        fd = sd.getElement("fieldData")
        for i in range(fd.numValues()):
            row_el = fd.getValueAsElement(i)
            d = row_el.getElementAsDatetime("date")
            out = {"date": pd.to_datetime(d.strftime("%Y-%m-%d"))}
            for f in fields: out[f] = _blp_get(row_el, f)
            rows.append(out)
    df = pd.DataFrame(rows)
    if df.empty:
        return pd.DataFrame(columns=list(fields)).astype(float)
    df = df.sort_values("date").set_index("date")
    for f in fields: df[f] = pd.to_numeric(df[f], errors="ignore")
    return df

def get_riskfree_series(bbg: BBGSession, start_yyyymmdd, end_yyyymmdd, ticker="USGG3M Index"):
    rf = get_bdh_equity(bbg, ticker, start_yyyymmdd, end_yyyymmdd, fields=("PX_LAST",))
    rf = rf.rename(columns={"PX_LAST":"RF_YLD_PCT"})
    rf["rf_daily"] = (rf["RF_YLD_PCT"]/100.0)/365.0
    return rf[["rf_daily"]]

def _parse_bdp_messages_to_df(msgs, fields):
    rows = []
    for msg in msgs:
        if not msg.hasElement("securityData"): continue
        sdata = msg.getElement("securityData")
        for i in range(sdata.numValues()):
            e = sdata.getValueAsElement(i)
            sec = e.getElementAsString("security")
            if not e.hasElement("fieldData"): continue
            fdata = e.getElement("fieldData")
            row = {"security": sec}
            for f in fields: row[f] = _blp_get(fdata, f)
            rows.append(row)
    return pd.DataFrame(rows)

# Cache: (expiry_str -> list of option tickers) within a short TTL window
class ChainCache:
    def __init__(self): self.store = {}
    def get(self, key): return self.store.get(key)
    def put(self, key, val): self.store[key] = val

def get_chain_snapshot(bbg: BBGSession, underlying, expiry_yyyymmdd, max_points=600, chain_cache: ChainCache|None=None):
    cache_key = (underlying, expiry_yyyymmdd, max_points)
    if chain_cache:
        cached = chain_cache.get(cache_key)
        if cached is not None:
            return cached[:]  # copy

    # BDS: CHAIN_TICKERS
    bds = bbg.ref.createRequest("ReferenceDataRequest")
    bds.getElement("securities").appendValue(underlying)
    bds.getElement("fields").appendValue("CHAIN_TICKERS")
    ov = bds.getElement("overrides")
    o1 = ov.appendElement(); o1.setElement("fieldId","CHAIN_EXP_DT_OVRD");   o1.setElement("value",expiry_yyyymmdd)
    o2 = ov.appendElement(); o2.setElement("fieldId","CHAIN_POINTS_LIMIT"); o2.setElement("value",str(max_points))
    msgs = _send_req(bbg.session, bds)

    tickers = []
    for msg in msgs:
        if not msg.hasElement("securityData"): continue
        arr = msg.getElement("securityData")
        for i in range(arr.numValues()):
            sd = arr.getValueAsElement(i)
            if not sd.hasElement("fieldData"): continue
            fdata = sd.getElement("fieldData")
            if not fdata.hasElement("CHAIN_TICKERS"): continue
            bulk = fdata.getElement("CHAIN_TICKERS")
            for j in range(bulk.numValues()):
                e = bulk.getValueAsElement(j)
                if e.hasElement("Ticker"):
                    tickers.append(e.getElementAsString("Ticker"))

    if chain_cache:
        chain_cache.put(cache_key, tickers)
    return tickers

def get_option_fields(bbg: BBGSession, option_tickers, fields):
    if not option_tickers: return pd.DataFrame()
    bdp = bbg.ref.createRequest("ReferenceDataRequest")
    se = bdp.getElement("securities"); [se.appendValue(t) for t in option_tickers]
    fe = bdp.getElement("fields");     [fe.appendValue(f) for f in fields]
    msgs2 = _send_req(bbg.session, bdp)
    df = _parse_bdp_messages_to_df(msgs2, fields)
    return df

# -------------------- Quant helpers --------------------
def realized_vol_from_close(close):
    ret = np.log(close/close.shift(1)).dropna()
    return ret.rolling(21).std()*np.sqrt(252)

# -------------------- Faster backtest --------------------
def backtest_parity_fast(
    underlying="VALE3 BZ Equity",
    start="20240701",
    end  ="20240815",
    min_dte=10, max_dte=60,
    gap_threshold=0.05,
    riskfree_ticker="BZSELIC Index",
    step_days=2,                  # <— sample every N days for speed
    probe_offsets=(-14,-7,0,7,14),# <— fewer expiry probes around ~1 month
    max_points=400,               # <— smaller chain size
    progress_every=5              # print progress every N iterations
):
    with BBGSession() as bbg:
        px = get_bdh_equity(bbg, underlying, start, end, fields=("PX_LAST",))
        rf = get_riskfree_series(bbg, start, end, ticker=riskfree_ticker)
        df = px.join(rf, how="left").fillna(method="ffill")
        df["rv_21"] = realized_vol_from_close(df["PX_LAST"])

        dates = df.index.to_list()
        trades, equity_curve = [], []
        open_pos = None
        chain_cache = ChainCache()

        for idx, dt in enumerate(dates):
            if idx % step_days != 0:
                equity_curve.append({"date": dt, "equity": 0.0 if not open_pos else open_pos["equity_mark"]})
                continue

            S = float(df.loc[dt, "PX_LAST"])
            rf_day = float(df.loc[dt, "rf_daily"]) if "rf_daily" in df.columns and pd.notna(df.loc[dt, "rf_daily"]) else 0.0

            if open_pos:
                open_pos["days_held"] += step_days
                open_pos["cash"] *= (1.0 + rf_day)**step_days
                equity_curve.append({"date": dt, "equity": open_pos["equity_mark"]})
            else:
                equity_curve.append({"date": dt, "equity": 0.0})

            if open_pos:
                if dt >= open_pos["expiry"]:
                    pnl = open_pos["gap_signed"]
                    trades.append({**open_pos, "exit_date": dt, "pnl": pnl})
                    open_pos = None
                continue

            # expiry candidates near ~1 month from dt
            base = dt + timedelta(days=(min_dte+max_dte)//2)
            expiries = [(base + timedelta(days=o)).strftime("%Y%m%d") for o in probe_offsets]

            picked = None
            fields = ["PX_BID","PX_ASK","IMPLIED_VOLATILITY",
                      "DELTA","GAMMA","THETA","VEGA",
                      "OPT_STRIKE_PX","OPT_EXPIRE_DT","OPT_PUT_CALL","OPT_UNDL_PX"]

            for exp in expiries:
                tickers = get_chain_snapshot(bbg, underlying, exp, max_points=max_points, chain_cache=chain_cache)
                if not tickers: continue
                dfopt = get_option_fields(bbg, tickers, fields)
                if dfopt.empty: continue

                dfopt["OPT_EXPIRE_DT"] = pd.to_datetime(dfopt["OPT_EXPIRE_DT"], errors="coerce")
                dfopt["OPT_STRIKE_PX"] = pd.to_numeric(dfopt["OPT_STRIKE_PX"], errors="coerce")
                dfopt["OPT_PUT_CALL"]  = dfopt["OPT_PUT_CALL"].astype(str).str.upper()
                dfopt["PX_BID"] = pd.to_numeric(dfopt["PX_BID"], errors="coerce")
                dfopt["PX_ASK"] = pd.to_numeric(dfopt["PX_ASK"], errors="coerce")
                dfopt["MID"]    = (dfopt["PX_BID"] + dfopt["PX_ASK"])/2.0

                dfopt["DTE"] = (dfopt["OPT_EXPIRE_DT"] - pd.Timestamp(dt)).dt.days
                dfopt = dfopt[(dfopt["DTE"]>=min_dte) & (dfopt["DTE"]<=max_dte)].dropna(subset=["MID","OPT_STRIKE_PX","OPT_EXPIRE_DT"])
                if dfopt.empty: continue

                dfopt["atm_dist"] = (dfopt["OPT_STRIKE_PX"] - S).abs()
                atmC = dfopt[dfopt["OPT_PUT_CALL"]=="C"].sort_values(["atm_dist","DTE"]).groupby("OPT_EXPIRE_DT").head(1)
                atmP = dfopt[dfopt["OPT_PUT_CALL"]=="P"].sort_values(["atm_dist","DTE"]).groupby("OPT_EXPIRE_DT").head(1)
                if atmC.empty or atmP.empty: continue
                pair = atmC.merge(atmP, on="OPT_EXPIRE_DT", suffixes=("_C","_P"))
                if pair.empty: continue
                pair["combo_dist"] = pair["atm_dist_C"] + pair["atm_dist_P"]
                picked = pair.sort_values(["combo_dist","DTE_C"]).head(1)
                if not picked.empty: break

            if picked is None or picked.empty:
                if idx % progress_every == 0:
                    print(f"[{dt.date()}] No options found (S={S:.2f})")
                continue

            K    = float(picked["OPT_STRIKE_PX_C"].iloc[0])
            Cmid = float(picked["MID_C"].iloc[0])
            Pmid = float(picked["MID_P"].iloc[0])
            expiry = picked["OPT_EXPIRE_DT"].iloc[0]
            dte    = int(picked["DTE_C"].iloc[0])
            T      = dte/365.0

            r_ann = (df.loc[dt, "rf_daily"]*365.0) if "rf_daily" in df.columns and pd.notna(df.loc[dt, "rf_daily"]) else 0.0
            dfac  = np.exp(-r_ann*T)
            gap   = (Cmid - Pmid) - (S - K*dfac)

            if abs(gap) < gap_threshold:
                if idx % progress_every == 0:
                    print(f"[{dt.date()}] Gap {gap:.3f} < thr {gap_threshold}")
                continue

            direction = "short_call_long_put_shortS_longB" if gap > 0 else "long_call_short_put_longS_shortB"
            open_pos = {
                "entry_date": dt, "expiry": expiry, "days_held": 0,
                "underlying": underlying,
                "S0": S, "K": K, "C_mid0": Cmid, "P_mid0": Pmid,
                "r_ann": r_ann, "T": T, "dte": dte,
                "gap": float(gap), "gap_signed": float(np.sign(gap)*abs(gap)),
                "direction": direction,
                "cash": K*dfac if gap>0 else -K*dfac,
                "equity_mark": float(np.sign(gap)*abs(gap))
            }

            if idx % progress_every == 0:
                print(f"[{dt.date()}] ENTER {direction} | DTE={dte} K={K:.2f} S={S:.2f} gap={gap:.3f}")

        eq = pd.DataFrame(equity_curve).set_index("date")
        tr = pd.DataFrame(trades)
        if not tr.empty:
            total_pnl = tr["pnl"].sum()
            hitrate   = (tr["pnl"]>0).mean()
            avg_hold  = tr["days_held"].mean()
        else:
            total_pnl = 0.0; hitrate = np.nan; avg_hold = np.nan

        return {
            "underlying_history": df,
            "equity_curve": eq,
            "trades": tr,
            "summary": {
                "total_pnl_units": float(total_pnl),
                "num_trades": int(len(tr)),
                "hit_ratio": float(hitrate) if pd.notna(hitrate) else None,
                "avg_hold_days": float(avg_hold) if pd.notna(avg_hold) else None
            }
        }

# -------------------- Plot helper (same as before) --------------------
def plot_backtest(res, title="Parity Arbitrage Backtest", outfile=None):
    uh = res["underlying_history"].copy()
    tr = res["trades"].copy() if not res["trades"].empty else pd.DataFrame(columns=["exit_date","pnl"])
    daily = pd.Series(0.0, index=uh.index)
    if not tr.empty:
        tr = tr.set_index("exit_date").sort_index()
        for dt, row in tr.iterrows():
            if dt in daily.index:
                daily.loc[dt] += float(row["pnl"])
    cum_pnl = daily.cumsum()
    if outfile is None:
        outfile = Path(f"./backtest_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")

    plt.figure(figsize=(10,5))
    plt.plot(cum_pnl.index, cum_pnl.values, linewidth=2)
    plt.title(title); plt.xlabel("Date"); plt.ylabel("Cumulative Realized PnL (units)")
    plt.grid(True, linewidth=0.5, alpha=0.6); plt.tight_layout()
    plt.savefig(outfile, dpi=200); plt.close()
    print("Saved chart to:", outfile)

# -------------------- FAST RUN EXAMPLE --------------------
res = backtest_parity_fast(
    underlying="PETR4 BZ Equity",
    start="20240701",
    end="20240815",
    min_dte=10, max_dte=60,
    gap_threshold=0.05,
    riskfree_ticker="BZSELIC Index",
    step_days=2,           # sample every 2 days
    probe_offsets=(-7,0,7) # fewer expiry probes for speed
)
print("Summary:", res["summary"])
print("\nFirst trades:\n", res["trades"].head())

outfile = "VALE3_fasttest.png"
plot_backtest(res, title="VALE3 Parity Arbitrage Backtest (Fast)", outfile=outfile)
print("Chart:", outfile)


  for f in fields: df[f] = pd.to_numeric(df[f], errors="ignore")
  df = px.join(rf, how="left").fillna(method="ffill")


[2024-07-01] No options found (S=38.00)
[2024-07-15] No options found (S=38.00)
[2024-07-29] No options found (S=36.00)
[2024-08-12] No options found (S=37.00)
Summary: {'total_pnl_units': 0.0, 'num_trades': 0, 'hit_ratio': None, 'avg_hold_days': None}

First trades:
 Empty DataFrame
Columns: []
Index: []
Saved chart to: VALE3_fasttest.png
Chart: VALE3_fasttest.png


In [35]:
res = backtest_parity_fast(
    underlying="AAPL US Equity",
    start="20240701", end="20240815",
    fast=True, min_dte=10, max_dte=60, gap_threshold=0.05,
    riskfree_ticker="USGG3M Index",
    step_days=2, probe_offsets=(-7,0,7)
)

TypeError: backtest_parity_fast() got an unexpected keyword argument 'fast'

In [44]:
# ======================= ALL-IN-ONE (KEEPS ORIGINAL API, WITH ROBUST FALLBACKS) =======================
# Works on a Bloomberg Terminal PC (Desktop API; localhost:8194)
# Public functions (same names/signatures as before):
#   get_bdh_equity, get_chain_for_day, backtest_parity, plot_backtest

import blpapi, pandas as pd, numpy as np, matplotlib.pyplot as plt
from scipy.stats import norm
from datetime import datetime, timedelta
from pathlib import Path

# -------------------- Bloomberg session helpers --------------------
def _start_bbg_session(host="localhost", port=8194):
    so = blpapi.SessionOptions()
    so.setServerHost(host); so.setServerPort(port)
    s = blpapi.Session(so)
    if not s.start(): raise RuntimeError("Failed to start Bloomberg session.")
    if not s.openService("//blp/refdata"): raise RuntimeError("Failed to open //blp/refdata.")
    return s, s.getService("//blp/refdata")

def _send_req(session, req, timeout_ms=120_000):
    session.sendRequest(req)
    msgs = []
    while True:
        ev = session.nextEvent(timeout_ms)
        for m in ev: msgs.append(m)
        if ev.eventType() == blpapi.Event.RESPONSE:
            break
    return msgs

# -------------------- Safe extractor for blpapi.Element --------------------
def _blp_get(parent_el: blpapi.Element, field_name: str):
    if not parent_el.hasElement(field_name):
        return np.nan
    sub = parent_el.getElement(field_name)
    # numeric
    for meth in ("getValueAsFloat64","getValueAsInt64","getValueAsInteger"):
        try: return getattr(sub, meth)()
        except Exception: pass
    # other common types
    for meth in ("getValueAsBool","getValueAsDatetime","getValueAsString"):
        try:
            val = getattr(sub, meth)()
            if isinstance(val, blpapi.Datetime):
                return pd.to_datetime(val.strftime("%Y-%m-%d"))
            return val
        except Exception: pass
    # fallback
    try: return sub.getValue()
    except Exception: return np.nan

# -------------------- BDH: historical for any security (e.g., equity, rates) --------------------
def get_bdh_equity(symbol_bbg, start_yyyymmdd, end_yyyymmdd, fields=("PX_LAST",), periodicity="DAILY"):
    s, ref = _start_bbg_session()
    try:
        r = ref.createRequest("HistoricalDataRequest")
        r.getElement("securities").appendValue(symbol_bbg)
        fe = r.getElement("fields")
        for f in fields: fe.appendValue(f)
        r.set("startDate", start_yyyymmdd)
        r.set("endDate", end_yyyymmdd)
        r.set("periodicitySelection", periodicity)
        msgs = _send_req(s, r)

        rows = []
        for msg in msgs:
            if not msg.hasElement("securityData"): continue
            sd = msg.getElement("securityData")
            if not sd.hasElement("fieldData"): continue
            fd = sd.getElement("fieldData")
            for i in range(fd.numValues()):
                row_el = fd.getValueAsElement(i)
                d = row_el.getElementAsDatetime("date")
                out = {"date": pd.to_datetime(d.strftime("%Y-%m-%d"))}
                for f in fields:
                    out[f] = _blp_get(row_el, f)
                rows.append(out)

        df = pd.DataFrame(rows)
        if df.empty:
            return pd.DataFrame(columns=list(fields)).astype(float)
        df = df.sort_values("date").set_index("date")
        for f in fields:
            df[f] = pd.to_numeric(df[f], errors="ignore")
        return df
    finally:
        s.stop()

def get_riskfree_series(start_yyyymmdd, end_yyyymmdd, ticker="USGG3M Index"):
    rf = get_bdh_equity(ticker, start_yyyymmdd, end_yyyymmdd, fields=("PX_LAST",))
    rf = rf.rename(columns={"PX_LAST":"RF_YLD_PCT"})
    rf["rf_daily"] = (rf["RF_YLD_PCT"]/100.0)/365.0
    return rf[["rf_daily"]]

# -------------------- Helper: parse BDP messages to DataFrame --------------------
def _parse_bdp_messages_to_df(msgs, fields):
    rows = []
    for msg in msgs:
        if not msg.hasElement("securityData"):
            continue
        sdata = msg.getElement("securityData")
        for i in range(sdata.numValues()):
            e = sdata.getValueAsElement(i)
            sec = e.getElementAsString("security")
            if not e.hasElement("fieldData"):
                continue
            fdata = e.getElement("fieldData")
            row = {"security": sec}
            for f in fields:
                row[f] = _blp_get(fdata, f)
            rows.append(row)
    return pd.DataFrame(rows)

# -------------------- Option chain (with robust fallbacks) --------------------
# -------------------- Option chain (BDP-based synthetic scan; no CHAIN_TICKERS needed) --------------------
def get_chain_for_day(
    underlying_bbg,
    spot_px,
    trade_date,
    target_min_dte=25,
    target_max_dte=45,
    strike_band=0.20,       # ±20% around spot
    strike_step=1.0,        # $1 increments
    max_strikes=101,        # safety cap (kept by band/step anyway)
    verbose=False
):
    """
    Build an options snapshot WITHOUT using CHAIN_TICKERS (works even when chain bulk field isn't entitled).
    1) Create expiry candidates (nearest Fridays within DTE window).
    2) Build strike ladder around spot.
    3) Query BDP in bulk for both Calls and Puts, then pick ATM pair.

    Returns a 1-row DataFrame with chosen ATM call/put or empty DF if none.
    """
    # --- helpers ---
    def _bbg_base(under):
        # "AAPL US Equity" -> "AAPL US"
        parts = under.split()
        if len(parts) < 3:
            raise ValueError(f"Unexpected underlying format: {under}")
        return f"{parts[0]} {parts[1]}"

    def _nearest_friday(d):
        # Move to Friday of the same week (or next) depending on day
        wd = d.weekday()  # Mon=0 ... Sun=6
        if wd <= 4:
            return d + timedelta(days=(4 - wd))
        else:
            # Sat/Sun -> next Friday
            return d + timedelta(days=(11 - wd))

    def _expiry_candidates(trade_dt, min_dte, max_dte):
        # build a small set of Friday expiries (unique) inside [min_dte, max_dte]
        exps = set()
        for days_ahead in range(min_dte, max_dte + 1, 7):
            exp = _nearest_friday(trade_dt + timedelta(days=days_ahead))
            exps.add(exp)
        # keep a handful (sorted)
        exps = sorted(list(exps))
        return exps[:8]  # limit to 8 expiries for speed

    def _format_bbg_opt(base, exp_dt, cp, strike):
        # Bloomberg std: "<BASE> <MM/DD/YY> C123 Equity"
        # base is "AAPL US", cp in {"C","P"}, strike as int or rounded price
        mmddyy = exp_dt.strftime("%m/%d/%y")
        # strikes must be integers if step=1.0; format like 200, 205, etc.
        if abs(strike - round(strike)) < 1e-6:
            strike_txt = str(int(round(strike)))
        else:
            strike_txt = f"{strike:.2f}".rstrip("0").rstrip(".")
        return f"{base} {mmddyy} {cp}{strike_txt} Equity"

    # --- start session ---
    s, ref = _start_bbg_session()
    try:
        base = _bbg_base(underlying_bbg)
        trade_dt = pd.Timestamp(trade_date)
        exps = _expiry_candidates(trade_dt, target_min_dte, target_max_dte)
        if not exps:
            return pd.DataFrame()

        # strike ladder around spot
        lo = max(0.01, spot_px * (1.0 - strike_band))
        hi = spot_px * (1.0 + strike_band)
        # ensure step not zero, protect from insane values
        step = max(0.01, float(strike_step))
        strikes = np.arange(np.floor(lo), np.ceil(hi) + 1e-9, step)
        if len(strikes) > max_strikes:
            # thin them evenly if extremely dense
            idx = np.linspace(0, len(strikes) - 1, max_strikes).round().astype(int)
            strikes = strikes[idx]
        strikes = np.unique(np.round(strikes, 2))

        # Build bulk list of option tickers to request via BDP
        sec_list = []
        meta = []  # to remember CP/strike/expiry per security
        for exp in exps:
            for cp in ("C", "P"):
                for K in strikes:
                    sec = _format_bbg_opt(base, exp, cp, K)
                    sec_list.append(sec)
                    meta.append({"sec": sec, "cp": cp, "K": float(K), "expiry": exp})

        if not sec_list:
            return pd.DataFrame()

        # BDP request
        fields = [
            "PX_BID","PX_ASK",
            "OPT_STRIKE_PX","OPT_EXPIRE_DT","OPT_PUT_CALL",
            "IVOL_MID",  # preferred
            "IMPLIED_VOLATILITY"  # fallback (some sources)
        ]
        bdp = ref.createRequest("ReferenceDataRequest")
        se = bdp.getElement("securities")
        for sec in sec_list:
            se.appendValue(sec)
        fe = bdp.getElement("fields")
        for f in fields:
            fe.appendValue(f)

        msgs = _send_req(s, bdp)

        # parse
        rows = []
        meta_map = {m["sec"]: m for m in meta}
        for msg in msgs:
            if not msg.hasElement("securityData"):
                continue
            sdata = msg.getElement("securityData")
            for i in range(sdata.numValues()):
                e = sdata.getValueAsElement(i)
                sec = e.getElementAsString("security")
                if not e.hasElement("fieldData"):
                    continue
                fd = e.getElement("fieldData")

                def _get(fd, name):
                    if not fd.hasElement(name): return np.nan
                    sub = fd.getElement(name)
                    # try float
                    for meth in ("getValueAsFloat64","getValueAsInt64","getValueAsInteger"):
                        try: return getattr(sub, meth)()
                        except Exception: pass
                    # then string/datetime
                    for meth in ("getValueAsDatetime","getValueAsString"):
                        try:
                            val = getattr(sub, meth)()
                            if isinstance(val, blpapi.Datetime):
                                return pd.to_datetime(val.strftime("%Y-%m-%d"))
                            return val
                        except Exception: pass
                    try: return sub.getValue()
                    except Exception: return np.nan

                bid = pd.to_numeric(_get(fd, "PX_BID"), errors="coerce")
                ask = pd.to_numeric(_get(fd, "PX_ASK"), errors="coerce")
                if pd.isna(bid) and pd.isna(ask):
                    continue  # skip dead securities

                Kfd  = _get(fd, "OPT_STRIKE_PX")
                Efd  = _get(fd, "OPT_EXPIRE_DT")
                PCfd = _get(fd, "OPT_PUT_CALL")
                iv1  = _get(fd, "IVOL_MID")
                iv2  = _get(fd, "IMPLIED_VOLATILITY")

                m = meta_map.get(sec, None)
                K = float(Kfd) if pd.notna(Kfd) else (m["K"] if m else np.nan)
                E = pd.to_datetime(Efd) if not isinstance(Efd, pd.Timestamp) else Efd
                if pd.isna(E) and m:
                    E = m["expiry"]
                cp = (str(PCfd).upper() if isinstance(PCfd, str) else (m["cp"] if m else None))

                row = {
                    "security": sec,
                    "OPT_STRIKE_PX": K,
                    "OPT_EXPIRE_DT": E,
                    "OPT_PUT_CALL": cp,
                    "PX_BID": float(bid) if pd.notna(bid) else np.nan,
                    "PX_ASK": float(ask) if pd.notna(ask) else np.nan,
                    "MID": np.nan if (pd.isna(bid) or pd.isna(ask)) else (float(bid)+float(ask))/2.0,
                    "IMPLIED_VOLATILITY": pd.to_numeric(iv1, errors="coerce") if pd.notna(iv1) else pd.to_numeric(iv2, errors="coerce")
                }
                rows.append(row)

        df = pd.DataFrame(rows)
        if df.empty:
            if verbose: print("[synthetic] No options returned via BDP.")
            return pd.DataFrame()

        # Clean and filter by DTE
        df = df.dropna(subset=["OPT_STRIKE_PX","OPT_EXPIRE_DT","OPT_PUT_CALL","MID"])
        df["OPT_EXPIRE_DT"] = pd.to_datetime(df["OPT_EXPIRE_DT"], errors="coerce")
        df["DTE"] = (df["OPT_EXPIRE_DT"] - trade_dt).dt.days
        df = df[(df["DTE"]>=target_min_dte) & (df["DTE"]<=target_max_dte)].copy()
        if df.empty:
            if verbose: print("[synthetic] All candidates filtered out by DTE window.")
            return pd.DataFrame()

        # Pick ATM C & P from each expiry; then best pair overall
        df["atm_dist"] = (df["OPT_STRIKE_PX"] - spot_px).abs()
        df["OPT_PUT_CALL"] = df["OPT_PUT_CALL"].astype(str).str.upper()
        atmC = df[df["OPT_PUT_CALL"]=="C"].sort_values(["atm_dist","DTE"]).groupby("OPT_EXPIRE_DT").head(1)
        atmP = df[df["OPT_PUT_CALL"]=="P"].sort_values(["atm_dist","DTE"]).groupby("OPT_EXPIRE_DT").head(1)
        pair = atmC.merge(atmP, on="OPT_EXPIRE_DT", suffixes=("_C","_P"))
        if pair.empty:
            if verbose: print("[synthetic] Could not pair ATM call/put.")
            return pd.DataFrame()
        pair["combo_dist"] = pair["atm_dist_C"] + pair["atm_dist_P"]
        best = pair.sort_values(["combo_dist","DTE_C"]).head(1)

        return pd.DataFrame([{
            "expiry":   best["OPT_EXPIRE_DT"].iloc[0],
            "DTE":      int(best["DTE_C"].iloc[0]),
            "call_tkr": best["security_C"].iloc[0],
            "put_tkr":  best["security_P"].iloc[0],
            "K":        float(best["OPT_STRIKE_PX_C"].iloc[0]),
            "C_mid":    float(best["MID_C"].iloc[0]),
            "P_mid":    float(best["MID_P"].iloc[0]),
            "IV_call":  float(best["IMPLIED_VOLATILITY_C"].iloc[0]) if pd.notna(best["IMPLIED_VOLATILITY_C"].iloc[0]) else np.nan,
            "IV_put":   float(best["IMPLIED_VOLATILITY_P"].iloc[0]) if pd.notna(best["IMPLIED_VOLATILITY_P"].iloc[0]) else np.nan
        }])
    finally:
        s.stop()

# -------------------- Quant helpers --------------------
def black_scholes_price(S, K, T, r, sigma, cp):
    d1 = (np.log(S/K) + (r + 0.5*sigma*sigma)*T)/(sigma*np.sqrt(T))
    d2 = d1 - sigma*np.sqrt(T)
    return (S*norm.cdf(d1) - K*np.exp(-r*T)*norm.cdf(d2)) if cp.upper()=="C" else (K*np.exp(-r*T)*norm.cdf(-d2) - S*norm.cdf(-d1))

def realized_vol_from_close(close):
    ret = np.log(close/close.shift(1)).dropna()
    return ret.rolling(21).std()*np.sqrt(252)

# -------------------- Backtest (unchanged interface, optional progress/sampling) --------------------
def backtest_parity(
    underlying="AAPL US Equity",
    start="20240101",
    end  ="20250918",
    min_dte=25, max_dte=45,
    gap_threshold=0.10,
    max_hold_days=60,
    riskfree_ticker="USGG3M Index",
    step_days=1,                # NEW: sample every N business days (1 = daily, 2 = every other day)
    progress_every=10           # NEW: print progress every N steps (0 to silence)
):
    px = get_bdh_equity(underlying, start, end, fields=("PX_LAST",))
    rf = get_riskfree_series(start, end, ticker=riskfree_ticker)
    df = px.join(rf, how="left").fillna(method="ffill")
    df["rv_21"] = realized_vol_from_close(df["PX_LAST"])

    trades, equity_curve = [], []
    open_pos = None
    dates = df.index.to_list()

    for idx, dt in enumerate(dates):
        if step_days > 1 and idx % step_days != 0:
            equity_curve.append({"date": dt, "equity": 0.0 if not open_pos else open_pos["equity_mark"]})
            continue

        S = float(df.loc[dt, "PX_LAST"])
        rf_day = float(df.loc[dt, "rf_daily"]) if "rf_daily" in df.columns and pd.notna(df.loc[dt, "rf_daily"]) else 0.0

        # carry open pos
        if open_pos:
            open_pos["days_held"] += step_days
            open_pos["cash"] *= (1.0 + rf_day) ** step_days
            equity_curve.append({"date": dt, "equity": open_pos["equity_mark"]})
        else:
            equity_curve.append({"date": dt, "equity": 0.0})

        # exit rules
        if open_pos:
            if dt >= open_pos["expiry"] or open_pos["days_held"] >= max_hold_days:
                pnl = open_pos["gap_signed"]
                trades.append({**open_pos, "exit_date": dt, "pnl": pnl})
                open_pos = None
            continue

        # chain lookup (now with built-in fallback)
        chain = get_chain_for_day(underlying, S, dt, min_dte, max_dte, max_points=600, verbose=False)
        if chain.empty:
            if progress_every and idx % progress_every == 0:
                print(f"[{dt.date()}] No options found in DTE window ({min_dte}-{max_dte}).")
            continue

        K    = chain["K"].iloc[0]
        Cmid = chain["C_mid"].iloc[0]
        Pmid = chain["P_mid"].iloc[0]
        expiry = chain["expiry"].iloc[0]
        dte   = int(chain["DTE"].iloc[0])
        T     = dte/365.0

        r_ann = (df.loc[dt, "rf_daily"]*365.0) if "rf_daily" in df.columns and pd.notna(df.loc[dt, "rf_daily"]) else 0.0
        dfac  = np.exp(-r_ann*T)
        gap   = (Cmid - Pmid) - (S - K*dfac)

        if abs(gap) < gap_threshold:
            if progress_every and idx % progress_every == 0:
                print(f"[{dt.date()}] Gap {gap:.3f} < thr {gap_threshold}")
            continue

        direction = "short_call_long_put_shortS_longB" if gap > 0 else "long_call_short_put_longS_shortB"
        open_pos = {
            "entry_date": dt, "expiry": expiry, "days_held": 0,
            "underlying": underlying,
            "S0": S, "K": K, "C_mid0": Cmid, "P_mid0": Pmid,
            "r_ann": r_ann, "T": T, "dte": dte,
            "gap": float(gap), "gap_signed": float(np.sign(gap)*abs(gap)),
            "direction": direction,
            "cash": K*dfac if gap>0 else -K*dfac,
            "equity_mark": float(np.sign(gap)*abs(gap))
        }

        if progress_every and idx % progress_every == 0:
            print(f"[{dt.date()}] ENTER {direction} | DTE={dte} K={K:.2f} S={S:.2f} gap={gap:.3f}")

    eq = pd.DataFrame(equity_curve).set_index("date")
    tr = pd.DataFrame(trades)

    if not tr.empty:
        total_pnl = tr["pnl"].sum()
        hitrate   = (tr["pnl"]>0).mean()
        avg_hold  = tr["days_held"].mean()
    else:
        total_pnl = 0.0; hitrate = np.nan; avg_hold = np.nan

    return {
        "underlying_history": df,
        "equity_curve": eq,
        "trades": tr,
        "summary": {
            "total_pnl_units": float(total_pnl),
            "num_trades": int(len(tr)),
            "hit_ratio": float(hitrate) if pd.notna(hitrate) else None,
            "avg_hold_days": float(avg_hold) if pd.notna(avg_hold) else None
        }
    }

# -------------------- Plot helper (skips flat plots) --------------------
def plot_backtest(res, title="Parity Arbitrage Backtest", outfile=None):
    # no-trade guard
    if not isinstance(res, dict) or "summary" not in res or res["summary"].get("num_trades", 0) == 0:
        print("No trades — skipping plot.")
        return

    uh = res["underlying_history"].copy()
    tr = res["trades"].copy().set_index("exit_date").sort_index()
    daily = pd.Series(0.0, index=uh.index)
    for dt, row in tr.iterrows():
        if dt in daily.index:
            daily.loc[dt] += float(row["pnl"])
    cum_pnl = daily.cumsum()
    if outfile is None:
        outfile = Path(f"./backtest_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")

    plt.figure(figsize=(10,5))
    plt.plot(cum_pnl.index, cum_pnl.values, linewidth=2)
    plt.title(title); plt.xlabel("Date"); plt.ylabel("Cumulative Realized PnL (units)")
    plt.grid(True, linewidth=0.5, alpha=0.6); plt.tight_layout()
    plt.savefig(outfile, dpi=200); plt.close()
    print("Saved chart to:", outfile)

# -------------------- Example run (same usage as before) --------------------
underlying = "PETR4 BZ Equity"      # try "VALE3 BZ Equity", "ITUB4 BZ Equity", or a US name like "AAPL US Equity"
start      = "20240701"
end        = "20240815"
min_dte, max_dte = 10, 60           # wider window helps on B3
gap_threshold    = 0.05
res = backtest_parity(
    underlying=underlying,
    start=start,
    end=end,
    min_dte=min_dte,
    max_dte=max_dte,
    gap_threshold=gap_threshold,
    riskfree_ticker="BZSELIC Index",
    step_days=2,                     # sample every 2 business days for speed
    progress_every=5                 # print heartbeat
)
print("Summary:", res["summary"])
outfile = f"{underlying.replace(' ','_')}_parity_backtest.png"
plot_backtest(res, title=f"{underlying} Parity Arbitrage Backtest", outfile=outfile)
print(f"Chart path: {outfile}")
# ===============================================================================================================

  df[f] = pd.to_numeric(df[f], errors="ignore")
  df = px.join(rf, how="left").fillna(method="ffill")


TypeError: get_chain_for_day() got an unexpected keyword argument 'max_points'

In [46]:
# ======================= ALL-IN-ONE (Bloomberg parity backtest; BDP chain; cleaned warnings) =======================
# Works on a Bloomberg Terminal PC (Desktop API; localhost:8194)
# Public functions:
#   get_bdh_equity, get_chain_for_day, backtest_parity, plot_backtest

import blpapi, pandas as pd, numpy as np, matplotlib.pyplot as plt
from scipy.stats import norm
from datetime import datetime, timedelta
from pathlib import Path

# -------------------- Bloomberg session helpers --------------------
def _start_bbg_session(host="localhost", port=8194):
    so = blpapi.SessionOptions()
    so.setServerHost(host); so.setServerPort(port)
    s = blpapi.Session(so)
    if not s.start(): raise RuntimeError("Failed to start Bloomberg session.")
    if not s.openService("//blp/refdata"): raise RuntimeError("Failed to open //blp/refdata.")
    return s, s.getService("//blp/refdata")

def _send_req(session, req, timeout_ms=120_000):
    session.sendRequest(req)
    msgs = []
    while True:
        ev = session.nextEvent(timeout_ms)
        for m in ev: msgs.append(m)
        if ev.eventType() == blpapi.Event.RESPONSE:
            break
    return msgs

# -------------------- Safe extractor for blpapi.Element --------------------
def _blp_get(parent_el: blpapi.Element, field_name: str):
    if not parent_el.hasElement(field_name):
        return np.nan
    sub = parent_el.getElement(field_name)
    # numeric
    for meth in ("getValueAsFloat64","getValueAsInt64","getValueAsInteger"):
        try: return getattr(sub, meth)()
        except Exception: pass
    # other common types
    for meth in ("getValueAsBool","getValueAsDatetime","getValueAsString"):
        try:
            val = getattr(sub, meth)()
            if isinstance(val, blpapi.Datetime):
                return pd.to_datetime(val.strftime("%Y-%m-%d"))
            return val
        except Exception: pass
    # fallback
    try: return sub.getValue()
    except Exception: return np.nan

# -------------------- BDH: historical for any security (e.g., equity, rates) --------------------
def get_bdh_equity(symbol_bbg, start_yyyymmdd, end_yyyymmdd, fields=("PX_LAST",), periodicity="DAILY"):
    s, ref = _start_bbg_session()
    try:
        r = ref.createRequest("HistoricalDataRequest")
        r.getElement("securities").appendValue(symbol_bbg)
        fe = r.getElement("fields")
        for f in fields: fe.appendValue(f)
        r.set("startDate", start_yyyymmdd)
        r.set("endDate", end_yyyymmdd)
        r.set("periodicitySelection", periodicity)
        msgs = _send_req(s, r)

        rows = []
        for msg in msgs:
            if not msg.hasElement("securityData"): continue
            sd = msg.getElement("securityData")
            if not sd.hasElement("fieldData"): continue
            fd = sd.getElement("fieldData")
            for i in range(fd.numValues()):
                row_el = fd.getValueAsElement(i)
                d = row_el.getElementAsDatetime("date")
                out = {"date": pd.to_datetime(d.strftime("%Y-%m-%d"))}
                for f in fields:
                    val = _blp_get(row_el, f)
                    out[f] = val if val is not None else np.nan
                rows.append(out)

        df = pd.DataFrame(rows)
        if df.empty:
            return pd.DataFrame(columns=list(fields)).astype(float)
        df = df.sort_values("date").set_index("date")
        # coerce numerics without deprecated errors="ignore"
        for f in fields:
            try:
                df[f] = pd.to_numeric(df[f])
            except Exception:
                pass
        return df
    finally:
        s.stop()

def get_riskfree_series(start_yyyymmdd, end_yyyymmdd, ticker="USGG3M Index"):
    rf = get_bdh_equity(ticker, start_yyyymmdd, end_yyyymmdd, fields=("PX_LAST",))
    rf = rf.rename(columns={"PX_LAST":"RF_YLD_PCT"})
    rf["rf_daily"] = (rf["RF_YLD_PCT"]/100.0)/365.0
    return rf[["rf_daily"]]

# -------------------- Helper: parse BDP messages to DataFrame --------------------
def _parse_bdp_messages_to_df(msgs, fields):
    rows = []
    for msg in msgs:
        if not msg.hasElement("securityData"):
            continue
        sdata = msg.getElement("securityData")
        for i in range(sdata.numValues()):
            e = sdata.getValueAsElement(i)
            sec = e.getElementAsString("security")
            if not e.hasElement("fieldData"):
                continue
            fdata = e.getElement("fieldData")
            row = {"security": sec}
            for f in fields:
                row[f] = _blp_get(fdata, f)
            rows.append(row)
    return pd.DataFrame(rows)

# -------------------- Option chain (BDP-based synthetic scan; no CHAIN_TICKERS needed) --------------------
def get_chain_for_day(
    underlying_bbg,
    spot_px,
    trade_date,
    target_min_dte=25,
    target_max_dte=45,
    max_points=None,         # accepted for compatibility; not used here
    strike_band=0.20,        # ±20% around spot
    strike_step=1.0,         # $1 increments
    max_strikes=101,         # safety cap (kept by band/step anyway)
    verbose=False
):
    """
    Build an options snapshot WITHOUT using CHAIN_TICKERS (works even when chain bulk field isn't entitled).
    1) Create expiry candidates (nearest Fridays within DTE window).
    2) Build strike ladder around spot.
    3) Query BDP in bulk for both Calls and Puts, then pick ATM pair.

    Returns a 1-row DataFrame with chosen ATM call/put or empty DF if none.
    """
    # --- helpers ---
    def _bbg_base(under):
        # "AAPL US Equity" -> "AAPL US"
        parts = under.split()
        if len(parts) < 3:
            raise ValueError(f"Unexpected underlying format: {under}")
        return f"{parts[0]} {parts[1]}"

    def _nearest_friday(d):
        # Move to Friday of the same week (or next) depending on day
        wd = d.weekday()  # Mon=0 ... Sun=6
        if wd <= 4:
            return d + timedelta(days=(4 - wd))
        else:
            # Sat/Sun -> next Friday
            return d + timedelta(days=(11 - wd))

    def _expiry_candidates(trade_dt, min_dte, max_dte):
        # build a small set of Friday expiries (unique) inside [min_dte, max_dte]
        exps = set()
        for days_ahead in range(min_dte, max_dte + 1, 7):
            exp = _nearest_friday(trade_dt + timedelta(days=days_ahead))
            exps.add(exp)
        # keep a handful (sorted)
        exps = sorted(list(exps))
        return exps[:8]  # limit to 8 expiries for speed

    def _format_bbg_opt(base, exp_dt, cp, strike):
        # Bloomberg std: "<BASE> <MM/DD/YY> C123 Equity"
        mmddyy = exp_dt.strftime("%m/%d/%y")
        if abs(strike - round(strike)) < 1e-6:
            strike_txt = str(int(round(strike)))
        else:
            strike_txt = f"{strike:.2f}".rstrip("0").rstrip(".")
        return f"{base} {mmddyy} {cp}{strike_txt} Equity"

    # --- start session ---
    s, ref = _start_bbg_session()
    try:
        base = _bbg_base(underlying_bbg)
        trade_dt = pd.Timestamp(trade_date)
        exps = _expiry_candidates(trade_dt, target_min_dte, target_max_dte)
        if not exps:
            return pd.DataFrame()

        # strike ladder around spot
        lo = max(0.01, spot_px * (1.0 - strike_band))
        hi = spot_px * (1.0 + strike_band)
        step = max(0.01, float(strike_step))
        strikes = np.arange(np.floor(lo), np.ceil(hi) + 1e-9, step)
        if len(strikes) > max_strikes:
            # thin evenly if dense
            idx = np.linspace(0, len(strikes) - 1, max_strikes).round().astype(int)
            strikes = strikes[idx]
        strikes = np.unique(np.round(strikes, 2))

        # Build bulk list of option tickers (Calls & Puts across expiries/strikes)
        sec_list, meta = [], []
        for exp in exps:
            for cp in ("C", "P"):
                for K in strikes:
                    sec = _format_bbg_opt(base, exp, cp, K)
                    sec_list.append(sec)
                    meta.append({"sec": sec, "cp": cp, "K": float(K), "expiry": exp})
        if not sec_list:
            return pd.DataFrame()

        # BDP request
        fields = [
            "PX_BID","PX_ASK",
            "OPT_STRIKE_PX","OPT_EXPIRE_DT","OPT_PUT_CALL",
            "IVOL_MID",                  # preferred
            "IMPLIED_VOLATILITY"         # fallback
        ]
        bdp = ref.createRequest("ReferenceDataRequest")
        se = bdp.getElement("securities"); [se.appendValue(sec) for sec in sec_list]
        fe = bdp.getElement("fields");     [fe.appendValue(f)   for f   in fields]
        msgs = _send_req(s, bdp)

        # parse
        rows = []
        meta_map = {m["sec"]: m for m in meta}

        def _get(fd, name):
            if not fd.hasElement(name): return np.nan
            sub = fd.getElement(name)
            for meth in ("getValueAsFloat64","getValueAsInt64","getValueAsInteger"):
                try: return getattr(sub, meth)()
                except Exception: pass
            for meth in ("getValueAsDatetime","getValueAsString"):
                try:
                    val = getattr(sub, meth)()
                    if isinstance(val, blpapi.Datetime):
                        return pd.to_datetime(val.strftime("%Y-%m-%d"))
                    return val
                except Exception: pass
            try: return sub.getValue()
            except Exception: return np.nan

        for msg in msgs:
            if not msg.hasElement("securityData"): continue
            sdata = msg.getElement("securityData")
            for i in range(sdata.numValues()):
                e = sdata.getValueAsElement(i)
                sec = e.getElementAsString("security")
                if not e.hasElement("fieldData"): continue
                fd = e.getElement("fieldData")

                bid = pd.to_numeric(_get(fd, "PX_BID"), errors="coerce")
                ask = pd.to_numeric(_get(fd, "PX_ASK"), errors="coerce")
                if pd.isna(bid) and pd.isna(ask):
                    continue  # dead quote

                Kfd  = _get(fd, "OPT_STRIKE_PX")
                Efd  = _get(fd, "OPT_EXPIRE_DT")
                PCfd = _get(fd, "OPT_PUT_CALL")
                iv1  = _get(fd, "IVOL_MID")
                iv2  = _get(fd, "IMPLIED_VOLATILITY")

                meta_row = meta_map.get(sec, {})
                K = float(Kfd) if pd.notna(Kfd) else meta_row.get("K", np.nan)
                E = (Efd if isinstance(Efd, pd.Timestamp) else
                     (pd.to_datetime(Efd) if pd.notna(Efd) else meta_row.get("expiry", np.nan)))
                cp = (str(PCfd).upper() if isinstance(PCfd, str) else meta_row.get("cp", None))

                rows.append({
                    "security": sec,
                    "OPT_STRIKE_PX": K,
                    "OPT_EXPIRE_DT": E,
                    "OPT_PUT_CALL": cp,
                    "PX_BID": float(bid) if pd.notna(bid) else np.nan,
                    "PX_ASK": float(ask) if pd.notna(ask) else np.nan,
                    "MID": np.nan if (pd.isna(bid) or pd.isna(ask)) else (float(bid)+float(ask))/2.0,
                    "IMPLIED_VOLATILITY": (pd.to_numeric(iv1, errors="coerce")
                                           if pd.notna(iv1) else pd.to_numeric(iv2, errors="coerce"))
                })

        df = pd.DataFrame(rows)
        if df.empty:
            if verbose: print("[synthetic] No options returned via BDP.")
            return pd.DataFrame()

        # Clean & filter by DTE
        df = df.dropna(subset=["OPT_STRIKE_PX","OPT_EXPIRE_DT","OPT_PUT_CALL","MID"]).copy()
        df["OPT_EXPIRE_DT"] = pd.to_datetime(df["OPT_EXPIRE_DT"], errors="coerce")
        trade_dt = pd.Timestamp(trade_date)
        df["DTE"] = (df["OPT_EXPIRE_DT"] - trade_dt).dt.days
        df = df[(df["DTE"]>=target_min_dte) & (df["DTE"]<=target_max_dte)]
        if df.empty:
            if verbose: print("[synthetic] All candidates filtered out by DTE window.")
            return pd.DataFrame()

        # Pick ATM C & P from each expiry; then best pair overall
        df["atm_dist"] = (df["OPT_STRIKE_PX"] - spot_px).abs()
        df["OPT_PUT_CALL"] = df["OPT_PUT_CALL"].astype(str).str.upper()
        atmC = df[df["OPT_PUT_CALL"]=="C"].sort_values(["atm_dist","DTE"]).groupby("OPT_EXPIRE_DT").head(1)
        atmP = df[df["OPT_PUT_CALL"]=="P"].sort_values(["atm_dist","DTE"]).groupby("OPT_EXPIRE_DT").head(1)
        pair = atmC.merge(atmP, on="OPT_EXPIRE_DT", suffixes=("_C","_P"))
        if pair.empty:
            if verbose: print("[synthetic] Could not pair ATM call/put.")
            return pd.DataFrame()
        pair["combo_dist"] = pair["atm_dist_C"] + pair["atm_dist_P"]
        best = pair.sort_values(["combo_dist","DTE_C"]).head(1)

        return pd.DataFrame([{
            "expiry":   best["OPT_EXPIRE_DT"].iloc[0],
            "DTE":      int(best["DTE_C"].iloc[0]),
            "call_tkr": best["security_C"].iloc[0],
            "put_tkr":  best["security_P"].iloc[0],
            "K":        float(best["OPT_STRIKE_PX_C"].iloc[0]),
            "C_mid":    float(best["MID_C"].iloc[0]),
            "P_mid":    float(best["MID_P"].iloc[0]),
            "IV_call":  float(best["IMPLIED_VOLATILITY_C"].iloc[0]) if pd.notna(best["IMPLIED_VOLATILITY_C"].iloc[0]) else np.nan,
            "IV_put":   float(best["IMPLIED_VOLATILITY_P"].iloc[0]) if pd.notna(best["IMPLIED_VOLATILITY_P"].iloc[0]) else np.nan
        }])
    finally:
        s.stop()

# -------------------- Quant helpers --------------------
def black_scholes_price(S, K, T, r, sigma, cp):
    d1 = (np.log(S/K) + (r + 0.5*sigma*sigma)*T)/(sigma*np.sqrt(T))
    d2 = d1 - sigma*np.sqrt(T)
    return (S*norm.cdf(d1) - K*np.exp(-r*T)*norm.cdf(d2)) if cp.upper()=="C" else (K*np.exp(-r*T)*norm.cdf(-d2) - S*norm.cdf(-d1))

def realized_vol_from_close(close):
    ret = np.log(close/close.shift(1)).dropna()
    return ret.rolling(21).std()*np.sqrt(252)

# -------------------- Backtest (unchanged interface; progress/sampling) --------------------
def backtest_parity(
    underlying="AAPL US Equity",
    start="20240101",
    end  ="20250918",
    min_dte=25, max_dte=45,
    gap_threshold=0.10,
    max_hold_days=60,
    riskfree_ticker="USGG3M Index",
    step_days=1,                # sample every N business days (1 = daily)
    progress_every=10           # print progress every N steps (0 to silence)
):
    px = get_bdh_equity(underlying, start, end, fields=("PX_LAST",))
    rf = get_riskfree_series(start, end, ticker=riskfree_ticker)
    # forward-fill with new API
    df = px.join(rf, how="left").ffill()
    df["rv_21"] = realized_vol_from_close(df["PX_LAST"])

    trades, equity_curve = [], []
    open_pos = None
    dates = df.index.to_list()

    for idx, dt in enumerate(dates):
        if step_days > 1 and idx % step_days != 0:
            equity_curve.append({"date": dt, "equity": 0.0 if not open_pos else open_pos["equity_mark"]})
            continue

        S = float(df.loc[dt, "PX_LAST"])
        rf_day = float(df.loc[dt, "rf_daily"]) if "rf_daily" in df.columns and pd.notna(df.loc[dt, "rf_daily"]) else 0.0

        # carry open pos
        if open_pos:
            open_pos["days_held"] += step_days
            open_pos["cash"] *= (1.0 + rf_day) ** step_days
            equity_curve.append({"date": dt, "equity": open_pos["equity_mark"]})
        else:
            equity_curve.append({"date": dt, "equity": 0.0})

        # exit rules
        if open_pos:
            if dt >= open_pos["expiry"] or open_pos["days_held"] >= max_hold_days:
                pnl = open_pos["gap_signed"]
                trades.append({**open_pos, "exit_date": dt, "pnl": pnl})
                open_pos = None
            continue

        # chain lookup (BDP synthetic; max_points accepted but ignored)
        chain = get_chain_for_day(underlying, S, dt, min_dte, max_dte, max_points=600, verbose=False)
        if chain.empty:
            if progress_every and idx % progress_every == 0:
                print(f"[{dt.date()}] No options found in DTE window ({min_dte}-{max_dte}).")
            continue

        K    = chain["K"].iloc[0]
        Cmid = chain["C_mid"].iloc[0]
        Pmid = chain["P_mid"].iloc[0]
        expiry = chain["expiry"].iloc[0]
        dte   = int(chain["DTE"].iloc[0])
        T     = dte/365.0

        r_ann = (df.loc[dt, "rf_daily"]*365.0) if "rf_daily" in df.columns and pd.notna(df.loc[dt, "rf_daily"]) else 0.0
        dfac  = np.exp(-r_ann*T)
        gap   = (Cmid - Pmid) - (S - K*dfac)

        if abs(gap) < gap_threshold:
            if progress_every and idx % progress_every == 0:
                print(f"[{dt.date()}] Gap {gap:.3f} < thr {gap_threshold}")
            continue

        direction = "short_call_long_put_shortS_longB" if gap > 0 else "long_call_short_put_longS_shortB"
        open_pos = {
            "entry_date": dt, "expiry": expiry, "days_held": 0,
            "underlying": underlying,
            "S0": S, "K": K, "C_mid0": Cmid, "P_mid0": Pmid,
            "r_ann": r_ann, "T": T, "dte": dte,
            "gap": float(gap), "gap_signed": float(np.sign(gap)*abs(gap)),
            "direction": direction,
            "cash": K*dfac if gap>0 else -K*dfac,
            "equity_mark": float(np.sign(gap)*abs(gap))
        }

        if progress_every and idx % progress_every == 0:
            print(f"[{dt.date()}] ENTER {direction} | DTE={dte} K={K:.2f} S={S:.2f} gap={gap:.3f}")

    eq = pd.DataFrame(equity_curve).set_index("date")
    tr = pd.DataFrame(trades)

    if not tr.empty:
        total_pnl = tr["pnl"].sum()
        hitrate   = (tr["pnl"]>0).mean()
        avg_hold  = tr["days_held"].mean()
    else:
        total_pnl = 0.0; hitrate = np.nan; avg_hold = np.nan

    return {
        "underlying_history": df,
        "equity_curve": eq,
        "trades": tr,
        "summary": {
            "total_pnl_units": float(total_pnl),
            "num_trades": int(len(tr)),
            "hit_ratio": float(hitrate) if pd.notna(hitrate) else None,
            "avg_hold_days": float(avg_hold) if pd.notna(avg_hold) else None
        }
    }

# -------------------- Plot helper (skips flat plots) --------------------
def plot_backtest(res, title="Parity Arbitrage Backtest", outfile=None):
    # no-trade guard
    if not isinstance(res, dict) or "summary" not in res or res["summary"].get("num_trades", 0) == 0:
        print("No trades — skipping plot.")
        return

    uh = res["underlying_history"].copy()
    tr = res["trades"].copy().set_index("exit_date").sort_index() if not res["trades"].empty else pd.DataFrame()
    daily = pd.Series(0.0, index=uh.index)
    if not tr.empty:
        for dt, row in tr.iterrows():
            if dt in daily.index:
                daily.loc[dt] += float(row["pnl"])
    cum_pnl = daily.cumsum()
    if outfile is None:
        outfile = Path(f"./backtest_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")

    plt.figure(figsize=(10,5))
    plt.plot(cum_pnl.index, cum_pnl.values, linewidth=2)
    plt.title(title); plt.xlabel("Date"); plt.ylabel("Cumulative Realized PnL (units)")
    plt.grid(True, linewidth=0.5, alpha=0.6); plt.tight_layout()
    plt.savefig(outfile, dpi=200); plt.close()
    print("Saved chart to:", outfile)

# -------------------- Example run --------------------
underlying = "AAPL US Equity"        # e.g., "MSFT US Equity", "SPY US Equity"; BZ names may require local entitlements
start      = "20240701"
end        = "20240815"
min_dte, max_dte = 10, 60
gap_threshold    = 0.05

res = backtest_parity(
    underlying=underlying,
    start=start,
    end=end,
    min_dte=min_dte,
    max_dte=max_dte,
    gap_threshold=gap_threshold,
    riskfree_ticker="USGG3M Index",
    step_days=2,
    progress_every=5
)
print("Summary:", res["summary"])
outfile = f"{underlying.replace(' ','_')}_parity_backtest.png"
plot_backtest(res, title=f"{underlying} Parity Arbitrage Backtest", outfile=outfile)
print(f"Chart path: {outfile}")
# ===============================================================================================================

[2024-07-01] No options found in DTE window (10-60).
[2024-07-16] No options found in DTE window (10-60).
[2024-07-30] No options found in DTE window (10-60).
[2024-08-13] No options found in DTE window (10-60).
Summary: {'total_pnl_units': 0.0, 'num_trades': 0, 'hit_ratio': None, 'avg_hold_days': None}
No trades — skipping plot.
Chart path: AAPL_US_Equity_parity_backtest.png


In [50]:
# ======================= ALL-IN-ONE (Bloomberg parity backtest; robust BDP chain scan) =======================
# Works on a Bloomberg Terminal PC (Desktop API; localhost:8194)
# Public functions:
#   get_bdh_equity, get_chain_for_day, backtest_parity, plot_backtest

import blpapi, pandas as pd, numpy as np, matplotlib.pyplot as plt
from scipy.stats import norm
from datetime import datetime, timedelta
from pathlib import Path

# -------------------- Bloomberg session helpers --------------------
def _start_bbg_session(host="localhost", port=8194):
    so = blpapi.SessionOptions()
    so.setServerHost(host); so.setServerPort(port)
    s = blpapi.Session(so)
    if not s.start(): raise RuntimeError("Failed to start Bloomberg session.")
    if not s.openService("//blp/refdata"): raise RuntimeError("Failed to open //blp/refdata.")
    return s, s.getService("//blp/refdata")

def _send_req(session, req, timeout_ms=120_000):
    session.sendRequest(req)
    msgs = []
    while True:
        ev = session.nextEvent(timeout_ms)
        for m in ev: msgs.append(m)
        if ev.eventType() == blpapi.Event.RESPONSE:
            break
    return msgs

# -------------------- Safe extractor for blpapi.Element --------------------
def _blp_get(parent_el: blpapi.Element, field_name: str):
    if not parent_el.hasElement(field_name):
        return np.nan
    sub = parent_el.getElement(field_name)
    for meth in ("getValueAsFloat64","getValueAsInt64","getValueAsInteger"):
        try: return getattr(sub, meth)()
        except Exception: pass
    for meth in ("getValueAsBool","getValueAsDatetime","getValueAsString"):
        try:
            val = getattr(sub, meth)()
            if isinstance(val, blpapi.Datetime):
                return pd.to_datetime(val.strftime("%Y-%m-%d"))
            return val
        except Exception: pass
    try: return sub.getValue()
    except Exception: return np.nan

# -------------------- BDH: historical for any security (e.g., equity, rates) --------------------
def get_bdh_equity(symbol_bbg, start_yyyymmdd, end_yyyymmdd, fields=("PX_LAST",), periodicity="DAILY"):
    s, ref = _start_bbg_session()
    try:
        r = ref.createRequest("HistoricalDataRequest")
        r.getElement("securities").appendValue(symbol_bbg)
        fe = r.getElement("fields")
        for f in fields: fe.appendValue(f)
        r.set("startDate", start_yyyymmdd)
        r.set("endDate", end_yyyymmdd)
        r.set("periodicitySelection", periodicity)
        msgs = _send_req(s, r)

        rows = []
        for msg in msgs:
            if not msg.hasElement("securityData"): continue
            sd = msg.getElement("securityData")
            if not sd.hasElement("fieldData"): continue
            fd = sd.getElement("fieldData")
            for i in range(fd.numValues()):
                row_el = fd.getValueAsElement(i)
                d = row_el.getElementAsDatetime("date")
                out = {"date": pd.to_datetime(d.strftime("%Y-%m-%d"))}
                for f in fields:
                    val = _blp_get(row_el, f)
                    out[f] = val if val is not None else np.nan
                rows.append(out)

        df = pd.DataFrame(rows)
        if df.empty:
            return pd.DataFrame(columns=list(fields)).astype(float)
        df = df.sort_values("date").set_index("date")
        for f in fields:
            try: df[f] = pd.to_numeric(df[f])
            except Exception: pass
        return df
    finally:
        s.stop()

def get_riskfree_series(start_yyyymmdd, end_yyyymmdd, ticker="USGG3M Index"):
    rf = get_bdh_equity(ticker, start_yyyymmdd, end_yyyymmdd, fields=("PX_LAST",))
    rf = rf.rename(columns={"PX_LAST":"RF_YLD_PCT"})
    rf["rf_daily"] = (rf["RF_YLD_PCT"]/100.0)/365.0
    return rf[["rf_daily"]]

# -------------------- Option chain (heavy-duty BDP scan; no CHAIN_TICKERS needed) --------------------
def get_chain_for_day(
    underlying_bbg,
    spot_px,
    trade_date,
    target_min_dte=25,
    target_max_dte=45,
    max_points=None,         # accepted for compatibility; not used
    verbose=True
):
    """
    Robust BDP scanner (no CHAIN_TICKERS):
      • Expiries: all Fridays in [min_dte, max_dte] + next 6 third-Fridays (monthlies)
      • Strikes: price-aware ATM ladder (0.5/1/2.5/5/10 increments), a few steps each side
      • Pricing: prefer BID/ASK -> MID; fallback to LAST_PRICE when quotes are unavailable

    Returns a 1-row DataFrame with the chosen ATM call/put, or empty DF if none.
    """
    import pandas as pd, numpy as np, blpapi
    from datetime import timedelta

    # ---- helpers ----
    def _bbg_base(under):
        parts = under.split()
        if len(parts) < 3:
            raise ValueError(f"Unexpected underlying format: {under}")
        return f"{parts[0]} {parts[1]}"

    def _fridays_between(start_dt, min_dte, max_dte):
        fr = []
        for d in range(min_dte, max_dte + 1):
            dt = start_dt + timedelta(days=d)
            if dt.weekday() == 4:  # Friday
                fr.append(dt)
        return fr

    def _third_fridays(start_dt, count=6):
        exps, dt = [], start_dt
        while len(exps) < count:
            y, m = dt.year + (dt.month // 12), (dt.month % 12) + 1
            first_next = pd.Timestamp(year=y, month=m, day=1)
            wd = first_next.weekday()
            first_friday = first_next + timedelta(days=(4 - wd) % 7)
            third_friday = first_friday + timedelta(days=14)
            exps.append(third_friday)
            dt = third_friday
        return exps

    def _strike_increment(S):
        if S < 25:   return 0.5
        if S < 200:  return 1.0
        if S < 500:  return 2.5
        if S < 1000: return 5.0
        return 10.0

    def _atm_ladder(S, steps=5):
        inc = _strike_increment(S)
        k0  = round(S / inc) * inc
        ks  = [k0 + i*inc for i in range(-steps, steps+1)]
        coarse = [k0 + i*(5*inc) for i in range(-2, 3)]
        ks = sorted(set([round(k, 2) for k in ks + coarse]))
        return ks

    def _format_bbg_opt(base, exp_dt, cp, strike):
        mmddyy = pd.Timestamp(exp_dt).strftime("%m/%d/%y")
        if abs(strike - round(strike)) < 1e-6:
            strike_txt = str(int(round(strike)))
        else:
            strike_txt = f"{strike:.2f}".rstrip("0").rstrip(".")
        return f"{base} {mmddyy} {cp}{strike_txt} Equity"

    # ---- assemble candidates ----
    base = _bbg_base(underlying_bbg)
    trade_dt = pd.Timestamp(trade_date)

    expiry_fridays   = _fridays_between(trade_dt, target_min_dte, target_max_dte)
    expiry_monthlies = _third_fridays(trade_dt, count=6)
    expiries = sorted(set(expiry_fridays + expiry_monthlies))[:24]  # cap

    if verbose:
        print(f"[chain] expiries candidates: {len(expiries)} (min_dte={target_min_dte}, max_dte={target_max_dte})")

    if not expiries:
        return pd.DataFrame()

    strikes = _atm_ladder(spot_px, steps=5)
    if verbose:
        print(f"[chain] strike candidates around spot {spot_px:.2f}: {len(strikes)}")

    sec_list, meta = [], []
    for exp in expiries:
        for cp in ("C", "P"):
            for K in strikes:
                sec = _format_bbg_opt(base, exp, cp, K)
                sec_list.append(sec)
                meta.append({"sec": sec, "cp": cp, "K": float(K), "expiry": pd.Timestamp(exp)})

    if verbose:
        print(f"[chain] requesting {len(sec_list)} option securities via BDP")

    # ---- BDP request (use BID/ASK + LAST_PRICE) ----
    s, ref = _start_bbg_session()
    try:
        fields = [
            "BID","ASK","LAST_PRICE",
            "OPT_STRIKE_PX","OPT_EXPIRE_DT","OPT_PUT_CALL",
            "IVOL_MID","IMPLIED_VOLATILITY"
        ]
        bdp = ref.createRequest("ReferenceDataRequest")
        se = bdp.getElement("securities"); [se.appendValue(sec) for sec in sec_list]
        fe = bdp.getElement("fields");     [fe.appendValue(f)   for f   in fields]
        msgs = _send_req(s, bdp)

        rows = []
        meta_map = {m["sec"]: m for m in meta}

        def _get(fd, name):
            if not fd.hasElement(name): return np.nan
            sub = fd.getElement(name)
            for meth in ("getValueAsFloat64","getValueAsInt64","getValueAsInteger"):
                try: return getattr(sub, meth)()
                except Exception: pass
            for meth in ("getValueAsDatetime","getValueAsString"):
                try:
                    val = getattr(sub, meth)()
                    if isinstance(val, blpapi.Datetime):
                        return pd.to_datetime(val.strftime("%Y-%m-%d"))
                    return val
                except Exception: pass
            try: return sub.getValue()
            except Exception: return np.nan

        returned = 0
        with_quotes = 0
        with_last   = 0

        for msg in msgs:
            if not msg.hasElement("securityData"): continue
            sdata = msg.getElement("securityData")
            for i in range(sdata.numValues()):
                e = sdata.getValueAsElement(i)
                sec = e.getElementAsString("security")
                if not e.hasElement("fieldData"): continue
                fd = e.getElement("fieldData")
                returned += 1

                bid = pd.to_numeric(_get(fd, "BID"), errors="coerce")
                ask = pd.to_numeric(_get(fd, "ASK"), errors="coerce")
                last= pd.to_numeric(_get(fd, "LAST_PRICE"), errors="coerce")

                mid = np.nan
                if pd.notna(bid) and pd.notna(ask):
                    with_quotes += 1
                    mid = (float(bid) + float(ask)) / 2.0
                elif pd.notna(last):
                    with_last += 1
                    mid = float(last)

                if pd.isna(mid):
                    continue  # still nothing usable

                Kfd  = _get(fd, "OPT_STRIKE_PX")
                Efd  = _get(fd, "OPT_EXPIRE_DT")
                PCfd = _get(fd, "OPT_PUT_CALL")
                iv1  = _get(fd, "IVOL_MID")
                iv2  = _get(fd, "IMPLIED_VOLATILITY")

                meta_row = meta_map.get(sec, {})
                K = float(Kfd) if pd.notna(Kfd) else meta_row.get("K", np.nan)
                E = (Efd if isinstance(Efd, pd.Timestamp) else
                     (pd.to_datetime(Efd) if pd.notna(Efd) else meta_row.get("expiry", np.nan)))
                cp = (str(PCfd).upper() if isinstance(PCfd, str) else meta_row.get("cp", None))

                rows.append({
                    "security": sec,
                    "OPT_STRIKE_PX": K,
                    "OPT_EXPIRE_DT": E,
                    "OPT_PUT_CALL": cp,
                    "BID": float(bid) if pd.notna(bid) else np.nan,
                    "ASK": float(ask) if pd.notna(ask) else np.nan,
                    "LAST_PRICE": float(last) if pd.notna(last) else np.nan,
                    "MID": mid,
                    "IMPLIED_VOLATILITY": (pd.to_numeric(iv1, errors="coerce")
                                           if pd.notna(iv1) else pd.to_numeric(iv2, errors="coerce"))
                })

        if verbose:
            print(f"[chain] returned: {returned}, with quotes: {with_quotes}, with last-only: {with_last}")

        df = pd.DataFrame(rows)
        if df.empty:
            if verbose: print("[chain] No valid options returned via BDP.")
            return pd.DataFrame()

        # Clean & filter by DTE
        df = df.dropna(subset=["OPT_STRIKE_PX","OPT_EXPIRE_DT","OPT_PUT_CALL","MID"]).copy()
        df["OPT_EXPIRE_DT"] = pd.to_datetime(df["OPT_EXPIRE_DT"], errors="coerce")
        df["DTE"] = (df["OPT_EXPIRE_DT"] - pd.Timestamp(trade_date)).dt.days
        df = df[(df["DTE"]>=target_min_dte) & (df["DTE"]<=target_max_dte)]
        if df.empty:
            if verbose: print("[chain] All candidates filtered out by DTE window.")
            return pd.DataFrame()

        # Pair ATM per expiry
        df["atm_dist"] = (df["OPT_STRIKE_PX"] - spot_px).abs()
        df["OPT_PUT_CALL"] = df["OPT_PUT_CALL"].astype(str).str.upper()
        atmC = df[df["OPT_PUT_CALL"]=="C"].sort_values(["atm_dist","DTE"]).groupby("OPT_EXPIRE_DT").head(1)
        atmP = df[df["OPT_PUT_CALL"]=="P"].sort_values(["atm_dist","DTE"]).groupby("OPT_EXPIRE_DT").head(1)
        pair = atmC.merge(atmP, on="OPT_EXPIRE_DT", suffixes=("_C","_P"))
        if pair.empty:
            if verbose: print("[chain] Could not pair ATM call/put on any expiry.")
            return pd.DataFrame()
        pair["combo_dist"] = pair["atm_dist_C"] + pair["atm_dist_P"]
        best = pair.sort_values(["combo_dist","DTE_C"]).head(1)

        if verbose:
            print(f"[chain] picked expiry {best['OPT_EXPIRE_DT'].iloc[0].date()}, K≈{best['OPT_STRIKE_PX_C'].iloc[0]}")

        return pd.DataFrame([{
            "expiry":   best["OPT_EXPIRE_DT"].iloc[0],
            "DTE":      int(best["DTE_C"].iloc[0]),
            "call_tkr": best["security_C"].iloc[0],
            "put_tkr":  best["security_P"].iloc[0],
            "K":        float(best["OPT_STRIKE_PX_C"].iloc[0]),
            "C_mid":    float(best["MID_C"].iloc[0]),
            "P_mid":    float(best["MID_P"].iloc[0]),
            "IV_call":  float(best["IMPLIED_VOLATILITY_C"].iloc[0]) if pd.notna(best["IMPLIED_VOLATILITY_C"].iloc[0]) else np.nan,
            "IV_put":   float(best["IMPLIED_VOLATILITY_P"].iloc[0]) if pd.notna(best["IMPLIED_VOLATILITY_P"].iloc[0]) else np.nan
        }])
    finally:
        s.stop()
    # ---- helpers ----
    def _bbg_base(under):
        # "AAPL US Equity" -> "AAPL US"
        parts = under.split()
        if len(parts) < 3:
            raise ValueError(f"Unexpected underlying format: {under}")
        return f"{parts[0]} {parts[1]}"

    def _fridays_between(start_dt, min_dte, max_dte):
        fr = []
        for d in range(min_dte, max_dte + 1):
            dt = start_dt + timedelta(days=d)
            if dt.weekday() == 4:  # Friday
                fr.append(dt)
        return fr

    def _third_fridays(start_dt, count=6):
        # next N standard monthly expiries (third Friday)
        exps, dt = [], start_dt
        while len(exps) < count:
            # move to first day next month
            y, m = dt.year + (dt.month // 12), (dt.month % 12) + 1
            first_next = pd.Timestamp(year=y, month=m, day=1)
            # third Friday of that month
            wd = first_next.weekday()
            # Friday index in that month: find first Friday then add 14 days
            first_friday = first_next + timedelta(days=(4 - wd) % 7)
            third_friday = first_friday + timedelta(days=14)
            exps.append(third_friday)
            dt = third_friday
        return exps

    def _strike_increment(S):
        # heuristic for US equity option grids
        if S < 25:   return 0.5
        if S < 200:  return 1.0
        if S < 500:  return 2.5
        if S < 1000: return 5.0
        return 10.0

    def _atm_ladder(S, steps=4):
        inc = _strike_increment(S)
        k0  = round(S / inc) * inc
        ks  = [k0 + i*inc for i in range(-steps, steps+1)]
        # also add a coarser grid around spot to catch odd OCC ladders
        coarse = [k0 + i*(5*inc) for i in range(-2, 3)]
        ks = sorted(set([round(k, 2) for k in ks + coarse]))
        return ks

    def _format_bbg_opt(base, exp_dt, cp, strike):
        mmddyy = pd.Timestamp(exp_dt).strftime("%m/%d/%y")
        if abs(strike - round(strike)) < 1e-6:
            strike_txt = str(int(round(strike)))
        else:
            strike_txt = f"{strike:.2f}".rstrip("0").rstrip(".")
        return f"{base} {mmddyy} {cp}{strike_txt} Equity"

    # ---- assemble candidates ----
    base = _bbg_base(underlying_bbg)
    trade_dt = pd.Timestamp(trade_date)

    expiry_fridays = _fridays_between(trade_dt, target_min_dte, target_max_dte)
    expiry_monthlies = _third_fridays(trade_dt, count=6)
    expiries = sorted(set(expiry_fridays + expiry_monthlies))[:24]  # cap to 24 expiries

    if verbose:
        print(f"[chain] expiries candidates: {len(expiries)} (min_dte={target_min_dte}, max_dte={target_max_dte})")

    if not expiries:
        return pd.DataFrame()

    strikes = _atm_ladder(spot_px, steps=5)  # 5 steps each side + coarse grid
    if verbose:
        print(f"[chain] strike candidates around spot {spot_px:.2f}: {len(strikes)}")

    # Build bulk list
    sec_list, meta = [], []
    for exp in expiries:
        for cp in ("C", "P"):
            for K in strikes:
                sec = _format_bbg_opt(base, exp, cp, K)
                sec_list.append(sec)
                meta.append({"sec": sec, "cp": cp, "K": float(K), "expiry": pd.Timestamp(exp)})

    if verbose:
        print(f"[chain] requesting {len(sec_list)} option securities via BDP")

    # ---- BDP request ----
    s, ref = _start_bbg_session()
    try:
        fields = [
            "PX_BID","PX_ASK",
            "OPT_STRIKE_PX","OPT_EXPIRE_DT","OPT_PUT_CALL",
            "IVOL_MID","IMPLIED_VOLATILITY"
        ]
        bdp = ref.createRequest("ReferenceDataRequest")
        se = bdp.getElement("securities"); [se.appendValue(sec) for sec in sec_list]
        fe = bdp.getElement("fields");     [fe.appendValue(f)   for f   in fields]
        msgs = _send_req(s, bdp)

        rows = []
        meta_map = {m["sec"]: m for m in meta}

        def _get(fd, name):
            if not fd.hasElement(name): return np.nan
            sub = fd.getElement(name)
            for meth in ("getValueAsFloat64","getValueAsInt64","getValueAsInteger"):
                try: return getattr(sub, meth)()
                except Exception: pass
            for meth in ("getValueAsDatetime","getValueAsString"):
                try:
                    val = getattr(sub, meth)()
                    if isinstance(val, blpapi.Datetime):
                        return pd.to_datetime(val.strftime("%Y-%m-%d"))
                    return val
                except Exception: pass
            try: return sub.getValue()
            except Exception: return np.nan

        returned = 0
        with_quotes = 0
        for msg in msgs:
            if not msg.hasElement("securityData"): continue
            sdata = msg.getElement("securityData")
            for i in range(sdata.numValues()):
                e = sdata.getValueAsElement(i)
                sec = e.getElementAsString("security")
                if not e.hasElement("fieldData"): continue
                fd = e.getElement("fieldData")
                returned += 1

                bid = pd.to_numeric(_get(fd, "PX_BID"), errors="coerce")
                ask = pd.to_numeric(_get(fd, "PX_ASK"), errors="coerce")
                if pd.isna(bid) and pd.isna(ask):
                    continue  # skip dead securities
                with_quotes += 1

                Kfd  = _get(fd, "OPT_STRIKE_PX")
                Efd  = _get(fd, "OPT_EXPIRE_DT")
                PCfd = _get(fd, "OPT_PUT_CALL")
                iv1  = _get(fd, "IVOL_MID")
                iv2  = _get(fd, "IMPLIED_VOLATILITY")

                meta_row = meta_map.get(sec, {})
                K = float(Kfd) if pd.notna(Kfd) else meta_row.get("K", np.nan)
                E = (Efd if isinstance(Efd, pd.Timestamp) else
                     (pd.to_datetime(Efd) if pd.notna(Efd) else meta_row.get("expiry", np.nan)))
                cp = (str(PCfd).upper() if isinstance(PCfd, str) else meta_row.get("cp", None))

                rows.append({
                    "security": sec,
                    "OPT_STRIKE_PX": K,
                    "OPT_EXPIRE_DT": E,
                    "OPT_PUT_CALL": cp,
                    "PX_BID": float(bid) if pd.notna(bid) else np.nan,
                    "PX_ASK": float(ask) if pd.notna(ask) else np.nan,
                    "MID": np.nan if (pd.isna(bid) or pd.isna(ask)) else (float(bid)+float(ask))/2.0,
                    "IMPLIED_VOLATILITY": (pd.to_numeric(iv1, errors="coerce")
                                           if pd.notna(iv1) else pd.to_numeric(iv2, errors="coerce"))
                })

        if verbose:
            print(f"[chain] returned: {returned}, with quotes: {with_quotes}")

        df = pd.DataFrame(rows)
        if df.empty:
            if verbose: print("[chain] No valid options returned via BDP.")
            return pd.DataFrame()

        # Clean & filter by DTE
        df = df.dropna(subset=["OPT_STRIKE_PX","OPT_EXPIRE_DT","OPT_PUT_CALL","MID"]).copy()
        df["OPT_EXPIRE_DT"] = pd.to_datetime(df["OPT_EXPIRE_DT"], errors="coerce")
        trade_dt = pd.Timestamp(trade_date)
        df["DTE"] = (df["OPT_EXPIRE_DT"] - trade_dt).dt.days
        df = df[(df["DTE"]>=target_min_dte) & (df["DTE"]<=target_max_dte)]
        if df.empty:
            if verbose: print("[chain] All candidates filtered out by DTE window.")
            return pd.DataFrame()

        # Pair ATM per expiry
        df["atm_dist"] = (df["OPT_STRIKE_PX"] - spot_px).abs()
        df["OPT_PUT_CALL"] = df["OPT_PUT_CALL"].astype(str).str.upper()
        atmC = df[df["OPT_PUT_CALL"]=="C"].sort_values(["atm_dist","DTE"]).groupby("OPT_EXPIRE_DT").head(1)
        atmP = df[df["OPT_PUT_CALL"]=="P"].sort_values(["atm_dist","DTE"]).groupby("OPT_EXPIRE_DT").head(1)
        pair = atmC.merge(atmP, on="OPT_EXPIRE_DT", suffixes=("_C","_P"))
        if pair.empty:
            if verbose: print("[chain] Could not pair ATM call/put on any expiry.")
            return pd.DataFrame()
        pair["combo_dist"] = pair["atm_dist_C"] + pair["atm_dist_P"]
        best = pair.sort_values(["combo_dist","DTE_C"]).head(1)

        if verbose:
            print(f"[chain] picked expiry {best['OPT_EXPIRE_DT'].iloc[0].date()}, K≈{best['OPT_STRIKE_PX_C'].iloc[0]}")

        return pd.DataFrame([{
            "expiry":   best["OPT_EXPIRE_DT"].iloc[0],
            "DTE":      int(best["DTE_C"].iloc[0]),
            "call_tkr": best["security_C"].iloc[0],
            "put_tkr":  best["security_P"].iloc[0],
            "K":        float(best["OPT_STRIKE_PX_C"].iloc[0]),
            "C_mid":    float(best["MID_C"].iloc[0]),
            "P_mid":    float(best["MID_P"].iloc[0]),
            "IV_call":  float(best["IMPLIED_VOLATILITY_C"].iloc[0]) if pd.notna(best["IMPLIED_VOLATILITY_C"].iloc[0]) else np.nan,
            "IV_put":   float(best["IMPLIED_VOLATILITY_P"].iloc[0]) if pd.notna(best["IMPLIED_VOLATILITY_P"].iloc[0]) else np.nan
        }])
    finally:
        s.stop()

# -------------------- Quant helpers --------------------
def black_scholes_price(S, K, T, r, sigma, cp):
    d1 = (np.log(S/K) + (r + 0.5*sigma*sigma)*T)/(sigma*np.sqrt(T))
    d2 = d1 - sigma*np.sqrt(T)
    return (S*norm.cdf(d1) - K*np.exp(-r*T)*norm.cdf(d2)) if cp.upper()=="C" else (K*np.exp(-r*T)*norm.cdf(-d2) - S*norm.cdf(-d1))

def realized_vol_from_close(close):
    ret = np.log(close/close.shift(1)).dropna()
    return ret.rolling(21).std()*np.sqrt(252)

# -------------------- Backtest --------------------
def backtest_parity(
    underlying="AAPL US Equity",
    start="20240101",
    end  ="20250918",
    min_dte=25, max_dte=45,
    gap_threshold=0.10,
    max_hold_days=60,
    riskfree_ticker="USGG3M Index",
    step_days=1,
    progress_every=10
):
    px = get_bdh_equity(underlying, start, end, fields=("PX_LAST",))
    rf = get_riskfree_series(start, end, ticker=riskfree_ticker)
    df = px.join(rf, how="left").ffill()
    df["rv_21"] = realized_vol_from_close(df["PX_LAST"])

    trades, equity_curve = [], []
    open_pos = None
    dates = df.index.to_list()

    for idx, dt in enumerate(dates):
        if step_days > 1 and idx % step_days != 0:
            equity_curve.append({"date": dt, "equity": 0.0 if not open_pos else open_pos["equity_mark"]})
            continue

        S = float(df.loc[dt, "PX_LAST"])
        rf_day = float(df.loc[dt, "rf_daily"]) if "rf_daily" in df.columns and pd.notna(df.loc[dt, "rf_daily"]) else 0.0

        if open_pos:
            open_pos["days_held"] += step_days
            open_pos["cash"] *= (1.0 + rf_day) ** step_days
            equity_curve.append({"date": dt, "equity": open_pos["equity_mark"]})
        else:
            equity_curve.append({"date": dt, "equity": 0.0})

        if open_pos:
            if dt >= open_pos["expiry"] or open_pos["days_held"] >= max_hold_days:
                pnl = open_pos["gap_signed"]
                trades.append({**open_pos, "exit_date": dt, "pnl": pnl})
                open_pos = None
            continue

        chain = get_chain_for_day(underlying, S, dt, min_dte, max_dte, max_points=600, verbose=True)
        if chain.empty:
            if progress_every and idx % progress_every == 0:
                print(f"[{dt.date()}] No options found in DTE window ({min_dte}-{max_dte}).")
            continue

        K    = chain["K"].iloc[0]
        Cmid = chain["C_mid"].iloc[0]
        Pmid = chain["P_mid"].iloc[0]
        expiry = chain["expiry"].iloc[0]
        dte   = int(chain["DTE"].iloc[0])
        T     = dte/365.0

        r_ann = (df.loc[dt, "rf_daily"]*365.0) if "rf_daily" in df.columns and pd.notna(df.loc[dt, "rf_daily"]) else 0.0
        dfac  = np.exp(-r_ann*T)
        gap   = (Cmid - Pmid) - (S - K*dfac)

        if abs(gap) < gap_threshold:
            if progress_every and idx % progress_every == 0:
                print(f"[{dt.date()}] Gap {gap:.3f} < thr {gap_threshold}")
            continue

        direction = "short_call_long_put_shortS_longB" if gap > 0 else "long_call_short_put_longS_shortB"
        open_pos = {
            "entry_date": dt, "expiry": expiry, "days_held": 0,
            "underlying": underlying,
            "S0": S, "K": K, "C_mid0": Cmid, "P_mid0": Pmid,
            "r_ann": r_ann, "T": T, "dte": dte,
            "gap": float(gap), "gap_signed": float(np.sign(gap)*abs(gap)),
            "direction": direction,
            "cash": K*dfac if gap>0 else -K*dfac,
            "equity_mark": float(np.sign(gap)*abs(gap))
        }

        if progress_every and idx % progress_every == 0:
            print(f"[{dt.date()}] ENTER {direction} | DTE={dte} K={K:.2f} S={S:.2f} gap={gap:.3f}")

    eq = pd.DataFrame(equity_curve).set_index("date")
    tr = pd.DataFrame(trades)

    if not tr.empty:
        total_pnl = tr["pnl"].sum()
        hitrate   = (tr["pnl"]>0).mean()
        avg_hold  = tr["days_held"].mean()
    else:
        total_pnl = 0.0; hitrate = np.nan; avg_hold = np.nan

    return {
        "underlying_history": df,
        "equity_curve": eq,
        "trades": tr,
        "summary": {
            "total_pnl_units": float(total_pnl),
            "num_trades": int(len(tr)),
            "hit_ratio": float(hitrate) if pd.notna(hitrate) else None,
            "avg_hold_days": float(avg_hold) if pd.notna(avg_hold) else None
        }
    }

# -------------------- Plot helper --------------------
def plot_backtest(res, title="Parity Arbitrage Backtest", outfile=None):
    if not isinstance(res, dict) or "summary" not in res or res["summary"].get("num_trades", 0) == 0:
        print("No trades — skipping plot.")
        return

    uh = res["underlying_history"].copy()
    tr = res["trades"].copy().set_index("exit_date").sort_index() if not res["trades"].empty else pd.DataFrame()
    daily = pd.Series(0.0, index=uh.index)
    if not tr.empty:
        for dt, row in tr.iterrows():
            if dt in daily.index:
                daily.loc[dt] += float(row["pnl"])
    cum_pnl = daily.cumsum()
    if outfile is None:
        outfile = Path(f"./backtest_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")

    plt.figure(figsize=(10,5))
    plt.plot(cum_pnl.index, cum_pnl.values, linewidth=2)
    plt.title(title); plt.xlabel("Date"); plt.ylabel("Cumulative Realized PnL (units)")
    plt.grid(True, linewidth=0.5, alpha=0.6); plt.tight_layout()
    plt.savefig(outfile, dpi=200); plt.close()
    print("Saved chart to:", outfile)

# -------------------- Example run --------------------
underlying = "AAPL US Equity"        # try other US names once AAPL works
start      = "20240701"
end        = "20240815"
min_dte, max_dte = 10, 60
gap_threshold    = 0.05

res = backtest_parity(
    underlying=underlying,
    start=start,
    end=end,
    min_dte=min_dte,
    max_dte=max_dte,
    gap_threshold=gap_threshold,
    riskfree_ticker="USGG3M Index",
    step_days=2,
    progress_every=5
)
print("Summary:", res["summary"])
outfile = f"{underlying.replace(' ','_')}_parity_backtest.png"
plot_backtest(res, title=f"{underlying} Parity Arbitrage Backtest", outfile=outfile)
print(f"Chart path: {outfile}")
# ===============================================================================================================

[chain] expiries candidates: 13 (min_dte=10, max_dte=60)
[chain] strike candidates around spot 216.00: 13
[chain] requesting 338 option securities via BDP
[chain] returned: 338, with quotes: 0, with last-only: 0
[chain] No valid options returned via BDP.
[2024-07-01] No options found in DTE window (10-60).
[chain] expiries candidates: 12 (min_dte=10, max_dte=60)
[chain] strike candidates around spot 221.00: 13
[chain] requesting 312 option securities via BDP
[chain] returned: 312, with quotes: 0, with last-only: 0
[chain] No valid options returned via BDP.
[chain] expiries candidates: 13 (min_dte=10, max_dte=60)
[chain] strike candidates around spot 227.00: 13
[chain] requesting 338 option securities via BDP
[chain] returned: 338, with quotes: 0, with last-only: 0
[chain] No valid options returned via BDP.
[chain] expiries candidates: 12 (min_dte=10, max_dte=60)
[chain] strike candidates around spot 232.00: 13
[chain] requesting 312 option securities via BDP
[chain] returned: 312, with