In [None]:
"""hw1f_jamshidian_pricer_quantlib.py
-------------------------------------------------------------------
Industrial‑grade SOFR swaption pricer using **QuantLib‑Python**
• Bootstrapped SOFR discount curve (DF csv) → QuantLib DiscountCurve
• ATM SOFR Black vol matrix (csv)      → Hull‑White(1F) calibration
• Exact European swaption PV via Jamshidian decomposition
-------------------------------------------------------------------

1. FILE INPUT REQUIREMENTS
──────────────────────────
(a) df.csv              columns = ['date','df'] – valuation‑date row first
(b) sofr_black_vol.csv  columns = ['expiry','tenor','black_vol_%']
    expiry  : "1M","3M","6M","1Y",…  (string parsable by Tenor())
    tenor   : "1Y","2Y","5Y",…        (fixed‑leg tenor = underlying swap)
    black_vol_% : e.g. 5.74  (meaning 5.74%)

2. PUBLIC API
─────────────
>>> curve = build_discount_curve('2025-07-25', 'df.csv')
>>> model = calibrate_hull_white(curve, 'sofr_black_vol.csv')
>>> pv = price_swaption(model, curve,
                        option_tenor=ql.Period('1Y'),
                        swap_tenor  =ql.Period('5Y'),
                        pay_fixed   =True,
                        strike      ='ATM')

3. DEPENDENCIES
───────────────
$ pip install QuantLib-Python pandas numpy
"""
from __future__ import annotations
import math
import pandas as pd
import QuantLib as ql
from typing import List, Tuple

# ──────────────────────────────────────────────────────────────────
# 1. DISCOUNT CURVE BUILDER
# ──────────────────────────────────────────────────────────────────

def build_discount_curve(valuation_date: str | ql.Date,
                         df_csv: str,
                         calendar: ql.Calendar = ql.UnitedStates(ql.UnitedStates.FederalReserve)) -> ql.YieldTermStructureHandle:
    """Return QuantLib Handle<YieldTermStructure> from DF csv."""
    if isinstance(valuation_date, str):
        valuation_date = ql.DateParser.parseISO(valuation_date)
    ql.Settings.instance().evaluationDate = valuation_date

    df = pd.read_csv(df_csv)
    dates = [valuation_date] + [ql.DateParser.parseISO(d) for d in df['date'].iloc[1:]]
    dfs = df['df'].values.tolist()

    curve = ql.DiscountCurve(dates, dfs, ql.Actual360(), calendar)
    return ql.YieldTermStructureHandle(curve)

# ──────────────────────────────────────────────────────────────────
# 2. HULL‑WHITE CALIBRATION (Black → Normal conversion)
# ──────────────────────────────────────────────────────────────────

_DEF_OPT_EXPIRIES = {
    '1M':ql.Period('1M'),'3M':ql.Period('3M'),'6M':ql.Period('6M'),
    '9M':ql.Period('9M'),'1Y':ql.Period('1Y'),'18M':ql.Period('18M'),
    '2Y':ql.Period('2Y'),'3Y':ql.Period('3Y'),'5Y':ql.Period('5Y'),
    '7Y':ql.Period('7Y'),'10Y':ql.Period('10Y'),'15Y':ql.Period('15Y'),
    '20Y':ql.Period('20Y'),'30Y':ql.Period('30Y')}

def _black_to_normal(vol_b: float, fwd: float, strike: float) -> float:
    """ATM approximation (if strike==fwd) else exact Jäckel formula."""
    if abs(fwd - strike) < 1e-8:
        return vol_b * fwd
    num = math.log(fwd/strike)
    return vol_b * (fwd - strike) / num


def calibrate_hull_white(curve: ql.YieldTermStructureHandle,
                         vol_csv: str,
                         mean_reversion: float = 0.05) -> ql.HullWhite:
    """Calibrate σ (vol of HW) assuming constant a (mean_reversion)."""
    vols = pd.read_csv(vol_csv)

    calendar = ql.UnitedStates(ql.UnitedStates.FederalReserve)
    dc       = ql.Actual360()

    engine_black = ql.BlackSwaptionEngine(curve, ql.AverageStrike)

    helpers: List[ql.SwaptionHelper] = []
    for _, row in vols.iterrows():
        exp_str, ten_str, black_pct = row['expiry'], row['tenor'], row['black_vol_%']
        opt_tenor = _DEF_OPT_EXPIRIES[exp_str]
        swap_tenor= _DEF_OPT_EXPIRIES[ten_str]
        black_vol = black_pct / 100.0

        # need strike = ATM forward => pass 0, engine will use ATM.
        helper = ql.SwaptionHelper(opt_tenor, swap_tenor,
                                   ql.QuoteHandle(ql.SimpleQuote(black_vol)),
                                   curve,
                                   ql.Annual,
                                   ql.ModifiedFollowing,
                                   dc,
                                   ql.BlackNormal, True)
        helper.setPricingEngine(engine_black)
        helpers.append(helper)

    # HW model with constant σ to calibrate
    model = ql.HullWhite(curve, mean_reversion, 0.01)
    opt_method = ql.LevenbergMarquardt()
    model.calibrate(helpers, opt_method,
                    ql.EndCriteria(500, 100, 1e-8, 1e-8, 1e-8))

    return model

# ──────────────────────────────────────────────────────────────────
# 3. JAMSHIDIAN SWAPTION PRICER
# ──────────────────────────────────────────────────────────────────

def price_swaption(model: ql.HullWhite,
                   curve: ql.YieldTermStructureHandle,
                   option_tenor: ql.Period,
                   swap_tenor: ql.Period,
                   pay_fixed: bool = True,
                   strike: str | float = 'ATM') -> Tuple[float, dict]:
    """Exact Jamshidian PV + Greeks."""
    calendar = ql.UnitedStates(ql.UnitedStates.FederalReserve)
    dc = ql.Actual360()
    fixed_sched = ql.Schedule(ql.Settings.instance().evaluationDate + option_tenor,
                              ql.Settings.instance().evaluationDate + option_tenor + swap_tenor,
                              ql.Period('6M'), calendar,
                              ql.ModifiedFollowing, ql.ModifiedFollowing,
                              ql.DateGeneration.Forward, False)
    vanilla_swap = ql.VanillaSwap(
        ql.VanillaSwap.Payer if pay_fixed else ql.VanillaSwap.Receiver,
        1.0,
        fixed_sched, 0.0, dc,  # dummy fixed rate; engine will set ATM if strike='ATM'
        ql.SofrIndex(curve), dc
    )
    if strike != 'ATM':
        vanilla_swap.setFixedRate(strike)

    exercise = ql.EuropeanExercise(vanilla_swap.startDate())
    swaption  = ql.Swaption(vanilla_swap, exercise)
    engine    = ql.JamshidianSwaptionEngine(model)
    swaption.setPricingEngine(engine)

    pv = swaption.NPV()

    # Greeks via bump‑and‑reprice (Δ, Γ) quick demo
    bump = 1e-4
    curve_bump = ql.ZeroSpreadedTermStructure(curve, ql.QuoteHandle(ql.SimpleQuote(bump)))
    model_bump = ql.HullWhite(ql.YieldTermStructureHandle(curve_bump), model.a(), model.sigma())
    swaption.setPricingEngine(ql.JamshidianSwaptionEngine(model_bump))
    pv_up = swaption.NPV()
    delta = (pv_up - pv) / bump

    # gamma approximate: bump both sides
    curve_bump_dn = ql.ZeroSpreadedTermStructure(curve, ql.QuoteHandle(ql.SimpleQuote(-bump)))
    model_dn = ql.HullWhite(ql.YieldTermStructureHandle(curve_bump_dn), model.a(), model.sigma())
    swaption.setPricingEngine(ql.JamshidianSwaptionEngine(model_dn))
    pv_dn = swaption.NPV()
    gamma = (pv_up - 2 * pv + pv_dn) / (bump**2)

    greeks = {"delta": delta, "gamma": gamma}
    return pv, greeks

# ──────────────────────────────────────────────────────────────────
# 4. DEMO when run standalone
# ──────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    curve = build_discount_curve("2025-07-25", "df.csv")
    model = calibrate_hull_white(curve, "sofr_black_vol.csv")
    pv, gk = price_swaption(model, curve, ql.Period("1Y"), ql.Period("5Y"))
    print(f"PV 1Yx5Y payer ATM: {pv:.6f}")
    print("Greeks:", gk)