# Monte Carlo Default Simulation

We aim to estimate the expected loss up to time T:

$$E[L_T] = E\left[\sum_{i=1}^{N} \ell_i \cdot 1\{T_i \leq T\}\right]$$

Where:

- $\ell_i$ represents the loss amount from default $i$
- $T_i$ represents the default time for firm $i$
- $1\{T_i \leq T\}$ is the indicator function for defaults occurring before time $T$

## Model Components

### Firm Intensity Process
Each firm's default intensity is modeled as:

$$\lambda_t^i = X_i + \sum_{j=1}^{J} w_{ij} Y_t^j$$

Where:
- $X_i$ is the idiosyncratic component for firm $i$
- $Y_t^j$ is the sectoral intensity component
- $w_{ij}$ are loading factors representing sensitivity to sectoral factors

### Sectoral Intensity 

**We assume orthogonal sectors.**

$$dY_t^j = \kappa_j(\theta_j - Y_t^j)dt + \sigma_{j}\sqrt{Y_t^j}dW_t^j + \sum_{i=1}^{N}\delta_{ij}dL_t^i$$

Where:
- $\kappa_j$ is the mean reversion speed
- $\theta_j$ is the long-term mean level
- $\sigma_{j}$ are volatility parameters
- $W_t^j$ are standard Brownian motions
- $\delta_{ij}$ are contagion coefficients
- $L_t$ is the loss process 


## We experiment with integrating amortisation schedules in the above intensity based model.

### Hence the loss becomes LGD times the Exposure-at-Default.

We consider four repayment schedules. 
1) "Bullet" repayment where the outstanding principal is repaid in full at the time of "maturity".

2) "Linear/Italian" repayment scheme characterised by a constant repayment of principal in each installment.

3) "French" repayment scheme where a constant total installment (Principal + Interest) throughout the life of the loan is paid.

4) "Negative Amortisation" where the obligor continues to borrow and accrue further interest on the sum owed.

## In this notebook, instead of using plain exposure as the loss marked (i.e. LGD = 1), we take LGD $\sim$ Beta($\alpha$, $\beta$)

$\alpha$, $\beta$ parameters calculated using LGD_MEAN = 0.45, LGD_STDDEV = 0.15 (in accordance with regularoty assumptions)

In [1]:
import numpy as np
import scipy
from scipy.stats import ncx2
from scipy.special import hyp1f1
from scipy.optimize import brentq
from math import exp
import matplotlib.pyplot as plt
import pandas as pd
from scipy.stats import kurtosis

DECIMALS = 6

np.set_printoptions(precision=DECIMALS, suppress=True)
pd.options.display.float_format = lambda x: f"{x:.{DECIMALS}f}"
pd.set_option("display.precision", DECIMALS)

In [2]:
def lambda_max_generator(epsilon, y, theta, sigma, kappa, max_attempts=5, tol=1e-8):
    """
    
    Compute H^*_epsilon such that P_y(sigma_H < tau) = G_y(H; H) <= epsilon.
    This function implements Equation (11) using the closed-form in Equation (19).

    """

    def G_y(H):

        a = H / kappa  # Now using H as the Laplace exponent (since tau ~ Exp(H))
        b = 2 * kappa * theta / sigma**2
        z_y = 2 * kappa * y / sigma**2
        z_H = 2 * kappa * H / sigma**2

        num = hyp1f1(a, b, z_y)
        denom = hyp1f1(a, b, z_H)
        
        return num / denom  # This is G_y(H; H) = E[e^{-H * sigma_H}]

    def root_function(H):
        return G_y(H) - epsilon

    # Bracketing interval
    H_min = y + 1e-8
    H_max = y + 10.0

    # Expand H_max until G_y(H_max) < epsilon
    for _ in range(max_attempts):
        try:
          H_star = scipy.optimize.toms748(root_function, H_min, H_max, xtol=tol)
          
          if root_function(H_star)<=0:
            
            return H_star
          
          else:
          
            while root_function(H_star)>0:
              H_star = H_star + 1e-2
          
            return H_star
          
        except Exception:
            pass
        
        H_max += 5.0

    # Fallback: scan manually to find conservative H
    print("[Warning] brentq failed to converge. Using fallback grid search.")
    H_vals = np.linspace(H_min, H_max + 50, 1000)
    for H in H_vals:
        if G_y(H) <= epsilon:
            return H

    raise RuntimeError("Unable to find H^*_epsilon. Try expanding search space.")


In [3]:
def cir_transition_sample_per_sector(y_vec, tau, kappa_vec, theta_vec, sigma_vec, rng):
    """
    y_vec, kappa_vec, theta_vec, sigma_vec are arrays of length J (sectors).
    Returns xi_vec: sampled Y_{t+tau} per sector (length J).
    """
    if tau <= 0:
        return y_vec.copy()
    
    one_minus = -np.expm1(-kappa_vec * tau)  # = 1 - exp(-kappa*tau)
    # avoid zeros
    one_minus = np.where(one_minus <= 0, 1e-16, one_minus)
    
    c = (sigma_vec * sigma_vec * one_minus) / (4.0 * kappa_vec)
    d = 4.0 * kappa_vec * theta_vec / (sigma_vec * sigma_vec)
    
    # noncentrality parameters
    nc = (4.0 * kappa_vec * np.exp(-kappa_vec * tau) * y_vec) / (sigma_vec * sigma_vec * one_minus)
    
    # guard
    d = np.maximum(d, 1e-12)
    nc = np.maximum(nc, 0.0)
    
    # sample per sector (simple looping works fine since J would be typically small)
    xi = np.empty_like(y_vec, dtype=float)

    for j in range(len(y_vec)):
    
        # If df or nc are extreme, ncx2.rvs might throw an error
        try:
            Z = ncx2.rvs(df=d[j], nc=nc[j], random_state=rng)
    
        except Exception:
            Z = ncx2.rvs(df=max(d[j],1e-6), nc=0.0, random_state=rng) + nc[j]
    
        xi[j] = c[j] * Z
    
    xi = np.maximum(xi, 0.0)
    
    return xi


In [4]:
LGD_MEAN = 0.45
LGD_STD = 0.15

def get_beta_params(mu, sigma):
    if sigma**2 >= mu * (1 - mu):
        raise ValueError("Standard deviation is too high for this mean.")

    nu = (mu * (1 - mu) / sigma**2) - 1
    alpha = mu * nu
    beta = (1 - mu) * nu
    return alpha, beta

alpha_lgd, beta_lgd = get_beta_params(LGD_MEAN, LGD_STD)

In [5]:
alpha_lgd, beta_lgd

(4.500000000000001, 5.500000000000002)

In [12]:
from scipy.interpolate import interp1d

def loss_distribution_plot(Payoff_T,index):
  # Get histogram data (without plotting)
  counts, bin_edges = np.histogram(Payoff_T, bins=10, density=True)

  # Get bin centers
  bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2

  # Interpolation function
  f_interp = interp1d(bin_centers, counts, kind='cubic', fill_value="extrapolate")

  # New x-values for smooth curve
  x_smooth = np.linspace(bin_centers.min(), bin_centers.max(), 500)
  y_smooth = f_interp(x_smooth)

  # Plot histogram and interpolated curve
  plt.hist(Payoff_T, bins=100, density=True, label='Histogram_'+str(index),alpha = 0.4)
  plt.plot(x_smooth, y_smooth, label='Loss Distribution Case_'+str(index))
  plt.xlabel('Loss')
  plt.ylabel('Density')
  # plt.legend()
  plt.title('Loss Distribution for '+str(index))


In [7]:
def expected_shortfall(losses, alpha=0.95):
    var_alpha = np.quantile(losses, alpha)
    tail_losses = losses[losses >= var_alpha]
    return float(np.round(tail_losses.mean(), DECIMALS))

def compute_metrics(losses, alpha=0.95):
    var_alpha = np.quantile(losses, alpha)
    es = expected_shortfall(losses, alpha)
    metrics = {
        "Mean": np.mean(losses),
        "Std": np.std(losses),
        "VaR": var_alpha,
        "ES": es,
        "Excess ES": es - var_alpha,
        "ExcessKurtosis": kurtosis(losses)
    }
    return {key: float(np.round(value, DECIMALS)) for key, value in metrics.items()}

In [None]:
loan_types = ["bullet", "linear", "french", "negative"]

def bullet_exposure(contract, T, t):
    P = float(contract["P"])
    return P if t < T else 0.0


def linear_exposure(contract, T, t):
    P = float(contract["P"])
    periods_per_year = float(contract["N"])
    if periods_per_year <= 0 or T <= 0:
        return 0.0

    period = 1.0 / periods_per_year  # years per period
    total_periods = int(np.ceil(T * periods_per_year))
    payments_made = int(np.floor(t / period))
    payments_made = int(np.clip(payments_made, 0, total_periods))
    principal_paid = (payments_made / total_periods) * P
    return max(P - principal_paid, 0.0)


def french_exposure(contract, T, t):
    P = float(contract["P"])
    periods_per_year = float(contract["N"])
    r_annual = float(contract["r"])
    if periods_per_year <= 0 or T <= 0:
        return 0.0

    period = 1.0 / periods_per_year  # years per period
    total_periods = int(np.ceil(T * periods_per_year))
    payments_made = int(np.floor(t / period))
    payments_made = int(np.clip(payments_made, 0, total_periods))
    periods_remaining = total_periods - payments_made
    if periods_remaining <= 0:
        return 0.0

    r_period = r_annual / periods_per_year  # annual to per-period rate

    if r_period == 0.0:
        payment = P / total_periods
        return payment * periods_remaining

    annuity_payment = P * r_period / (1.0 - (1.0 + r_period) ** (-total_periods))
    pv_remaining = annuity_payment * (1.0 - (1.0 + r_period) ** (-periods_remaining)) / r_period
    return pv_remaining


def negative_amortization_exposure(contract, T, t):
    P = float(contract["P"])
    periods_per_year = float(contract["N"])
    r_annual = float(contract.get("r", 0.0))
    if periods_per_year <= 0 or T <= 0:
        return max(P, 0.0)

    period = 1.0 / periods_per_year  # years per period
    total_periods = int(np.ceil(T * periods_per_year))
    periods_elapsed = int(np.floor(t / period))
    periods_elapsed = int(np.clip(periods_elapsed, 0, total_periods))

    r_period = r_annual / periods_per_year  # annual to per-period rate

    exposure = P * ((1.0 + r_period) ** periods_elapsed)

    return exposure if t < T else 0.0


def exposure_at_time(contract, T, t):
    loan_type = str(contract["type"]).lower()
    dispatch = {
        "bullet": bullet_exposure,
        "linear": linear_exposure,
        "french": french_exposure,
        "negative_amortisation": negative_amortization_exposure,
    }
    if loan_type not in dispatch:
        raise ValueError(f"Unknown loan type: {loan_type}")
    return dispatch[loan_type](contract, T, t)


In [9]:
def simulate_sector_cir_model(kappa, theta, sigma, T, delta, W, eps, lambda_benchmark,
                              Nfirms, idiosyncratic_factor, loan_contracts, alpha_lgd, beta_lgd, rng=None):

    import warnings

    if rng is None:
        rng = np.random.default_rng()

    # ensure shapes
    J = len(theta)
    assert W.shape == (Nfirms, J)
    idiosyncratic_factor = np.asarray(idiosyncratic_factor).reshape(Nfirms,)
    lambda_benchmark = np.asarray(lambda_benchmark).reshape(J,)

    delta = np.asarray(delta)
    if delta.ndim == 1:
        delta = np.tile(delta.reshape(1, J), (Nfirms, 1))
    assert delta.shape == (Nfirms, J)

    # init
    t = 0.0
    Y_t = theta.copy().astype(float)   # sectoral intensities (J-vector), initialised at theta
    events = []
    owed = []
    marks = []
    defaulter_idio_factor = []
    sector_contributions = []
    alive = np.arange(Nfirms, dtype=int)

    # precompute some arrays for speed
    kappa = np.asarray(kappa, dtype=float)
    sigma = np.asarray(sigma, dtype=float)
    theta = np.asarray(theta, dtype=float)

    # event loop
    while (t < T) and (alive.size > 0):

        lambda_max = np.maximum(Y_t, lambda_benchmark)  # J-vector
        # Hepsilon = sum_i [ X_i + w_i Â· lambda_max ] over alive firms
        
        # We can compute per-firm value and sum
        per_firm_sys = W[alive].dot(lambda_max)   # length alive.size
        per_firm_idio = idiosyncratic_factor[alive]
        Hepsilon = per_firm_sys.sum() + per_firm_idio.sum()

        if Hepsilon <= 0:
            break

        # sample a candidate waiting time
        tau = rng.exponential(1.0 / Hepsilon)
        t_candidate = t + tau
        if t_candidate >= T:
            break

        # sample sectoral Y at t_candidate conditional on no defaults in the waiting time
        Y_proposed = cir_transition_sample_per_sector(Y_t, tau, kappa, theta, sigma, rng)

        # compute proposed per-firm intensities (scalar values) using Y_proposed
        per_firm_sys_prop = W[alive].dot(Y_proposed)
        per_firm_total_prop = per_firm_sys_prop + per_firm_idio
        lambda_proposed_total = per_firm_total_prop.sum()

        # acceptance probability: Xi / Hepsilon where Xi = lambda_proposed_total
        accept_prob = min(lambda_proposed_total / Hepsilon, 1.0)
        if rng.random() < accept_prob:
            # accept a default at t_candidate
            t = t_candidate
            events.append(t)

            # choose which alive firm defaulted
            probs = per_firm_total_prop / lambda_proposed_total
            probs = np.maximum(probs, 0.0)
            probs = probs / probs.sum()
            selected_local_idx = rng.choice(len(alive), p=probs)
            selected_firm = int(alive[selected_local_idx])

            # record idiosyncratic factor
            defaulter_idio_factor.append(float(idiosyncratic_factor[selected_firm]))

            sector_contributions.append(W[selected_firm].copy())

            # mark (loss) from contract exposure
            contract = loan_contracts[selected_firm]
            exposure = exposure_at_time(contract, T, t)
            if exposure < 0:
                warnings.warn("Exposure computed negative; clamping to zero.")
                exposure = max(exposure, 0.0)
            if t >= T and exposure > 0:
                warnings.warn("Exposure positive after maturity; forcing to zero.")
                exposure = 0.0

            owed.append(exposure)
            
            LGD = rng.beta(alpha_lgd, beta_lgd)
            mark = LGD * exposure
            # mark =  exposure
            if mark < 0:
                warnings.warn("Loss mark negative; clamping to zero.")
                mark = max(mark, 0.0)
            marks.append(mark)

            # update Y_t with contagion
            Y_t = Y_proposed + delta[selected_firm] * mark/contract["P"]

            # remove defaulted firm from alive set
            alive = np.delete(alive, selected_local_idx)

        else:
            # reject, ie no default: advance time and set Y_t = Y_proposed
            t = t_candidate
            Y_t = Y_proposed

    return (np.array(events), np.array(marks), np.array(owed),
            np.array(defaulter_idio_factor), np.array(sector_contributions))

In [10]:
def generate_portfolio_weights(N, J, composition_type, concentration_params=None, rng=None):
    """
    Build portfolio weights under three stylised configurations plus a random fallback.

    - concentrated: one dominant sector for every obligor (eg for J=3 [0.7, 0.2, 0.1])
    - balanced: evenly distributed across sectors (eg for J=3  [1/3, 1/3, 1/3])
    - mixed: partial concentration with residual diversified weights (eg for J=3  [0.5, 0.3, 0.2])
    - random: symmetric Dirichlet for variation.
    """
    if rng is None:
        rng = np.random.default_rng()

    if concentration_params is None:
        concentration_params = {}

    noise_level = concentration_params.get("noise_level", 0.0)
    alpha_random = concentration_params.get("alpha", 1.0)

    if composition_type == "concentrated":
        pattern = _pattern_concentrated(J)
        W = _apply_noise_and_normalize(pattern, N, noise_level, rng)
    elif composition_type == "balanced":
        pattern = _pattern_balanced(J)
        W = _apply_noise_and_normalize(pattern, N, noise_level, rng)
    elif composition_type == "mixed":
        pattern = _pattern_mixed(J)
        W = _apply_noise_and_normalize(pattern, N, noise_level, rng)
    elif composition_type == "random":
        W = _generate_random_weights(N, J, {"alpha": alpha_random}, rng)
    else:
        raise ValueError(f"Unknown composition_type: {composition_type}")

    return W


def _pattern_concentrated(J, dominant=0.7):
    if J <= 0:
        return np.array([])
    if J == 1:
        return np.array([1.0])
    residual = max(1.0 - dominant, 0.0)
    tail = residual / (J - 1)
    vec = np.full(J, tail)
    vec[0] = dominant
    return vec


def _pattern_balanced(J):
    if J <= 0:
        return np.array([])
    return np.full(J, 1.0 / J)


def _pattern_mixed(J, primary=0.5, secondary=0.3):
    if J <= 0:
        return np.array([])
    vec = np.zeros(J)
    vec[0] = min(primary, 1.0)
    if J >= 2:
        vec[1] = min(secondary, max(1.0 - vec[0], 0.0))
    residual = max(1.0 - vec.sum(), 0.0)
    if J > 2:
        vec[2:] = residual / (J - 2)
    elif J == 1:
        vec[0] = 1.0
    else:
        vec[1] += residual
    return vec


def _apply_noise_and_normalize(pattern, N, noise_level, rng):
    base = np.tile(pattern, (N, 1))
    if noise_level > 0:
        noise = rng.normal(0.0, noise_level, size=base.shape)
        base = base + noise
    base = np.maximum(base, 1e-6)
    base = base / base.sum(axis=1, keepdims=True)
    return base


def _generate_random_weights(N, J, params, rng):
    alpha = params.get("alpha", 1.0)
    W = np.zeros((N, J))
    for i in range(N):
        W[i] = rng.dirichlet(np.full(J, alpha))
    return W

In [11]:
def run_simulation(params, loan_contracts, num_trials, seed=42):

    rng = np.random.default_rng(seed)
    eps = 1e-5
    losses = np.zeros(num_trials)
    for i in range(num_trials):
        events, marks, _, _, _ = simulate_sector_cir_model(params['kappa'], params['theta'], params['sigma'], 
                          params['T'], params['delta'], params['W'], 1e-5, 
                          params['lambda_benchmark'], 
                          params['Firms'], params['idio_factor'], 
                          loan_contracts, alpha_lgd, beta_lgd,
                          rng)
        losses[i] = np.sum(marks)
    return losses