In [None]:
import numpy as np
from datetime import date
from dateutil.relativedelta import relativedelta
import abc
import scipy

# QuantLib imports
import QuantLib as ql

class G2Calibrator:
    """
    Calibrates a G2++ model to market cap/floor quotes.
    """
    def __init__(self, ts_handle, index):
        self.ts_handle = ts_handle
        self.index = index
        self.model = ql.G2(ts_handle)

    def calibrate(
        self,
        periods: list,
        quotes: list,
        optimization_method=None,
        end_criteria=None,
        engine_steps: int = 50
    ) -> tuple:
        """
        Calibrate the G2 model to cap vols.

        Args:
            periods: list of ql.Period objects (e.g. [ql.Period('1Y'),...])
            quotes:  list of QuoteHandle objects matching periods
            optimization_method: ql.LevenbergMarquardt instance
            end_criteria: ql.EndCriteria instance
            engine_steps: steps for TreeCapFloorEngine

        Returns:
            tuple of calibrated (a, sigma, b, eta, rho)
        """
        # build helpers
        helpers = []
        for p, q in zip(periods, quotes):
            # CapHelper with explicit errorType, volType, shift
            helper = ql.CapHelper(
                p,
                q,
                self.index,
                ql.Quarterly,
                ql.Actual360(),
                False,
                self.ts_handle,
                ql.BlackCalibrationHelper.RelativePriceError,
                ql.Normal,
                0.0
            )
            engine = ql.TreeCapFloorEngine(self.model, engine_steps)
            helper.setPricingEngine(engine)
            helpers.append(helper)

        # default optimization and end criteria
        opt = optimization_method or ql.LevenbergMarquardt(1e-8,1e-8,1e-8)
        criteria = end_criteria or ql.EndCriteria(1000,500,1e-8,1e-8,1e-8)

        # calibrate
        self.model.calibrate(helpers, opt, criteria)
        return self.model.params()


class QuantLibBondStaticBase:
    """
    Encapsulates static bond definition in QuantLib: schedule, instrument, and evaluation-date setup.
    """
    def __init__(
        self,
        valuation_date: date,
        maturity_date: date,
        coupon_rate: float,
        face_value: float = 100.0,
        freq: int = 2,
        calendar=None,
        day_count=None,
        business_convention=None,
    ):
        # Store Python-side
        self.valuation_date_py = valuation_date
        self.maturity_date_py = maturity_date
        self.coupon_rate = coupon_rate
        self.face_value = face_value
        self.freq = freq

        # QuantLib settings (defaults if None)
        self.calendar = calendar or ql.NullCalendar()
        self.day_count = day_count or ql.Actual365Fixed()
        self.business_convention = (
            business_convention if business_convention is not None else ql.Unadjusted
        )

        # Convert to QL Dates and set global evaluation date
        self.ql_valuation_date = ql.Date(
            valuation_date.day, valuation_date.month, valuation_date.year
        )
        ql.Settings.instance().evaluationDate = self.ql_valuation_date
        self.ql_maturity_date = ql.Date(
            maturity_date.day, maturity_date.month, maturity_date.year
        )

        # Build payment schedule
        months = int(12 / self.freq)
        self.schedule = ql.Schedule(
            self.ql_valuation_date,
            self.ql_maturity_date,
            ql.Period(months, ql.Months),
            self.calendar,
            self.business_convention,
            self.business_convention,
            ql.DateGeneration.Forward,
            False,
        )

        # Build the FixedRateBond instrument
        self.bond = ql.FixedRateBond(
            0,
            self.face_value,
            self.schedule,
            [self.coupon_rate],
            self.day_count,
        )

class CallableBondStaticBase(QuantLibBondStaticBase):
    """
    Extends QuantLibBondStaticBase to add a call schedule and build
    a CallableFixedRateBond.
    """
    def __init__(
        self,
        valuation_date: date,
        maturity_date: date,
        coupon_rate: float,
        call_dates: list[date],
        call_prices: list[float],
        face_value: float = 100.0,
        freq: int = 2,
        calendar=None,
        day_count=None,
        business_convention=None,
    ):
        # initialize the plain vanilla bond
        super().__init__(
            valuation_date, maturity_date,
            coupon_rate, face_value, freq,
            calendar, day_count, business_convention
        )

        # build a callability schedule
        self.call_schedule = ql.CallabilitySchedule()
        for cd, cp in zip(call_dates, call_prices):
            ql_cd = ql.Date(cd.day, cd.month, cd.year)
            callability_price  = ql.BondPrice(cp, ql.BondPrice.Clean)
            call = ql.Callability(
                callability_price,
                ql.Callability.Call,
                ql_cd
            )
            self.call_schedule.push_back(call)

        # rebuild bond as CallableFixedRateBond
        settlement_days = 0
        self.bond = ql.CallableFixedRateBond(
            settlement_days,
            self.face_value,
            self.schedule,
            [self.coupon_rate],
            self.day_count,
            self.business_convention,
            self.face_value,
            self.ql_valuation_date,
            self.call_schedule
        )

class BondPricerBase(abc.ABC):
    """
    Abstract pricer interface. Operates on a static bond definition.
    """
    def __init__(self, bond_static: QuantLibBondStaticBase):
        self.bond_static = bond_static

    @abc.abstractmethod
    def price(
        self,
        pillar_times: np.ndarray,
        zero_rates: np.ndarray,
    ) -> np.ndarray:
        pass

class FastBondPricer(BondPricerBase):
    """
    Fast numpy-based pricer (Actual/365Fixed) with interpolation.
    """
    def __init__(self, bond_static: QuantLibBondStaticBase):
        super().__init__(bond_static)
        self._gen_cashflows()

    def _gen_cashflows(self):
        ql_sched = self.bond_static.schedule
        dc       = self.bond_static.day_count
        ql_val   = self.bond_static.ql_valuation_date

        cf_dates   = []
        cf_times   = []
        cf_amts    = []
        coupon_amt = self.bond_static.face_value * self.bond_static.coupon_rate / self.bond_static.freq

        for d in ql_sched:
            if d == ql_val:
                continue
            cf_dates.append(d.to_date())
            t = dc.yearFraction(ql_val, d)
            cf_times.append(t)
            cf_amts.append(coupon_amt)

        cf_amts[-1] += self.bond_static.face_value

        self.cf_dates   = cf_dates
        self.cf_times   = np.array(cf_times, dtype=float)
        self.cf_amounts = np.array(cf_amts,  dtype=float)

    def price(self, pillar_times: np.ndarray, zero_rates: np.ndarray) -> np.ndarray:
        if zero_rates.ndim == 1:
            r_cf = np.interp(self.cf_times, pillar_times, zero_rates)
            dfs  = np.exp(-r_cf * self.cf_times)
            return float(self.cf_amounts.dot(dfs))
        r_cf_matrix = np.vstack([
            np.interp(self.cf_times, pillar_times, sc)
            for sc in zero_rates
        ])
        dfs = np.exp(-r_cf_matrix * self.cf_times[None, :])
        return dfs.dot(self.cf_amounts)

class QuantLibBondPricer(BondPricerBase):
    """
    Pricer using QuantLib's time-based ZeroCurve + appropriate engine.
    - Uses DiscountingBondEngine for vanilla bonds.
    - Uses TreeCallableFixedRateBondEngine with G2 model for callable bonds (method='g2').

    Args:
        bond_static: static bond definition (vanilla or callable)
        method: 'discount' or 'g2'
        grid_steps: time steps for tree engine (if callable)
    """
    def __init__(self, bond_static: QuantLibBondStaticBase, method: str = 'discount', grid_steps: int = 100):
        super().__init__(bond_static)
        self.method = method.lower()
        self.grid_steps = grid_steps

    def _make_term_structure(self, pillar_times, rates_vec):
        """
        Build a date-based zero curve from float year-times and rates.
        """
        # anchor at valuation date
        base = self.bond_static.ql_valuation_date
        # build date vector: time zero plus each pillar
        dates = ql.DateVector()
        dates.push_back(base)
        for t in pillar_times:
            days = int(round(t * 365))
            dates.push_back(base + ql.Period(days, ql.Days))
        # prepend zero-time rate
        rates = [rates_vec[0]] + list(rates_vec)
        # build zero curve
        zc = ql.ZeroCurve(
            dates,
            rates,
            self.bond_static.day_count,
            self.bond_static.calendar,
            ql.Linear(),
            ql.Continuous,
            ql.Annual
        )
        zc.enableExtrapolation()
        return ql.YieldTermStructureHandle(zc)

    @staticmethod
    def _price_vanilla(bond, ts_handle):
        engine = ql.DiscountingBondEngine(ts_handle)
        bond.setPricingEngine(engine)
        return bond.NPV()

    @staticmethod
    def _price_callable(bond, ts_handle, model_params, grid_steps):
        # model_params: tuple (a, sigma, b, eta, rho)
        a, sigma, b, eta, rho = model_params
        model = ql.G2(ts_handle, a, sigma, b, eta, rho)
        engine = ql.TreeCallableFixedRateBondEngine(model, grid_steps)
        bond.setPricingEngine(engine)
        return bond.cleanPrice()

    def price(
        self,
        pillar_times: np.ndarray,
        zero_rates: np.ndarray,
        g2_params=None
    ) -> np.ndarray:
        """
        For method='g2', supply g2_params:
          - single tuple (a,sigma,b,eta,rho) to apply to all curves,
          - or iterable of tuples matching zero_rates.shape[0]
        """
        dc  = self.bond_static.day_count
        cal = self.bond_static.calendar
        is_callable = hasattr(self.bond_static, 'call_schedule') and self.method == 'g2'

        def price_curve(rates_vec, params=None):
            ts = self._make_term_structure(pillar_times, rates_vec)
            if is_callable:
                return self._price_callable(
                    self.bond_static.bond,
                    ts,
                    params if params is not None else g2_params,
                    self.grid_steps
                )
            else:
                return self._price_vanilla(
                    self.bond_static.bond,
                    ts
                )

        # single curve
        if zero_rates.ndim == 1:
            return float(price_curve(zero_rates, g2_params))

        # multi-scenario
        prices = []
        # if single params tuple, broadcast
        single_params = g2_params and not hasattr(g2_params[0], '__iter__')
        for idx, scen in enumerate(zero_rates):
            params = g2_params if single_params else (g2_params[idx] if g2_params else None)
            prices.append(price_curve(scen, params))
        return np.array(prices)


import numpy as np

class TensorFunctionalForm:
    """
    Represents a multivariate quadratic function f(x) = x^T A x + b^T x + c,
    where A is diagonal matrix of second-order coefficients.
    """
    def __init__(self, a: np.ndarray, b: np.ndarray, c: float):
        # a and b are vectors of length D
        self.a = a  # second-order (diag) coefficients
        self.b = b  # first-order coefficients
        self.c = c  # constant term

    def __call__(self, x: np.ndarray) -> float:
        # x is vector of length D
        return float(np.dot(self.a, x**2) + np.dot(self.b, x) + self.c)

    def params(self):
        return self.a, self.b, self.c

class TensorFunctionalFormCalibrate:
    """
    Builds and evaluates a TensorFunctionalForm approximation of bond prices
    over multi-dimensional zero-rate shocks, using Sobol sampling.
    """
    def __init__(
        self,
        pricer: BondPricerBase,
        pillar_times: np.ndarray,
        # full set of zero-curve scenarios to define domain
        full_scenarios: np.ndarray  # shape (N_full, D)
    ):
        self.pricer = pricer
        self.pillar_times = pillar_times
        self.full_scenarios = full_scenarios
        self.domain_min = np.min(full_scenarios, axis=0)
        self.domain_max = np.max(full_scenarios, axis=0)

    def sample_and_fit(
        self,
        n_train: int = 200,
        n_test: int = 20,
        random_seed: int = 0
    ) -> tuple[TensorFunctionalForm, np.ndarray, np.ndarray, float]:
        """
        Samples training points via Sobol in the scenario domain,
        fits a TensorFunctionalForm, then tests on n_test actual scenarios.

        Returns:
            model:       fitted TensorFunctionalForm
            test_scen:   array of test scenarios (n_test, D)
            test_prices: actual prices on test scenarios
            rmse:        root mean squared error of approximation
        """
        from scipy.stats import qmc

        D = self.full_scenarios.shape[1]
        # 1) Sobol sampling on unit cube, then scale to domain
        sampler = qmc.Sobol(d=D, scramble=True, seed=random_seed)
        sobol_u = sampler.random_base2(m=int(np.log2(n_train))) if (n_train & (n_train-1))==0 else sampler.random(n_train)
        train_scen = qmc.scale(sobol_u, self.domain_min, self.domain_max)

        # 2) Price training scenarios
        train_prices = self.pricer.price(self.pillar_times, train_scen)

        # 3) Fit tensor quadratic: model f(x) = sum a_j x_j^2 + b_j x_j + c
        # Design matrix X: columns [x1^2,...,xD^2, x1,...,xD, 1]
        X_train = np.hstack([train_scen**2, train_scen, np.ones((n_train,1))])
        coeffs, *_ = np.linalg.lstsq(X_train, train_prices, rcond=None)
        a = coeffs[:D]
        b = coeffs[D:2*D]
        c = coeffs[-1]
        model = TensorFunctionalForm(a, b, c)

        # 4) Test on random subset of full scenarios
        rng = np.random.default_rng(random_seed)
        idx = rng.choice(self.full_scenarios.shape[0], size=n_test, replace=False)
        test_scen = self.full_scenarios[idx]
        test_true = self.pricer.price(self.pillar_times, test_scen)
        test_pred = np.array([model(x) for x in test_scen])

        # 5) compute RMSE
        rmse = np.sqrt(np.mean((test_true - test_pred)**2))
        return model, test_scen, test_true, rmse

# -------------------------------------------------------------------:
    """
    Fits and evaluates a quadratic approximation of bond prices
    as a function of a scalar scenario shock.
    """
    def __init__(
        self,
        pricer,                # instance of BondPricerBase
        pillar_times: np.ndarray,
        base_curve: np.ndarray  # base zero curve
    ):
        self.pricer = pricer
        self.pillar_times = pillar_times
        self.base_curve = base_curve

    def generate_prices(self, shocks: np.ndarray) -> np.ndarray:
        """
        Given an array of scalar shocks, computes pricer prices
        at zero_curves = base_curve + shock.
        """
        # Build scenario curves by adding shock to each pillar
        scenarios = self.base_curve[None, :] + shocks[:, None]
        return self.pricer.price(self.pillar_times, scenarios)

    def fit_quadratic(self, shocks: np.ndarray, prices: np.ndarray) -> TensorFunctionalForm:
        """
        Fit a quadratic f(x)=ax^2+bx+c to the given (shocks, prices).
        Returns a QuadraticFunctionalForm instance.
        """
        # Design matrix: [x^2, x, 1]
        X = np.vstack([shocks**2, shocks, np.ones_like(shocks)]).T
        # Solve least squares for [a,b,c]
        coeffs, *_ = np.linalg.lstsq(X, prices, rcond=None)
        return TensorFunctionalForm(*coeffs)

# -------------------------------------------------------------------
# Example usage in test harness
# -------------------------------------------------------------------
if __name__ == "__main__":
    val_date  = date(2025, 5, 18)
    mat_date  = date(2030, 5, 18)
    coupon    = 0.04

    # Plain vanilla
    bond_static = QuantLibBondStaticBase(val_date, mat_date, coupon)
    fast_pricer = FastBondPricer(bond_static)
    ql_pricer   = QuantLibBondPricer(bond_static)

    # Callable static
    call_dates  = [date(2027,5,18), date(2028,5,18), date(2029,5,18)]
    call_prices = [100.0, 101.0, 102.0]
    cb_static   = CallableBondStaticBase(
        val_date, mat_date, coupon,
        call_dates, call_prices
    )

    ql_callable_pricer = QuantLibBondPricer(cb_static, method='g2', grid_steps=10)

    # Market curve - single curve and simulated scenarios
    pillar_times = np.array([1,2,3,5,7,10], dtype=float)
    zero_curve   = np.array([0.02,0.021,0.022,0.025,0.027,0.03], dtype=float)

    # Generate multiple simulated zero-curve scenarios
    np.random.seed(42)
    n_scenarios = 2000
    simulated_curves = zero_curve[None, :] + 0.01 * np.random.randn(n_scenarios, zero_curve.size)

    print("Plain vanilla single-curve price:")
    print(" Fast  :", fast_pricer.price(pillar_times, zero_curve))
    print(" QL    :", ql_pricer.price(pillar_times, zero_curve))

        # --------------- Model-based callable pricer ---------------
    print("Callable bond pricing via calibrated G2++ Model:")
    # Use the pricer's term-structure builder to avoid manual ZeroCurve call
    pricer = QuantLibBondPricer(bond_static, method='g2', grid_steps=10)
    ts_handle = pricer._make_term_structure(pillar_times, zero_curve)

    # Build Libor index for calibration
    index = ql.Sofr(ts_handle)

    # Market cap vol quotes for G2 calibration
    periods = [ql.Period('1Y'), ql.Period('2Y'), ql.Period('5Y'), ql.Period('7Y'), ql.Period('10Y'), ql.Period('15Y')]
    vols    = [0.0185,        0.0155,         0.0145,         0.0143,         0.0135,          0.0125]
    quotes  = [ql.QuoteHandle(ql.SimpleQuote(v)) for v in vols]

    # Calibrate G2 model
    calibrator = G2Calibrator(ts_handle, index)
    #g2_params = calibrator.calibrate(periods, quotes, engine_steps=10)
    g2_params = [ 0.0937371, 0.0201141, 1.14135, 0.046775, -0.886926 ]
    print(" Calibrated G2 parameters:", g2_params)

    # Price callable bond using calibrated parameters
    model_price = ql_callable_pricer.price(pillar_times, simulated_curves, g2_params=g2_params)
    print(" G2++ Callable Price (calibrated):", model_price)
  
    # --------------- Tensor approximation example for callable bond ---------------
    print("\nFitting tensor approximation for callable bond price:")
    # Use callable pricer with hard-coded G2 params
    hardcoded_g2 = (0.0937371, 0.0201141, 1.14135, 0.046775, -0.886926)
    cb_pricer_tf = QuantLibBondPricer(cb_static, method='g2', grid_steps=10)
    
    # Build a full scenario set around the base zero curve (e.g. simulated_curves above)
    tf_calibrator = TensorFunctionalFormCalibrate(cb_pricer_tf, pillar_times, simulated_curves)
    
    model_c, test_scen_c, true_prices_c, rmse_c = tf_calibrator.sample_and_fit(
        n_train=64,
        n_test=20,
        random_seed=42
    )
    print(" Callable RMSE:", rmse_c)
    print(" Test true vs approx (5 samples):")
    for tp, ts in zip(true_prices_c[:5], [model_c(x) for x in test_scen_c[:5]]):
        print(f"  true={tp:.4f}, approx={ts:.4f}")


In [49]:
import time

# Generate scenarios
np.random.seed(0)
scenarios_1m = zero_curve + 0.001 * np.random.randn(100_000, pillar_times.size)

# Measure time for FastBondPricer
start_time = time.time()
fp_scen_1m = fast_pricer.price(pillar_times, scenarios_1m)
fast_pricer_time_1m = time.time() - start_time
print(f"FastBondPricer time for 1M scenarios: {fast_pricer_time_1m:.4f} seconds")

# Measure time for QuantLibBondPricer
start_time = time.time()
qp_scen_1m = ql_pricer.price(pillar_times, scenarios_1m)
quantlib_pricer_time_1m = time.time() - start_time
print(f"QuantLibBondPricer time for 1M scenarios: {quantlib_pricer_time_1m:.4f} seconds")

# Measure time for CallableBondPricer
start_time = time.time()
cb_scen_1m = ql_callable_pricer.price(pillar_times, scenarios_1m, g2_params=g2_params)
callable_pricer_time_1m = time.time() - start_time
print(f"CallableBondPricer time for 1M scenarios: {callable_pricer_time_1m:.4f} seconds")


FastBondPricer time for 1M scenarios: 0.1194 seconds
QuantLibBondPricer time for 1M scenarios: 2.3319 seconds
CallableBondPricer time for 1M scenarios: 12.1400 seconds


In [None]:
# --------------- Tensor approximation example for callable bond ---------------
print("\nFitting tensor approximation for callable bond price:")
# Use callable pricer with hard-coded G2 params
hardcoded_g2 = (0.0937371, 0.0201141, 1.14135, 0.046775, -0.886926)
cb_pricer_tf = QuantLibBondPricer(cb_static, method='g2', grid_steps=10)

# Build a full scenario set around the base zero curve (e.g. simulated_curves above)
tf_calibrator = TensorFunctionalFormCalibrate(cb_pricer_tf, pillar_times, simulated_curves)

model_c, test_scen_c, true_prices_c, rmse_c = tf_calibrator.sample_and_fit(
    n_train=64,
    n_test=20,
    random_seed=42
)
print(" Callable RMSE:", rmse_c)
print(" Test true vs approx (5 samples):")
for tp, ts in zip(true_prices_c[:5], [model_c(x) for x in test_scen_c[:5]]):
    print(f"  true={tp:.4f}, approx={ts:.4f}")



Fitting tensor approximation for callable bond price:


ModuleNotFoundError: No module named 'scipy'