# Pricing vanilla bond 

In [3]:
# --------------------- load libraries ---------------------
from math import exp
import pandas as pd
import numpy as np
from scipy.optimize import minimize, minimize_scalar, newton

In [4]:
df_spot_rate = pd.DataFrame({"SpotRates": [0.025, 0.026, 0.0265,0.028, 0.03,0.033,0.037,0.039,0.043,0.051],
                            "Year": np.arange(1,11,1)}, index=np.arange(1,11,1))
df_spot_rate_constant = pd.DataFrame({"SpotRates": np.repeat(0.05, 10), "Year": np.arange(1,11,1)}, index=np.arange(1,11,1))

def bond_cash_flow(notional, cp, cp_frequency, maturity):
    """
    function to create cash-flow of vanilla bond.

    notional: the bond notional
    cp: the coupon rate as 0.05 for 5% 
    cp_frequency: the yearly frequency of payment? 1 for annual, 2 for semi-annual,...
    maturity: the bond maturity in years
    """
    coupon_cf = np.full(maturity * cp_frequency, cp / cp_frequency * notional)
    principal_cf = np.zeros(maturity * cp_frequency); principal_cf[-1] = notional
    cash_flow = coupon_cf + principal_cf
    time_to_receip_cf = np.arange(1/cp_frequency, maturity + 1 / cp_frequency, 1 / cp_frequency)
    time_cf = pd.Index(np.arange(1,maturity * cp_frequency + 1,1))
    
    cash_flow_df = pd.DataFrame(data = {
        "TimetoReceipt": time_to_receip_cf,
        "Coupon":coupon_cf,
        "Principal":principal_cf,
        "CashFlow":cash_flow
        
    }, index=time_cf)
    return cash_flow_df

def discount_cash_flow(cash_flow, spot_rates):
    """
    Function that takes a dataframe of cash flow and a dataframe of discount factor and returns
    a dataframe of discounted cash flow.
    """
    df_discount_cash_flow = pd.merge(cash_flow[["TimetoReceipt", "CashFlow"]], spot_rates, left_index=True, right_index=True, how="left")
    
    # df_discount_cash_flow["InterpolatedSpotRate"] = df_discount_cash_flow["SpotRate"].interpolate(method="linear",limit_direction="both")
    
    df_discount_cash_flow = cash_flow[["TimetoReceipt", "CashFlow"]].copy()
    df_discount_cash_flow["InterpolatedSpotRates"] = np.interp(
        cash_flow["TimetoReceipt"],
        spot_rates["Year"],
        spot_rates["SpotRates"]
    )
    df_discount_cash_flow["DiscountFactor"] = 1 / (1 + df_discount_cash_flow["InterpolatedSpotRates"])**df_discount_cash_flow["TimetoReceipt"]
    df_discount_cash_flow["DiscountedCF"] = df_discount_cash_flow["CashFlow"] * df_discount_cash_flow["DiscountFactor"]
    return df_discount_cash_flow

def calculate_ytm(df, price=100, frequency=2):
    """
    Calculates the Yield to Maturity (YTM) of a bond.
    
    Parameters
    ----------
    df: pandas.DataFrame
        DataFrame with a "CashFlow" column (bond cash flows)
        and index = timing of cash flows (e.g., years or periods).
    price: float 
        Observed market price of the bond.
    guess: float (default=0.05) 
        initial guess for the numerical solver.
    
    Returns
    -------
    ytm: float
        Yield to Maturity in decimal (e.g., 0.05 = 5%)
    """

    def present_value(ytm):
        periods = df.index.values
        cashflows = df["CashFlow"].values
        return np.sum(cashflows / (1 + ytm/frequency)**periods) - price
    
    # Solve for the YTM that zeroes the function
    ytm = newton(present_value, x0=0.05)
    return ytm

def build_discount_factors(times_to_receipt, df_spot, frequency=1, quotation_type= "BEY"):
    """
    Build discount factors from spot rates and requested times.

    Parameters
    ----------
    times_to_receipt: array-like
        Time points where discount factors are needed.
    df_spot: DataFrame
        Must contain "Year", "SpotRate"
    compounding: str
        "annual" or "continuous"
    quotation_type: str
        EAY: effective annual yield
        BEY: bond equivalent yield
    
    Returns
    -------
    DataFrame with ["TimetoReceipt", "SpotRates", "DiscountFactor"]
    """
    # Interpolate spot rates
    spot_interp = np.interp(times_to_receipt, df_spot["Year"], df_spot["SpotRates"])
    
    # calculate the discount factor for the specified quotation
    if quotation_type == "BEY":
        times = np.arange(1,len(times_to_receipt) + 1,1)
        discount_factors = 1 / (1 + spot_interp/frequency) ** times
    else:
        times = times_to_receipt.copy()
        discount_factors = 1 / (1 + spot_interp) ** times
    
    return pd.DataFrame({
        "TimeToReceipt": times_to_receipt,
        "SpotRate": spot_interp,
        "DiscountFactor": discount_factors
    }, index=np.arange(1, len(times_to_receipt) + 1, 1))
    
def estimated_ytm(price, notional, cp, frequency, maturity):
    """
    Estimate the yield to maturity of a bond

    """
    return (cp * notional + (notional-price)/maturity)/((notional + price)/frequency)

# -----------------------------------------------------------------------------------------
"""
        assess the risk of a fixed income position

"""
# -----------------------------------------------------------------------------------------

def wal(cash_flow_df):
    """
    return the weighted average life of a principal over the life of an investment.
    WAl it measures the average time over which the investor may expect the return of his principal.
    """
    return (cash_flow_df["Principal"] * cash_flow_df.index).sum() / cash_flow_df["Principal"].sum()

def mac_dur(cash_flow_df, spot_price, frequency = 1):
    """
    return the Macaulay Duration Calculation for a cash flow DataFrame

    """
    YtoM =  calculate_ytm(cash_flow_df, spot_price)
    nb_rows = len(cash_flow_df) + 1 
    df_YtoM = pd.DataFrame(
    { "Year": np.arange(1, nb_rows, 1),
     "SpotRates": np.repeat(ytm_b1, nb_rows - 1)
    }, index=np.arange(1, nb_rows, 1))

    # we discount the cash flow with the yield to maturity
    df_discount_factor = build_discount_factors(cash_flow_df["TimetoReceipt"].values,df_YtoM, frequency)
    df_discount_cash_flow = pd.merge(
        cash_flow_df[["TimetoReceipt", "CashFlow"]], 
        df_discount_factor[["SpotRate", "DiscountFactor"]], 
        left_index=True, right_index=True, how="left")
    df_discount_cash_flow["PV"] = df_discount_cash_flow["CashFlow"] * df_discount_cash_flow["DiscountFactor"]

    mac_dur_value =  np.dot(df_discount_cash_flow["PV"], df_discount_cash_flow["TimetoReceipt"]) / np.sum(df_discount_cash_flow["PV"])
    
    return df_discount_cash_flow, mac_dur_value

In [5]:
b1 = bond_cash_flow(100, 0.03, 2, 10)

ytm_b1 = calculate_ytm(b1,100)

df_spot_rate_constant = pd.DataFrame(
    {"SpotRates": np.repeat(ytm_b1, 10), 
     "Year": np.arange(1,11,1)
    }, index=np.arange(1,11,1))

detail, macdur = mac_dur(b1, 100, 2)
print(detail)
print(macdur)

    TimetoReceipt  CashFlow  SpotRate  DiscountFactor         PV
1             0.5       1.5      0.03        0.985222   1.477833
2             1.0       1.5      0.03        0.970662   1.455993
3             1.5       1.5      0.03        0.956317   1.434475
4             2.0       1.5      0.03        0.942184   1.413276
5             2.5       1.5      0.03        0.928260   1.392390
6             3.0       1.5      0.03        0.914542   1.371813
7             3.5       1.5      0.03        0.901027   1.351540
8             4.0       1.5      0.03        0.887711   1.331567
9             4.5       1.5      0.03        0.874592   1.311888
10            5.0       1.5      0.03        0.861667   1.292501
11            5.5       1.5      0.03        0.848933   1.273400
12            6.0       1.5      0.03        0.836387   1.254581
13            6.5       1.5      0.03        0.824027   1.236041
14            7.0       1.5      0.03        0.811849   1.217774
15            7.5       1

# Par curve, spot curve and forward curve

In [6]:
par_rates = [0.025, 0.031680, 0.03704]
df_par_rate = pd.DataFrame({"ParRates": par_rates,
                             "TimetoMaturity": np.arange(1,len(par_rates) +1,1)},
                           index=np.arange(1,len(par_rates)+1,1))

def get_df_factor(par_rates):
    discount_factors = pd.DataFrame(
        data={"TimetoMaturity": par_rates["TimetoMaturity"], 
              "DiscountFactors" : np.zeros(par_rates.shape[0])},
        index = par_rates.index
        )
    discount_factors["DiscountFactors"] = 1 / (1 + par_rates["ParRates"])**(par_rates["TimetoMaturity"])
    return discount_factors


def get_spot_rates(par_rates):
    par_rates_discount_factor = get_df_factor(par_rates)
    spot_rates = []
    for i,n in enumerate(par_rates["ParRates"]):
        if i == 0:
            spot_rates.append(n)
        else:
            rate = 1+n
            prod = par_rates_discount_factor["DiscountFactors"].iloc[:i-1].sum()
            spot_rates.append(
                ((1+n)/(1-n*par_rates_discount_factor["DiscountFactors"].iloc[:i].sum()))**(1/par_rates["TimetoMaturity"].iloc[i]) - 1
            )
    spot_rates_df = pd.DataFrame(
        data = {
            "TimetoMaturity": par_rates_discount_factor["TimetoMaturity"],
            "SpotRates": spot_rates
        },
        index= df_par_rate.index
    )
    return spot_rates_df
        
def get_forward_rates_1y(spot_rates: pd.DataFrame) -> pd.DataFrame:
    """
    Build the 1-year forward rate curve from a spot rate curve.
    The first element corresponds to the 1-year spot rate,
    and the next ones are forward 1y in 1y, 1y in 2y, etc.
    """
    
    df_forward = pd.DataFrame(columns=["TimetoMaturity", "ForwardRates"])
    
    # First rate = spot 1y
    first_rate = spot_rates.iloc[0]["SpotRates"]
    df_forward.loc[0] = [spot_rates.iloc[0]["TimetoMaturity"], first_rate]
    
    # Compute 1y forward rates
    for t in range(1, len(spot_rates)):
        s_t = spot_rates.iloc[t-1]["SpotRates"]
        s_t1 = spot_rates.iloc[t]["SpotRates"]
        tenor_t = spot_rates.iloc[t-1]["TimetoMaturity"]
        tenor_t1 = spot_rates.iloc[t]["TimetoMaturity"]
        
        f_t1 = ((1 + s_t1)**tenor_t1 / (1 + s_t)**tenor_t) - 1
        df_forward.loc[t] = [tenor_t1, f_t1]
    return df_forward

spot_rates= get_spot_rates(df_par_rate)
forward_rates = get_forward_rates_1y(spot_rates)

# Interest Rate Volatility - Binomial Tree Pricing 

A callable bond is a bond that includes an embedded call option. The call option is an issuer option; that is, the right to exercise the option is at the discretion of the bond’s issuer. Early redemption usually happens when the issuer has the opportunity to replace a high-coupon bond with another bond that has more favorable terms, typically when interest rates have fallen or when the issuer’s credit quality has improved. Most callable bonds include a call protection period during which the issuer cannot call the bond.

* The issuer of a European-style callable bond can exercise the call option only once on the call date.
* An American-style callable bond is continuously callable at any time starting on the first call date.
* A Bermudan-style call option can be exercised only on a predetermined schedule on specified dates following the call protection period.

$$
\text{Value of callable bond} = 
\text{Value of straight bond} - 
\text{Value of issuer call option}
$$

## Build forward rates tree


In [7]:

par_rates = [0.025, 0.031680, 0.03704]
df_par_rate = pd.DataFrame({"ParRates": par_rates,
                             "TimetoMaturity": np.arange(1,len(par_rates) +1,1)},
                           index=np.arange(1,len(par_rates)+1,1))

# ----------------------------------------------------------------------------------------------------
#                               Payoff functions for binomial tree
# ----------------------------------------------------------------------------------------------------
def vanilla_payoff(value_at_node: float) -> float:
    return value_at_node

def callable_payoff(value_at_node: float, strike: float) -> float:
    return min(value_at_node, strike)

def puttable_payoff(value_at_node: float, strike: float) -> float:
    return max(value_at_node, strike)

# ----------------------------------------------------------------------------------------------------
#                               build binomial tree under log normal law
# ----------------------------------------------------------------------------------------------------

# === 1. Construction des nœuds lognormaux ===
def log_normal_node(base_rate: float, sigma: float, node_nb: int) -> list:
    node = [base_rate]
    for i in range(1, node_nb + 1):
        node.append(node[i-1]*exp(2*sigma))
    return node

# === 2. Construction des cash flows d’un bond ===
def bond_tree_cf(coupon: float, maturity: int, notional: int = 100) -> list:
    tree = []
    # Build tree iteratively
    for t in range(maturity):
        tree.append(list(np.repeat(coupon*notional, t+1)))
    tree.append(list(np.repeat(notional*(1+coupon), maturity+1)))
    return tree

# === 3. Discounting backward ===
def discount_binomial(
    cash_flow_tree: list, 
    forward_rates_tree: list, 
    payoff_fn: callable,
    event_calendar: list = [], 
    strike: float = 100, 
    risk_neutral_prob: float = 0.5
    )-> list:
    
    # error management - to do
    n_periods_rates = len(forward_rates_tree)
    n_periods_cf = len(cash_flow_tree)
    if event_calendar is None:
        event_calendar = []
        
    # Initialize value tree as a deep copy of cash flows
    value_tree = [list(row) for row in cash_flow_tree]
    value_at_node: float
    
    # build internal function to manage calendar event
    def apply_payoff_event(t, value):
        if t in event_calendar:
            return payoff_fn(value, strike)
        return value
    
    # calculate the last value of the tree
    for i in range(len(forward_rates_tree[n_periods_rates-1])):
        value_at_node = value_tree[n_periods_rates-1][i]/(1+forward_rates_tree[n_periods_rates-1][i])
        value_tree[n_periods_rates-1][i] = apply_payoff_event(n_periods_rates-1, value_at_node)
    
    # backward induction
    for t in range(n_periods_rates-2, -1, -1): 
        for i in range(len(forward_rates_tree[t])):
            r = forward_rates_tree[t][i]
            V_up = value_tree[t+1][i+1] 
            V_down = value_tree[t+1][i]
            expected_next = risk_neutral_prob * V_up + (1 - risk_neutral_prob) * V_down
            value_at_node = (cash_flow_tree[t][i] + expected_next) / (1 + r)
            value_tree[t][i] =  apply_payoff_event(t, value_at_node)
    
    # Return price and full tree
    return value_tree

# === 4. get the present value of the discounted tree ===
def present_value_binomial_tree(cash_flow_tree: list, forward_rates_tree: list, risk_neutral_prob: float)-> float:
    discounted_tree = discount_binomial(cash_flow_tree, forward_rates_tree, vanilla_payoff, [], 100, risk_neutral_prob)
    return discounted_tree[0][0]

# === 5. build next node ===
def build_next_node(next_lower_rate: float, rates_tree: list, sigma: float) -> list:
    if isinstance(rates_tree, (float, int)):
        value_tree = [[float(rates_tree)]]
    elif isinstance(rates_tree, list):
        if all(isinstance(x, (float, int)) for x in rates_tree):
            # Cas: [0.025, 0.03]
            value_tree = [list(map(float, rates_tree))]
        elif all(isinstance(x, list) for x in rates_tree):
            # Cas: [[0.025], [0.03, 0.0366]]
            value_tree = [list(node) for node in rates_tree]
        
    nb_period = len(value_tree)
    node = log_normal_node(next_lower_rate, sigma, nb_period)
    value_tree.append(node)
    return value_tree

# === 6. Objective function ===
def objective_binomial(next_rate: float, rates_tree: list, par_rate: float, sigma: float, risk_neutral_prob: float) -> float:
    new_tree = build_next_node(next_rate, rates_tree, sigma)
    nb_period = len(new_tree)
    cf_bond = bond_tree_cf(par_rate, nb_period-1)
    pv_bond = present_value_binomial_tree(cf_bond, new_tree, risk_neutral_prob)
    return abs(100 - pv_bond)

# === 7. Tree calibration ===
def calibrate_forward_tree(par_rates: list, sigma: float, notional: int = 100):
    """
    Calibrate forward rates tree node-by-node by minimizing abs(100 - PV) for each maturity.
    Returns the forward_rates_tree (list of lists) and list of absolute pricing errors.
    """
    n = len(par_rates)
    # initialize with first par rate as the root short rate
    forward_tree = [[float(par_rates[0])]]
    errors = [0.0]  # error at t=1 should be zero (par1 == spot1)
    
    # iterate maturities 2..n
    for t in range(1, n):
        par_rate_t = par_rates[t]  # par_rates indexed from 0 for 1..n
        # objective wrapper for minimize_scalar
        def obj(x):
            return objective_binomial(x, forward_tree, par_rate_t, sigma, 0.5)
        # optimize next lower rate (search in bounds)
        res = minimize_scalar(obj, bounds=(0.0001, 0.2), method='bounded', options={'xatol':1e-12})
        next_rate = float(res.x)
        # append new node to forward_tree
        forward_tree = build_next_node(next_rate, forward_tree, sigma)
        # compute error and store
        err = obj(next_rate)
        errors.append(err)
    return forward_tree, errors

## Test forward rates tree 

In [8]:
# calibrate a binomial tree under log noraml law
par_rates = [0.04, 0.04, 0.04, 0.04, 0.04]  # 1y,2y,3y par rates
sigma = 0.15
tree, errors = calibrate_forward_tree(par_rates, sigma)
print("Calibrated forward rate tree (levels):")
for t, level in enumerate(tree):
    print(f"t={t}: ", ["{:.6f}".format(x) for x in level])
print("Pricing abs errors per maturity:", errors)

# set up a straight bond
coupon = 0.045
cf_bond = bond_tree_cf(coupon, len(tree)-1)
bond_tree_discounted = discount_binomial(cf_bond, tree, vanilla_payoff )
print("Straight bond cash flow")
for t, level in enumerate(bond_tree_discounted):
    print(f"t={t}: ", ["{:.6f}".format(x) for x in level])

# create a callable bond from the above straight bond
call_calendar = [1,2,3,4,5]
callable_bond_tree_discounted = discount_binomial(cf_bond, tree, callable_payoff, call_calendar)
print("callable bond PV per node")
for t, level in enumerate(callable_bond_tree_discounted):
    print(f"t={t}: ", ["{:.6f}".format(x) for x in level])
    
# create a puttable bond from the above straight bond
call_calendar = [1,2,3,4,5]
callable_bond_tree_discounted = discount_binomial(cf_bond, tree, puttable_payoff, call_calendar)
print("callable bond PV per node")
for t, level in enumerate(callable_bond_tree_discounted):
    print(f"t={t}: ", ["{:.6f}".format(x) for x in level])

Calibrated forward rate tree (levels):
t=0:  ['0.040000']
t=1:  ['0.034074', '0.045995']
t=2:  ['0.029051', '0.039214', '0.052934']
t=3:  ['0.024790', '0.033463', '0.045171', '0.060974']
t=4:  ['0.021174', '0.028582', '0.038581', '0.052079', '0.070299']
Pricing abs errors per maturity: [0.0, 0.6096108986059647, 0.5820835814389795, 0.555715583698202, 0.5304681641696902]
Straight bond cash flow
t=0:  ['102.225911']
t=1:  ['104.001955', '99.627940']
t=2:  ['104.502575', '101.588792', '97.831792']
t=3:  ['103.889289', '102.187607', '99.957453', '97.063352']
t=4:  ['102.333226', '101.596226', '100.618054', '99.327150', '97.636252']
callable bond PV per node
t=0:  ['99.904500']
t=1:  ['100.000000', '98.801360']
t=2:  ['100.000000', '100.000000', '97.691388']
t=3:  ['100.000000', '100.000000', '99.661781', '97.063352']
t=4:  ['100.000000', '100.000000', '100.000000', '99.327150', '97.636252']
callable bond PV per node
t=0:  ['102.795789']
t=1:  ['104.076838', '100.738404']
t=2:  ['104.502575'

## Build OAS Calculator

The OAS is the constant spread that when added to all the one-period forward rates on the interest rate tree, makes the arbitrage-free value of the bond equal to its market price. Note that the Z-spread for an option-free bond is simply its OAS at zero volatility.

In [9]:
# ----------------------------------------------------------------------------------------------------
#                               OAS Calculation
# ----------------------------------------------------------------------------------------------------
from scipy.optimize import minimize_scalar
from typing import Callable, Optional, List, Tuple

def compute_oas(
    market_value: float,
    cash_flow_tree: List[List[float]], 
    forward_rates_tree: List[List[float]], 
    payoff_fn: Callable[[float, float], float],
    event_calendar: Optional[List[int]] = None, 
    strike: float = 100, 
    risk_neutral_prob: float = 0.5,
    bounds: tuple = (-0.05, 0.05),
    tol: float = 1e-8
    )-> list:
    
    if event_calendar is None:
        event_calendar = []
    
    def obj(x: float)-> float:
        parallel_shifted_tree = [[node + x for node in level] for level in forward_rates_tree]
        discounted_tree = discount_binomial(
            cash_flow_tree=cash_flow_tree,
            forward_rates_tree=parallel_shifted_tree,
            payoff_fn=payoff_fn,
            event_calendar=event_calendar,
            strike=strike,
            risk_neutral_prob=risk_neutral_prob
        )
        model_price = discounted_tree[0][0]
        return abs(market_value - model_price)
        
    res = minimize_scalar(obj, bounds=bounds, method='bounded', options={'xatol': tol})
    
    # compute model price with optimal OAS
    oas = res.x
    shifted_tree = [[node + oas for node in level] for level in forward_rates_tree]
    discounted_tree = discount_binomial(
        cash_flow_tree=cash_flow_tree,
        forward_rates_tree=shifted_tree,
        payoff_fn=payoff_fn,
        event_calendar=event_calendar,
        strike=strike,
        risk_neutral_prob=risk_neutral_prob
    )
    model_price = discounted_tree[0][0]
    
    return oas

## Test OAS Calculation for straight, callable and puttable bonds

In [10]:
# calibrate a binomial tree under log noraml law
par_rates = [0.025, 0.03, 0.035]  # 1y,2y,3y par rates
sigma = 0.10
tree, errors = calibrate_forward_tree(par_rates, sigma)
print("Calibrated forward rate tree (levels):")
for t, level in enumerate(tree):
    print(f"t={t}: ", ["{:.6f}".format(x) for x in level])
print("Pricing abs errors per maturity:", errors)

# set up a straight bond
coupon = 0.0425
cf_bond = bond_tree_cf(coupon, len(tree)-1)
market_value = 101
call_calendar = [1,2]

# compute oas
call_oas = compute_oas(market_value, cf_bond, tree, callable_payoff, call_calendar, 100, 0.5)
print(call_oas)

Calibrated forward rate tree (levels):
t=0:  ['0.025000']
t=1:  ['0.031681', '0.038695']
t=2:  ['0.037041', '0.045242', '0.055258']
Pricing abs errors per maturity: [0.0, 0.8200233550223146, 1.3399734259797356]
0.0028545356542266664


## Effective Duration - Pricing bond with binomial 

Effective duration indicates the sensitivity of the bond’s price to a 100 bps parallel shift of the benchmark yield curve—in particular, the government par curve—assuming no change in the bond’s credit spread:

$$
\text{EffDur} = 
\frac{\left( \text{PV}_{-}\right) - \left(\text{PV}_{+} \right)}
{2 \times \left( \Delta \text{Curve}  \right) \times (\text{PV}_0)}
$$

How is this formula applied in practice? Without a market price, we would need an issuer-specific yield curve to compute $PV_{0}$, $PV_{–}$, and $PV_{+}$. But practitioners usually have access to the bond’s current price and thus use the following procedure:

* Given a price ($PV_{0}$), calculate the implied OAS to the benchmark yield curve at an appropriate interest rate volatility. 
* Shift the benchmark yield curve down, generate a new interest rate tree, and then revalue the bond using the OAS calculated in Step 1. This value is $PV_{-}$
* Shift the benchmark yield curve up by the same magnitude as in Step 2, generate a new interest rate tree, and then revalue the bond using the OAS calculated in Step 1. This value is $PV_{+}$.
* Calculate the bond’s effective duration using Equation 20.

The effective duration of a callable bond cannot exceed that of the straight bond. When interest rates are high relative to the bond’s coupon, the call option is out of the money so the bond is unlikely to be called. In contrast, when interest rates fall, the call option moves into the money.

In [11]:
def effective_duration_calc(
    shift: float,
    bond_cash_flow_tree: List[List[float]], 
    par_rates: List[List[float]], 
    rates_vol: float,
    payoff_fn: Callable[[float, float], float],
    event_calendar: Optional[List[int]] = None, 
    strike: float = 100, 
    market_value: float = None,
    risk_neutral_prob: float = 0.5,
) -> float:
    
    # 1. from par rates determine 
    forward_tree_base, errors_base = calibrate_forward_tree(par_rates, rates_vol)
    
    # 2. given the cash flow of the bond, his maket value and the implied forward rates tree we determine the OAS spread
    oas_base = compute_oas(
        market_value = market_value, 
        cash_flow_tree= bond_cash_flow_tree, 
        forward_rates_tree= forward_tree_base,
        payoff_fn=payoff_fn, 
        event_calendar=event_calendar, 
        strike=strike, 
        risk_neutral_prob=risk_neutral_prob
        )
    
    # 3. given the OAS spread we shift the forward rates tree 
    parallel_shifted_tree_base = [[node + oas_base for node in level] for level in forward_tree_base]
    
    # 4. we calculate PV0
    discounted_tree_base = discount_binomial(
        cash_flow_tree= bond_cash_flow_tree, 
        forward_rates_tree= parallel_shifted_tree_base, 
        payoff_fn=payoff_fn, 
        event_calendar=event_calendar, 
        strike=strike, 
        risk_neutral_prob=risk_neutral_prob
        )
    PV_base = discounted_tree_base[0][0]
    
    par_rates_up = [rates + shift for rates in par_rates]
    forward_tree_up, errors_base = calibrate_forward_tree(par_rates_up, rates_vol)
    parallel_shifted_tree_up = [[node + oas_base for node in level] for level in forward_tree_up]
    discounted_tree_up = discount_binomial(
        cash_flow_tree= bond_cash_flow_tree, 
        forward_rates_tree= parallel_shifted_tree_up, 
        payoff_fn=payoff_fn, 
        event_calendar=event_calendar, 
        strike=strike, 
        risk_neutral_prob=risk_neutral_prob
        )
    PV_up = discounted_tree_up[0][0]
    
    par_rates_down = [rates - shift for rates in par_rates]
    forward_tree_down, errors_base = calibrate_forward_tree(par_rates_down, rates_vol)
    parallel_shifted_tree_down = [[node + oas_base for node in level] for level in forward_tree_down]
    discounted_tree_down = discount_binomial(
        cash_flow_tree= bond_cash_flow_tree, 
        forward_rates_tree= parallel_shifted_tree_down, 
        payoff_fn=payoff_fn, 
        event_calendar=event_calendar, 
        strike=strike, 
        risk_neutral_prob=risk_neutral_prob
        )
    PV_down = discounted_tree_down[0][0]
    
    return (PV_down - PV_up)/(2*shift*PV_base)
    

## determine effective duration for straight, callable and puttable bonds

In [15]:
par_rates = [0.025, 0.03, 0.035]  # 1y, 2y, 3y par rates
sigma = 0.10

# set up a straight bond
coupon = 0.0425
cf_bond = bond_tree_cf(coupon, len(tree)-1)
market_price = 101

# define a call and put calendar where call date are 2 and 3 years (iindex 1 and 2 because 1 y is index 0)
call_calendar = [1,2]

# define a shift
shift = 30/10000

# compute effective duration for straight bond
eff_dur_straight =  effective_duration_calc(
    shift = shift,
    bond_cash_flow_tree= cf_bond,
    par_rates = par_rates,
    rates_vol= sigma,
    payoff_fn= vanilla_payoff,
    market_value= market_price
)

# compute effective duration for callable bond
eff_dur_callable =  effective_duration_calc(
    shift = shift,
    bond_cash_flow_tree= cf_bond,
    par_rates = par_rates,
    rates_vol= sigma,
    payoff_fn= callable_payoff,
    event_calendar=call_calendar,
    market_value= market_price
)

# compute effective duration for puttable bond
eff_dur_puttable =  effective_duration_calc(
    shift = shift,
    bond_cash_flow_tree= cf_bond,
    par_rates = par_rates,
    rates_vol= sigma,
    payoff_fn= puttable_payoff,
    event_calendar= call_calendar,
    market_value= market_price
)

print(f"Effective Duration for straight bond with a shift of {shift*10000:.0f} bps : {eff_dur_straight:,.3f}")
print(f"Effective Duration for callable bond with a shift of {shift*10000:.0f} bps : {eff_dur_callable:,.3f}")
print(f"Effective Duration for puttable bond with a shift of {shift*10000:.0f} bps : {eff_dur_puttable:,.3f}")

Effective Duration for straight bond with a shift of 30 bps : 2.790
Effective Duration for callable bond with a shift of 30 bps : 1.966
Effective Duration for puttable bond with a shift of 30 bps : 1.367
