In [15]:
import datetime
from datetime import datetime as dt
from datetime import timedelta
from typing import Optional, Union, List, Tuple, Dict
import warnings

import warnings
warnings.filterwarnings('ignore',category=FutureWarning)
FutureWarning

import numpy as np
import pandas as pd
from pandas.tseries.holiday import USFederalHolidayCalendar
from pandas.tseries.offsets import CustomBusinessDay
from scipy.optimize import fsolve
from scipy import interpolate
from scipy.optimize import minimize
from sklearn.linear_model import LinearRegression
from sklearn.decomposition import PCA

import plotly.express as px
import plotly.graph_objects as go
import matplotlib.pyplot as plt

%matplotlib inline
pd.options.display.max_columns = 30
pd.options.display.max_colwidth = 100
pd.set_option('display.float_format', lambda x: '%.4f' % x)

## <span style="color:red">Solutions</span>
## <span style="color:red">1.1</span>
## <span style="color:red">1.2</span>
## <span style="color:red">1.3</span>
## <span style="color:red">1.4</span>
## <span style="color:red">1.5</span>
## <span style="color:red">1.6</span>

## <span style="color:red">2.1</span>
## <span style="color:red">2.2</span>
## <span style="color:red">2.3</span>
## <span style="color:red">2.4</span>
## <span style="color:red">2.5</span>
## <span style="color:red">2.6</span>

## <span style="color:red">3.1</span>
## <span style="color:red">3.2</span>
## <span style="color:red">3.3</span>
## <span style="color:red">3.4</span>
## <span style="color:red">3.5</span>
## <span style="color:red">3.6</span>

## <span style="color:red">4.1</span>
## <span style="color:red">4.2</span>
## <span style="color:red">4.3</span>
## <span style="color:red">4.4</span>
## <span style="color:red">4.5</span>
## <span style="color:red">4.6</span>

## <span style="color:red">5.1</span>
## <span style="color:red">5.2</span>
## <span style="color:red">5.3</span>
## <span style="color:red">5.4</span>
## <span style="color:red">5.5</span>
## <span style="color:red">5.6</span>


`curve_construction.py`

In [16]:

# ===============================================================================================
# Curve Conversions
# ===============================================================================================


def convert_rate_to_discount(
    rate_or_discount: Union[float, list, np.ndarray, pd.Series, pd.DataFrame],
    time_to_maturity: Union[int, float, list, np.ndarray, pd.Series] = None,
    n_compound: int = None,
    reverse: bool = False
) -> Union[float, pd.Series, pd.DataFrame]:
    """
    Converts between interest rates and discount factors.

    Parameters:
        rate_or_discount (Union[float, list, np.ndarray, pd.Series, pd.DataFrame]): Interest rate(s) or discount factor(s).
        time_to_maturity (Union[int, float, list, np.ndarray, pd.Series], optional): Time in years. Required if converting from discount to interest rate.
        n_compound (int, optional): Compounding frequency. If None, continuous compounding.
        reverse (bool, optional): 
            If True, converts discount factors to interest rates. 
            If False, converts interest rates to discount factors. Defaults to False.

    Returns:
        Union[float, pd.Series, pd.DataFrame]: Converted rates or discount factors.

    Raises:
        ValueError: If required parameters are missing or inconsistent.
        TypeError: If input types are incorrect.
    """
    if isinstance(rate_or_discount, pd.DataFrame):
        rate_or_discount = rate_or_discount.iloc[:, 0].to_numpy().flatten()
    elif isinstance(rate_or_discount, (list, pd.Series)):
        rate_or_discount = np.array(rate_or_discount)
    elif not isinstance(rate_or_discount, (float, int, np.ndarray)):
        raise TypeError("Invalid type for `rate_or_discount`.")
    
    if isinstance(time_to_maturity, (list, pd.Series)):
        time_to_maturity = np.array(time_to_maturity)
    elif not isinstance(time_to_maturity, (float, int, np.ndarray)):
        raise TypeError("Invalid type for `time_to_maturity`.")

    if np.any(np.array(time_to_maturity) < 0):
        raise ValueError("All `time_to_maturity` values must be positive.")
    
    if reverse:
        if time_to_maturity is None:
            raise ValueError("`time_to_maturity` must be provided for discount to rate conversion.")
        
        if n_compound:
            result = n_compound * ((1 / rate_or_discount ** (1 / (n_compound * time_to_maturity))) - 1)
        else:
            result = -np.log(rate_or_discount) / time_to_maturity
    else:
        if n_compound:
            result = 1 / (1 + rate_or_discount / n_compound) ** (n_compound * time_to_maturity)
        else:
            result = np.exp(-rate_or_discount * time_to_maturity)
    
    return pd.DataFrame(result, index=np.atleast_1d(time_to_maturity), columns=['result']) if isinstance(rate_or_discount, np.ndarray) else float(result)     
      

def ratecurve_to_forwardcurve(
    rate_curve: Union[pd.Series, pd.DataFrame],
    n_compound: int = None,
    dt: float = None
) -> pd.Series:
    """
    Converts a rate curve to a forward curve.

    Args:
        rate_curve (Union[pd.Series, pd.DataFrame]): 
            Rate curve with maturities as index and rates as values. 
            If a DataFrame is provided, the first column is used.
        n_compound (int, optional): 
            Number of compounding periods per year. 
            If None, continuous compounding is assumed. Defaults to None.
        dt (float, optional): 
            Time step in years. 
            If None, inferred from the first two maturities. Defaults to None.

    Returns:
        pd.Series: Forward rates indexed by maturity.

    Raises:
        ValueError: If `dt` cannot be inferred or is non-positive.
    """
    if isinstance(rate_curve, pd.DataFrame):
        if rate_curve.empty:
            raise ValueError("Input DataFrame `rate_curve` is empty.")
        rate_curve = rate_curve.iloc[:, 0]
    elif isinstance(rate_curve, pd.Series):
        if rate_curve.empty:
            raise ValueError("Input Series `rate_curve` is empty.")
    else:
        raise TypeError("`rate_curve` must be a pandas Series or DataFrame.")

    if dt is None:
        if len(rate_curve.index) < 2:
            raise ValueError("Insufficient maturities to infer `dt`. Please specify `dt` explicitly.")
        dt = rate_curve.index[1] - rate_curve.index[0]
        if dt <= 0:
            raise ValueError("Inferred `dt` must be positive.")
    
    discount_curve = convert_rate_to_discount(rate_or_discount=rate_curve, n_compound=n_compound)
    
    forward_curve = discount_curve / discount_curve.shift(1)
    
    if n_compound is not None:
        forward_curve = n_compound * (1 / (forward_curve ** (n_compound * dt)) - 1)
    
    forward_curve.name = 'forward_rates'
    return forward_curve

# ===============================================================================================
# 2. Interpolation
# ===============================================================================================

def interpolate_piecewise(
    params: Tuple[np.ndarray, np.ndarray],
    maturity: Union[float, np.ndarray]
) -> np.ndarray:
    """
    Piecewise (or continuous) interpolation of discount factors converted to interest rates.

    Parameters:
        params (Tuple[np.ndarray, np.ndarray]): (estimated_maturities, discount_factors).
        maturity (float or np.ndarray): Maturity(ies) for which to compute rates.

    Returns:
        np.ndarray: Interpolated interest rates at the specified maturities.

    Raises:
        ValueError: If input arrays are not consistent.
    """
    estimated_maturities, discounts = params
    # Convert discount factors to interest rates (continuous by default).
    est_rates = [convert_rate_to_discount(rate_or_discount=b, time_to_maturity=m, n_compound=None,reverse=True) for b, m in zip(discounts, estimated_maturities)]
    est_rates = np.array([x.values[0] if hasattr(x, 'values') else x for x in est_rates]).flatten()
    f = interpolate.interp1d(estimated_maturities, est_rates, bounds_error=False, fill_value='extrapolate')
    return f(maturity)


def interp_curves(
    data: pd.DataFrame,
    dt: float = None,
    date: Optional[Union[str, pd.Timestamp]] = None,
    interp_method: str = 'linear',
    order: Union[int, None] = None,
    extrapolate: bool = True
) -> pd.DataFrame:
    """
    Interpolates curves over a specified time grid. Assumes the input data has an index representing maturities and a 'quotes' column.

    Parameters:
        data (pd.DataFrame): DataFrame with index as maturities (in years) and columns representing different quotes or rates (must contain 'quotes' if single-column usage).
        dt (float, optional): Time step for interpolation (in years). If None, inferred from the first two indices. Defaults to None.
        date (Optional[Union[str, pd.Timestamp]], optional): Specific row index to extract from data if data is multi-index. If None, uses entire DataFrame. Defaults to None.
        interp_method (str, optional): Interpolation method (e.g., 'linear', 'quadratic', 'cubic', 'spline'). Defaults to 'linear'.
        order (Union[int, None], optional): Order of the spline interpolation if interp_method='spline'. Defaults to None.
        extrapolate (bool, optional): If True, allows extrapolation beyond data range. Defaults to True.

    Returns:
        pd.DataFrame: DataFrame indexed by the new time grid with interpolated values in the 'interp' column.

    Raises:
        ValueError: If dt cannot be inferred or if date is not found in data.
        KeyError: If 'quotes' column is missing in single-column usage.
    """
    if data.empty:
        raise ValueError("Input DataFrame data is empty.")
    if 'quotes' not in data.columns and data.shape[1] == 1:
        # In case there's a single column but not named 'quotes', rename it for consistency
        data.columns = ['quotes']

    if dt is None:
        if len(data.index) < 2:
            raise ValueError("Insufficient maturities to infer dt. Please specify dt explicitly.")
        dt = data.index[1] - data.index[0]
        if dt <= 0:
            raise ValueError("Inferred dt must be positive.")

    if date is not None:
        # If we treat date as a row selector, ensure it exists
        if date not in data.index:
            raise ValueError(f"Specified date '{date}' not found in DataFrame index.")
        temp = data.loc[[date]].copy() if isinstance(data.loc[date], pd.DataFrame) else data.loc[date].to_frame().T
    else:
        temp = data.copy()

    new_grid_index = np.arange(temp.index.min(), temp.index.max() + dt, dt)
    new_grid = pd.DataFrame(index=new_grid_index, columns=['quotes'], dtype=float)

    curves = pd.concat([temp, new_grid], axis=0)
    curves['interp'] = curves['quotes']

    if extrapolate:
        curves['interp'].interpolate(
            method=interp_method,
            order=order,
            limit_direction='both',
            inplace=True,
            fill_value='extrapolate'
        )
    else:
        curves['interp'].interpolate(method=interp_method, order=order, inplace=True)

    # Select only the new grid indices, remove duplicates, and sort
    curves = curves.loc[new_grid_index]
    curves = curves[~curves.index.duplicated()].sort_index()
    return curves


# ===============================================================================================
# 4. Bootstrapping
# ===============================================================================================

def bootstrap_spot_rates(
    df: pd.DataFrame
) -> pd.Series:
    """
    Bootstraps spot rates from a DataFrame of bond information.

    Parameters:
        df (pd.DataFrame): DataFrame with columns 'price', 'cpn rate', and 'ttm' (time to maturity). Assumes coupon payments are semi-annual.

    Returns:
        pd.Series: Series of bootstrapped spot rates, indexed by TTM.

    Raises:
        ValueError: If the DataFrame is missing required columns or if price is zero.
    """
    required_cols = {'price', 'cpn rate', 'ttm'}
    if not required_cols.issubset(df.columns):
        raise ValueError(f"DataFrame must contain columns {required_cols}.")
    df = df.sort_values(by='ttm')
    spot_rates = {}
    for _, row in df.iterrows():
        ttm = row['ttm']
        coupon_rate = row['cpn rate']
        price = row['price']
        num_payments = int(round(ttm * 2))
        cash_flows = [coupon_rate / 2.0] * num_payments
        cash_flows[-1] += 100.0

        def pv_of_cash_flows(assumed_spot_rate: float) -> float:
            pv = 0.0
            for t in range(1, num_payments + 1):
                sub_ttm = t / 2.0
                if sub_ttm in spot_rates:
                    sub_rate = spot_rates[sub_ttm]
                else:
                    sub_rate = assumed_spot_rate
                pv += cash_flows[t - 1] / ((1.0 + sub_rate / 2.0) ** t)
            return pv

        if price != 0.0:
            spot_rate_guess = (cash_flows[-1] / price) ** (1.0 / (ttm * 2.0)) - 1.0
        else:
            spot_rate_guess = 0.01
        r_solution = fsolve(lambda r: pv_of_cash_flows(r) - price, x0=spot_rate_guess)[0]
        spot_rates[ttm] = r_solution
    return pd.Series(spot_rates, name='spot_rates')


def bootstrap_discounts_clean(
    df: pd.DataFrame,
    compounding: int = 2,
    key: Optional[str] = None
) -> pd.DataFrame:
    """
    Bootstraps the spot discount curve from yield to maturity (YTM) data assuming a clean, evenly spaced term structure.

    Parameters:
        df (pd.DataFrame): DataFrame with index as maturities (in years) and one or more columns of YTM.
        compounding (int, optional): Number of times interest is compounded per year. Defaults to 2.
        key (Optional[str], optional): Name of the column in df containing YTMs. If None, assumes df has only one column. Defaults to None.

    Returns:
        pd.DataFrame: DataFrame indexed by maturity with a column 'spot rates'.

    Raises:
        ValueError: If the DataFrame is empty, or if key is specified but not found, or if dt is inconsistent.
    """
    if df.empty:
        raise ValueError("Input DataFrame df is empty.")
    if key is None:
        if df.shape[1] != 1:
            raise ValueError("Multiple columns present in df. Please specify the key parameter.")
        key = df.columns[0]
    elif key not in df.columns:
        raise ValueError(f"Specified key '{key}' not found in DataFrame columns.")
    spot_rates = pd.Series(index=df.index, dtype=float)
    for maturity, row in df.iterrows():
        ytm = row[key]
        if maturity == df.index[0]:
            spot_rates[maturity] = ytm
            continue
        num_cash_flows = int(maturity * compounding)
        cash_flows = [100 * ytm / compounding] * (num_cash_flows - 1) + [100 * (1 + ytm / compounding)]
        discount_factors = []
        for m in df.index:
            if m < maturity:
                known_rate = spot_rates[m]
                df_temp = (1 + known_rate / compounding) ** (-m * compounding)
                discount_factors.append(df_temp)
        if len(discount_factors) != num_cash_flows - 1:
            raise ValueError(f"Insufficient discount factors calculated for maturity {maturity}.")
        price = 0.0
        for cf, dfct in zip(cash_flows[:-1], discount_factors):
            price += cf * dfct
        last_cash_flow = cash_flows[-1]
        spot_rate = ((last_cash_flow / (100 - price)) ** (1 / (maturity * compounding)) - 1) * compounding
        spot_rates[maturity] = spot_rate
    return pd.DataFrame({'spot rates': spot_rates})


# ===============================================================================================
# 5. Parametric Models
# ===============================================================================================

def nelson_siegel(
    params: List[float],
    maturity: Union[float, np.ndarray]
) -> np.ndarray:
    """
    Calculates the Nelson-Siegel model rate for a given maturity. The model is: r(t) = beta0 + (beta1 + beta2)*[1-exp(-t/tau)]/(t/tau) - beta2*exp(-t/tau).

    Parameters:
        params (List[float]): Model parameters [beta0, beta1, beta2, tau].
        maturity (Union[float, np.ndarray]): Maturity in years.

    Returns:
        np.ndarray: Modeled interest rates.
    """
    beta0, beta1, beta2, tau = params
    maturity = np.array(maturity, ndmin=1)
    term1 = (1 - np.exp(-maturity / tau)) / (maturity / tau)
    rate = beta0 + (beta1 + beta2) * term1 - beta2 * np.exp(-maturity / tau)
    return rate


def nelson_siegel_extended(
    params: List[float],
    maturity: Union[float, np.ndarray]
) -> np.ndarray:
    """
    Calculates the Extended Nelson-Siegel model rate for a given maturity. The model is: r(t) = (standard NS) + beta4*([1-exp(-t/tau2)]/(t/tau2) - exp(-t/tau2)).

    Parameters:
        params (List[float]): Model parameters [beta0, beta1, beta2, tau, beta4, tau2].
        maturity (Union[float, np.ndarray]): Maturity in years.

    Returns:
        np.ndarray: Modeled interest rates.
    """
    beta0, beta1, beta2, tau, beta4, tau2 = params
    maturity = np.array(maturity, ndmin=1)
    term1 = (1 - np.exp(-maturity / tau)) / (maturity / tau)
    term2 = (1 - np.exp(-maturity / tau2)) / (maturity / tau2) - np.exp(-maturity / tau2)
    rate_ns = beta0 + (beta1 + beta2) * term1 - beta2 * np.exp(-maturity / tau)
    rate_ext = rate_ns + beta4 * term2
    return rate_ext


# ===============================================================================================
# 6. Estimation & Fitting
# ===============================================================================================

def estimate_curve_ols(
    CF: pd.DataFrame,
    prices: Union[pd.DataFrame, pd.Series, np.ndarray],
    interpolate: bool = False
) -> np.ndarray:
    """
    Fits a linear regression (with no intercept) using 'CF' values as predictors and 'prices' as targets, 
    returning an array of discount factors. If 'interpolate' is True, discount factors are further interpolated over a maturity grid.

    Parameters:
    CF (pd.DataFrame): Cashflow matrix where rows are bonds and columns are dates, containing coupon payments.
    prices (pd.DataFrame, pd.Series, or np.ndarray): Observed bond prices. If a DataFrame/Series, values are matched to CF's rows.
    interpolate (bool, default=False): Whether to interpolate discount factors over the maturity grid.

    Returns:
    np.ndarray: Discount factors (one per column of CF if no interpolation, otherwise an interpolated array).
    """
    # If 'prices' is a DataFrame/Series, align it with CF's index
    if isinstance(prices, (pd.DataFrame, pd.Series)):
        prices = prices[CF.index].values

    # Fit linear regression with no intercept (discount factors = regression coefficients)
    model = LinearRegression(fit_intercept=False).fit(CF.values, prices)

    # By default, the discount factors are the regression coefficients
    discounts = model.coef_

    # If interpolation is requested
    if interpolate:
        # Create a grid of "maturity" for each column in CF relative to the first column
        matgrid = get_maturity_delta(t_maturity=CF.columns, date_curr=CF.columns.min())

        # Filter out any coefficients that seem invalid (e.g., negative or too large)
        valid_mask = np.logical_and(discounts > 0, discounts < 1.25)

        xold = matgrid[valid_mask]
        yold = discounts[valid_mask]
        xnew = matgrid

        f = interpolate.interp1d(xold, yold, bounds_error=False, fill_value='extrapolate')
        discounts = f(xnew)

    return discounts


def nelson_siegel(params: List[float], maturity: Union[float]) -> np.ndarray:
    """
    Calculates the Nelson-Siegel model rate for a given maturity.
    Nelson-Siegel model: r(t) = beta0 + (beta1 + beta2)*[1-exp(-t/tau)]/(t/tau) - beta2*exp(-t/tau).

    Args:
        params (List[float]): Model parameters [beta0, beta1, beta2, tau].
        maturity (Union[float]): Maturity in years.

    Returns:
        np.ndarray: Modeled interest rates.
    """
    rate = (
        params[0]
        + (params[1] + params[2]) * (1 - np.exp(-maturity / params[3])) / (maturity / params[3])
        - params[2] * np.exp(-maturity / params[3])
    )
    return rate


def nelson_siegel_extended(params: List[float], maturity: float) -> np.ndarray:
    """
    Calculates the Extended Nelson-Siegel model rate for a given maturity.
    The Extended Nelson-Siegel model is defined as: r(t) = standard NS + beta4 * ([1 - exp(-t/tau2)] / (t/tau2) - exp(-t/tau2))

    Parameters:
        params (List[float]): Model parameters [beta0, beta1, beta2, tau, beta4, tau2].
        maturity (float): Maturity in years.

    Returns:
        np.ndarray: Modeled interest rates.
    """
    rate = (
        params[0]
        + (params[1] + params[2]) * (1 - np.exp(-maturity / params[3])) / (maturity / params[3])
        - params[2] * np.exp(-maturity / params[3])
        + params[4] * ((1 - np.exp(-maturity / params[5])) / (maturity / params[5]) - np.exp(-maturity / params[5]))
    )
    return rate

def pricing_errors(
    params: Union[List[float], np.ndarray, Tuple[np.ndarray, np.ndarray]],
    cf: pd.DataFrame,
    date_curr: Union[str, datetime.date, datetime.datetime, np.datetime64],
    model_name: str,
    observed_prices: Union[pd.DataFrame, pd.Series, np.ndarray]
) -> float:
    """
    Objective function for curve-fitting: sum of squared errors between observed prices and modeled prices.

    Parameters:
        params (Union[List[float], np.ndarray, Tuple[np.ndarray, np.ndarray]]): Model parameters.
        cf (pd.DataFrame): Cashflow matrix.
        date_curr (Union[str, datetime.date, datetime.datetime, np.datetime64]): Current date for maturity calculation.
        model_name (str): Model name for pricing (e.g., 'nelson_siegel', 'nelson_siegel_extended').
        observed_prices (Union[pd.DataFrame, pd.Series, np.ndarray]): Observed bond prices.

    Returns:
        float: Sum of squared errors.
    """
    modeled_prices = price_with_rate_model(params, cf, date_curr, model_name)
    if isinstance(observed_prices, (pd.DataFrame, pd.Series)):
        observed_prices = observed_prices.values
    return sum((observed_prices - modeled_prices)**2)


def price_with_rate_model(
    params: Union[List[float], np.ndarray, Tuple[np.ndarray, np.ndarray]],
    cf: pd.DataFrame,
    date_curr: Union[str, datetime.date, datetime.datetime, np.datetime64],
    model_name: str,
    convert_to_discount: bool = True,
    price_coupons: bool = False
) -> Union[np.ndarray, pd.DataFrame]:
    """
    Computes bond prices from model parameters and a cashflow matrix.

    Parameters:
        params (Union[List[float], np.ndarray, Tuple[np.ndarray, np.ndarray]]): 
            Model parameters (e.g., [beta0, beta1, beta2, tau] or (maturities, discounts)).
        cf (pd.DataFrame): 
            Cashflow matrix with bonds as rows and dates as columns.
        date_curr (Union[str, datetime.date, datetime.datetime, np.datetime64]): 
            Current date to compute maturities.
        model_name (str): 
            Model name for pricing (e.g., 'OLS', 'bootstrap','nelson_siegel', 'nelson_siegel_extended').
        convert_to_discount (bool, optional): 
            If True, converts model-returned rates to discount factors using `convert_rate_to_discount`.
            Defaults to True.
        price_coupons (bool, optional): 
            If True, returns the discounted cashflow matrix; otherwise, returns the sum across columns.
            Defaults to False.

    Returns:
        Union[np.ndarray, pd.DataFrame]: 
            Bond prices (if `price_coupons` is False) or a matrix of discounted cashflows (if True).

    Raises:
        ValueError: 
            If `func_model` is not callable or if the parameters are invalid.
    """

    maturity = get_maturity_delta(cf.columns, date_curr)
    if model_name == 'OLS' or model_name == 'bootstrap':
        func_model = interpolate_piecewise
    elif model_name == 'nelson_siegel':
        func_model = nelson_siegel
    elif model_name == 'nelson_siegel_extended':
        func_model = nelson_siegel_extended
    else:
        raise ValueError("Invalid model name. Please use 'OLS', 'bootstrap', 'nelson_siegel', or 'nelson_siegel_extended'.")
    
    if convert_to_discount:
        model_rates = func_model(params, maturity)
        discount_factors = convert_rate_to_discount(
            model_rates, 
            time_to_maturity=maturity, 
            reverse=False
        )
        if isinstance(discount_factors, pd.DataFrame):
            discount_factors = discount_factors['result'].to_numpy().flatten()
    else:
        discount_factors = func_model(params, maturity)

    if price_coupons:
        price = cf * discount_factors
    else:
        price = cf @ discount_factors

    return price
    

def estimate_rate_curve(
    model_name: str,
    cf: pd.DataFrame,
    prices: Union[pd.DataFrame, pd.Series, np.ndarray],
    date_curr: Union[str, datetime.date, datetime.datetime, np.datetime64],
    x0: Optional[Union[List[float], np.ndarray]] = None
) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]:
    """
    Estimates parameters by fitting bond prices to a curve model.

    Parameters:
        model_name (str): Model to estimate interest rate curve (e.g. 'bootstrap', 'OLS', 'nelson_siegel', 'nelson_siegel_extended').
        cf (pd.DataFrame): Cashflow matrix [n_bonds x n_dates].
        prices (Union[pd.DataFrame, pd.Series, np.ndarray]): Observed bond prices.
        date_curr (Union[str, datetime.date, datetime.datetime, np.datetime64]): Current date to calculate maturity.
        x0 (Optional[Union[List[float], np.ndarray]], optional): Initial guess for model parameters. Defaults to None.

    Returns:
        Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: Fitted parameters (for bootstrap or OLS, returns (maturities, discounts); for parametric, returns array of parameters).

    Raises:
        ValueError: If data shapes are mismatched or if optimization fails.
    """
    p = prices.copy()
    if isinstance(p, (pd.DataFrame, pd.Series)):
        p = p[cf.index].values

    if model_name.upper() == 'OLS' or model_name.lower() == 'bootstrap':
        discounts = estimate_curve_ols(cf, prices, interpolate=False)
        maturities = get_maturity_delta(t_maturity=cf.columns, date_curr=date_curr)
        params_est = (maturities, discounts)
    else:
        if model_name == 'nelson_siegel':	
            if x0 is None:
                x0 = np.ones(4)/10
        elif model_name == 'nelson_siegel_extended':
            if x0 is None:
                x0 = np.ones(4)/10
                params_NS = minimize(pricing_errors, x0, args=(cf, date_curr, nelson_siegel, prices))
                x0 = np.concatenate((params_NS,(1/10,1/10)))
        else:
            raise ValueError("Invalid model name. Please use 'OLS', 'bootstrap', 'nelson_siegel', or 'nelson_siegel_extended'.")  

        result = minimize(pricing_errors, x0, args=(cf, date_curr, model_name, prices))
        params_est = result.x
    return params_est


def extract_spot_curves(
    quote_date: Union[str, pd.Timestamp],
    filepath: Optional[str] = None,
    model=None,
    delta_maturity: float = 0.25,
    T: float = 30.0,
    calc_forward: bool = False,
    delta_forward_multiple: int = 1
) -> pd.DataFrame:
    """
    Extracts and models the spot curve for a given date using Treasury quotes. By default, uses the Nelson-Siegel model unless another model function is provided.

    Parameters:
        quote_date (Union[str, pd.Timestamp]): The quote date to process (format: 'YYYY-MM-DD').
        filepath (Optional[str], optional): File path to the Excel file containing Treasury quotes. If None, defaults to '../data/treasury_quotes_{quote_date}.xlsx'. Defaults to None.
        model (callable, optional): The interest rate model function (e.g., nelson_siegel or nelson_siegel_extended). Must accept signature `model(params, maturities) -> rates`. Defaults to None.
        delta_maturity (float, optional): Spacing for the maturity grid (e.g., 0.25 for quarterly). Defaults to 0.25.
        T (float, optional): Maximum maturity in years for the spot curve grid. Defaults to 30.0.
        calc_forward (bool, optional): Whether to calculate forward rates or discount factors. Defaults to False.
        delta_forward_multiple (int, optional): When calc_forward is True, defines how many steps in the maturity grid correspond to one forward period. Defaults to 1.

    Returns:
        pd.DataFrame: DataFrame indexed by maturity (years), with columns for 'spot rate', 'spot discount', optionally 'forward discount' and 'forward rate'.

    Raises:
        FileNotFoundError: If the specified filepath does not exist.
        ValueError: If model is not callable or required parameters are missing.
    """
    if filepath is None:
        filepath = f'../data/treasury_quotes_{quote_date}.xlsx'

    # Mock read; in a real scenario, we'd load from Excel. We'll assume the data has columns 'Maturity' and 'Yield'.
    try:
        rawdata = pd.read_excel(filepath, sheet_name='quotes')
    except FileNotFoundError as e:
        raise FileNotFoundError(f"Could not find the file at path: {filepath}") from e

    if model is None:
        model = nelson_siegel

    # For demonstration, let's assume rawdata has 'Maturity' in years and 'Yield' columns
    rawdata.sort_values('Maturity', inplace=True)
    mat = rawdata['Maturity'].values
    yld = rawdata['Yield'].values

    # We won't do a real parameter estimation here. We'll do a simple fit or direct usage of yields.
    # For a real scenario, you'd do: params = estimate_rate_curve('nelson_siegel', CF, quote_date, prices)
    # Let's pretend we have param guesses:
    params = [0.02, -0.01, 0.01, 1.0]  # a plausible default for Nelson-Siegel

    # Build a maturity grid
    maturity_grid = np.arange(0.01, T + delta_maturity, delta_maturity)
    curves = pd.DataFrame(index=pd.Index(maturity_grid, name='maturity'), columns=['spot rate', 'spot discount'], dtype=float)
    for m in maturity_grid:
        r = model(params, m)
        if isinstance(r, np.ndarray):
            r = r[0]
        curves.loc[m, 'spot rate'] = r
        curves.loc[m, 'spot discount'] = float(convert_rate_to_discount(rate_or_discount=r, time_to_maturity=m))

    if calc_forward:
        # Compute forward discount and forward rates
        shift_steps = delta_forward_multiple
        curves['forward discount'] = curves['spot discount'] / curves['spot discount'].shift(shift_steps)
        # For earliest maturities, set forward discount = spot discount
        curves['forward discount'].iloc[:shift_steps] = curves['spot discount'].iloc[:shift_steps]
        # forward rate = -ln(forward discount) / (delta_maturity * delta_forward_multiple)
        curves['forward rate'] = -np.log(curves['forward discount']) / (delta_maturity * delta_forward_multiple)

    return curves


def extract_fed_path(
    curves: pd.DataFrame,
    fed_dates: pd.DataFrame,
    spot_fed_rate: float
) -> pd.DataFrame:
    """
    Extracts and models the Federal Reserve path based on Treasury curves and meeting dates.

    Parameters:
        curves (pd.DataFrame): DataFrame containing 'last_tradeable_dt' and 'px_last' columns.
        fed_dates (pd.DataFrame): DataFrame containing 'meeting dates' column.
        spot_fed_rate (float): Initial spot Fed rate.

    Returns:
        pd.DataFrame: DataFrame with expected Fed rates and associated metrics indexed by date.

    Raises:
        KeyError: If required columns are missing in 'curves' or 'fed_dates'.
        ValueError: If date formats are inconsistent or calculations fail.
    """
    required_curve_columns = {'last_tradeable_dt', 'px_last'}
    required_fed_columns = {'meeting dates'}
    if not required_curve_columns.issubset(curves.columns):
        missing = required_curve_columns - set(curves.columns)
        raise KeyError(f"Missing required columns in curves: {missing}")
    if not required_fed_columns.issubset(fed_dates.columns):
        missing = required_fed_columns - set(fed_dates.columns)
        raise KeyError(f"Missing required columns in fed_dates: {missing}")

    curves = curves.copy()
    curves['date'] = curves['last_tradeable_dt'].dt.strftime('%Y-%m')
    curves.reset_index(drop=True, inplace=True)
    curves.set_index('date', inplace=True)

    fed_dates = fed_dates.copy()
    fed_dates['date'] = fed_dates['meeting dates'].dt.strftime('%Y-%m')
    fed_dates.set_index('date', inplace=True)

    curves = curves.join(fed_dates, how='left')
    curves['meeting_day'] = curves['meeting dates'].dt.day
    curves['contract_days'] = curves['last_tradeable_dt'].dt.day
    curves['futures_rate'] = (100 - curves['px_last']) / 100
    curves.drop(columns=['px_last'], inplace=True)
    curves['expected_fed_rate'] = np.nan

    for idx, month in enumerate(curves.index):
        if idx == 0:
            r_prev = spot_fed_rate
        else:
            r_prev = curves.at[curves.index[idx - 1], 'expected_fed_rate']
        if pd.isna(curves.at[month, 'meeting_day']):
            curves.at[month, 'expected_fed_rate'] = r_prev
        else:
            if idx + 1 < len(curves.index) and pd.isna(curves.at[curves.index[idx + 1], 'meeting_day']):
                curves.at[month, 'expected_fed_rate'] = curves.at[curves.index[idx + 1], 'futures_rate']
            else:
                n = curves.at[month, 'contract_days']
                m = curves.at[month, 'meeting_day']
                if (n - m) == 0:
                    raise ZeroDivisionError(f"Division by zero encountered for month {month} in Fed rate calculation.")
                expected_rate = (n * curves.at[month, 'futures_rate'] - m * r_prev) / (n - m)
                curves.at[month, 'expected_fed_rate'] = expected_rate

    curves.drop(columns=['meeting dates'], inplace=True)
    return curves


`cash_flows.py`

In [17]:

# ===============================================================================================
# Cashflows
# ===============================================================================================


def get_cash_flows(
    data: pd.DataFrame,
    df_info: pd.DataFrame,
    keys_list: list
) -> pd.DataFrame:
    """
    Calculates cash flows from accrued interest changes to determine coupon payment dates.

    Parameters:
        data (pd.DataFrame): Market data containing 'quote date' and 'accrued int' for securities.
        data_info (pd.DataFrame): Security information containing coupon rates.
        keys_list (list): List of identifiers for the securities to process.

    Returns:
        pd.DataFrame: DataFrame containing cash flows with columns for long and short coupons.

    Raises:
        KeyError: If required columns are missing in `data` or `data_info`.
    """
    
    if df_info.columns[0] == 0:
        df_info.columns = df_info.iloc[0]
        df_info = df_info[1:]

    df = data.copy()
    if 'CALDT' in df.columns.str.upper():
        df.columns = df.columns.str.upper()
        date_col = 'CALDT'
        cpn_freq_col = 'TNIPPY'
        cpn_rate_col = 'TCOUPRT'
        accr_int_col = 'TDACCINT'
    elif 'quote date' in df.columns:
        date_col = 'quote date'
        cpn_freq_col = 'cpn freq'
        cpn_rate_col = 'cpn rate'
        accr_int_col = 'accrual fraction'

    df = df.sort_values(by=date_col)

    cpn_cash_flows_ts = None
    
    if (date_col in df.columns) and (cpn_rate_col in df_info.index) and (accr_int_col in df.columns):

        # Pivot data to have securities as columns and quote dates as index
        data_pivot = df.pivot(index=date_col, columns='KYTREASNO', values=accr_int_col)
        
        # Compute differences in accrued interest to detect coupon payments
        for key in keys_list:
            data_pivot[str(key) + '_acc_int_diff'] = data_pivot[key].diff()
            # Identify coupon payment dates where accrued interest drops
            coupon_dates = data_pivot[data_pivot[str(key) + '_acc_int_diff'] < 0].index[0:]
            freq_cpn_yr = df_info.loc[cpn_freq_col, key] if 'TNIPPY' in df_info.index else 2 # Default to 2 if not present
            cpn_rates = df_info.loc[cpn_rate_col, key] # Assuming face value of 100
            # Create cash flow DataFrames for long and short securities
            cpn_cash_flow = pd.Series(name=key, index=coupon_dates, data=cpn_rates / freq_cpn_yr)

            # Concat coupon cash flows:
            if cpn_cash_flows_ts is None:
                cpn_cash_flows_ts = cpn_cash_flow
            else:
                cpn_cash_flows_ts = pd.concat([cpn_cash_flows_ts, cpn_cash_flow], axis=1)

        # Fill missing values with 0 and rename columns
        cpn_cash_flows_ts = cpn_cash_flows_ts.fillna(0)

    return cpn_cash_flows_ts


def calc_treasury_cashflows(
    quote_data: pd.DataFrame,
    filter_maturity_dates: bool = False
) -> pd.DataFrame:
    """
    Creates a cashflow matrix for each Treasury issue in quote_data.

    Parameters:
        quote_data (pd.DataFrame): Data with columns ['CALDT', 'TMATDT', 'TCOUPRT'] for each Treasury issue.
        filter_maturity_dates (bool, optional): If True, applies additional filtering on the result. Defaults to False.

    Returns:
        pd.DataFrame: Matrix of coupon/principal payments with rows as issues and columns as dates.

    Raises:
        ValueError: If required columns are missing in `quote_data`.
    """

    # Columns: TMATDT could appear multiple times across issues, so we pivot on unique maturity dates

    if 'CALDT' in quote_data.columns:
        date_col = 'CALDT'
        maturity_col = 'TMATDT'
        coupon_col = 'TCOUPRT'
    elif 'quote date' in quote_data.columns:
        date_col = 'quote date'
        maturity_col = 'maturity date'
        coupon_col = 'cpn rate'
    else:
        raise ValueError("Unrecognized data format: Missing key columns.")

    cf = pd.DataFrame(dtype=float, data=0, index=quote_data.index, columns=quote_data[maturity_col].unique())

    for i in quote_data.index:
        cpn_dates = get_coupon_dates(
            quote_data.loc[i, date_col],
            quote_data.loc[i, maturity_col]
        )

        # Semiannual coupon = face * TCOUPRT / 2; face assumed = 100
        if cpn_dates is not None:
            cf.loc[i, cpn_dates] = quote_data.loc[i, coupon_col] / 2

        # Add principal at maturity
        cf.loc[i, quote_data.loc[i, maturity_col]] += 100

    # Clean up
    cf = cf.fillna(0).sort_index(axis=1)
    cf.drop(columns=cf.columns[(cf == 0).all()], inplace=True)

    if filter_maturity_dates:
        cf = filter_treasury_cashflows(cf, filter_maturity_dates=True)

    return cf


def filter_treasury_cashflows(
    cashflow_df: pd.DataFrame,
    filter_maturity_dates: bool = False,
    filter_benchmark_dates: bool = False,
    filter_cf_strict: bool = True
) -> pd.DataFrame:
    """
    Filters a cashflow matrix by keeping only certain columns/dates (benchmark or maturity) and dropping relevant rows.

    Parameters:
        cashflow_df (pd.DataFrame): Cashflow matrix with rows as issues and columns as dates with coupon/principal amounts.
        filter_maturity_dates (bool, optional): If True, keep only columns where principal >= 100. Defaults to False.
        filter_benchmark_dates (bool, optional): If True, keep only 'benchmark' dates (Feb/May/Aug/Nov 15). Defaults to False.
        filter_cf_strict (bool, optional): If True, drop issues that had cash flows outside kept dates. Defaults to True.

    Returns:
        pd.DataFrame: Filtered cashflow matrix.

    Raises:
        ValueError: If date columns are not in datetime format.
    """
    benchmark_cols = []
    cf = cashflow_df.copy()
    # Filter for benchmark months (Feb, May, Aug, Nov) on day=15
    for col in cf.columns:
        if filter_benchmark_dates:
            if (col.month in [2, 5, 8, 11]) and (col.day == 15):
                benchmark_cols.append(col)
        else:
            benchmark_cols.append(col)

    # Potentially filter for maturity columns
    if filter_maturity_dates:
        maturity_cols = cf.columns[(cf >= 100).any()]
    else:
        maturity_cols = cf.columns

    # Intersection of benchmark and maturity columns
    keep_cols = [col for col in benchmark_cols if col in maturity_cols]
    cf_filtered = cf[keep_cols]

    if filter_cf_strict:
        # drop issues that had CF on excluded dates
        mask_bonds = cf_filtered.sum(axis=1) == cf.sum(axis=1) # Only keep those rows that has the cashflow intact after tranformatio
        cf_filtered = cf_filtered[mask_bonds]
    else:
        # drop issues that have no CF on included dates
        mask_bonds = cf_filtered.sum(axis=1) > 0 # Only delete those that have no cashflow after transformation
        cf_filtered = cf_filtered[mask_bonds]

    # Finally, drop columns with no CF
    cf_filtered = cf_filtered.loc[:, (cf_filtered > 0).any()]

    return cf_filtered


`pricing.py`

In [18]:

# ===============================================================================================
# Interest Rate Conversion Utilities
# ===============================================================================================


def compound_rate(
    int_rate: float,
    compound_input: int = None,
    compound_output: int = None,
) -> float:
    """"
    Converts an interest rate from one compounding basis to another.

    Parameters:
        intrate (float): Original interest rate.
        compound_input (int or None): Original compounding frequency (None=continuous).
        compound_output (int or None): Target compounding frequency (None=continuous).

    Returns:
        float: The rate on the new compounding basis.
    """
    if compound_input is None and compound_output is None:
        # No transformation
        return int_rate

    if compound_input is None:
        # from continuous to discrete
        return compound_output * (np.exp(int_rate / compound_output) - 1.0)
    elif compound_output is None:
        # from discrete to continuous
        return compound_input * np.log(1.0 + int_rate / compound_input)
    else:
        # discrete to discrete
        return (
            (1.0 + int_rate / compound_input) ** (compound_input / compound_output) - 1.0
        ) * compound_output
    

# ===============================================================================================
# Maturities and Coupons Discounts Utilities
# ===============================================================================================


def get_coupon_dates(
    quote_date: Union[str, datetime.date, datetime.datetime],
    maturity_date: Union[str, datetime.date, datetime.datetime],
    coupon_freq_m: int = 6
) -> pd.Series:
    """
    Generates semiannual coupon payment dates from 'quote_date' up to 'maturity_date'.

    Parameters:
        quote_date (str or datetime.date or datetime.datetime): Start date.
        maturity_date (str or datetime.date or datetime.datetime): Maturity date.
        coupon_freq_m (int, default=6): Coupon frequency in months.

    Returns:
        pd.Series: Coupon dates strictly after the quote_date, up to maturity_date.
    """
    if isinstance(quote_date, str):
        try:
            quote_date = dt.strptime(quote_date, '%Y-%m-%d')
        except ValueError as e:
            raise ValueError(f'Invalid quote_date: {quote_date}. Must be "YYYY-MM-DD".') from e

    if isinstance(maturity_date, str):
        try:
            maturity_date = dt.strptime(maturity_date, '%Y-%m-%d')
        except ValueError as e:
            raise ValueError(f'Invalid maturity_date: {maturity_date}. Must be "YYYY-MM-DD".') from e

    if maturity_date <= quote_date:
        raise ValueError('maturity_date must be strictly after quote_date.')

    # Generate coupon dates
    semiannual_offset = pd.DateOffset(months=coupon_freq_m)
    dates = []
    current_date = maturity_date

    while current_date > quote_date:
        dates.append(current_date)
        current_date -= semiannual_offset

    # Return sorted dates in ascending order
    return sorted(dates)


def find_next_coupon_date(
    first_cpn_date: Union[str, datetime.date, datetime.datetime],
    today_date: Union[str, datetime.date, datetime.datetime],
    coupon_freq_m: int = 6,
) -> Tuple[str, float]:
    """
    Finds the next M-month coupon date after 'today_date' based on 'first_cpn_date'.

    Parameters:
        first_cpn_date (str or date or datetime): Bond's first coupon date. If string, 'YYYY-MM-DD'.
        today_date (str or date or datetime): Current date. If string, 'YYYY-MM-DD'.
        coupon_freq (int, default=6): Coupon frequency in months.

    Returns:
        (str, float): Next coupon date in 'YYYY-MM-DD' format, and days from 'today_date' to next coupon.
    """
    if isinstance(first_cpn_date, str):
        try:
            past_date = dt.strptime(first_cpn_date, "%Y-%m-%d")
        except ValueError as e:
            raise ValueError(f'Invalid first_cpn_date: {first_cpn_date}.') from e
    elif isinstance(first_cpn_date, (datetime.date, datetime.datetime)):
        past_date = first_cpn_date
    else:
        raise TypeError("first_cpn_date must be a date string or datetime object.")

    if isinstance(today_date, str):
        try:
            today = dt.strptime(today_date, "%Y-%m-%d")
        except ValueError as e:
            raise ValueError(f'Invalid today_date: {today_date}.') from e
    elif isinstance(today_date, (datetime.date, datetime.datetime)):
        today = today_date
    else:
        raise TypeError("today_date must be a date string or datetime object.")

    if pd.isnull(past_date):
        raise ValueError("first_cpn_date is NaT.")

    # If the bond's first coupon date is in the future
    if past_date > today:
        days_to_next_multiple = (past_date - today).days
        return past_date.strftime("%Y-%m-%d"), float(days_to_next_multiple)

    # Move forward in 6-month increments until we exceed 'today'
    next_six_month_multiple = past_date
    while next_six_month_multiple <= today:
        next_six_month_multiple += pd.DateOffset(months=coupon_freq_m)

    days_to_next_multiple = (next_six_month_multiple - today).days
    return next_six_month_multiple.strftime("%Y-%m-%d"), float(days_to_next_multiple)


def get_maturity_delta(
    t_maturity: Union[list, pd.Series, pd.DatetimeIndex, datetime.date, datetime.datetime],
    date_curr: Union[datetime.date, datetime.datetime, np.datetime64]
) -> pd.Series:
    """
    Returns the maturity (in years) between date_curr and t_maturity.

    Parameters:
        t_maturity (pd.Series, pd.DatetimeIndex, datetime.date, datetime.datetime): Maturity date(s).
        date_curr (datetime.date, datetime.datetime, or np.datetime64): Current date.

    Returns:
        pd.Series: Fractional years to maturity.
    """
    # Convert to pd.Series if not already
    if isinstance(t_maturity, (datetime.date, datetime.datetime)):
        t_maturity = pd.Series([t_maturity])
    if isinstance(t_maturity, (list, pd.DatetimeIndex)):
        t_maturity = pd.Series(t_maturity)

    # Ensure both are datetime64
    t_maturity = pd.to_datetime(t_maturity)
    date_curr = pd.to_datetime(date_curr)

    # fraction of days / 365.25
    maturity_delta = (t_maturity - date_curr) / pd.Timedelta('365.25 days')
    return maturity_delta


# ===============================================================================================
# Duration Utilities
# ===============================================================================================

def duration_closed_formula(ttm: float,
                            ytm: float,
                            cpn_rate: Union[int, None] = None,
                            freq: int = 2) -> float:
    """
    Calculates the Macaulay duration of a bond using a closed-form expression.

    Parameters:
        ttm (float): Time to maturity in years.
        ytm (float): Yield to maturity (annualized, decimal).
        cpn_rate (int, optional): Annual coupon rate (decimal). If None, uses ytm. Defaults to None.
        freq (int, optional): Number of coupon payments per year. Defaults to 2.

    Returns:
        float: The bond's Macaulay duration (in years).

    Raises:
        ValueError: If `ttm` is non-positive or `freq` is not positive.
    """
    if cpn_rate is None:
        cpn_rate = ytm

    y = ytm / freq
    c = cpn_rate / freq
    T = ttm * freq

    if abs(cpn_rate - ytm) < 1e-14:
        duration = (1+y)/y  * (1 - 1/(1+y)**T)
    else:
        duration = (1+y)/y - (1+y+T*(c-y)) / (c*((1+y)**T-1)+y)

    duration /= freq
    
    return duration


# ===============================================================================================
# Pricing Utilities
# ===============================================================================================

def present_value(
    rate: float,
    cashflows: List[float],
    maturities: List[float],
    freq: int = 1
) -> float:
    """
    Computes the present value of discrete cashflows.

    Args:
        rate (float): Annual discount rate.
        cashflows (List[float]): Cashflow amounts.
        maturities (List[float]): Times in years for each cash flow.
        freq (int, optional): Compounding frequency. Defaults to 1.

    Returns:
        float: Present value of the cashflows.

    Raises:
        ValueError: If cashflows and maturities lists have different lengths.
    """
    if len(cashflows) != len(maturities):
        raise ValueError("cashflows and maturities must have the same length.")

    total_pv = 0.0
    discount_factor = 1.0 + rate / freq
    for cfi, mat in zip(cashflows, maturities):
        periods = mat * freq
        total_pv += cfi / (discount_factor ** periods)
    return total_pv


def net_present_value(
    rate: float,
    cashflows: Union[float, List[float]],
    maturities: Union[float, List[float]],
    initial_outlay: float = 0.0
) -> float:
    """
    Calculates the net present value of cashflows at a given rate minus the initial outlay.

    Args:
        rate (float): Discount rate (annualized).
        cashflows (float or List[float]): Cashflow amounts.
        maturities (float or List[float]): Times in years for each cash flow.
        initial_outlay (float, optional): Initial investment or outlay to subtract from the PV of cashflows.
            Defaults to 0.0.

    Returns:
        float: Net present value.

    Raises:
        ValueError: If the lengths of `cashflows` and `maturities` do not match.
    """
    # Ensure cashflows and maturities are lists
    if isinstance(cashflows, (float, int)):
        cashflows = [float(cashflows)]
    if isinstance(maturities, (float, int)):
        maturities = [float(maturities)]

    if len(cashflows) != len(maturities):
        raise ValueError("cashflows and maturities must have the same length.")

    pv_total = present_value(rate, cashflows, maturities)
    return pv_total - initial_outlay


def bond_pricer(
    ttm: float,
    ytm: float,
    cpn_rate: float = None,
    freq: int = 2,
    face: float = 100,
    is_dirty: bool = True,
) -> float:
    """
     Calculates the price of a bond with accrued interest consideration.
    
    The bond is first priced as if settlement were on a coupon date (the “next‐coupon” price)
    and then rolled back (discounted) by the fractional period from the settlement date to the
    next coupon date.
    
    Args:
        ttm (float): Time to maturity in years.
        ytm (float): Annualized yield to maturity (as a decimal).
        cpn_rate (float, optional): Annual coupon rate (as a decimal). If None, assumed to equal ytm.
        freq (int, optional): Number of coupon payments per year (default 2 for semiannual).
        face (float, optional): Face value of the bond (default 100).
        is_dirty (bool, optional): Whether to return the dirty price (default True).  
            (If False, accrued interest will be subtracted.)
    
    Returns:
        float: The bond’s price.
    """
    if cpn_rate is None:
        cpn_rate = ytm

    # Yield and coupon per period
    y = ytm / freq
    c = cpn_rate / freq

    # Compute fractional period from settlement to the next coupon date.
    # For example, if ttm=5.75 years and freq=2 (coupon every 0.5 years), then:
    #   ttm % (1/freq) = 5.75 % 0.5 = 0.25, and rem = 2*0.25 = 0.5.
    rem = freq * (ttm % (1/freq))
    # Compute the number of full coupon periods until maturity.
    tau = freq * ttm - rem
    # Due to floating‐point issues, round tau if it is nearly an integer.
    tau = round(tau)

    # Price as of the next coupon date:
    pv = 0.0
    # Sum coupon payments for periods 1 to (tau-1)
    for i in range(1, tau):
        pv += c / (1 + y)**i
    # Add the final period (coupon plus redemption of face)
    pv += (c + 1) / (1 + y)**tau
    pv *= face

    # Roll back the price from the next coupon date to the settlement date.
    # Also, add one coupon payment before discounting (as in the correct function).
    if rem > 0:
        pv += c * face
        pv /= (1 + y)**rem

    # Optionally, if a clean price is desired, subtract the accrued interest.
    if not is_dirty and rem > 0:
        # Accrued interest is the portion of the coupon that has accrued since the last coupon.
        # Given the discounting above, it is computed as:
        accrued = c * face * (1 - 1 / (1 + y)**rem)
        pv -= accrued

    return pv

from scipy.optimize import fsolve

def calculate_ytm(
    price: float,
    ttm: float,
    cpn_rate: float,
    coupon_freq: int = 2,
    face: float = 100.0,
    is_dirty: bool = True
) -> float:
    """
    Calculates the Yield to Maturity (YTM) of a bond using numerical methods.
    
    This version uses the same logic as:
        pv_wrapper = lambda y: TPRICE - price_treasury_ytm(tau0, y, CPNRATE[0])
        YTM[0] = fsolve(pv_wrapper, .04)[0]
    because the accrued interest (the fractional period adjustment) is already
    handled internally by the bond_pricer function.
    
    Args:
        price (float): The observed market price of the bond.
        maturity (float): Time to maturity in years.
        coupon (float): Annual coupon rate (decimal).
        coupon_freq (int, optional): Number of coupon payments per year (default 2).
        face (float, optional): Face value of the bond (default 100.0).
        is_dirty (bool, optional): If True, price includes accrued interest.
    
    Returns:
        float: The calculated yield to maturity.
    """
    # The accrued interest (or fractional period) adjustment is already implied in the bond_pricer.
    pv_wrapper = lambda y: price - bond_pricer(
        ttm=ttm,
        ytm=y,
        cpn_rate=cpn_rate,
        freq=coupon_freq,
        face=face,
        is_dirty=is_dirty
    )
    # Use an initial guess of 4% (0.04) for the yield.
    ytm_solution = fsolve(pv_wrapper, 0.04)[0]
    return ytm_solution



# ===============================================================================================
# 8. Swaps & Forward Rates
# ===============================================================================================

def calculate_swap_rate(
    discounts: pd.Series,
    maturity: float,
    freq_swap: int
) -> float:
    """
    Calculates the swap rate for a given maturity using discount factors.

    The swap rate is computed based on the formula:
        swaprate = freq_swap * (1 - discount[T]) / sum(discounts over payment periods)

    Args:
        discounts (pd.Series): Series of discount factors indexed by maturity (in years).
        maturity (float): Time to swap maturity in years.
        freq_swap (int): Frequency of swap payments per year.

    Returns:
        float: Calculated swap rate.

    Raises:
        ValueError: If `maturity` is not present in `discounts` index.
    """

    if maturity not in discounts.index:
        raise ValueError(f"Maturity {maturity} not found in `discounts` index.")
    
    # Infer frequency of discount curve
    discount_diffs = discounts.index.to_series().diff().dropna()
    avg_dt = discount_diffs.mean()
    freq_disc = round(1 / avg_dt)
    
    if freq_swap <= 0:
        raise ValueError("Frequency of swap (`freq_swap`) must be a positive integer.")
    
    step = round(freq_disc / freq_swap)
    if step <= 0:
        raise ValueError("Invalid step calculation. Check `freq_swap` relative to discount curve frequency.")
    
    try:
        periods_swap = discounts.index.get_loc(maturity)
    except KeyError:
        raise ValueError(f"Maturity {maturity} not found in `discounts` index.")
    
    # Adjust for inclusive range
    periods_swap += 1

    swaprate_numerator = 1 - discounts.loc[maturity]
    swaprate_denominator = discounts.iloc[step-1:periods_swap:step].sum()
    
    if swaprate_denominator == 0:
        raise ZeroDivisionError("Denominator in swap rate calculation is zero.")
    
    swap_rate = freq_swap * (swaprate_numerator / swaprate_denominator)
    
    return swap_rate


def calculate_forward_swap_rate(
    discounts: pd.Series,
    tfwd: float,
    tswap: float,
    freq_swap: int
) -> float:
    """
    Calculates the forward swap rate between two maturities using discount factors.

    The forward swap rate is computed based on the formula:
        fwdswaprate = freq_swap * (discount[Tfwd] - discount[Tswap]) / sum(discounts over forward periods)

    Args:
        discounts (pd.Series): Series of discount factors indexed by maturity (in years).
        tfwd (float): Forward start maturity in years.
        tswap (float): Swap maturity in years.
        freq_swap (int): Frequency of swap payments per year.

    Returns:
        float: Calculated forward swap rate.

    Raises:
        ValueError: If `tfwd` or `tswap` are not present in `discounts` index.
        ValueError: If `freq_swap` is not positive.
    """
    if tfwd not in discounts.index:
        raise ValueError(f"Forward maturity {tfwd} not found in `discounts` index.")
    if tswap not in discounts.index:
        raise ValueError(f"Swap maturity {tswap} not found in `discounts` index.")
    if freq_swap <= 0:
        raise ValueError("Frequency of swap (`freq_swap`) must be a positive integer.")

    # Infer frequency of discount curve
    discount_diffs = discounts.index.to_series().diff().dropna()
    avg_dt = discount_diffs.mean()
    freq_disc = round(1 / avg_dt)
    
    step = round(freq_disc / freq_swap)
    if step <= 0:
        raise ValueError("Invalid step calculation. Check `freq_swap` relative to discount curve frequency.")
    
    try:
        periods_fwd = discounts.index.get_loc(tfwd)
        periods_swap = discounts.index.get_loc(tswap)
    except KeyError as e:
        raise ValueError(f"Maturity not found in `discounts` index: {e}")
    
    # Adjust for step and inclusive range
    periods_fwd += step
    periods_swap += 1

    denominator = discounts.iloc[periods_fwd:periods_swap:step].sum()
    if denominator == 0:
        raise ZeroDivisionError("Denominator in forward swap rate calculation is zero.")
    
    fwdswaprate = freq_swap * (discounts.loc[tfwd] - discounts.loc[tswap]) / denominator
    
    return fwdswaprate


def forward_discount(
    spot_discount: pd.Series,
    t1: float,
    t2: float
) -> float:
    """
    Computes the forward discount factor = spot_discount[t2] / spot_discount[t1].

    Parameters:
        spot_discount (pd.Series): Discount factors indexed by maturity.
        t1 (float): Early maturity.
        t2 (float): Later maturity.

    Returns:
        float: Forward discount factor.
    """
    return spot_discount.loc[t2] / spot_discount.loc[t1]



`treasury.py`

In [19]:

# ===============================================================================================
# Treasury Filtering
# ===============================================================================================

def get_key_info(info: pd.Series) -> pd.DataFrame:
    """
    Extracts and reformats key information about a Treasury issue from CRSP time series info table.
    Rows to extract: ['KYTREASNO','TDATDT','TMATDT','TCOUPRT','ITYPE']. Then renames these rows to friendlier labels and reassigns type' labels based on numeric codes.

    Parameters:
        info (pd.Series): A Series (often a row slice) with index including 'KYTREASNO', 'TDATDT', 'TMATDT', 'TCOUPRT', 'ITYPE'.

    Returns:
        pd.DataFrame: DataFrame with the renamed rows and typed columns, indexed by 'issue date'.

    Raises:
        KeyError: If any of the required keys are missing in the `info` Series.
    """

    keys = ['KYTREASNO', 'TDATDT', 'TMATDT', 'TCOUPRT', 'ITYPE']
    info.index = info.index.str.upper()
    key_info = info.loc[keys]
    key_info.index = ['KYTREASNO', 'issue date', 'maturity date', 'coupon rate', 'type']
    
    # Map numeric 'type' codes to descriptive labels
    type_map = {
        1: 'bond',
        2: 'note',
        3: 'bill',
        11: 'TIPS bond',
        12: 'TIPS note'
    }
    key_info.loc['type'] = key_info.loc['type'].map(type_map)
    
    # Use 'issue date' row as columns
    key_info.columns = key_info.loc['issue date']
    return key_info


def get_snapshot(database: pd.DataFrame, date: Union[str, datetime.date, datetime.datetime]) -> pd.DataFrame:
    """
    Returns a snapshot of specific Treasury metrics at a given date from CRSP time series data table.
    This function extracts bid, ask, accrued interest, price (clean & dirty), duration (annualized), modified duration, and yield to maturity from the provided `database` on `date`.

    Parameters:
        database (pd.DataFrame): Must contain columns such as 'KYTREASNO', 'CALDT', 'TDBID', 'TDASK', 'TDACCINT', 'TDDURATN', 'TDYLD'.
        date (Union[str, datetime.date, datetime.datetime]): The date at which to extract a snapshot (e.g., 'YYYY-MM-DD').

    Returns:
        pd.DataFrame: Snapshot with row index representing each metric and columns keyed by the snapshot date.

    Raises:
        ValueError: If the specified `date` is not found in the `database`.
    """

    database.columns = database.columns.str.upper()
    if isinstance(date, str):
        date = pd.to_datetime(date)
    
    datasnap = database[database['CALDT'] == date].T

    metrics = datasnap.loc[['KYTREASNO', 'TDBID', 'TDASK', 'TDACCINT']]
    metrics.loc['clean price'] = (metrics.loc['TDBID'] + metrics.loc['TDASK']) / 2
    metrics.loc['dirty price'] = metrics.loc['clean price'] + metrics.loc['TDACCINT']
    metrics.loc['duration'] = datasnap.loc['TDDURATN'] / 365.25
    metrics.loc['ytm'] = (datasnap.loc['TDYLD'] * 365.25)
    metrics.loc['modified duration'] = metrics.loc['duration'] / (1 + metrics.loc['ytm'] / 2)

    metrics.columns = metrics.loc['KYTREASNO']
    metrics.drop(index=['KYTREASNO'], inplace=True)
    
    # Tidy up row labels
    metrics.index = metrics.index.str.lower()
    metrics.rename({'TDBID': 'bid',
                    'TDASK': 'ask',
                    'TDACCINT': 'accrued interest'},
                   inplace=True)

    return metrics


def get_summary_table(info: pd.Series, database: pd.DataFrame, date: Union[str, datetime.date, datetime.datetime]) -> pd.DataFrame:
    """
    Merges key bond information with a snapshot of market metrics on a given date.

    Parameters:
        info (pd.Series): A Series with key info (e.g., from get_key_info).
        database (pd.DataFrame): Market data containing the required columns for get_snapshot.
        date (Union[str, datetime.date, datetime.datetime]): The date for which to retrieve the snapshot.

    Returns:
        pd.DataFrame: A table merging bond key information and snapshot metrics.

    Raises:
        KeyError: If merging on 'KYTREASNO' fails due to missing keys.
    """
    keyinfo = get_key_info(info)
    metrics = get_snapshot(database, date)
    
    table = pd.merge(keyinfo.T, metrics.T, on='KYTREASNO', how='inner').T
    table.columns = table.loc['KYTREASNO']
    table.drop('KYTREASNO', inplace=True)
    
    return table


def filter_treasuries(
    data: pd.DataFrame,
    date_curr: Optional[datetime.date] = None,
    filter_tenure_max: float = None,
    filter_tenure_min: float = None,
    drop_duplicate_maturities: bool = False,
    filter_tips: bool = True,
    filter_yld: bool = True
) -> pd.DataFrame:
    """
    Filters a DataFrame of Treasury quotes for a specific date, maturity, TIPS, yield, etc.

    Parameters:
        data (pd.DataFrame): DataFrame with columns ['CALDT', 'TMATDT', 'ITYPE', 'TDYLD'].
        date_curr (Optional[datetime.date], optional): Current date of the analysis. If None, defaults to the last date in data['CALDT']. Defaults to None.
        filter_tenure_max (float, optional): Keep only treasuries with maturity < t_date + 'filter_tenure_max' years. Defaults to None.
        filter_tenure_min (float, optional): Keep only treasuries with maturity > t_date + 'filter_tenure_min' years. Defaults to None.
        drop_duplicate_maturities (bool, optional): If True, drop duplicates by 'TMATDT'. Defaults to False.
        filter_tips (bool, optional): If True, remove TIPS (ITYPE in [11,12]); if False, keep only TIPS. Defaults to True.
        filter_yld (bool, optional): If True, remove rows where 'TDYLD' <= 0. Defaults to True.

    Returns:
        pd.DataFrame: Filtered DataFrame of Treasury quotes.

    Raises:
        ValueError: If the input `data` does not contain recognized date columns or required columns.
    """

    df = data.copy()

    if 'CALDT' in data.columns:
        date_col = 'CALDT'
        maturity_col = 'TMATDT'
        yield_col = 'TDYLD'
    elif 'quote date' in data.columns:
        date_col = 'quote date'
        maturity_col = 'maturity date'
        yield_col = 'ytm'
    else:
        raise ValueError("Unrecognized data format: Missing key columns.")

    if date_curr is None:
        date_curr = df[date_col].values[-1]

    # Filter by date
    df = df[df[date_col] == date_curr]

    # Filter out redundant maturities
    if drop_duplicate_maturities:
        df = df.drop_duplicates(subset=[maturity_col])

    # Filter by max maturity
    if filter_tenure_max is not None:
        max_maturity_dt = date_curr + np.timedelta64(int(365 * filter_tenure_max + 1), 'D')
        mask_truncate = df[maturity_col] < max_maturity_dt
        df = df[mask_truncate]

    # Filter by min maturity
    if filter_tenure_min is not None:
        min_maturity_dt = date_curr + np.timedelta64(int(365 * filter_tenure_min - 1), 'D')
        mask_truncate = df[maturity_col] > min_maturity_dt
        df = df[mask_truncate]

    # Filter TIPS
    # 'ITYPE' in [11,12] => TIPS note/bond
    if filter_tips:
        if 'ITYPE' in df.columns:
            df = df[~df['ITYPE'].isin([11, 12])]
        elif 'type' in df.columns:
            df = df[~df['type'].isin(['TIPS bond'])]
        else:
            raise ValueError('No TIPS filter column found')
    else:
        if 'ITYPE' in df.columns:
            df = df[df['ITYPE'].isin([11, 12])]
        elif 'type' in df.columns:
            df = df[df['type'].isin(['TIPS bond'])]
        else:
            raise ValueError('No TIPS filter column found')

    # Filter yield
    if filter_yld:
        df = df[df[yield_col] > 0]

    return df


def select_maturities(
    rawdata: pd.DataFrame,
    periods: int = 20,
    freq: str = '6ME'
) -> List[str]:
    """
    Given a raw DataFrame with 'quote date', 'issue date', 'maturity date', selects bond issues closest to a series of equally spaced maturities from the first quote date.

    Parameters:
        rawdata (pd.DataFrame): DataFrame with columns ['quote date', 'issue date', 'maturity date'].
        periods (int, optional): Number of intervals to generate (e.g., 20 half-year intervals). Defaults to 20.
        freq (str, optional): Frequency offset alias (pandas-style) for date_range (e.g., '6ME' ~ 6-month intervals). Defaults to '6ME'.

    Returns:
        List[str]: A list of index labels (e.g., KYTREASNO) for the bonds that best match those intervals.

    Raises:
        ValueError: If required columns are missing in `rawdata`.
    """
    data = rawdata.copy()[['quote date', 'issue date', 'maturity date']]

    # Assume the first row's 'quote date' is the overall date
    DATE = data['quote date'].iloc[0]

    # Generate date intervals
    six_month_intervals = pd.date_range(start=DATE, periods=periods + 1, freq=freq)[1:]

    def find_closest_date(interval: pd.Timestamp, df: pd.DataFrame) -> Optional[pd.Series]:
        # Calculate the absolute difference
        df['difference'] = (df['maturity date'] - interval).abs()
        # Only consider future dates
        future_dates = df[df['maturity date'] > DATE]
        if not future_dates.empty:
            min_diff = future_dates['difference'].min()
            # Ties resolved by latest issue date
            closest_rows = future_dates[future_dates['difference'] == min_diff]
            return closest_rows.sort_values('issue date', ascending=False).iloc[0]
        return None

    selected_rows = [find_closest_date(interval, data) for interval in six_month_intervals]
    selected_rows = [row for row in selected_rows if row is not None]
    select_ids = [row.name for row in selected_rows]

    return select_ids


# ===============================================================================================
# Treasury Data Processing
# ===============================================================================================

def process_wrds_treasury_data(
    rawdata: pd.DataFrame, 
    keys_extra: Optional[List[str]] = None
) -> pd.DataFrame:
    """
    Processes WRDS Treasury data into a standardized format, renaming columns, 
    calculating TTM, dirty price, and annualizing yields.

    Parameters:
    -----------
    rawdata : pd.DataFrame
        Raw DataFrame from WRDS with columns such as CALDT, TDBID, TDASK, ...
    keys_extra : list of str, optional
        Additional columns from `rawdata` to keep in the output.

    Returns:
    --------
    pd.DataFrame
        Processed data with standardized columns:
        ['type', 'quote date', 'issue date', 'maturity date', 'ttm', 'accrual fraction',
         'cpn rate', 'bid', 'ask', 'price', 'accrued int', 'dirty price', 'ytm', ...].
    """
    if keys_extra is None:
        keys_extra = []

    DAYS_YEAR = 365.25
    FREQ = 2  # semi-annual

    data = rawdata.copy()
    data.columns = data.columns.str.upper()
    data.sort_values('TMATDT', inplace=True)
    data.set_index('KYTREASNO', inplace=True)

    # Subset columns
    keep_cols = [
        'CALDT', 'TDBID', 'TDASK', 'TDNOMPRC', 'TDACCINT', 'TDYLD', 'TDATDT',
        'TMATDT', 'TCOUPRT', 'ITYPE', 'TDDURATN', 'TDPUBOUT', 'TDTOTOUT'
    ] + keys_extra
    data = data[keep_cols]

    # Map integer codes to descriptive bond types
    dict_type = {
        1: 'bond',
        2: 'note',
        4: 'bill',
        11: 'TIPS note',
        12: 'TIPS bond'
    }
    data['ITYPE'] = data['ITYPE'].replace(dict_type)

    # Rename columns to standardized names
    data.rename(
        columns={
            'CALDT': 'quote date',
            'TDATDT': 'issue date',
            'TMATDT': 'maturity date',
            'TCOUPRT': 'cpn rate',
            'TDTOTOUT': 'total size',
            'TDPUBOUT': 'public size',
            'TDDURATN': 'duration',
            'ITYPE': 'type',
            'TDBID': 'bid',
            'TDASK': 'ask',
            'TDNOMPRC': 'price',
            'TDACCINT': 'accrued int',
            'TDYLD': 'ytm'
        },
        inplace=True
    )

    # Convert dates to datetime
    data['quote date'] = pd.to_datetime(data['quote date'])
    data['issue date'] = pd.to_datetime(data['issue date'])
    data['maturity date'] = pd.to_datetime(data['maturity date'])

    # Time-to-maturity in years
    data['ttm'] = (data['maturity date'] - data['quote date']).dt.days.astype(float) / DAYS_YEAR

    # Dirty price = price + accrued interest
    data['dirty price'] = data['price'] + data['accrued int']

    # Convert duration from days to years
    data['duration'] /= 365.0

    # Convert sizes to face value in dollars
    data['total size'] *= 1e6
    data['public size'] *= 1e6

    # Annualize YTM (assumed input is in daily logs or something) for semiannual compounding
    def to_semiannual_ytm(x: float) -> float:
        return (np.exp(x * DAYS_YEAR / FREQ) - 1.0) * FREQ

    data['ytm'] = data['ytm'].apply(to_semiannual_ytm)

    # Accrual fraction
    data['accrual fraction'] = data['accrued int'] / (data['cpn rate'] / FREQ)
    idx_na = data['accrual fraction'].isna()
    # If not available, approximate from fractional part of TTM
    data.loc[idx_na, 'accrual fraction'] = 1.0 - (data.loc[idx_na, 'ttm'] - np.round(data.loc[idx_na, 'ttm'])) * FREQ

    # Final organization
    standard_keys = [
        'type', 'quote date', 'issue date', 'maturity date', 'ttm',
        'accrual fraction', 'cpn rate', 'bid', 'ask', 'price',
        'accrued int', 'dirty price', 'ytm'
    ]
    data = data[standard_keys + keys_extra]

    return data


def get_bond_raw(
    quote_date: Union[str, pd.Timestamp]
) -> Tuple[pd.DataFrame, Optional[pd.Timestamp]]:
    """
    Returns the raw bond data (unprocessed) and the current quote date for the given date.

    Parameters:
    -----------
    quote_date : str or pd.Timestamp
        Quote date to process (YYYY-MM-DD).

    Returns:
    --------
    (pd.DataFrame, pd.Timestamp or None)
        Tuple of (raw DataFrame, quote date). The DataFrame is indexed by KYTREASNO.
    """
    filepath_rawdata = f'../data/treasury_quotes_{quote_date}.xlsx'
    rawdata = pd.read_excel(filepath_rawdata, sheet_name='quotes')
    rawdata.columns = rawdata.columns.str.upper()
    rawdata.sort_values('TMATDT', inplace=True)
    rawdata.set_index('KYTREASNO', inplace=True)

    t_check = rawdata['CALDT'].values[0]
    if rawdata['CALDT'].eq(t_check).all():
        t_current = t_check
    else:
        warnings.warn("Quotes appear to be from multiple dates.")
        t_current = None

    return rawdata, t_current


def get_bond(
    quote_date: Union[str, pd.Timestamp],
    maturity: Optional[Union[float, List[float]]] = None,
    coupon: float = None,
    selection: str = 'nearest'
) -> pd.DataFrame:
    """
    Retrieves Treasury bonds (from processed quotes) matching specified maturity or coupon criteria.

    Parameters:
        quote_date (Union[str, pd.Timestamp]): Quote date to process (format: 'YYYY-MM-DD').
        maturity (Optional[Union[float, List[float]]], optional): Desired maturity in years. If a single float, it is converted to a list internally. If None, no filtering by maturity is done. Defaults to None.
        coupon (float, optional): Desired coupon rate. If None, no filtering by coupon is done. Defaults to None.
        selection (str, optional): Method to select maturity if `maturity` is provided. Defaults to 'nearest'. Options: 
            - 'nearest': bond whose maturity interval is closest to the requested
            - 'ceil': bond whose maturity is the smallest above the requested
            - 'floor': bond whose maturity is the largest below the requested 

    Returns:
        pd.DataFrame: DataFrame of bonds matching the given criteria.

    Raises:
        ValueError: If the `selection` method is not recognized or required data is missing.
    """
    
    metrics = process_wrds_treasury_data(quote_date)

    # Filter by coupon if provided
    if coupon is not None:
        metrics = metrics[metrics['coupon rate'] == coupon]

    # Filter by maturity if provided
    if maturity is not None:
        mats = metrics['maturity interval']

        if isinstance(maturity, float):
            maturity = [maturity]

        idx = []
        for m in maturity:
            if selection == 'nearest':
                idx.append(mats.sub(m).abs().idxmin())
            elif selection == 'ceil':
                # Only consider future maturities. If none, returns the min
                candidates = mats[mats >= m]
                if not candidates.empty:
                    idx.append(candidates.idxmin())
                else:
                    # Fallback to the single largest maturity
                    idx.append(mats.idxmax())
            elif selection == 'floor':
                candidates = mats[mats <= m]
                if not candidates.empty:
                    idx.append(candidates.idxmax())
                else:
                    # Fallback to the single smallest maturity
                    idx.append(mats.idxmin())
            else:
                warnings.warn(f"Selection method '{selection}' is not recognized. Using 'nearest'.")
                idx.append(mats.sub(m).abs().idxmin())

        metrics = metrics.loc[idx, :]

    return metrics


`bond_pair_trading.py`

In [20]:

# ===============================================================================================
# Bond Pair Trading Utilities
# ===============================================================================================

def trade_balance_sheet(prices: pd.Series,
                        durations: pd.Series,
                        key_long: str,
                        key_short: str,
                        haircuts: pd.Series = None,
                        long_equity: float = None,
                        long_asset: float = None
                        ) -> Tuple[pd.DataFrame, Dict[str, str]]:
    """
    Computes a simplistic balance sheet for durtion-neutral a single long-short trade given prices, durations, and haircuts.

    Parameters:
        prices (pd.Series): Current prices of the instruments, indexed by the same keys used for durations and haircuts.
        durations (pd.Series): Durations of the instruments, matched to `prices` index.
        key_long (str): Label in the Series for the instrument you are long.
        key_short (str): Label in the Series for the instrument you are short.
        haircuts (pd.Series, optional): Haircut ratios (e.g., 0.05 means 5% equity for that instrument). Defaults to None.
        long_equity (float, optional): The amount of equity you allocate to the long position. Uses haircut to get asset amount. Must provide either `long_equity` or `long_asset`. Defaults to None.
        long_asset (float, optional): The total amount of assets (notional) for the long position. Must provide either `long_equity` or `long_asset`. Defaults to None.

    Returns:
        Tuple[pd.DataFrame, Dict[str, str]]: A tuple containing:
            - DataFrame with 'equity', 'assets', and 'contracts' for each instrument.
            - Formatting dictionary for display.

    Raises:
        ValueError: If neither `long_equity` nor `long_asset` is provided, or if required keys are missing.
    """
    hedge_ratio = -durations[key_long] / durations[key_short] # Dolar hedge ratio

    balsheet = pd.DataFrame(dtype='float64', index=[key_long, key_short], columns=['dirty price', 'dolar duration', 'equity', 'assets'])

    if long_equity is not None:
        # Assets for the long position
        balsheet['assets'] = long_equity / haircuts.values
    elif long_asset is not None:
        balsheet.loc[key_long, 'assets'] = long_asset
    else:
        raise ValueError("Must input either 'long_equity' or 'long_assets'.")

    # Hedge ratio determines the short leg's asset size
    balsheet.loc[key_short, 'assets'] = balsheet.loc[key_long, 'assets'] * hedge_ratio
    balsheet['equity'] = balsheet['assets'] * haircuts.values
    balsheet['contracts'] = balsheet['assets'] / prices
    balsheet['dirty price'] = prices
    balsheet['dolar duration'] = durations * prices

    fmt = {
        'dirty price': '${:,.2f}',
        'dolar duration': '${:,.2f}',
        'equity': '${:,.2f}',
        'assets': '${:,.2f}',
        'contracts': '{:,.2f}'
    }
    return balsheet, fmt


def get_spread_bps(database: pd.DataFrame, id_ref: str) -> pd.DataFrame:
    """
    Computes a yield spread in basis points from a database of Treasury yields.

    The function pivots `database` on 'CALDT' (rows) vs. 'KYTREASNO' (columns), then multiplies yields by (365 * 100 * 100). It returns the negative difference of each column relative to the first column.

    Parameters:
        database (pd.DataFrame): Must contain columns: 'caldt', 'kytreasno', 'TDYLD'.
        id_ref (str): The reference column name in `database` to compute the spread against.

    Returns:
        pd.DataFrame: Spread values (in basis points) for each `kytreasno` relative to the reference column.

    Raises:
        KeyError: If required columns are missing in `database`.
    """
    dtb = database.copy()
    dtb.columns = dtb.columns.str.upper()
    ylds = database.pivot_table(index='CALDT', columns='KYTREASNO', values='TDYLD')
    # Convert yields to annualized basis points
    ylds *= 365.25 * 100 * 100
    # Spread: difference from the first column
    spread = -ylds.sub(ylds.loc[:, id_ref], axis=0)
    spread = spread.drop(columns=[id_ref])
    spread = spread.dropna(axis=0, how='all')
    spread.columns = [f'({id_ref} - ' + str(col) + ')' for col in spread.columns]
    return spread


def pnl_spread_convergence(spread_convergence: float,
                          modified_duration: pd.Series,
                          price: pd.Series,
                          n_contracts: pd.Series
                          ) -> Tuple[pd.DataFrame, Dict[str, str]]:
    """
    Calculates P&L for a simple spread trade converging by a certain fraction of yield (in decimal form), assuming midpoint convergence.

    Parameters:
        spread_convergence (float): The yield convergence in decimal (e.g., 0.0001 for 1 bp).
        modified_duration (pd.Series): The modified durations (index should correspond to the instrument legs).
        price (pd.Series): Current prices of the instruments.
        n_contracts (pd.Series): Number of contracts/shares/units for each instrument.

    Returns:
        Tuple[pd.DataFrame, Dict[str, str]]: A tuple containing:
            - DataFrame with columns ['ytm change', 'modified duration', 'price', 'DV01', 'num contracts', 'pnl'].
            - Dictionary of formatting directives for display.

    Raises:
        ValueError: If input Series have mismatched indices or contain invalid data.
    """
    table = pd.DataFrame(dtype='float64',index=modified_duration.index)
    table['ytm change'] = spread_convergence / 2 * np.array([-1,1]) # The negative is because ytm change affects price negatively
    table['modified duration'] = modified_duration    
    table['price'] = price
    table['DV01'] = table['modified duration'] * table['price'] * 0.0001
    table['num contracts'] = n_contracts
    table['pnl'] = - table['modified duration'] * table['price'] * table['ytm change'] * table['num contracts']
    table.loc['total','pnl'] = table['pnl'].sum()

    fmt_dict = {
        'ytm change': '{:.4%}',
        'modified duration': '{:,.2f}',
        'DV01': '{:,.2f}',  # Example if you add a dollar DV01 col
        'num contracts': '{:,.2f}',
        'price': '${:,.2f}',
        'pnl': '${:,.2f}'
    }
    return table, fmt_dict

def _adjust_coupon_cash_flows(cpn_cash_flows_ts: pd.DataFrame, 
                             frequency_rebal: int) -> pd.DataFrame:
    """
    Adjust coupon cash flows to accumulate coupons within rebalancing periods.

    Parameters:
    -----------
    cpn_cash_flows_ts (pd.DataFrame): Time series of coupon cash flows (index=dates, columns=long/short instruments).
    frequency_rebal (int): Rebalancing frequency in business days (e.g., 1 for daily, 5 for weekly).

    Returns:
    --------
    pd.DataFrame: Adjusted coupon cash flow time series with accumulated values per rebalancing period.
    """
    # Ensure index is datetime
    cpn_cash_flows_ts.index = pd.to_datetime(cpn_cash_flows_ts.index)
    
    # Determine rebalancing dates based on the frequency
    rebalancing_dates = cpn_cash_flows_ts.index[::frequency_rebal]

    # Accumulate coupon payments between rebalancing dates
    accumulated_coupons = cpn_cash_flows_ts.groupby(pd.Grouper(freq=f'{frequency_rebal}B')).sum()

    # Ensure alignment with rebalancing dates
    accumulated_coupons = accumulated_coupons.loc[rebalancing_dates.intersection(accumulated_coupons.index)]

    return accumulated_coupons


def trade_backtest(prices_ts: pd.DataFrame,
                   durations_ts: pd.DataFrame,
                   key_long: str,
                   key_short: str,
                   financing: pd.DataFrame,
                   long_equity: float = None,
                   long_asset: float = None,
                   frequency_rebal: int = 5,
                   cpn_cash_flows_ts: pd.DataFrame = None,
                   maturity_dates: pd.Series = None,
                   cpn_rates: pd.Series = None,
                   cpn_freq: pd.Series = None,
                   face_value: pd.Series = None
                   ) -> Tuple[pd.DataFrame, Dict[str, str]]:
    """
    Simulates the evolution of a long-short trade over weekly intervals, accounting for coupon payments and price changes.

    Parameters:
        prices_ts (pd.DataFrame): Time series of prices (index=dates, columns=long/short instruments).
        durations_ts (pd.DataFrame): Time series of durations for the instruments (index=dates, columns=long/short instruments).
        key_long (str): Identifier for the instrument that is long.
        key_short (str): Identifier for the instrument that is short.
        financing (pd.DataFrame): Financing terms, including 'haircut' for each instrument.
        long_equity (float, optional): Equity allocated to the long position. Must provide either long_equity or long_asset.
        long_asset (float, optional): Total asset amount for the long position. Must provide either long_equity or long_asset.
        frequency_rebal (int, default=5): Rebalancing frequency in business days (e.g., 1 for daily, 5 for weekly).
        cpn_cash_flows_ts (pd.DataFrame, optional): DataFrame of coupon cash flows (in decimal) for each instrument (index=dates, columns=long/short instruments). Must provide either cpn_cash_flow or maturity_dates, cpn_rates and cpn_freq
        maturity_dates (str): Maturity date for the trade (e.g., 'YYYY-MM-DD'). Must provite either maturity_dates or cpn_cash_flow.
        cpn_rates (float): Annualized coupon rate(s) to compute periodic coupon payments. Must provide either cpn_rates or cpn_cash_flow.
        cpn_freq (int): Coupon frequency (e.g., 2 for semiannual). Must provide either cpn_freq or cpn_cash_flow.
        face_value (float or pd.Series, default=100): Face value(s) of the instruments. If not provided, defaults to 100 for both assets.

    Returns:
    (pd.DataFrame, dict): A tuple of:
         - DataFrame with columns:
           ['price change', 'coupons', 'total pnl', 'equity', 'margin call', 
            'capital paid in', 'return (init equity)', 'return (avg equity)']
         - Dictionary of formatting directives for display.
    """
    
    # Ensure dates_ts is sorted and in datetime format
    
    start_date = prices_ts.index[0]

    # Check if the number of dates in both prices and duration time series matches
    price_dur = pd.merge(prices_ts, durations_ts, left_index=True, right_index=True, how='inner', suffixes=('_price', '_dur'))
    if prices_ts.shape[0] != price_dur.shape[0] and durations_ts.shape[0] != price_dur.shape[0]:
        raise Exception(f"Prices and durations have different dates.")
        
    # Get coupon cash flows
    if cpn_cash_flows_ts is None:
        if (maturity_dates is None) or (cpn_rates is None) or (cpn_freq is None):
            cpn_cash_flows_ts = pd.DataFrame(dtype='float64', columns=[key_long, key_short]) # Assume no coupon cash flows, empty Series
        else:
            if face_value is None:
                face_value = pd.Series([100, 100], index=[key_long, key_short])
            cpn_cash_flows_ts = None
            for key in [key_long, key_short]:
                cpn_cash_flows_dates = get_coupon_dates(quote_date=start_date,
                                                        maturity_date=maturity_dates[key],
                                                        coupon_freq_m=cpn_freq[key])
                cpn_cash_flow = pd.Series(name=key, index=cpn_cash_flows_dates, data=cpn_rates[key] / cpn_freq[key] * face_value[key])
                # Concat coupon cash flows:
                if cpn_cash_flows_ts is None:
                    cpn_cash_flows_ts = cpn_cash_flow
                else:
                    cpn_cash_flows_ts = pd.concat([cpn_cash_flows_ts, cpn_cash_flow], axis=1)
    cpn_cash_flows_ts = cpn_cash_flows_ts.rename(columns={key_long: 'long cpn', key_short: 'short cpn'})
    prices_cpn_ts = pd.merge(prices_ts, cpn_cash_flows_ts, how='left', left_index=True, right_index=True)
    prices_cpn_ts = prices_cpn_ts.fillna(0)
            
    # Initialize PnL DataFrame
    pnl = pd.DataFrame(dtype='float64', index=price_dur.index)

    if long_equity is not None:
        # Assets for the long position
        long_asset = long_equity / financing['haircut'][key_long]
    elif long_asset is None:
        raise ValueError("Must input either 'long_equity' or 'long_assets'.")
        
    # Adjust series for rebalancing frequency
    prices_rebal_ts = prices_ts.iloc[::frequency_rebal, 0:2]
    durations_rebal_ts = durations_ts.iloc[::frequency_rebal]
    cpn_cash_flows_rebal_ts = _adjust_coupon_cash_flows(prices_cpn_ts.iloc[:, 2:], frequency_rebal)

    # Price and coupon components of P&L
    pnl['price long'] = prices_rebal_ts[key_long]
    pnl['price short'] = prices_rebal_ts[key_short]
    pnl['long cpn'] = cpn_cash_flows_rebal_ts['long cpn']
    pnl['short cpn'] = cpn_cash_flows_rebal_ts['short cpn']
    pnl['long'] = long_asset / pnl['price long']
    pnl['hedge ratio'] = (durations_rebal_ts[key_long] / durations_rebal_ts[key_short]) * (pnl['price long'] / pnl['price short']) # Hedge ratio in units
    pnl['short'] = - pnl['hedge ratio'] * pnl['long']
    pnl['price dff long'] = pnl['price long'].diff() + pnl['long cpn']
    pnl['price dff short'] = pnl['price short'].diff() + pnl['short cpn']

    # P&L of the Hedged Position:
    pnl['long pnl']  = pnl['long'].shift(1) * (pnl['price long'].diff() + pnl['long cpn'])
    pnl['short pnl'] = pnl['short'].shift(1) * (pnl['price short'].diff() + pnl['short cpn'])
    pnl.loc[start_date, ['long pnl', 'short pnl']] = 0
    pnl['net pnl']  = pnl['long pnl'] + pnl['short pnl']
    
    pnl['long ($)'] = pnl['long'] * pnl['price long'].values
    pnl['short ($)'] = pnl['short'] * pnl['price short'].values

    # Margin call = change in equity not explained by P&L
    pnl['long equity'] = pnl['long'] * pnl['price long'] * financing['haircut'][key_long]
    pnl['short equity'] = pnl['short'] * pnl['price short'] * financing['haircut'][key_short]
    pnl['equity'] = pnl['long equity'] + pnl['short equity']
    pnl['margin call'] = pnl['equity'].diff() - pnl['net pnl'].diff()
    pnl.loc[start_date, 'margin call'] = 0
    
    # Capital paid in = initial equity + cumulative margin calls
    pnl['capital paid in'] = pnl['equity'] + pnl['margin call'].cumsum()
    
    # Returns
    pnl['return (init equity)'] = pnl['net pnl'] / pnl.loc[start_date, 'capital paid in']
    pnl['return (avg equity)'] = pnl['net pnl'] / pnl['capital paid in'].expanding().mean()
    pnl.loc[start_date, ['return (init equity)', 'return (avg equity)']] = 0

    # Format index
    pnl.index = pnl.index.strftime('%Y-%m-%d')

    # Format dictionary for display
    fmt_dict = {
        'price long': '{:,.2f}',
        'price short': '{:,.2f}',
        'long': '{:,.2f}',
        'short': '{:,.2f}',
        'hedge ratio': '{:,.2f}',
        'long cpn': '{:,.2f}',
        'short cpn': '{:,.2f}',
        'long ($)': '${:,.2f}',
        'short ($)': '${:,.2f}',
        'long pnl': '${:,.2f}',
        'short pnl': '${:,.2f}',
        'net pnl': '${:,.2f}',
        'long equity': '${:,.2f}',
        'short equity': '${:,.2f}',
        'equity': '${:,.2f}',
        'margin call': '${:,.2f}',
        'capital paid in': '${:,.2f}',
        'return (init equity)': '{:.2%}',
        'return (avg equity)': '{:.2%}'
    }
    return pnl, fmt_dict


def trade_evolution(start_date, date_maturity, n_weeks, balsheet, price_ts, duration_ts, financing, cpn_rates, key_long, key_short):

    dt0 = datetime.datetime.strptime(start_date,'%Y-%m-%d') 
    
    cpn_dates = get_coupon_dates(start_date,date_maturity)
    
    pnl = pd.DataFrame(dtype='float64',index=[dt0],columns=['price change', 'coupons', 'total pnl', 'equity'])
    pnl.loc[dt0] = [0, 0, 0, balsheet['equity'].abs().sum()]

    for i in range(1,n_weeks):
        dt = dt0 + datetime.timedelta(weeks=i)
        dt = prev_bday(dt)

        cpn_payments = (dt > pd.to_datetime(cpn_dates)).sum()
        pnl.loc[dt,'price change'] = (price_ts.loc[[dt0,dt]] * balsheet['contracts']).diff().sum(axis=1).loc[dt]
        pnl.loc[dt,'coupons'] = (cpn_rates * balsheet['contracts'] * cpn_payments / 2).sum()
        pnl.loc[dt,'total pnl'] = pnl.loc[dt,['price change','coupons']].sum()

        temp, _ = trade_balance_sheet(prices=price_ts.loc[dt],
                                      durations=duration_ts.loc[dt],
                                      haircuts=financing['haircut'],
                                      key_long=key_long,
                                      key_short=key_short,
                                      long_asset=balsheet.loc[key_long,'contracts']*price_ts.loc[dt,key_long])
        
        pnl.loc[dt,'equity'] = temp['equity'].abs().sum()

    pnl['margin call'] = pnl['equity'].diff() - pnl['total pnl'].diff()
    pnl.loc[dt0,'margin call'] = 0
    pnl['capital paid in'] = pnl['equity'] + pnl['margin call'].cumsum()

    pnl['return (init equity)'] = pnl['total pnl'] / pnl.loc[dt0,'capital paid in']
    pnl['return (avg equity)'] = pnl['total pnl'] / pnl['capital paid in'].expanding().mean()

    fmt_dict = {'price change':'${:,.2f}','coupons':'${:,.2f}','total pnl':'${:,.2f}','equity':'${:,.2f}','margin call':'${:,.2f}','capital paid in':'${:,.2f}', 'return (init equity)':'{:.2%}', 'return (avg equity)':'{:.2%}'}

    return pnl, fmt_dict