In [3]:
# ---------------------------
# Fannie Mae 2025-21 REMIC – Cashflow Model
# Scope: Cashflow waterfall, collateral simulation, tranche interest, WAL calculation
# ---------------------------

!pip install openpyxl --quiet

import numpy as np
import pandas as pd
from dataclasses import dataclass, field

# ---------------------------
# Deal Inputs
# ---------------------------
SETTLEMENT = pd.Timestamp('2025-02-28')
DIST_DAY = 25

# Collateral Groups
group_coll = {
    1: {'balance': 146_244_678.0, 'wac': 0.06905, 'remaining_months': 334},
    2: {'balance': 186_362_730.0, 'wac': 0.07454, 'remaining_months': 336}
}

# ---------------------------
# Tranche Class Definition
# ---------------------------
@dataclass
class Tranche:
    """
    Represents a REMIC tranche with balance, coupon, type, and waterfall characteristics.
    """
    name: str
    group: int
    original_balance: float
    curr_balance: float = field(init=False)
    coupon_type: str = 'fixed'  # fixed, float, accrual, notional, residual
    coupon: float = 0.0
    cap: float = None
    floor: float = None
    accrual: bool = False
    notional_of: str = None
    delay: bool = True  # Fixed-rate interest is delayed; floaters/inverse = no-delay

    def __post_init__(self):
        self.curr_balance = self.original_balance

# ---------------------------
# Tranche Setup
# ---------------------------
tranches = {
    # Group 1
    'BA': Tranche('BA', 1, 49_911_000.0, coupon_type='fixed', coupon=0.055),
    'BE': Tranche('BE', 1, 6_232_000.0, coupon_type='fixed', coupon=0.055),
    'BV': Tranche('BV', 1, 7_620_000.0, coupon_type='fixed', coupon=0.055),
    'BZ': Tranche('BZ', 1, 9_359_339.0, coupon_type='accrual', coupon=0.055, accrual=True),
    'FB': Tranche('FB', 1, 73_122_339.0, coupon_type='float', cap=0.0650, floor=0.0130, delay=False),
    'SB': Tranche('SB', 1, 0.0, coupon_type='notional', notional_of='FB', delay=False),

    # Group 2
    'CA': Tranche('CA', 2, 82_000_000.0, coupon_type='fixed', coupon=0.050),
    'CZ': Tranche('CZ', 2, 11_181_365.0, coupon_type='accrual', coupon=0.050, accrual=True),
    'FC': Tranche('FC', 2, 93_181_365.0, coupon_type='float', cap=0.0800, floor=0.0090, delay=False),
    'SC': Tranche('SC', 2, 0.0, coupon_type='notional', notional_of='FC', delay=False),

    # Residual placeholder
    'R': Tranche('R', 0, 0.0, coupon_type='residual', delay=False)
}

# ---------------------------
# PSA Functions
# ---------------------------
def psa_to_cpr(psa, month):
    """
    Convert PSA speed to CPR.
    """
    base = 0.06 * (month / 30.0)
    return min(0.06, base * (psa / 100.0))

def cpr_to_smm(cpr):
    """
    Convert annual CPR to single monthly mortality (SMM) for prepayments.
    """
    return 1 - (1 - cpr)**(1/12.0)

# ---------------------------
# Collateral Simulation
# ---------------------------
def simulate_group_collateral(group_id, psa, periods=360):
    """
    Simulate collateral cashflows (scheduled + prepayments) for a pool.
    """
    coll = group_coll[group_id].copy()
    bal = coll['balance']
    r = coll['wac'] / 12.0
    n = coll['remaining_months']
    pmt = bal * r / (1 - (1 + r)**(-n))  # Level monthly payment
    rows = []

    for m in range(1, periods+1):
        if bal <= 0:
            rows.append({'month': m, 'start_bal': 0, 'sched_prin': 0, 'prepay': 0,
                         'prin_pay': 0, 'int_pay': 0, 'end_bal': 0})
            continue

        interest = bal * r
        sched_prin = min(bal, pmt - interest)
        cpr = psa_to_cpr(psa, m)
        smm = cpr_to_smm(cpr)
        prepay = (bal - sched_prin) * smm
        total_prin = sched_prin + prepay
        end_bal = max(0.0, bal - total_prin)

        rows.append({
            'month': m,
            'start_bal': bal,
            'sched_prin': sched_prin,
            'prepay': prepay,
            'prin_pay': total_prin,
            'int_pay': interest,
            'end_bal': end_bal
        })
        bal = end_bal

    return pd.DataFrame(rows)

# ---------------------------
# Tranche Interest Rates
# ---------------------------
def tranche_interest_rate(tr_name, sofr):
    """
    Calculate interest rate for each tranche.
    """
    t = tranches[tr_name]
    if t.coupon_type == 'fixed':
        return t.coupon
    if tr_name == 'FB':
        r = sofr + 0.0130
        return min(max(r, t.floor), t.cap)
    if tr_name == 'FC':
        r = sofr + 0.0090
        return min(max(r, t.floor), t.cap)
    if tr_name == 'SB':
        r = 0.052 - sofr
        return max(0.0, min(0.052, r))
    if tr_name == 'SC':
        r = 0.071 - sofr
        return max(0.0, min(0.071, r))
    if t.coupon_type == 'accrual':
        return t.coupon
    return 0.0

# ---------------------------
# Principal Waterfall
# ---------------------------
def apply_principal_waterfall(group_id, principal_amount, accrual_map):
    """
    Apply principal distribution rules to tranches within a group.
    """
    payments = {n: {'prin': 0.0, 'int': 0.0} for n, t in tranches.items() if t.group == group_id}

    if group_id == 1:
        # Accrual allocation first
        amt = accrual_map.get('BZ', 0.0)
        if tranches['BV'].curr_balance > 0:
            to_bv = min(amt, tranches['BV'].curr_balance)
            payments['BV']['prin'] += to_bv
            tranches['BV'].curr_balance -= to_bv
            rem = amt - to_bv
            if rem > 0:
                payments['BZ']['prin'] += rem
                tranches['BZ'].curr_balance -= rem
        else:
            payments['BZ']['prin'] += amt
            tranches['BZ'].curr_balance -= amt

        # Sequential distribution
        seq = ['BA', 'BE', 'BV', 'BZ']
        half = principal_amount * 0.5
        rem = half
        for s in seq:
            pay = min(rem, tranches[s].curr_balance)
            payments[s]['prin'] += pay
            tranches[s].curr_balance -= pay
            rem -= pay
            if rem <= 1e-8: break

        # Remaining to floating
        rem2 = principal_amount - half
        pay_fb = min(rem2, tranches['FB'].curr_balance)
        payments['FB']['prin'] += pay_fb
        tranches['FB'].curr_balance -= pay_fb

    elif group_id == 2:
        # Accrual allocation
        amt = accrual_map.get('CZ', 0.0)
        if tranches['CA'].curr_balance > 0:
            to_ca = min(amt, tranches['CA'].curr_balance)
            payments['CA']['prin'] += to_ca
            tranches['CA'].curr_balance -= to_ca
            rem = amt - to_ca
            if rem > 0:
                payments['CZ']['prin'] += rem
                tranches['CZ'].curr_balance -= rem
        else:
            payments['CZ']['prin'] += amt
            tranches['CZ'].curr_balance -= amt

        seq = ['CA', 'CZ']
        half = principal_amount * 0.5
        rem = half
        for s in seq:
            pay = min(rem, tranches[s].curr_balance)
            payments[s]['prin'] += pay
            tranches[s].curr_balance -= pay
            rem -= pay
            if rem <= 1e-8: break

        # Remaining to floating
        rem2 = principal_amount - half
        pay_fc = min(rem2, tranches['FC'].curr_balance)
        payments['FC']['prin'] += pay_fc
        tranches['FC'].curr_balance -= pay_fc

    return payments

# ---------------------------
# Run REMIC Model
# ---------------------------
def run_remic(psa, months=120, sofr_curve=None):
    """
    Main cashflow runner for REMIC.
    """
    if sofr_curve is None:
        sofr_curve = [0.03]*months  # default flat SOFR

    # Reset balances
    for t in tranches.values():
        t.curr_balance = t.original_balance

    coll1 = simulate_group_collateral(1, psa, periods=months)
    coll2 = simulate_group_collateral(2, psa, periods=months)
    records = []
    prev_int = {n: 0.0 for n in tranches.keys()}  # store delayed interest

    for m in range(1, months+1):
        sofr = sofr_curve[m-1]
        row = {'month': m, 'psa': psa}
        c1 = coll1.loc[m-1]; c2 = coll2.loc[m-1]
        accrual_map = {}

        # Compute interest for all tranches
        for g, c in [(1, c1), (2, c2)]:
            for n, t in tranches.items():
                if t.group == g:
                    r = tranche_interest_rate(n, sofr)
                    base_bal = t.curr_balance
                    if t.coupon_type == 'notional' and t.notional_of:
                        base_bal = tranches[t.notional_of].curr_balance
                    interest_due = base_bal * (r/12.0)
                    if t.accrual:
                        accrual_map[n] = accrual_map.get(n, 0.0) + interest_due
                    else:
                        prev_int[n] += interest_due

        # Apply principal waterfalls
        pay1 = apply_principal_waterfall(1, c1['prin_pay'], accrual_map)
        pay2 = apply_principal_waterfall(2, c2['prin_pay'], accrual_map)

        row.update({'coll1_prin': c1['prin_pay'], 'coll1_int': c1['int_pay'],
                    'coll2_prin': c2['prin_pay'], 'coll2_int': c2['int_pay']})

        # Record tranche-level cashflows
        for name in tranches.keys():
            row[f'prin_{name}'] = pay1.get(name, {}).get('prin', 0.0) + pay2.get(name, {}).get('prin', 0.0)
            if tranches[name].accrual:
                row[f'int_{name}'] = 0.0
            elif tranches[name].delay:
                row[f'int_{name}'] = prev_int[name] if m > 1 else 0.0
                if m > 1: prev_int[name] = 0.0
            else:
                row[f'int_{name}'] = prev_int[name]
                prev_int[name] = 0.0
            row[f'endbal_{name}'] = tranches[name].curr_balance

        records.append(row)

    return pd.DataFrame(records)

# ---------------------------
# WAL Calculation
# ---------------------------
def compute_wal(df):
    """
    Compute weighted average life in years for each tranche.
    """
    wal_dict = {}
    for name in tranches.keys():
        if name == 'R': continue
        prin_col = f'prin_{name}'
        if prin_col not in df.columns: continue
        cashflows = df[['month', prin_col]].copy()
        total_prin = cashflows[prin_col].sum()
        if total_prin <= 0:
            wal_dict[name] = 0.0
        else:
            weighted_sum = (cashflows['month'] * cashflows[prin_col]).sum()
            wal_dict[name] = weighted_sum / (total_prin * 12.0)
    return pd.DataFrame(list(wal_dict.items()), columns=['Tranche', 'WAL (years)'])

# ---------------------------
# Example Run
# ---------------------------
df0 = run_remic(psa=0, months=360)
df400 = run_remic(psa=400, months=360)

wal0 = compute_wal(df0)
wal400 = compute_wal(df400)

# Save outputs to Excel
with pd.ExcelWriter('FNM2025_21_outputs.xlsx') as writer:
    df0.to_excel(writer, sheet_name='PSA0_CF', index=False)
    wal0.to_excel(writer, sheet_name='PSA0_WAL', index=False)
    df400.to_excel(writer, sheet_name='PSA400_CF', index=False)
    wal400.to_excel(writer, sheet_name='PSA400_WAL', index=False)

    # Summary WAL
    summary = wal0.merge(wal400, on='Tranche', suffixes=('_PSA0','_PSA400'))
    summary.to_excel(writer, sheet_name='Summary_WAL', index=False)

print("Done. File saved: FNM2025_21_outputs.xlsx (cashflows, WALs, summary)")


Done. File saved: FNM2025_21_outputs.xlsx (cashflows, WALs, summary)
