<a href="https://colab.research.google.com/github/aderdouri/ql_web_app/blob/master/ql_notebooks/capflooredcoupon.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install QuantLib-Python

In [None]:
import QuantLib as ql
import unittest
import math

# Helper for io::rate like formatting
def format_rate(r):
    return f"{r * 100:.4f}%"

# Helper for io::volatility like formatting
def format_vol(v):
    return f"{v * 100:.4f}%"

# Helper for checkAbsError
def check_abs_error(x1, x2, tolerance):
    return abs(x1 - x2) < tolerance

# Helper for typeToString
def type_to_string(capfloor_type):
    if capfloor_type == ql.CapFloor.Cap:
        return "cap"
    elif capfloor_type == ql.CapFloor.Floor:
        return "floor"
    elif capfloor_type == ql.CapFloor.Collar:
        return "collar"
    else:
        raise ValueError(f"unknown cap/floor type: {capfloor_type}")

class CommonVars:
    # common data setup
    def __init__(self):
        self.length = 20           # years
        self.volatility = 0.20
        self.nominal = 100.0
        # Python bindings often expect lists for vector arguments
        self.nominals = [self.nominal] * self.length # Replicate nominal for each year
        self.frequency = ql.Annual # Changed from Semiannual in CapFloor test to Annual here
        self.termStructure = ql.RelinkableYieldTermStructureHandle()
        # Changed to Euribor1Y to match Annual frequency
        self.index = ql.Euribor1Y(self.termStructure)
        self.calendar = self.index.fixingCalendar()
        self.convention = ql.ModifiedFollowing
        # Use a fixed date for reproducibility
        self.today = ql.Date(15, ql.May, 2024) # Example fixed date
        # Evaluation date must be set globally for some calculations
        # ql.Settings.instance().evaluationDate = self.today # Done in test's setUp
        self.settlementDays = 2
        self.fixingDays = 2
        self.settlement = self.calendar.advance(self.today, self.settlementDays, ql.Days)
        # Link the term structure AFTER settlement date is known
        self.termStructure.linkTo(ql.FlatForward(self.settlement, 0.05,
                                                 ql.ActualActual(ql.ActualActual.ISDA)))
        # Caps and floors are typically set per test case if needed
        self.caps = []
        self.floors = []


    # utilities
    def makeFixedLeg(self, startDate, length_years):
        endDate = self.calendar.advance(startDate, length_years, ql.Years,
                                        self.convention)
        schedule = ql.Schedule(startDate, endDate, ql.Period(self.frequency), self.calendar,
                               self.convention, self.convention,
                               ql.DateGeneration.Forward, False)
        # Fixed rate leg with 0 coupons needs Rate=0.0
        coupons = [0.0] * length_years # One coupon rate per period
        # Use FixedRateLeg convenience class
        leg = ql.FixedRateLeg(schedule) \
                .withNotionals(self.nominals) \
                .withCouponRates(coupons, ql.Thirty360(ql.Thirty360.BondBasis)) \
                .withPaymentAdjustment(self.convention) # Added payment adjustment
        return leg

    def makeFloatingLeg(self, startDate, length_years,
                        gearing=1.0, spread=0.0):
        endDate = self.calendar.advance(startDate, length_years, ql.Years, self.convention)
        schedule = ql.Schedule(startDate, endDate, ql.Period(self.frequency), self.calendar,
                               self.convention, self.convention,
                               ql.DateGeneration.Forward, False)
        # Ensure vectors match the number of periods (length_years for Annual freq)
        gearingVector = [gearing] * length_years
        spreadVector = [spread] * length_years
        # Use IborLeg convenience class
        leg = ql.IborLeg(schedule, self.index) \
                .withNotionals(self.nominals) \
                .withPaymentDayCounter(self.index.dayCounter()) \
                .withPaymentAdjustment(self.convention) \
                .withFixingDays([self.fixingDays]) \
                .withGearings(gearingVector) \
                .withSpreads(spreadVector)
        return leg

    def makeCapFlooredLeg(self, startDate, length_years,
                          caps, floors, volatility,
                          gearing=1.0, spread=0.0):
        endDate = self.calendar.advance(startDate, length_years, ql.Years, self.convention)
        schedule = ql.Schedule(startDate, endDate, ql.Period(self.frequency), self.calendar,
                               self.convention, self.convention,
                               ql.DateGeneration.Forward, False)

        # Volatility structure setup
        vol_handle = ql.OptionletVolatilityStructureHandle(
            ql.ConstantOptionletVolatility(0, self.calendar, ql.Following,
                                           volatility, ql.Actual365Fixed())
        )
        # Coupon pricer
        pricer = ql.BlackIborCouponPricer(vol_handle)

        # Ensure vectors match the number of periods
        num_periods = length_years # For annual frequency
        gearingVector = [gearing] * num_periods
        spreadVector = [spread] * num_periods

        # Ensure caps/floors lists are either empty or match num_periods
        caps_adj = caps if len(caps) == num_periods else ([caps[0]] * num_periods if caps else [])
        floors_adj = floors if len(floors) == num_periods else ([floors[0]] * num_periods if floors else [])

        iborLeg = ql.IborLeg(schedule, self.index) \
                    .withNotionals(self.nominals) \
                    .withPaymentDayCounter(self.index.dayCounter()) \
                    .withPaymentAdjustment(self.convention) \
                    .withFixingDays([self.fixingDays]) \
                    .withGearings(gearingVector) \
                    .withSpreads(spreadVector) \
                    .withCaps(caps_adj) \
                    .withFloors(floors_adj)

        # Apply the pricer to the leg's coupons
        ql.setCouponPricer(iborLeg, pricer)
        return iborLeg

    def makeEngine(self, volatility):
        # Pricing engine for CapFloor instruments (e.g., Cap, Floor, Collar)
        vol_quote = ql.QuoteHandle(ql.SimpleQuote(volatility))
        if not self.termStructure.empty():
            return ql.BlackCapFloorEngine(self.termStructure, vol_quote)
        else:
            raise RuntimeError("Term structure handle is not linked.")


    def makeCapFloor(self, capfloor_type, leg, capStrike, floorStrike, volatility):
        # Note: Cap/Floor/Collar constructors take lists of strikes
        cap_strikes = [capStrike]
        floor_strikes = [floorStrike]

        result = None
        if capfloor_type == ql.CapFloor.Cap:
            result = ql.Cap(leg, cap_strikes)
        elif capfloor_type == ql.CapFloor.Floor:
            result = ql.Floor(leg, floor_strikes)
        elif capfloor_type == ql.CapFloor.Collar:
            result = ql.Collar(leg, cap_strikes, floor_strikes)
        else:
            raise ValueError(f"unknown cap/floor type: {capfloor_type}")

        engine = self.makeEngine(volatility)
        result.setPricingEngine(engine)
        return result


class CapFlooredCouponTests(unittest.TestCase):

    def setUp(self):
        self.original_eval_date = ql.Settings.instance().evaluationDate
        self.vars = CommonVars()
        # Set the global eval date to the one from CommonVars for this test
        ql.Settings.instance().evaluationDate = self.vars.today

    def tearDown(self):
        ql.Settings.instance().evaluationDate = self.original_eval_date

    def testLargeRates(self):
        """Testing degenerate collared coupon."""
        print("Testing degenerate collared coupon...")
        vars_ = self.vars

        # A vanilla floating leg should equal a leg Capped at 10000% and Floored at 0%
        large_cap_rate = 100.0 # 10000%
        zero_floor_rate = 0.0

        # Ensure caps/floors lists match the length (number of coupons)
        num_periods = vars_.length # Annual frequency
        caps = [large_cap_rate] * num_periods
        floors = [zero_floor_rate] * num_periods

        tolerance = 1e-10

        # fixed leg with zero rate (for swap pricing)
        fixedLeg = vars_.makeFixedLeg(vars_.startDate, vars_.length)
        # Vanilla floating leg
        floatLeg = vars_.makeFloatingLeg(vars_.startDate, vars_.length)
        # Collared leg with extreme strikes
        collaredLeg = vars_.makeCapFlooredLeg(vars_.startDate, vars_.length,
                                              caps, floors, vars_.volatility)

        engine = ql.DiscountingSwapEngine(vars_.termStructure)

        vanillaSwap = ql.Swap(fixedLeg, floatLeg)
        collaredSwap = ql.Swap(fixedLeg, collaredLeg)

        vanillaSwap.setPricingEngine(engine)
        collaredSwap.setPricingEngine(engine)

        vanilla_npv = vanillaSwap.NPV()
        collared_npv = collaredSwap.NPV()

        self.assertAlmostEqual(vanilla_npv, collared_npv, delta=tolerance,
                               msg=(f"Leg NPVs should be equal for extreme collar:\n"
                                    f"Length: {vars_.length} y\n"
                                    f"Volatility: {vars_.volatility*100:.2f}%\n"
                                    f"Notional: {vars_.nominal}\n"
                                    f"Vanilla floating leg NPV: {vanilla_npv}\n"
                                    f"Collared floating leg NPV (strikes 0 and 10000%): {collared_npv}\n"
                                    f"Diff: {abs(vanilla_npv - collared_npv)}"))

    def testDecomposition(self):
        """Testing collared coupon against its decomposition."""
        print("Testing collared coupon against its decomposition...")
        vars_ = self.vars

        tolerance = 1e-12
        floorstrike = 0.05
        capstrike = 0.10

        num_periods = vars_.length # Annual frequency
        caps = [capstrike] * num_periods
        floors = [floorstrike] * num_periods
        caps0 = [] # Represents no cap
        floors0 = [] # Represents no floor

        gearing_p = 0.5
        spread_p = 0.002
        gearing_n = -1.5
        spread_n = 0.12

        # Common legs and swaps
        fixedLeg = vars_.makeFixedLeg(vars_.startDate, vars_.length)
        floatLeg = vars_.makeFloatingLeg(vars_.startDate, vars_.length) # gearing=1, spread=0
        floatLeg_p = vars_.makeFloatingLeg(vars_.startDate, vars_.length, gearing_p, spread_p)
        floatLeg_n = vars_.makeFloatingLeg(vars_.startDate, vars_.length, gearing_n, spread_n)

        swapEngine = ql.DiscountingSwapEngine(vars_.termStructure)

        vanillaSwap = ql.Swap(fixedLeg, floatLeg); vanillaSwap.setPricingEngine(swapEngine)
        vanillaSwap_p = ql.Swap(fixedLeg, floatLeg_p); vanillaSwap_p.setPricingEngine(swapEngine)
        vanillaSwap_n = ql.Swap(fixedLeg, floatLeg_n); vanillaSwap_n.setPricingEngine(swapEngine)

        npvVanilla = vanillaSwap.NPV()
        npvVanilla_p = vanillaSwap_p.NPV()
        npvVanilla_n = vanillaSwap_n.NPV()

        # === Capped coupon decomposition ===
        # Capped Leg = Vanilla Leg - Cap

        # Gearing = 1, Spread = 0
        cappedLeg = vars_.makeCapFlooredLeg(vars_.startDate, vars_.length, caps, floors0, vars_.volatility)
        capSwap = ql.Swap(fixedLeg, cappedLeg); capSwap.setPricingEngine(swapEngine)
        capInstrument = ql.Cap(floatLeg, [capstrike]); capInstrument.setPricingEngine(vars_.makeEngine(vars_.volatility))
        npvCappedLeg = capSwap.NPV()
        npvCap = capInstrument.NPV()
        self.assertAlmostEqual(npvCappedLeg, npvVanilla - npvCap, delta=tolerance,
                               msg=f"Capped Leg decomp failed (g=1, s=0): leg={npvCappedLeg}, V-C={npvVanilla - npvCap}")

        # Gearing > 0
        cappedLeg_p = vars_.makeCapFlooredLeg(vars_.startDate, vars_.length, caps, floors0, vars_.volatility, gearing_p, spread_p)
        capSwap_p = ql.Swap(fixedLeg, cappedLeg_p); capSwap_p.setPricingEngine(swapEngine)
        # Cap on the geared leg
        capInstrument_p = ql.Cap(floatLeg_p, [capstrike]); capInstrument_p.setPricingEngine(vars_.makeEngine(vars_.volatility))
        npvCappedLeg_p = capSwap_p.NPV()
        npvCap_p = capInstrument_p.NPV()
        self.assertAlmostEqual(npvCappedLeg_p, npvVanilla_p - npvCap_p, delta=tolerance,
                               msg=f"Capped Leg decomp failed (g>0): leg={npvCappedLeg_p}, V_p-C_p={npvVanilla_p - npvCap_p}")

        # Gearing < 0: Capped Leg = Vanilla Leg + |g| * Put( Keff = (K-s)/g )
        cappedLeg_n = vars_.makeCapFlooredLeg(vars_.startDate, vars_.length, caps, floors0, vars_.volatility, gearing_n, spread_n)
        capSwap_n = ql.Swap(fixedLeg, cappedLeg_n); capSwap_n.setPricingEngine(swapEngine)
        # Floor on the vanilla leg with effective strike
        effective_strike_cap_n = (capstrike - spread_n) / gearing_n
        floorInstrument_eff_n = ql.Floor(floatLeg, [effective_strike_cap_n]); floorInstrument_eff_n.setPricingEngine(vars_.makeEngine(vars_.volatility))
        npvCappedLeg_n = capSwap_n.NPV()
        npvFloor_eff_n = floorInstrument_eff_n.NPV()
        # C++ test uses: npvVanilla_n + gearing_n * npvFloor_eff_n
        # Let's re-derive: Payoff = Min(g*R+s, K) = g*R+s + Min(0, K-(g*R+s))
        # = g*R+s - Max(0, g*R+s - K)
        # If g < 0: Payoff = g*R+s - Max(0, s-K - |g|R)
        # = g*R+s - Max(0, -( |g|R - (s-K) ) ) --- This doesn't look right.
        # Let's use the alternative: Min(a*R+b, K) = a*R+b when a*R+b < K => R > (K-b)/a (since a<0)
        # Min(a*R+b, K) = K when a*R+b >= K => R <= (K-b)/a
        # Payoff = (g*R+s) * I(R > Keff) + K * I(R <= Keff)
        # = g*R+s + (K - (g*R+s)) * I(R <= Keff)
        # = g*R+s + (K-s - g*R) * I(R <= Keff)
        # = g*R+s + |g| * ( (K-s)/|g| - R/|g|*|g| ) * I(R <= Keff) ??? Still messy.
        # C++ test compares against npvVanilla_n + gearing_n * npvFloor. Let's stick to it.
        # NOTE: The C++ comparison `+ gearing_n*npvFloor` seems odd. Maybe it should be `- gearing_n*npvFloor`?
        # Let's re-check the C++ test logic: `error = std::abs(npvCappedLeg - (npvVanilla+ gearing_n*npvFloor));` - Uses '+'.
        # Let's verify: Floor payoff = Max(0, Keff - R). NPV(Floor) = E[ D(t) * Max(0, Keff-R) ]
        # Value(Capped Leg) = E[ D(t) * Min(gR+s, K) ]
        # If g<0, Min(gR+s, K) = gR+s + Min(0, K-gR-s) = gR+s - Max(0, gR+s-K) = gR+s - Max(0, s-K -|g|R)
        # = gR+s - |g| Max(0, (s-K)/|g| - R). Let Keff_put = (s-K)/|g|. This is Put(R, Keff_put).
        # So, Value(Capped Leg) = NPV(Vanilla_n) - |g| * NPV(Put(Keff_put))
        # = NPV(Vanilla_n) + g * NPV(Put(Keff_put)). This matches C++ test if Keff_put = (k-s)/g used in C++ test floor.
        self.assertAlmostEqual(npvCappedLeg_n, npvVanilla_n + gearing_n * npvFloor_eff_n, delta=tolerance,
                               msg=f"Capped Leg decomp failed (g<0): leg={npvCappedLeg_n}, V_n + g*F_eff={npvVanilla_n + gearing_n * npvFloor_eff_n}")

        # === Floored coupon decomposition ===
        # Floored Leg = Vanilla Leg + Floor

        # Gearing = 1, Spread = 0
        flooredLeg = vars_.makeCapFlooredLeg(vars_.startDate, vars_.length, caps0, floors, vars_.volatility)
        floorSwap = ql.Swap(fixedLeg, flooredLeg); floorSwap.setPricingEngine(swapEngine)
        floorInstrument = ql.Floor(floatLeg, [floorstrike]); floorInstrument.setPricingEngine(vars_.makeEngine(vars_.volatility))
        npvFlooredLeg = floorSwap.NPV()
        npvFloor = floorInstrument.NPV()
        self.assertAlmostEqual(npvFlooredLeg, npvVanilla + npvFloor, delta=tolerance,
                               msg=f"Floored Leg decomp failed (g=1, s=0): leg={npvFlooredLeg}, V+F={npvVanilla + npvFloor}")

        # Gearing > 0
        flooredLeg_p = vars_.makeCapFlooredLeg(vars_.startDate, vars_.length, caps0, floors, vars_.volatility, gearing_p, spread_p)
        floorSwap_p = ql.Swap(fixedLeg, flooredLeg_p); floorSwap_p.setPricingEngine(swapEngine)
        # Floor on the geared leg
        floorInstrument_p = ql.Floor(floatLeg_p, [floorstrike]); floorInstrument_p.setPricingEngine(vars_.makeEngine(vars_.volatility))
        npvFlooredLeg_p = floorSwap_p.NPV()
        npvFloor_p = floorInstrument_p.NPV()
        self.assertAlmostEqual(npvFlooredLeg_p, npvVanilla_p + npvFloor_p, delta=tolerance,
                               msg=f"Floored Leg decomp failed (g>0): leg={npvFlooredLeg_p}, V_p+F_p={npvVanilla_p + npvFloor_p}")

        # Gearing < 0: Floored Leg = Vanilla Leg - |g| * Cap( Keff = (K-s)/g )
        flooredLeg_n = vars_.makeCapFlooredLeg(vars_.startDate, vars_.length, caps0, floors, vars_.volatility, gearing_n, spread_n)
        floorSwap_n = ql.Swap(fixedLeg, flooredLeg_n); floorSwap_n.setPricingEngine(swapEngine)
        # Cap on the vanilla leg with effective strike
        effective_strike_floor_n = (floorstrike - spread_n) / gearing_n
        capInstrument_eff_n = ql.Cap(floatLeg, [effective_strike_floor_n]); capInstrument_eff_n.setPricingEngine(vars_.makeEngine(vars_.volatility))
        npvFlooredLeg_n = floorSwap_n.NPV()
        npvCap_eff_n = capInstrument_eff_n.NPV()
        # C++ test: error = std::abs(npvFlooredLeg - (npvVanilla - gearing_n*npvCap)); Uses '-'
        self.assertAlmostEqual(npvFlooredLeg_n, npvVanilla_n - gearing_n * npvCap_eff_n, delta=tolerance,
                               msg=f"Floored Leg decomp failed (g<0): leg={npvFlooredLeg_n}, V_n - g*C_eff={npvVanilla_n - gearing_n * npvCap_eff_n}")


        # === Collared coupon decomposition ===
        # Collared Leg = Vanilla Leg - Collar

        # Gearing = 1, Spread = 0
        collaredLeg = vars_.makeCapFlooredLeg(vars_.startDate, vars_.length, caps, floors, vars_.volatility)
        collarSwap = ql.Swap(fixedLeg, collaredLeg); collarSwap.setPricingEngine(swapEngine)
        collarInstrument = ql.Collar(floatLeg, [capstrike], [floorstrike]); collarInstrument.setPricingEngine(vars_.makeEngine(vars_.volatility))
        npvCollaredLeg = collarSwap.NPV()
        npvCollar = collarInstrument.NPV()
        self.assertAlmostEqual(npvCollaredLeg, npvVanilla - npvCollar, delta=tolerance,
                               msg=f"Collared Leg decomp failed (g=1, s=0): leg={npvCollaredLeg}, V-Col={npvVanilla - npvCollar}")

        # Gearing > 0
        collaredLeg_p = vars_.makeCapFlooredLeg(vars_.startDate, vars_.length, caps, floors, vars_.volatility, gearing_p, spread_p)
        collarSwap_p = ql.Swap(fixedLeg, collaredLeg_p); collarSwap_p.setPricingEngine(swapEngine)
        # Collar on the geared leg
        collarInstrument_p = ql.Collar(floatLeg_p, [capstrike], [floorstrike]); collarInstrument_p.setPricingEngine(vars_.makeEngine(vars_.volatility))
        npvCollaredLeg_p = collarSwap_p.NPV()
        npvCollar_p = collarInstrument_p.NPV()
        self.assertAlmostEqual(npvCollaredLeg_p, npvVanilla_p - npvCollar_p, delta=tolerance,
                               msg=f"Collared Leg decomp failed (g>0): leg={npvCollaredLeg_p}, V_p-Col_p={npvVanilla_p - npvCollar_p}")

        # Gearing < 0: Collared Leg = Vanilla Leg - g * Collar( R, Kfloor_eff, Kcap_eff)
        # Note the negative sign flips the collar relation.
        collaredLeg_n = vars_.makeCapFlooredLeg(vars_.startDate, vars_.length, caps, floors, vars_.volatility, gearing_n, spread_n)
        collarSwap_n = ql.Swap(fixedLeg, collaredLeg_n); collarSwap_n.setPricingEngine(swapEngine)
        # Collar on the vanilla leg with effective strikes
        effective_floor_n = (floorstrike - spread_n) / gearing_n
        effective_cap_n = (capstrike - spread_n) / gearing_n
        # IMPORTANT: Since g<0, floorstrike eff > capstrike eff. This is an inverse collar (buy floor, sell cap).
        # QL Collar takes cap strikes first, then floor strikes.
        # The decomposition should involve Collar(R, Kcap_eff, Kfloor_eff)
        collarInstrument_eff_n = ql.Collar(floatLeg, [effective_cap_n], [effective_floor_n]);
        collarInstrument_eff_n.setPricingEngine(vars_.makeEngine(vars_.volatility))
        npvCollaredLeg_n = collarSwap_n.NPV()
        npvCollar_eff_n = collarInstrument_eff_n.NPV()
        # C++ test: error = std::abs(npvCollaredLeg - (npvVanilla - gearing_n*npvCollar));
        self.assertAlmostEqual(npvCollaredLeg_n, npvVanilla_n - gearing_n * npvCollar_eff_n, delta=tolerance,
                               msg=f"Collared Leg decomp failed (g<0): leg={npvCollaredLeg_n}, V_n - g*Col_eff={npvVanilla_n - gearing_n * npvCollar_eff_n}")


if __name__ == '__main__':
    print("Presolve testQuantLib.py ...")
    unittest.main(argv=['first-arg-is-ignored'], exit=False)