## Importing Libraries

In [62]:
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime, date, timedelta
from dateutil.relativedelta import relativedelta
import plotly.graph_objects as go
import re

import holidays
us_holidays = holidays.UnitedStates()

This is a two-week long homework composed of the following portions:

1. Bond construction 
2. Duration calculation (should be pretty simple!)
3. Using the FRED API to plot the yield/forward/zero curve
4. Using treasury coupon bond prices as of 09/19/2025 to bootstrap spot rates from coupon bonds. 
    Write a small piece comparing that with the zero curve. 
5. From the spot curve in Q4, plot the forward curve and classify the yield shape (is it inverted/normal etc)


For all data collection purposes get the data as of **09/19/2025**.

# Question 1

Constructure hypothetical treasury par bond of a given time to maturity (in years) with given yield that pays semiannual coupons. Just define the functions.

In [26]:
class Treasury_Par_Bond:

    n_bonds = 0

    # ----- Instance Details -----
    def __init__(self, fv, ytm, freq, T, issue_date: date): # force issue date to be entered 
        self.fv = fv
        self.ytm = ytm
        self.freq = freq
        self.T = T

        # accept str or date for issue_date
        self.issue_date = issue_date if isinstance(issue_date, date) else date.fromisoformat(issue_date)

        # For a par bond, coupon rate = YTM
        self.coupon_rate = ytm

        # Total number of payments
        self.n_periods = self.T * self.freq # N

        # Counting the number of bonds
        Treasury_Par_Bond.n_bonds += 1

    # ----- HELPER FUNCTIONS -----
    def yield_per_coupon(self):
        return self.ytm / self.freq

    def discount_factor(self, t):
        """ 
        Don't directly use n_periods. 
        Discount factor is individualised for each payment. 
        
        """
        return 1 / ((1 + self.yield_per_coupon()) ** t)

    @staticmethod
    def is_business_day(d: date, holidays=us_holidays) -> bool:
        return d.weekday() < 5 and (not holidays or d not in holidays)

    @staticmethod
    def adjusted_business_day(d: date, holiday_calendar=us_holidays) -> date:
        """
        Roll forward to the next business day if d is weekend/holiday.
        
        """
        while d.weekday() >= 5 or d in holiday_calendar:  # 5=Sat, 6=Sun
            d += timedelta(days=1)
        
        return d

    # ----- Value of Each Coupon Payment -----
    def coupon_payment(self):
        return self.fv * self.coupon_rate / self.freq

    # ----- Cash Flow Schedule for the Bond -----
    def cash_flows(self):
        """
        Returns a list of all coupon payments,
        with the face value added to the last payment. 
        
        """
        c = self.coupon_payment()
        cash_flows = [c] * self.n_periods # N coupon payments
        cash_flows[-1] += self.fv # Nth payment = coupon + principal

        return cash_flows 

    # ----- Bond Price from YTM -----
    def bond_price_from_ytm(self):
        """
        Returns the present value (price) of the bond
        based on its yield to maturity (ytm).

        Note: US Treasuries use discrete compounding discouting
        
        """
        cash_flows = self.cash_flows()
        bond_price = sum(cf * self.discount_factor(t) for t, cf in enumerate (cash_flows, start=1)) # returns (index, value)

        return bond_price

    # ----- Cash Flows of the Bond -----
    def table_of_cash_flows(self):
        """
        Returns a list of dicts: period t, cash_flow, discount_factor, present_value.

        """
        # Setting up coupon payment date calculation 
        months_between_payments = 12 // self.freq
        scheduled_coupon_payment_date = self.issue_date 

        table = []
        for t, cf in enumerate(self.cash_flows(), start=1):
            scheduled_coupon_payment_date += relativedelta(months = months_between_payments) # Add months_between_payments to the issue_date for payment date 
            
            # adjust coupon payment date if it is a holiday 
            actual_coupon_payment_date = Treasury_Par_Bond.adjusted_business_day(scheduled_coupon_payment_date)

            df = self.discount_factor(t)

            table.append({
                "t": t,
                "scheduled_date": scheduled_coupon_payment_date,   # fixed calendar schedule
                "coupon_payment_date": actual_coupon_payment_date, # business-day adjusted
                "cash_flow": cf,
                "discount_factor": df,
                "present_value": cf * df
            })
        
        return table

    # ----- Accrued Interest, Dirty Price and Clean Price -----
    # Dirty Price = Clean Price + Accured Interest 

    @staticmethod
    def get_settlement_date_from_trade_date(trade_date, holidays=us_holidays):
        """
        US Treasury settlement: T+1 (roll forward for weekends/holidays).
        
        """
        # Base case: T+1 calendar day
        settlement_date = trade_date + timedelta(days=1)
        
        # Roll forward until business day
        while settlement_date.weekday() >= 5 or settlement_date in holidays:  # 5=Sat, 6=Sun
            settlement_date += timedelta(days=1)
        
        return settlement_date

    def accrued_days(self, trade_date, holidays=us_holidays) -> int:
        settlement_date = self.get_settlement_date_from_trade_date(trade_date, holidays)
        coupon_payment_dates = [row["coupon_payment_date"] for row in self.table_of_cash_flows()]

        if settlement_date >= coupon_payment_dates[-1]:
            return 0

        if settlement_date < coupon_payment_dates[0]:
            return max(0, (settlement_date - self.issue_date).days)

        for i in range(1, len(coupon_payment_dates)):
            if coupon_payment_dates[i-1] <= settlement_date < coupon_payment_dates[i]:
                last_coupon_date = coupon_payment_dates[i-1]
                return (settlement_date - last_coupon_date).days

        return 0

    def days_in_coupon_period(self, trade_date, holidays=us_holidays) -> int:
        """
        Actual days in the current coupon period (denominator for Actual/Actual).
        If pre-first-coupon: (first_coupon - issue_date). If post-final: 0.

        """
        settlement_date = self.get_settlement_date_from_trade_date(trade_date, holidays)
        coupon_payment_dates = [row["coupon_payment_date"] for row in self.table_of_cash_flows()]

        # After (or on) final coupon -> no period
        if settlement_date >= coupon_payment_dates[-1]:
            return 0

        # Before first coupon -> period is (first_coupon - issue_date)
        if settlement_date < coupon_payment_dates[0]:
            return (coupon_payment_dates[0] - self.issue_date).days

        # Between coupons
        for i in range(1, len(coupon_payment_dates)):
            if coupon_payment_dates[i-1] <= settlement_date < coupon_payment_dates[i]:
                last_coupon_date = coupon_payment_dates[i-1]
                next_coupon_date = coupon_payment_dates[i]
                return (next_coupon_date - last_coupon_date).days

        return 0

    def accrued_fraction(self, trade_date, holidays=us_holidays):
        """
        Actual/Actual fraction elapsed in the current coupon period.
        
        """
        denom = self.days_in_coupon_period(trade_date, holidays)
        
        if denom <= 0:
            return 0.0
        
        numer = self.accrued_days(trade_date, holidays)  

        x = numer / denom
        
        return 0.0 if x < 0 else (0.999999 if x >= 1 else x)
    

    def accrued_interest(self, trade_date, holidays=us_holidays):
        """
        Accrued = coupon per period × accrued_fraction (Actual/Actual).
        
        """
        return self.coupon_payment() * self.accrued_fraction(trade_date, holidays)

    def clean_price(self):
        """
        Quoted clean price (uses coupon-date PV).
        
        """
        return self.bond_price_from_ytm()

    def dirty_price_from_trade(self, trade_date, holidays=us_holidays) -> float:
        """
        Dirty = Clean + Accrued (accrued computed at settlement).
        
        """
        return self.clean_price() + self.accrued_interest(trade_date, holidays)

    # ----- Bond Duration -----
    def macaulay_duration(self):
        """
        Macaulay Duration (in years).
        
        """
        price = self.bond_price_from_ytm()
        weighted_sum = sum(
            t * cf * self.discount_factor(t)
            for t, cf in enumerate(self.cash_flows(), start=1)
        )
        
        return weighted_sum / price / self.freq  # divide by freq to convert periods → years

    def modified_duration(self):
        mod_duration = self.macaulay_duration() / (1 + self.yield_per_coupon())

        return mod_duration

    def pv01(self) -> float:
        """
        Price Value of 1bp (DV01).
        
        """
        return self.modified_duration() * self.clean_price() * 0.0001

    
    def convexity(self) -> float:
        """
        Convexity in years^2 (discrete compounding, standard textbook formula):
        C = [ Σ CF_t * t*(t+1) / (1+y_per)^(t+2) ] / P   (per-period units)
        Convert to years^2 by dividing by freq^2.
        
        """
        price = self.bond_price_from_ytm()
        
        if price <= 0:
            return 0.0
        
        per = self.yield_per_coupon()
        
        conv_per_period = (
            sum(
                cf * t * (t + 1) / ((1 + per) ** (t + 2))
                for t, cf in enumerate(self.cash_flows(), start=1)
            ) / price
        )
        
        return conv_per_period / (self.freq ** 2)

    def dollar_convexity(self) -> float:
        """
        Dollar convexity = price × convexity (units: currency per (Δy)^2).
        
        """
        return self.clean_price() * self.convexity()

In [None]:
# Instantiate a 5y semiannual par Treasury (issue dated 2025-01-01)
bond_1 = Treasury_Par_Bond(1000, 0.03, 2, 5, '2025-01-01')

print("Face Value:", bond_1.fv)
print("YTM:", bond_1.ytm)
print("Coupon Frequency:", bond_1.freq)
print("Maturity (years):", bond_1.T)
print("Number of Periods:", bond_1.n_periods)

print("\nCoupon Payment per Period:", bond_1.coupon_payment())
print("Cash Flows:", bond_1.cash_flows())
print("Bond Price from YTM (Clean):", round(bond_1.bond_price_from_ytm(), 2))

print("\n--- Cash Flow Table ---")
for row in bond_1.table_of_cash_flows():
    print(
        f"t={row['t']:2d}, "
        f"Scheduled={row['scheduled_date']}, "
        f"Payment Date={row['coupon_payment_date']}, "
        f"CF=${row['cash_flow']:8.2f}, "
        f"DF={row['discount_factor']:.6f}, "
        f"PV=${row['present_value']:7.2f}"
    )

# ---- Accrued, Clean, and Dirty Prices ----
trade_date = date(2025, 6, 10)   # example trade date
settlement_date = bond_1.get_settlement_date_from_trade_date(trade_date)

print("\n--- Pricing ---")
print("Trade Date:", trade_date)
print("Settlement Date:", settlement_date)
print("Accrued Days:", bond_1.accrued_days(trade_date))
print("Days in Coupon Period:", bond_1.days_in_coupon_period(trade_date))
print("Accrued Fraction:", round(bond_1.accrued_fraction(trade_date), 6))
print("Accrued Interest:", round(bond_1.accrued_interest(trade_date), 6))
print("Clean Price:", round(bond_1.clean_price(), 6))
print("Dirty Price:", round(bond_1.dirty_price_from_trade(trade_date), 6))

# ---- Duration ----
print("\n--- Duration ---")
print("Macaulay Duration (years):", round(bond_1.macaulay_duration(), 6))
print("Modified Duration (years):", round(bond_1.modified_duration(), 6))
print("PV01: ", round(bond_1.pv01(), 6))

# ---- Convexity ----
print("\n--- Convexity ---")
print("Convexity: ", round(bond_1.convexity(), 6))
print("Dollar Convexity: ", round(bond_1.dollar_convexity(), 6))

Face Value: 1000
YTM: 0.03
Coupon Frequency: 2
Maturity (years): 5
Number of Periods: 10

Coupon Payment per Period: 15.0
Cash Flows: [15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 1015.0]
Bond Price from YTM (Clean): 1000.0

--- Cash Flow Table ---
t= 1, Scheduled=2025-07-01, Payment Date=2025-07-01, CF=$   15.00, DF=0.985222, PV=$  14.78
t= 2, Scheduled=2026-01-01, Payment Date=2026-01-02, CF=$   15.00, DF=0.970662, PV=$  14.56
t= 3, Scheduled=2026-07-01, Payment Date=2026-07-01, CF=$   15.00, DF=0.956317, PV=$  14.34
t= 4, Scheduled=2027-01-01, Payment Date=2027-01-04, CF=$   15.00, DF=0.942184, PV=$  14.13
t= 5, Scheduled=2027-07-01, Payment Date=2027-07-01, CF=$   15.00, DF=0.928260, PV=$  13.92
t= 6, Scheduled=2028-01-01, Payment Date=2028-01-03, CF=$   15.00, DF=0.914542, PV=$  13.72
t= 7, Scheduled=2028-07-01, Payment Date=2028-07-03, CF=$   15.00, DF=0.901027, PV=$  13.52
t= 8, Scheduled=2029-01-01, Payment Date=2029-01-02, CF=$   15.00, DF=0.887711, PV=$  13.32
t= 9, 

# Question 2

Calculate its duration (Macaulay Bond Duration)

### Answered above. 

Method is defined as macaulay_duration. 

# Question 3 
Use the FRED API to get the yields by maturity.  Plot the yield curve, forward curve and zero curve. 

In [None]:
# FRED series IDs for Treasury yields by maturity 

# US Gov bond interest rates for different maturities 
# <= 1 year - T-Bills
# 2 - 10 years - T-Notes 
# 20 and 30 years - T-Bonds 

series_ids = { '3M': 'DGS3MO'
                , '6M': 'DGS6MO'
                , '1Y': 'DGS1'
                , '2Y': 'DGS2'
                , '3Y': 'DGS3'
                , '5Y': 'DGS5'
                , '7Y': 'DGS7'
                , '10Y': 'DGS10'
                , '20Y': 'DGS20'
                , '30Y': 'DGS30' }

API_KEY = "a6710e81fd45204490adffed360ffd42"

In [35]:
def fetch_fred_series(series_id, start=None, end=None, api_key=None):
    """
    Fetch a FRED series into a pandas Series (index = datetime, values = float).
    Skips missing '.' values and raises on HTTP errors.

    ---- Notes ----
    JSON is easiest to parse in Python. 
    `requests` library has .json() built in, so you get a Python dictionary immediately.
    
    """
    url = "https://api.stlouisfed.org/fred/series/observations"

    # adds api_key, start date and end date to the params dictionary if they are passed     
    params = {"series_id": series_id, "file_type": "json"}
    if api_key: params["api_key"] = api_key
    if start:   params["observation_start"] = start
    if end:     params["observation_end"] = end

    r = requests.get(url, params=params) # send request for data and get it back in r
    r.raise_for_status()  # raise if HTTP error

    # go here to see what the data looks like -> https://fred.stlouisfed.org/docs/api/fred/series_observations.html#example_json
    data = r.json() # .json() converts data into a python dict

    # this is a list of dictionaries
    obs = data.get("observations", []) # .get(key, default) is a dictionary method that takes. default -> value to return if data is not available

    # creating two columns extracted from obs to get date and yield for that date
    dates, vals = [], [] # create empty lists 
    for o in obs:
        # what to do if there is no value
        v = o.get("value", ".")
        if v == ".":
            continue  # skip missing
        
        # add the date to date column
        dates.append(datetime.strptime(o["date"], "%Y-%m-%d"))
        # add the yields to values column
        vals.append(float(v))  # FRED yields are in PERCENT (e.g., 4.25)

    # return the data in a series format with index as the date 
    return pd.Series(vals, index=dates, name=series_id)

In [None]:
# ---- Fetch all maturities into one DataFrame ----

# Create an empty dictionary to store all the yield curves
curves = {}

# Loop through each key-value pair in series_ids
#   label   = shorthand (e.g., '3M', '2Y', '10Y')
#   fred_id = actual FRED series ID (e.g., 'DGS3MO', 'DGS2', 'DGS10')
for label, fred_id in series_ids.items():  
    
    # For each maturity, fetch the yield time series from FRED
    # and store it in the curves dict with label as the key.
    
    # Example: curves['3M'] = pandas.Series with 2025 yield data for DGS3MO

    # curves is key=fred_id and value=pd.Series
    curves[label] = fetch_fred_series(
        fred_id, 
        start="2025-01-01", 
        end="2025-12-31", 
        api_key=API_KEY
    )

curve_df = pd.DataFrame(curves)  # rows=dates, cols=labels (3M..30Y)

print("Shape:", curve_df.shape)

Shape: (184, 10)
              3M    6M    1Y    2Y    3Y    5Y    7Y   10Y   20Y   30Y
2025-09-19  4.03  3.81  3.60  3.57  3.56  3.68  3.88  4.14  4.71  4.75
2025-09-22  4.00  3.81  3.61  3.61  3.59  3.71  3.90  4.15  4.73  4.77
2025-09-23  4.00  3.81  3.61  3.53  3.57  3.68  3.87  4.12  4.70  4.73
2025-09-24  4.02  3.82  3.63  3.57  3.60  3.70  3.91  4.16  4.73  4.76
2025-09-25  4.04  3.85  3.68  3.64  3.66  3.75  3.94  4.18  4.73  4.75


In [None]:
# curve_df: rows=dates (DatetimeIndex), cols=['3M','6M','1Y',...,'30Y'] in PERCENT
MAT_YEARS = {"3M":0.25,"6M":0.5,"1Y":1,"2Y":2,"3Y":3,"5Y":5,"7Y":7,"10Y":10,"20Y":20,"30Y":30}

# keep only columns we know, in order
cols = [c for c in MAT_YEARS if c in curve_df.columns]
x = np.array([MAT_YEARS[c] for c in cols])

# Build frames (one per date with complete data)
df_complete = curve_df[cols].dropna()
dates = df_complete.index

frames = []
for d in dates:
    y = df_complete.loc[d].values  # yields in %
    frames.append(go.Frame(data=[go.Scatter(x=x, y=y, mode="lines+markers")],
                           name=str(d.date())))

# Initial frame = first date
fig = go.Figure(
    data=[go.Scatter(x=x, y=df_complete.iloc[0].values, mode="lines+markers")],
    layout=go.Layout(
        title="U.S. Treasury Yield Curve (daily)",
        xaxis=dict(
            title="Maturity (in years)",
            range=[0, 35] # extend to cover all maturities
        ),
        yaxis=dict(
            title="Yield (%)",
            range=[3.3,5.5]  # extend above max yield
        ),
        width=1000,
        height=600,
        updatemenus=[{
            "type": "buttons",
            "buttons": [
                {"label": "Play", "method": "animate",
                 "args": [None, {"frame": {"duration": 100, "redraw": True},
                                 "fromcurrent": True, "transition": {"duration": 0}}]},
                {"label": "Pause", "method": "animate",
                 "args": [[None], {"frame": {"duration": 0, "redraw": False},
                                   "mode": "immediate"}]},
            ],
        }],
        sliders=[{
            "steps": [{"args": [[str(d.date())],
                        {"frame": {"duration": 0, "redraw": True},
                         "mode": "immediate"}],
                       "label": str(d.date()), "method": "animate"} for d in dates],
            "currentvalue": {"prefix": "Date: "}
        }]
    ),
    frames=frames
)


fig.show()


In [58]:
import numpy as np
import pandas as pd
import plotly.graph_objects as go

# ---- 1) Fetch GS zero/spot series ----
zero_ids = {
    1: "GS1",
    2: "GS2",
    3: "GS3",
    5: "GS5",
    7: "GS7",
    10: "GS10",
    20: "GS20",
    30: "GS30",
}

dfs = {T: fetch_fred_series(sid, start="2020-01-01", api_key=API_KEY) for T, sid in zero_ids.items()}
zero_curve = pd.concat(dfs, axis=1)                     # columns are numeric maturities
zero_curve.columns.name = "Maturity"

# Rename columns to strings like "1Y","2Y",... for plotting convenience
rename_map = {T: f"{int(T)}Y" for T in zero_curve.columns}
curve_df_zero = zero_curve.rename(columns=rename_map).sort_index()

In [None]:
MAT_YEARS_ZERO = {"1Y":1,"2Y":2,"3Y":3,"5Y":5,"7Y":7,"10Y":10,"20Y":20,"30Y":30}

# keep only columns we know, in order
cols = [c for c in MAT_YEARS_ZERO if c in curve_df_zero.columns]
x = np.array([MAT_YEARS_ZERO[c] for c in cols])

# Build frames (one per date with complete data)
df_complete = curve_df_zero[cols].dropna()
dates = df_complete.index

frames = []
for d in dates:
    y = df_complete.loc[d].values  # yields in %
    frames.append(
        go.Frame(
            data=[go.Scatter(x=x, y=y, mode="lines+markers")],
            name=str(d.date())
        )
    )

# Initial frame = first date
y0 = df_complete.iloc[0].values
fig = go.Figure(
    data=[go.Scatter(x=x, y=y0, mode="lines+markers", name="Zero (GS)")],
    layout=go.Layout(
        title="U.S. Treasury Zero (Spot) Curve — GS series (daily)",
        xaxis=dict(title="Maturity (years)", range=[0, 35]),
        yaxis=dict(
            title="Yield (%)",
            range=[
                float(df_complete.min().min()) - 0.5,
                float(df_complete.max().max()) + 0.5
            ],
        ),
        width=1000, height=600,
        updatemenus=[{
            "type": "buttons",
            "buttons": [
                {"label": "Play", "method": "animate",
                 "args": [None, {"frame": {"duration": 100, "redraw": True},
                                 "fromcurrent": True, "transition": {"duration": 0}}]},
                {"label": "Pause", "method": "animate",
                 "args": [[None], {"frame": {"duration": 0, "redraw": False},
                                   "mode": "immediate"}]},
            ],
        }],
        sliders=[{
            "steps": [
                {"args": [[str(d.date())],
                          {"frame": {"duration": 0, "redraw": True},
                           "mode": "immediate"}],
                 "label": str(d.date()), "method": "animate"}
                for d in dates
            ],
            "currentvalue": {"prefix": "Date: "}
        }]
    ),
    frames=frames
)

fig.show()

In [75]:
import re
import numpy as np
import pandas as pd

def compute_forward_curve_from_zero(
    curve_df_zero: pd.DataFrame,
    horizon_years: float = 1.0,
    comp: str = "annual",
    interpolate_missing: bool = True,
) -> pd.DataFrame:
    """
    Compute n-year forward rates f_{t,t+n} from a zero-coupon (spot) curve time series.

    Parameters
    ----------
    curve_df_zero : DataFrame
        Index = dates; columns = maturities ("1Y","2Y",... or numeric 1,2,...).
        Values are yields in PERCENT.
    horizon_years : float
        n in years for f_{t,t+n}.
    comp : {'annual','semiannual','continuous'}
        Compounding convention to interpret the input zeros.
        Output forwards are annual-effective (%).
    interpolate_missing : bool
        If True, add missing maturities needed for t and t+n and
        linearly interpolate across columns using maturity as x.

    Returns
    -------
    DataFrame
        Index = dates; columns = start years t (float);
        values = f_{t,t+n} in PERCENT (annual-effective).
    """
    if curve_df_zero is None or curve_df_zero.empty:
        return pd.DataFrame()

    # --- parse maturities to numeric years ---
    def to_year(col):
        if isinstance(col, (int, float, np.integer, np.floating)):
            return float(col)
        m = re.fullmatch(r"\s*(\d+(?:\.\d+)?)\s*[Yy]\s*", str(col))
        if m:
            return float(m.group(1))
        raise ValueError(f"Cannot parse maturity column '{col}' into years.")

    df = curve_df_zero.copy()
    df.columns = pd.Index([to_year(c) for c in df.columns], name="MaturityYears")
    df = df.sort_index().sort_index(axis=1)

    # % -> decimal
    df_dec = df / 100.0

    # discount factor from zero under comp convention
    def DF(s, T):
        if comp == "annual":
            return 1.0 / ((1.0 + s) ** T)
        elif comp == "semiannual":
            m = 2
            return 1.0 / ((1.0 + s/m) ** (m*T))
        elif comp == "continuous":
            return np.exp(-s * T)
        else:
            raise ValueError("comp must be 'annual', 'semiannual', or 'continuous'.")

    n = float(horizon_years)
    orig_mats = np.array(df_dec.columns, dtype=float)

    # Ensure columns for t+n exist; interpolate across columns using column values as x
    if interpolate_missing:
        target_cols = sorted(set(orig_mats) | {t + n for t in orig_mats})
        # Reindex columns to include t+n points; columns must be numeric and sorted
        df_dec = df_dec.reindex(columns=target_cols)
        # Interpolate along maturity dimension using column values as x
        # (no extrapolation beyond min/max)
        try:
            df_dec = df_dec.interpolate(method="index", axis=1, limit_area="inside")
        except TypeError:
            # Older pandas may not support limit_area; still okay without it
            df_dec = df_dec.interpolate(method="index", axis=1)

    mats = np.array(df_dec.columns, dtype=float)
    starts = [t for t in mats if (t + n) in mats]

    if not starts:
        return pd.DataFrame(index=df_dec.index)  # empty but not None

    # compute forwards date-by-date
    out_rows = []
    for date, row in df_dec.iterrows():
        fwd_vals = {}
        for t in starts:
            s_t  = row[t]
            s_tn = row[t + n]
            if pd.isna(s_t) or pd.isna(s_tn):
                continue
            DF_t  = DF(s_t,  t)
            DF_tn = DF(s_tn, t + n)
            f_eff = (DF_t / DF_tn) ** (1.0 / n) - 1.0  # annual effective
            fwd_vals[float(t)] = 100.0 * f_eff         # back to percent
        out_rows.append(pd.Series(fwd_vals, name=date))

    fwd_df = pd.DataFrame(out_rows).sort_index()
    fwd_df.columns.name = f"StartYear_fwd_{n:g}y"
    return fwd_df


In [83]:
import plotly.graph_objects as go

h = 1.0  # choose horizon (years)
fwd = compute_forward_curve_from_zero(curve_df_zero, horizon_years=h, comp="annual", interpolate_missing=True)
fwd = fwd.dropna(how="all")

dates = fwd.index
# initial frame
row0 = fwd.iloc[0].dropna()
x0 = [float(c) for c in row0.index]
y0 = row0.values.tolist()

# build frames: one per date
frames = []
for d in dates:
    r = fwd.loc[d].dropna()
    frames.append(go.Frame(
        data=[go.Scatter(x=[float(c) for c in r.index], y=r.values.tolist(), mode="lines+markers")],
        name=str(d.date())
    ))

fig = go.Figure(
    data=[go.Scatter(x=x0, y=y0, mode="lines+markers", name=f"{h}Y forward")],
    layout=go.Layout(
        title=f"Forward Curve — {h}Y forwards (animated)",
        xaxis=dict(title="Start (years)", range=[0, 31]),
        yaxis=dict(title="Forward Rate (%)", range=[0, 5]),
        width=1000, height=700,
        updatemenus=[{
            "type":"buttons",
            "buttons":[
                {"label":"Play","method":"animate",
                 "args":[None,{"frame":{"duration":100,"redraw":True},
                               "fromcurrent":True,"transition":{"duration":0}}]},
                {"label":"Pause","method":"animate",
                 "args":[[None],{"frame":{"duration":0,"redraw":False},
                                 "mode":"immediate"}]}
            ]
        }],
        sliders=[{
            "steps":[{"args":[[str(d.date())],
                               {"frame":{"duration":0,"redraw":True},
                                "mode":"immediate"}],
                      "label":str(d.date()),"method":"animate"} for d in dates],
            "currentvalue":{"prefix":"Date: "}
        }]
    ),
    frames=frames
)
fig.show()


# Question 4
Bootstrap spot Rates from coupon bonds - use the FRED API or Bloomberg to get the treasury prices/yields - do this till the 30Y. 

In [84]:
import numpy as np
import pandas as pd
import re

def _mat_to_years(label):
    """Parse '3M','6M','1Y','10Y','1MO' (or numeric) -> float years."""
    if isinstance(label, (int, float, np.integer, np.floating)):
        return float(label)
    s = str(label).strip().upper()
    if s.endswith("MO"):  # 1MO
        return float(s[:-2]) / 12.0
    if s.endswith("M"):   # 3M, 6M
        return float(s[:-1]) / 12.0
    if s.endswith("Y"):   # 1Y, 10Y
        return float(s[:-1])
    return float(s)       # last resort

def bootstrap_zeros_from_par(
    curve_df: pd.DataFrame,
    grid=np.arange(0.5, 30.0 + 0.5, 0.5),   # semiannual nodes to 30Y
    comp: str = "semiannual",               # output zero compounding
    allow_extrapolation: bool = False       # keep False for grading
):
    """
    Bootstrap spot (zero) rates from par yields (values in PERCENT).
    Returns (zeros_df %, dfs_df).
    """
    if curve_df is None or curve_df.empty:
        return pd.DataFrame(), pd.DataFrame()

    # 1) Standardize column maturities to numeric years and sort
    par_df = curve_df.copy()
    par_df.columns = pd.Index([_mat_to_years(c) for c in par_df.columns], name="ParMaturityYears")
    par_df = par_df.sort_index().sort_index(axis=1)

    grid = np.array(sorted(grid), dtype=float)
    zeros_rows, dfs_rows = [], []

    for date, row in par_df.iterrows():
        # Known par points for this date
        x = np.array([m for m, y in row.items() if pd.notna(y)], dtype=float)
        y = np.array([row[m] for m in x], dtype=float)  # percent
        if len(x) < 2:
            zeros_rows.append(pd.Series(index=grid, dtype=float, name=date))
            dfs_rows.append(pd.Series(index=grid, dtype=float, name=date))
            continue

        idx = np.argsort(x); x = x[idx]; y = y[idx]

        # 2) Interpolate par yields to each semiannual node on grid (no extrapolation by default)
        y_grid = np.full_like(grid, np.nan, dtype=float)
        xmin, xmax = x.min(), x.max()
        for i, T in enumerate(grid):
            if (T < xmin or T > xmax) and not allow_extrapolation:
                continue
            y_grid[i] = float(np.interp(T, x, y))

        # 3) Bootstrap discount factors (semiannual coupons)
        DF = {}
        for T, yT_pct in zip(grid, y_grid):
            if np.isnan(yT_pct):
                break
            yT = yT_pct / 100.0
            m  = int(round(T * 2))       # number of semiannual periods
            c  = yT / 2.0                # coupon per semiannual period

            if m == 1:
                # 0.5y: 100 = (100 + 100c) * DF_0.5  → DF_0.5 = 1/(1+c)
                DF[T] = 1.0 / (1.0 + c)
            else:
                coupon_leg = 0.0
                ok = True
                for i_per in range(1, m):
                    Ti = 0.5 * i_per
                    if Ti not in DF or pd.isna(DF[Ti]):
                        ok = False; break
                    coupon_leg += (100.0 * c) * DF[Ti]
                if not ok:
                    break
                DF[T] = (100.0 - coupon_leg) / (100.0 * (1.0 + c))

        df_series = pd.Series({T: DF.get(T, np.nan) for T in grid}, name=date)

        # 4) Convert DF → zero with requested compounding
        def zero_from_DF(DF_T, T):
            if pd.isna(DF_T) or T <= 0:
                return np.nan
            if comp == "semiannual":   # DF = 1 / (1+z/2)^(2T)
                return 2.0 * (DF_T ** (-1.0 / (2.0 * T)) - 1.0)
            if comp == "annual":       # DF = 1 / (1+z)^T
                return DF_T ** (-1.0 / T) - 1.0
            if comp == "continuous":   # DF = exp(-z T)
                return -np.log(DF_T) / T
            raise ValueError("comp must be 'semiannual','annual','continuous'.")

        z_series = pd.Series({T: 100.0 * zero_from_DF(df_series[T], T) for T in grid}, name=date)

        zeros_rows.append(z_series)
        dfs_rows.append(df_series)

    zeros_df = pd.DataFrame(zeros_rows).reindex(index=par_df.index)
    dfs_df   = pd.DataFrame(dfs_rows).reindex(index=par_df.index)
    zeros_df.columns.name = "MaturityYears"
    dfs_df.columns.name   = "MaturityYears"
    return zeros_df, dfs_df


In [85]:
# Bootstrap zeros from your 2025 par curve (percent in, percent out)
zeros_2025, dfs_2025 = bootstrap_zeros_from_par(
    curve_df, grid=np.arange(0.5, 30.0 + 0.5, 0.5), comp="semiannual"
)

# Plot the latest date
import plotly.graph_objects as go

latest = zeros_2025.dropna(how="all").index[-1]
row = zeros_2025.loc[latest].dropna()
fig = go.Figure()
fig.add_trace(go.Scatter(x=row.index.astype(float), y=row.values, mode="lines+markers",
                         name="Bootstrapped zeros (semiannual)"))
fig.update_layout(
    title=f"Treasury Zero Curve bootstrapped from Par — {latest.date()}",
    xaxis_title="Maturity (years)",
    yaxis_title="Zero Rate (%)",
    width=900, height=500
)
fig.show()


# Question 5
Calculate the 1 year forward rates from 1Y - 30Y (so 2Y1Y forward, 3Y1Y forward etc.). Plot the forward curve and classify the shape of it (inverted/normal etc.) Explain what that means!

In [86]:
# ---- Choose your zero curve source ----
# Option A: GS zeros (columns like "1Y","2Y",..., "30Y")
zero_curve_df = curve_df_zero.copy()
comp_in = "annual"         # GS treated as annual

# Option B: Bootstrapped zeros (columns numeric: 0.5, 1.0, ..., 30.0)
# zero_curve_df = zeros_2025.copy()
# comp_in = "semiannual"   # our bootstrap produced semiannual zeros

# Compute 1-year forwards (f_{t,t+1}); interpolate_missing fills gaps like 4Y, 6Y, etc.
fwd_1y = compute_forward_curve_from_zero(
    zero_curve_df, horizon_years=1.0, comp=comp_in, interpolate_missing=True
).dropna(how="all")


In [87]:
import plotly.graph_objects as go
import numpy as np

assert not fwd_1y.empty, "No forwards computed—check your zero curve columns and dates."

latest_date = fwd_1y.index[-1]
row = fwd_1y.loc[latest_date].dropna()

x = np.array([float(c) for c in row.index])  # start years t
y = row.values                               # forward rates in %

fig = go.Figure()
fig.add_trace(go.Scatter(x=x, y=y, mode="lines+markers", name="1Y forwards"))
fig.update_layout(
    title=f"1Y Forward Curve from Zero Rates — {latest_date.date()}",
    xaxis_title="Start (years)  →  (2Y1Y starts at 2 years, 3Y1Y at 3 years, ...)",
    yaxis_title="Forward Rate (%)",
    width=950, height=520
)
fig.show()


In [88]:
import numpy as np

def classify_forward_curve(x_years, y_percent, slope_tol_bps_per_year=2.0, hump_thresh_bps=10.0):
    """
    Heuristic:
    - 'normal'   : upward-sloping overall and roughly nondecreasing
    - 'inverted' : downward-sloping overall and roughly nonincreasing
    - 'humped'   : interior max clearly above both ends (by hump_thresh)
    - 'flat/mixed': otherwise

    Tolerances are in basis points per year (slope) and basis points (hump height).
    """
    x = np.asarray(x_years, dtype=float)
    y = np.asarray(y_percent, dtype=float)

    # overall slope (bp/year)
    slope = np.polyfit(x, y, 1)[0] * 100.0  # %/yr → bp/yr

    # monotonic checks with small tolerance (bp)
    dy = np.diff(y) * 100.0  # % → bp
    eps = 1.0  # 1bp wiggle room per step

    nondec = np.all(dy >= -eps)
    noninc = np.all(dy <=  eps)

    # hump check: interior maximum meaningfully above both ends
    i_max = int(np.argmax(y))
    hump = 0 < i_max < len(y)-1 and (y[i_max] - max(y[0], y[-1]))*100.0 >= hump_thresh_bps

    if slope > slope_tol_bps_per_year and nondec:
        label = "normal (upward-sloping)"
    elif slope < -slope_tol_bps_per_year and noninc:
        label = "inverted (downward-sloping)"
    elif hump:
        label = "humped"
    elif abs(slope) <= slope_tol_bps_per_year:
        label = "flat"
    else:
        label = "mixed/irregular"

    return label, slope

label, slope_bp_per_year = classify_forward_curve(x, y)
print(f"Shape classification: {label}  |  regression slope ≈ {slope_bp_per_year:.1f} bp/year")

# Short interpretation to include in your write-up:
explanations = {
    "normal (upward-sloping)":
        "Market-implied short rates are expected to be higher in the future, or a positive term premium dominates.",
    "inverted (downward-sloping)":
        "Market implies falling future short rates—often associated with easing expectations / recession risk.",
    "humped":
        "Near- to intermediate-term forwards are elevated, but longer-dated forwards decline—peaking mid-curve.",
    "flat":
        "Little change across maturities—transition or uncertainty; forwards roughly constant.",
    "mixed/irregular":
        "No clean single-shape signal (data gaps, noise, or multiple local moves)."
}
print("Interpretation:", explanations.get(label, "See plot for details."))


Shape classification: mixed/irregular  |  regression slope ≈ 7.3 bp/year
Interpretation: No clean single-shape signal (data gaps, noise, or multiple local moves).
