In [2]:
import json
import random
from dataclasses import dataclass
from typing import Dict, Tuple
import numpy as np
import pandas as pd

In [3]:
RNG_SEED = 42
np.random.seed(RNG_SEED)
random.seed(RNG_SEED)

In [4]:
@dataclass
class Scenario:
    name: str
    equity_index: float   # e.g., -0.10 for -10%
    jgb_10y_bps: float    # e.g., +50 for +50 bps
    usd_jpy_change: float # e.g., -5.0 means JPY appreciation by 5 yen (USDJPY down 5)

In [5]:
def generate_mock_data(num_accounts: int = 20, num_instruments: int = 50, as_of_date: str = "2025-10-27"):
    """
    Generates mock instruments, accounts, holdings, exposures.
    Returns dict of DataFrames.
    """
    # Instruments universe: mix of equity (JPY), bond (JPY), foreign equity (USD)
    instrument_ids = [f"INS{1000+i}" for i in range(num_instruments)]
    types = np.random.choice(["equity", "bond", "equity_fx"], size=num_instruments, p=[0.5, 0.3, 0.2])
    currencies = np.where(types=="equity_fx", "USD", "JPY")
    names = [f"{t.upper()}_{i}" for i,t in zip(instrument_ids, types)]
    prices = np.round(np.random.uniform(500, 10000, size=num_instruments), 2)

    instruments = pd.DataFrame({
        "instrument_id": instrument_ids,
        "name": names,
        "type": types,
        "currency": currencies,
        "price": prices
    })

    # Factor exposures
    # equity: beta ~ N(1.0, 0.3)
    # bond: duration Uniform(2,8), convexity Uniform(0.1, 0.6)
    betas = np.clip(np.random.normal(1.0, 0.3, size=num_instruments), 0.2, 2.0)
    durations = np.random.uniform(2.0, 8.0, size=num_instruments)
    convexities = np.random.uniform(0.1, 0.6, size=num_instruments)

    exposures = []
    for ins_id, t, b, d, c in zip(instrument_ids, types, betas, durations, convexities):
        if t in ["equity", "equity_fx"]:
            exposures.append({"instrument_id": ins_id, "factor": "equity_beta", "value": float(b)})
        if t == "bond":
            exposures.append({"instrument_id": ins_id, "factor": "duration", "value": float(d)})
            exposures.append({"instrument_id": ins_id, "factor": "convexity", "value": float(c)})
    exposures = pd.DataFrame(exposures)

    # Accounts
    account_ids = [f"ACC{10000+i}" for i in range(num_accounts)]
    customers = [f"CUST{200+i}" for i in range(num_accounts)]
    account_currencies = np.random.choice(["JPY","JPY","JPY","USD"], size=num_accounts, p=[0.6,0.2,0.1,0.1])  # mostly JPY
    accounts = pd.DataFrame({
        "account_id": account_ids,
        "customer_id": customers,
        "currency": account_currencies,
        "status": ["active"]*num_accounts
    })

    # Holdings: each account holds between 6 and 15 names
    rows = []
    for acc in account_ids:
        n_hold = random.randint(6, 15)
        pick_idx = np.random.choice(range(num_instruments), size=n_hold, replace=False)
        for idx in pick_idx:
            ins = instruments.iloc[idx]
            qty = float(np.random.uniform(5, 200))
            rows.append({
                "account_id": acc,
                "instrument_id": ins["instrument_id"],
                "quantity": qty,
                "as_of_date": as_of_date
            })
    holdings = pd.DataFrame(rows)

    # FX snapshot: USDJPY level (for converting USD to JPY and vice versa)
    fx_snapshot = {
        "as_of_date": as_of_date,
        "USDJPY": 150.0  # base level
    }

    return {
        "instruments": instruments,
        "exposures": exposures,
        "accounts": accounts,
        "holdings": holdings,
        "fx": fx_snapshot
    }

In [6]:
def apply_scenario(data: Dict[str, pd.DataFrame], scenario: Scenario) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Apply shock scenario to all holdings and aggregate to account-level results.
    Returns (account_results, holding_results).
    """
    instruments = data["instruments"].copy()
    exposures = data["exposures"].copy()
    accounts = data["accounts"].copy()
    holdings = data["holdings"].copy()
    fx = data["fx"]

    # Merge holdings with instrument info
    h = holdings.merge(instruments, on="instrument_id", how="left")
    h = h.merge(exposures.pivot_table(index="instrument_id", columns="factor", values="value", aggfunc="first").reset_index(), on="instrument_id", how="left")

    # Base prices and values in local currency
    h["base_price"] = h["price"]
    h["base_value_local"] = h["quantity"] * h["base_price"]

    # Compute local price shock by type
    # Equity: dP/P = beta * equity_index
    equity_mask = h["type"].isin(["equity", "equity_fx"])
    h.loc[equity_mask, "dP_over_P_local"] = h.loc[equity_mask, "equity_beta"].fillna(1.0) * scenario.equity_index

    # Bond: dP/P = -Dur * dy + 0.5 * Conv * dy^2 (dy in absolute)
    bond_mask = h["type"] == "bond"
    dy = scenario.jgb_10y_bps / 10000.0
    h.loc[bond_mask, "dP_over_P_local"] = (
        - h.loc[bond_mask, "duration"].fillna(0.0) * dy
        + 0.5 * h.loc[bond_mask, "convexity"].fillna(0.0) * (dy ** 2)
    )

    # Default zero move for others
    h["dP_over_P_local"] = h["dP_over_P_local"].fillna(0.0)

    # Shocked local price & value
    h["shocked_price_local"] = h["base_price"] * (1.0 + h["dP_over_P_local"])
    h["shocked_value_local"] = h["quantity"] * h["shocked_price_local"]

    # FX conversion to account currency:
    USDJPY_base = fx["USDJPY"]
    USDJPY_shocked = USDJPY_base + scenario.usd_jpy_change  # e.g., -5 means JPY strengthens (USDJPY down)
    USDJPY_shocked = max(USDJPY_shocked, 0.0001)  # guard

    def convert_value(row, value_col: str, shocked: bool) -> float:
        ins_ccy = row["currency"]
        acc_ccy = row["account_currency"]
        if ins_ccy == acc_ccy:
            return row[value_col]
        if ins_ccy == "USD" and acc_ccy == "JPY":
            rate = USDJPY_shocked if shocked else USDJPY_base
            return row[value_col] * rate
        if ins_ccy == "JPY" and acc_ccy == "USD":
            rate = USDJPY_shocked if shocked else USDJPY_base
            return row[value_col] / rate
        # Other pairs omitted in PoC
        return row[value_col]

    # attach account currency
    acc_ccy_map = dict(zip(accounts["account_id"], accounts["currency"]))
    h["account_currency"] = h["account_id"].map(acc_ccy_map)

    # Convert base & shocked values into account currency
    h["base_value_acc"] = h.apply(lambda r: convert_value(r, "base_value_local", shocked=False), axis=1)
    h["shocked_value_acc"] = h.apply(lambda r: convert_value(r, "shocked_value_local", shocked=True), axis=1)

    # Factor contributions via isolated shocks (equity-only, rate-only, fx-only)
    h_eq = h.copy()
    h_eq["dP_over_P_local_eq"] = 0.0
    h_eq.loc[equity_mask, "dP_over_P_local_eq"] = h_eq.loc[equity_mask, "equity_beta"].fillna(1.0) * scenario.equity_index
    h_eq["shocked_value_local_eq"] = h_eq["quantity"] * h_eq["base_price"] * (1.0 + h_eq["dP_over_P_local_eq"])
    h_eq["value_acc_eq"] = h_eq.apply(lambda r: convert_value(r, "shocked_value_local_eq", shocked=False), axis=1)  # FX unchanged for pure equity factor

    h_rt = h.copy()
    h_rt["dP_over_P_local_rt"] = 0.0
    h_rt.loc[bond_mask, "dP_over_P_local_rt"] = (
        - h_rt.loc[bond_mask, "duration"].fillna(0.0) * dy
        + 0.5 * h_rt.loc[bond_mask, "convexity"].fillna(0.0) * (dy ** 2)
    )
    h_rt["shocked_value_local_rt"] = h_rt["quantity"] * h_rt["base_price"] * (1.0 + h_rt["dP_over_P_local_rt"])
    h_rt["value_acc_rt"] = h_rt.apply(lambda r: convert_value(r, "shocked_value_local_rt", shocked=False), axis=1)  # FX unchanged for pure rate factor

    h_fx = h.copy()
    h_fx["value_acc_fx"] = h_fx.apply(lambda r: convert_value(r, "base_value_local", shocked=True), axis=1)

    # Aggregate to account
    grp_cols = ["account_id", "account_currency"]
    base_sum = h.groupby(grp_cols)["base_value_acc"].sum().rename("base_value")
    shocked_sum = h.groupby(grp_cols)["shocked_value_acc"].sum().rename("shocked_value")

    eq_only = h_eq.groupby(grp_cols)["value_acc_eq"].sum().rename("value_eq_only")
    rt_only = h_rt.groupby(grp_cols)["value_acc_rt"].sum().rename("value_rt_only")
    fx_only = h_fx.groupby(grp_cols)["value_acc_fx"].sum().rename("value_fx_only")

    results = pd.concat([base_sum, shocked_sum, eq_only, rt_only, fx_only], axis=1).reset_index()
    results["pnl_abs"] = results["shocked_value"] - results["base_value"]
    results["pnl_pct"] = np.where(results["base_value"] != 0, results["pnl_abs"] / results["base_value"], np.nan)

    results["contrib_eq"] = results["value_eq_only"] - results["base_value"]
    results["contrib_rt"] = results["value_rt_only"] - results["base_value"]
    results["contrib_fx"] = results["value_fx_only"] - results["base_value"]

    # Scale contributions to match total PnL (handles cross-terms approx.)
    contrib_sum = results[["contrib_eq","contrib_rt","contrib_fx"]].sum(axis=1)
    mismatch = results["pnl_abs"] - contrib_sum
    weight = results[["contrib_eq","contrib_rt","contrib_fx"]].abs().sum(axis=1).replace(0.0, np.nan)
    for col in ["contrib_eq", "contrib_rt", "contrib_fx"]:
        results[col] = results[col] + (results[col].abs()/weight).fillna(1/3) * mismatch

    # Holding-level details
    h_detail = h.copy()
    h_detail["pnl_abs"] = h_detail["shocked_value_acc"] - h_detail["base_value_acc"]
    h_detail["pnl_pct"] = np.where(h_detail["base_value_acc"] != 0, h_detail["pnl_abs"]/h_detail["base_value_acc"], np.nan)

    return results, h_detail

In [7]:
data = generate_mock_data(num_accounts=30, num_instruments=80, as_of_date="2025-10-27")

In [8]:
scenario = Scenario(
    name="Equity -10%, Rates +50bp, JPY +5 (USDJPY -5)",
    equity_index=-0.10,
    jgb_10y_bps=50.0,
    usd_jpy_change=-5.0
)

In [9]:
account_results, holding_results = apply_scenario(data, scenario)

In [10]:
accounts = data["accounts"][["account_id","customer_id","currency"]].rename(columns={"currency":"account_currency"})
account_results = account_results.merge(accounts, on=["account_id","account_currency"], how="left")
