# ⚡️ Designing Market Shock Scenarios Notebook (Abdymomunov et al. (2024))
[Paper Link](https://www.richmondfed.org/-/media/RichmondFedOrg/publications/research/working_papers/2024/wp24-17.pdf)

This notebook re-implements the framework from Abdymomunov et al. (2024) using simulated data:
1. Two-stage modelling of risk-factor relationships

2. Quantile regressions (primary → secondary)

3. t-copula with marginal GARCH (secondary → all remaining)

4. Scenario generation & selection

4. Historical simulation of primary factors

5. Material-risk-factor screening (PnL coverage)

6. Tail-loss filter (1st percentile)

7. K-means reduction to a small representative set

When real market or portfolio data are unavailable the notebook fabricates synthetic but statistically plausible data so every block runs out-of-the-box.


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import QuantileRegressor
from arch import arch_model
from scipy.stats import t
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
import warnings

warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

np.random.seed(42)
sns.set(style="whitegrid", context="notebook")

# Helper functions
def simulate_ou(mu, theta, sigma, n):
    """Simulate Ornstein-Uhlenbeck process for interest rate factors"""
    x = np.zeros(n); x[0] = mu
    for i in range(1, n):
        x[i] = x[i-1] + theta*(mu - x[i-1]) + sigma*np.random.randn()
    return x

def fix_corr_matrix(corr):
    """Fix correlation matrix to be positive semi-definite"""
    corr = (corr + corr.T)/2
    eigvals, eigvecs = np.linalg.eigh(corr)
    eigvals[eigvals < 1e-6] = 1e-6
    corr_fixed = eigvecs @ np.diag(eigvals) @ eigvecs.T
    d = np.sqrt(np.diag(corr_fixed))
    corr_fixed = corr_fixed / np.outer(d, d)
    np.fill_diagonal(corr_fixed, 1.0)
    return corr_fixed

def get_tau(hist, x0):
    """Calculate tau based on historical severity of shock (Equation 3)"""
    p = np.mean(np.abs(hist) >= abs(x0))
    tau = min(max(round(p/0.05)*0.05, 0.10), 0.90)
    return tau

def quantile_predict_shocks(Xh, yh, xs):
    """Standard quantile regression for secondary factors (Equation 1)"""
    preds = []
    for x0 in xs:
        tau = get_tau(Xh, x0)
        qr = QuantileRegressor(quantile=tau, alpha=0)
        qr.fit(Xh.reshape(-1,1), yh)
        preds.append(qr.predict([[x0]])[0])
    return np.array(preds)

def quantile_autoreg_predict(Xh, yh, xs, horizon=3):
    """
    Quantile autoregression for factors with strong autocorrelation (Equation 2)
    - yh: factor levels (not shocks)
    - Xh: primary factor shocks
    - xs: new primary factor shocks to predict for
    - horizon: shock calibration horizon (e.g., 3 months)
    """
    # Estimate quantile autoregression model
    tau = get_tau(Xh, xs[0])  # Use first shock to determine tau
    
    # Prepare data for autoregression
    y_lagged = yh[:-1]  # Y_{t-1}
    X = Xh[1:]         # X_{Δt}
    y = yh[1:]         # Y_t
    
    # Fit quantile regression
    qr = QuantileRegressor(quantile=tau, alpha=0)
    qr.fit(np.column_stack([X, y_lagged]), y)
    
    # Predict shocks recursively
    preds = []
    for x0 in xs:
        # Start with current level
        y0 = yh[-1]
        y_pred = y0
        
        # Recursively predict h months ahead
        for _ in range(horizon):
            y_pred = qr.predict([[x0, y_pred]])[0]
        
        # Shock = Y_h - Y_0
        preds.append(y_pred - y0)
    
    return np.array(preds)

def safe_garch_fit(returns):
    """Apply GARCH filtering for marginal distributions"""
    try:
        m = arch_model(returns, vol='Garch', p=1, q=1)
        r = m.fit(disp='off')
        if r.converged:
            return r.resid
    except:
        pass
    return (returns - returns.mean()) / (returns.std() + 1e-8)

def filter_implausible_scenarios(primary_shocks, primary_levels):
    """
    Filter out implausible scenarios (e.g., negative interest rates)
    Paper: "filter out historical realizations of the U.S. Treasury bond yield shocks 
    that cause post-shock rate levels to be negative"
    """
    # Check if post-shock rates would be negative
    plausible_mask = np.ones(len(primary_shocks), dtype=bool)
    
    # For interest rate factors (UST10, TERM, UST3)
    rate_factors = ['UST10', 'TERM', 'UST3']
    for factor in rate_factors:
        if factor in primary_levels.columns:
            # Current level + shock <= 0 would be implausible
            implausible = (primary_levels[factor].iloc[-1] + primary_shocks[factor] <= 0)
            plausible_mask &= ~implausible.values
    
    return primary_shocks[plausible_mask]

# 1. Simulate primary factors with realistic interest rate dynamics
dates = pd.date_range("1990-01-01", "2025-06-30", freq="M")
n = len(dates)

# Simulate interest rate factors with realistic mean-reversion
ust10 = simulate_ou(0.025, 0.05, 0.005, n)  # 10-year Treasury yield
term = simulate_ou(0.005, 0.1, 0.003, n)    # Term spread
ust3 = ust10 - term                        # 3-month Treasury yield

# Ensure interest rates stay positive
ust10 = np.maximum(ust10, 0.001)
ust3 = np.maximum(ust3, 0.001)

primary = pd.DataFrame({
    "SP500": simulate_ou(0.002, 0.1, 0.05, n),
    "BAA_AAA": simulate_ou(0.02, 0.2, 0.01, n),
    "UST10": ust10,
    "TERM": term,
    "USD_EUR": simulate_ou(0, 0.1, 0.03, n),
    "ENERGY": simulate_ou(0.001, 0.1, 0.04, n),
    "METAL": simulate_ou(0.001, 0.1, 0.035, n),
    "GOLD": simulate_ou(0.001, 0.15, 0.06, n)
}, index=dates)

# Add 3-month Treasury yield
primary["UST3"] = ust3

# Calculate rolling 3-month shocks (paper: "rolling-window of three-month changes")
primary_shocks = pd.DataFrame(index=primary.index[3:], columns=primary.columns)
for col in primary.columns:
    primary_shocks[col] = primary[col].values[3:] - primary[col].values[:-3]

# Filter out implausible scenarios (e.g., negative interest rates)
primary_levels = primary.iloc[3:]  # Current levels corresponding to shocks
primary_shocks = filter_implausible_scenarios(primary_shocks, primary_levels)

# Display primary shocks
display(primary_shocks.head().style.format("{:.5f}").set_caption("Primary Factor 3-Month Shocks Sample"))

# 2. Simulate secondary factors with appropriate modeling choices
n_sec = 100
sec_names = [f"SEC{i+1}" for i in range(n_sec)]
secondary = pd.DataFrame(index=primary_shocks.index, columns=sec_names)

# Create secondary factors with different characteristics
for i, name in enumerate(sec_names):
    noise = np.random.normal(0, 0.02, len(primary_shocks))
    
    # Some secondary factors have strong autocorrelation (like volatility)
    if i % 10 == 0:  # Every 10th factor has strong autocorrelation
        # Simulate as levels with autocorrelation
        secondary[name] = 0.8 * secondary[name].shift(1).fillna(0.0) + 0.6 * primary_shocks["SP500"] + 0.4 * primary_shocks["TERM"] + noise
    else:
        # Standard secondary factors
        secondary[name] = 0.6 * primary_shocks["SP500"] + 0.4 * primary_shocks["TERM"] + noise

secondary_sim = pd.DataFrame(index=primary_shocks.index, columns=sec_names)
Xh = primary_shocks["SP500"].values

# Use appropriate model for each secondary factor
for i, name in enumerate(sec_names):
    if i % 10 == 0:  # Every 10th factor uses quantile autoregression
        # For autoregressive factors, we need the levels (not shocks)
        yh_levels = secondary[name].cumsum()  # Convert shocks to levels
        secondary_sim[name] = quantile_autoreg_predict(Xh, yh_levels.values, Xh)
    else:
        secondary_sim[name] = quantile_predict_shocks(Xh, secondary[name].values, Xh)

display(secondary_sim.iloc[:5,:5].describe().style.set_caption("Sample Secondary Factor Shocks Summary"))

# 3. Simulate remaining factors and build t-copula
n_rem = 500
rem_names = [f"REM{i+1}" for i in range(n_rem)]
rem_data = pd.DataFrame({name: safe_garch_fit(np.random.normal(0,0.01,n)) for name in rem_names}, index=dates)
rem_data = rem_data.loc[primary_shocks.index]
pits = rem_data.rank(pct=True).values
pits_ann = pd.DataFrame(pits, index=primary_shocks.index).groupby(primary_shocks.index.year).last()
df_t = 4
tq = t.ppf(np.clip(pits_ann.values,1e-6,1-1e-6),df=df_t)
rho = fix_corr_matrix(np.corrcoef(tq.T))

# 4. Joint scenario simulation
N = 1000
# Use rolling-window historical simulation approach from paper
idx = np.random.choice(len(primary_shocks)-30, N, replace=True)  # Avoid edge cases
idx = idx + 15  # Center the window
pri_sim = primary_shocks.iloc[idx].reset_index(drop=True)

sec_sim = secondary_sim.iloc[idx].reset_index(drop=True)
zs = np.random.standard_t(df=df_t, size=(N,n_rem))
L = np.linalg.cholesky(rho)
rem_sim = (zs @ L.T) * rem_data.std().values
scenarios = np.hstack([pri_sim.values, sec_sim.values, rem_sim])
all_names = list(primary_shocks.columns) + sec_names + rem_names

# 5. Synthetic PnL sensitivities & distributions
dv01 = pd.DataFrame(np.random.uniform(-10000,20000,(2,len(all_names))),
                    index=["Firm_A","Firm_B"], columns=all_names)
conv = pd.DataFrame(np.random.uniform(-50,50,(2,len(all_names))),
                    index=["Firm_A","Firm_B"], columns=all_names)
def pnl(f,sh):
    bps = sh*10000
    return -(dv01.loc[f].values*bps).sum() + 0.5*(conv.loc[f].values*(bps**2)).sum()

# === PnL Calculation Section ===
pnlA = np.array([pnl("Firm_A",scenarios[i]) for i in range(N)])/1e6
pnlB = np.array([pnl("Firm_B",scenarios[i]) for i in range(N)])/1e6

# PnL summary table
pnl_stats = pd.DataFrame({
    "Firm_A": pd.Series(pnlA).describe(),
    "Firm_B": pd.Series(pnlB).describe()
}).T.round(2)
display(pnl_stats.style.set_caption("PnL Distribution Statistics ($ Millions)"))

# PnL distributions plot
plt.figure(figsize=(8,4))
sns.kdeplot(pnlA, label="Firm_A", fill=True)
sns.kdeplot(pnlB, label="Firm_B", fill=True)
cutA,cutB = np.percentile(pnlA,1), np.percentile(pnlB,1)
plt.axvline(cutA, color='blue', linestyle='--', label="Firm_A 1% Tail")
plt.axvline(cutB, color='orange', linestyle='--', label="Firm_B 1% Tail")
plt.title("PnL Distributions with 1% Tail Thresholds")
plt.xlabel("PnL ($ Millions)")
plt.legend()
plt.tight_layout()
plt.show()

# 6. Identify material factors (~70% coverage)
# Paper uses standardized shocks (+200/-200 bps) for materiality assessment
material_factors = []
material_coverages = []

# Test both positive and negative standardized shocks as in the paper
for sign in [-1, 1]:
    standardized_shock = sign * 0.02  # 200 bps
    
    # Calculate PnL impacts with standardized shock
    standardized_impacts = {}
    for factor in all_names:
        idx = all_names.index(factor)
        # Create shock vector with only this factor shocked
        shock_vector = np.zeros(len(all_names))
        shock_vector[idx] = standardized_shock
        
        # Calculate PnL impact
        impact = pnl("Firm_A", shock_vector) + pnl("Firm_B", shock_vector)
        standardized_impacts[factor] = abs(impact)
    
    # Sort by impact magnitude
    sorted_impacts = sorted(standardized_impacts.items(), key=lambda x: x[1], reverse=True)
    
    # Calculate cumulative coverage
    total_impact = sum(standardized_impacts.values())
    cum_impact = 0
    factors = []
    
    for factor, impact in sorted_impacts:
        cum_impact += impact
        coverage = cum_impact / total_impact
        factors.append(factor)
        
        if coverage >= 0.7 and len(factors) >= 2:  # At least 2 factors
            break
    
    material_factors.extend(factors)
    material_coverages.extend([cum_impact/total_impact] * len(factors))

# Remove duplicates while preserving order
seen = set()
material = []
for factor in material_factors:
    if factor not in seen:
        seen.add(factor)
        material.append(factor)

# Get material indices
mat_idx = [all_names.index(m) for m in material]

mat_df = pd.DataFrame({
    "Material Factor": material,
    "Cum. Coverage": [material_coverages[material_factors.index(m)] for m in material]
})
display(mat_df.style.set_caption("Material Risk Factors (~70% DV01 Coverage)"))

# 7. Tail-loss scenarios
tail = np.unique(np.concatenate([np.where(pnlA<=cutA)[0], np.where(pnlB<=cutB)[0]]))
tail_mat = scenarios[tail][:,mat_idx]

# 8. Cluster analysis for tail scenarios
k = 2
km = KMeans(n_clusters=k,random_state=42).fit(tail_mat)
clusters = km.labels_
centers = km.cluster_centers_
representatives = []
for cl in range(k):
    mem = np.where(clusters==cl)[0]
    d = np.linalg.norm(tail_mat[mem]-centers[cl],axis=1)
    representatives.append(tail[mem[np.argmin(d)]])

print(f"Representative scenarios: {representatives}")

# Annotate silhouette scores for k=2..5
sil_scores = [silhouette_score(tail_mat, KMeans(n_clusters=i,random_state=42).fit_predict(tail_mat)) for i in range(2,6)]
sil_df = pd.DataFrame({"k":range(2,6),"silhouette":sil_scores})
display(sil_df.style.format({"silhouette":"{:.3f}"}).set_caption("Silhouette Scores by k"))

# 9. Plot optimal number of clusters (like paper)
plt.figure(figsize=(6,4))
plt.plot(range(2,6), sil_scores, marker='o', color='black')
plt.axvline(2, linestyle='--', color='grey')
plt.title("Optimal Number of Clusters for Interest Rate Shocks")
plt.xlabel("Number of clusters k")
plt.ylabel("Average silhouette width")
plt.xticks(range(2,6))
plt.tight_layout()
plt.show()

# 10. Cluster scatter plot
plt.figure(figsize=(6,5))
palette = {0:"tomato",1:"teal"}
for cl in range(k):
    pts = tail_mat[clusters==cl]
    plt.scatter(pts[:,0], pts[:,1], color=palette[cl], label=f"Cluster {cl+1}", alpha=0.6)
plt.scatter(centers[:,0], centers[:,1], marker="^", s=150, color="gold", label="Centers")
plt.axhline(0,color='black'); plt.axvline(0,color='black')
plt.xlabel(f"{material[0]} Shock")
plt.ylabel(f"{material[1]} Shock")
plt.title("Clusters of Tail-Loss Scenarios")
plt.legend()
plt.tight_layout()
plt.show()

# --- Other diagnostics and scenario tables ---
for i, rep in enumerate(representatives, 1):
    print(f"\nScenario {i}:")
    scenario_vals = pd.Series(scenarios[rep], index=all_names)
    mat_vals = scenario_vals[material].round(5)
    pnl_a_val = pnlA[rep]
    pnl_b_val = pnlB[rep]
    display(pd.DataFrame({
        "Material Factor Shock": mat_vals
    }))
    print(f"Firm_A PnL Impact: ${pnl_a_val:,.2f}M")
    print(f"Firm_B PnL Impact: ${pnl_b_val:,.2f}M")

# Primary shocks over time
plt.figure(figsize=(14, 6))
for col in primary_shocks.columns:
    plt.plot(primary_shocks.index, primary_shocks[col], lw=1.2, label=col)
plt.title("Primary Factor 3-Month Shocks Over Time")
plt.xlabel("Date")
plt.ylabel("Shock")
plt.legend()
plt.show()

# Histogram secondary factor shocks (first 5)
plt.figure(figsize=(10, 5))
secondary_sim.iloc[:, :5].hist(bins=30, alpha=0.7, layout=(1,5), figsize=(15,3))
plt.suptitle("Histogram of Example Secondary Factor Shocks")
plt.show()

# Bar plot: Scenario PnLs vs Tail Percentiles
plt.figure(figsize=(10,6))
data_to_plot = {
    'Scenario 1 Firm_A': pnlA[representatives[0]],
    'Scenario 1 Firm_B': pnlB[representatives[0]],
    'Scenario 2 Firm_A': pnlA[representatives[1]],
    'Scenario 2 Firm_B': pnlB[representatives[1]],
    'Firm_A 1% Tail': cutA,
    'Firm_A Median': np.median(pnlA),
    'Firm_A 99% Percentile': np.percentile(pnlA, 99),
    'Firm_B 1% Tail': cutB,
    'Firm_B Median': np.median(pnlB),
    'Firm_B 99% Percentile': np.percentile(pnlB, 99)
}
plt.bar(data_to_plot.keys(), data_to_plot.values(), color=sns.color_palette("Set1", 10))
plt.xticks(rotation=45, ha="right")
plt.ylabel("PnL ($ Millions)")
plt.title("Scenario PnLs Compared to Firm Percentiles")
plt.tight_layout()
plt.show()