In [1]:
!pip install numpy pandas matplotlib seaborn




In [2]:
import os
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Optional but helpful
try:
    import seaborn as sns
    _HAS_SEABORN = True
except Exception:
    _HAS_SEABORN = False


def sigmoid(x):
    return 1.0 / (1.0 + np.exp(-x))


def pareto_rvs(rng, s_min, xi, size):
    # Tail form: P(S > s) = (s_min / s)^xi for s >= s_min
    u = rng.random(size=size)
    return s_min * (u ** (-1.0 / xi))


def cvar(x, q=0.99):
    x = np.asarray(x)
    threshold = np.quantile(x, q)
    tail = x[x >= threshold]
    if len(tail) == 0:
        return float(threshold)
    return float(tail.mean())


def bootstrap_iqr_ratio(rng, x, B=400):
    # IQR divided by median for a statistic, here mean of sampled years
    x = np.asarray(x)
    n = len(x)
    stats = np.empty(B)
    for b in range(B):
        idx = rng.integers(0, n, size=n)
        stats[b] = float(np.mean(x[idx]))
    q25, q75 = np.quantile(stats, [0.25, 0.75])
    med = np.median(stats)
    return float((q75 - q25) / (med + 1e-12))


def simulate_losses(
    rng,
    N=200,
    Y=5000,
    mu_e=0.0,
    sigma_e=1.0,
    sigma_u=0.3,
    alpha=-2.5,
    beta_e=1.0,
    beta_c=-0.6,
    s_min=1.0,
    xi=1.3,
    p=0.05,
    delta=1.0,
    dxi=0.3,
    gamma0=-2.0,
    gammas=0.8,
    gammac=0.3,
    regime="C",
    backstop_share=0.0
):
    e = rng.lognormal(mean=mu_e, sigma=sigma_e, size=N)
    c = rng.normal(loc=0.0, scale=1.0, size=N)
    u = rng.normal(loc=0.0, scale=sigma_u, size=N)

    loglam = alpha + beta_e * np.log(e) + beta_c * c + u

    # True annual losses per organization
    L_true = np.zeros((N, Y), dtype=float)

    # Observed annual loss proxy per organization under reporting regime
    # This is not used as true loss, but it affects the estimation precision proxy
    L_obs = np.zeros((N, Y), dtype=float)

    Zs = rng.binomial(1, p, size=Y)

    # Regime controls visibility via intercept shift
    if regime == "A":
        g0_shift = -2.0
    elif regime == "B":
        g0_shift = 1.0
    elif regime == "C":
        g0_shift = 1.0
    else:
        raise ValueError("regime must be A, B, or C")

    for t in range(Y):
        Z = int(Zs[t])
        lam_t = np.exp(loglam + Z * delta)
        K = rng.poisson(lam_t)

        xi_t = xi + Z * dxi

        for i in range(N):
            k = int(K[i])
            if k == 0:
                continue

            S = pareto_rvs(rng, s_min=s_min, xi=xi_t, size=k)
            loss_true = float(np.sum(S))

            # Apply systemic facility backstop only in shock state
            if Z == 1 and backstop_share > 0:
                loss_true = (1.0 - backstop_share) * loss_true

            L_true[i, t] = loss_true

            # Reporting selection to define what is observed
            report_p = sigmoid((gamma0 + g0_shift) + gammas * np.log(S) + gammac * c[i])
            R = rng.binomial(1, report_p, size=k)
            if np.any(R == 1):
                L_obs[i, t] = float(np.sum(S[R == 1]))
            else:
                L_obs[i, t] = 0.0

    return e, c, L_true, L_obs


def compute_metrics(rng, e, c, L_true, L_obs, q=0.99):
    N = L_true.shape[0]
    M = np.zeros(N, dtype=float)
    prem_unc = np.zeros(N, dtype=float)

    # Use L_obs to model estimation uncertainty (proxy for data availability)
    # Use L_true to compute solvency relevant tail multiple
    for i in range(N):
        li_true = L_true[i]
        mu_true = float(np.mean(li_true))
        tail_true = cvar(li_true, q=q)
        M[i] = float(tail_true / (mu_true + 1e-12))

        li_obs = L_obs[i]
        prem_unc[i] = bootstrap_iqr_ratio(rng, li_obs, B=300)

    premium_proxy = np.mean(L_true, axis=1)

    lo = float(np.mean(premium_proxy[c <= np.quantile(c, 0.25)]))
    hi = float(np.mean(premium_proxy[c >= np.quantile(c, 0.75)]))
    discount = float(1.0 - (hi / (lo + 1e-12)))

    out = {
        "median_premium_uncertainty_ratio": float(np.median(prem_unc)),
        "median_capital_load_proxy_multiple": float(np.median(M)),
        "premium_discount_strong_controls": float(discount),
        "exclusion_rate_T6": float(np.mean(M > 6.0)),
        "M": M,
        "prem_unc": prem_unc,
        "premium_proxy": premium_proxy
    }
    return out


def make_tables_for_section_7(seed=0, out_dir="outputs"):
    os.makedirs(out_dir, exist_ok=True)
    rng = np.random.default_rng(seed)

    # Table 1 style comparison across regimes
    rows = []
    for regime in ["A", "B", "C"]:
        e, c, L_true, L_obs = simulate_losses(
            rng=np.random.default_rng(seed + 10),
            regime=regime,
            p=0.05,
            xi=1.3,
            backstop_share=0.0
        )
        metrics = compute_metrics(np.random.default_rng(seed + 20), e, c, L_true, L_obs, q=0.99)
        rows.append({
            "Data regime": regime,
            "Premium uncertainty ratio": metrics["median_premium_uncertainty_ratio"],
            "Capital load proxy multiple": metrics["median_capital_load_proxy_multiple"],
            "Premium discount from strong controls": metrics["premium_discount_strong_controls"]
        })

    df1 = pd.DataFrame(rows)
    df1.to_csv(os.path.join(out_dir, "table_1.csv"), index=False)
    with open(os.path.join(out_dir, "table_1.tex"), "w", encoding="utf8") as f:
        f.write(df1.to_latex(index=False, float_format=lambda x: f"{x:.3f}"))

    # Table 2 style backstop variation under regime C
    rows2 = []
    for b in [0.0, 0.3, 0.6]:
        e, c, L_true, L_obs = simulate_losses(
            rng=np.random.default_rng(seed + 30),
            regime="C",
            p=0.05,
            xi=1.3,
            backstop_share=b
        )
        metrics = compute_metrics(np.random.default_rng(seed + 40), e, c, L_true, L_obs, q=0.99)
        rows2.append({
            "Backstop share": b,
            "Capital load proxy multiple": metrics["median_capital_load_proxy_multiple"],
            "Exclusion rate T equals 6": metrics["exclusion_rate_T6"]
        })

    df2 = pd.DataFrame(rows2)
    df2.to_csv(os.path.join(out_dir, "table_2.csv"), index=False)
    with open(os.path.join(out_dir, "table_2.tex"), "w", encoding="utf8") as f:
        f.write(df2.to_latex(index=False, float_format=lambda x: f"{x:.3f}"))

    return df1, df2


def stress_test_grid(seed=0, out_dir="outputs", regime="C", backstop_share=0.0, T=6.0):
    os.makedirs(out_dir, exist_ok=True)
    rng = np.random.default_rng(seed)

    ps = [0.0, 0.01, 0.02, 0.05, 0.10]
    xis = [1.1, 1.3, 1.6, 2.0]

    records = []
    for p in ps:
        for xi in xis:
            e, c, L_true, L_obs = simulate_losses(
                rng=np.random.default_rng(seed + int(1000 * p) + int(100 * xi)),
                regime=regime,
                p=p,
                xi=xi,
                backstop_share=backstop_share
            )
            metrics = compute_metrics(np.random.default_rng(seed + 999), e, c, L_true, L_obs, q=0.99)
            M = metrics["M"]
            records.append({
                "p": p,
                "xi": xi,
                "median_M": float(np.median(M)),
                "exclusion_rate": float(np.mean(M > T)),
                "median_premium_unc": metrics["median_premium_uncertainty_ratio"]
            })

    df = pd.DataFrame(records)
    df.to_csv(os.path.join(out_dir, f"stress_grid_regime_{regime}.csv"), index=False)
    return df


def plot_heatmap(df, value_col, title, out_path):
    pivot = df.pivot(index="xi", columns="p", values=value_col)
    plt.figure(figsize=(8, 4.8))
    if _HAS_SEABORN:
        ax = sns.heatmap(pivot, annot=True, fmt=".2f", cmap="viridis")
    else:
        ax = plt.gca()
        im = ax.imshow(pivot.values, aspect="auto")
        ax.set_xticks(range(len(pivot.columns)))
        ax.set_xticklabels([str(x) for x in pivot.columns])
        ax.set_yticks(range(len(pivot.index)))
        ax.set_yticklabels([str(x) for x in pivot.index])
        plt.colorbar(im, ax=ax)

    ax.set_title(title)
    ax.set_xlabel("Systemic shock probability p")
    ax.set_ylabel("Tail index xi")
    plt.tight_layout()
    plt.savefig(out_path, dpi=220)
    plt.close()


def plot_exclusion_boundary_curve(seed=0, out_dir="outputs", regime="C", xi=1.3, backstop_share=0.0, T=6.0):
    os.makedirs(out_dir, exist_ok=True)

    ps = np.linspace(0.0, 0.12, 13)
    rates = []
    medMs = []

    for p in ps:
        e, c, L_true, L_obs = simulate_losses(
            rng=np.random.default_rng(seed + int(10000 * p)),
            regime=regime,
            p=float(p),
            xi=float(xi),
            backstop_share=backstop_share
        )
        metrics = compute_metrics(np.random.default_rng(seed + 777), e, c, L_true, L_obs, q=0.99)
        rates.append(float(np.mean(metrics["M"] > T)))
        medMs.append(float(np.median(metrics["M"])))

    plt.figure(figsize=(7.5, 4.5))
    plt.plot(ps, rates, marker="o", label="Exclusion rate")
    plt.axhline(0.5, color="gray", linewidth=1.0)
    plt.title("Exclusion rate as a function of shock probability")
    plt.xlabel("Systemic shock probability p")
    plt.ylabel("Fraction above exclusion threshold")
    plt.ylim(-0.02, 1.02)
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.savefig(os.path.join(out_dir, "fig_exclusion_boundary_curve.png"), dpi=220)
    plt.close()

    out = pd.DataFrame({"p": ps, "exclusion_rate": rates, "median_M": medMs})
    out.to_csv(os.path.join(out_dir, "exclusion_boundary_curve.csv"), index=False)
    return out


def write_appendix_equations_tex(out_dir="outputs"):
    os.makedirs(out_dir, exist_ok=True)

    tex = r"""
\section{Appendix A: Stochastic loss model and reporting regimes}

This appendix states the full simulator specification used for the stress tests in Section 7.

\subsection{Organization variables}
For organization $i \in \{1,\ldots,N\}$, exposure and control are

$$

e_i \sim \mathrm{LogNormal}(\mu_e,\sigma_e), \qquad c_i \sim \mathcal{N}(0,1),

$$
and an organization random effect may be included

$$

u_i \sim \mathcal{N}(0,\sigma_u).

$$

\subsection{Incident frequency}
Annual incident count is

$$

K_i \sim \mathrm{Poisson}(\lambda_i), \qquad \log \lambda_i = \alpha + \beta_e \log e_i + \beta_c c_i + u_i.

$$

\subsection{Systemic shock correlation}
A systemic shock indicator is drawn each year:

$$

Z \sim \mathrm{Bernoulli}(p).

$$
In shock years, frequency increases by $\delta$:

$$

\log \lambda_i \leftarrow \log \lambda_i + Z\delta.

$$

\subsection{Severity}
Each incident has heavy tailed severity:

$$

S_{ij} \sim \mathrm{Pareto}(s_{\min}, \xi + Z \Delta_{\xi}).

$$
Total annual loss is

$$

L_i = \sum_{j=1}^{K_i} S_{ij}.

$$

\subsection{Reporting and selection}
Visibility of an incident is

$$

R_{ij} \sim \mathrm{Bernoulli}(\pi_{ij}),
\qquad
\pi_{ij} = \sigma(\gamma_0 + \gamma_s \log S_{ij} + \gamma_c c_i),

$$
where $\sigma(\cdot)$ is the logistic sigmoid. Reporting regimes modify the effective intercept to model public only versus confidential reporting.

\subsection{Systemic facility backstop}
When the systemic facility pays share $b$ of losses in shock years, net loss becomes

$$

L_i^{\mathrm{net}} =
\begin{cases}
(1-b)L_i & \text{if } Z=1,\\
L_i & \text{if } Z=0.
\end{cases}

$$

\subsection{Capital load proxy multiple}
Let $q$ be a high percentile. Define conditional value at risk:

$$

\mathrm{CVaR}_q(L_i) = \mathbb{E}[L_i \mid L_i \ge \mathrm{VaR}_q(L_i)].

$$
Define the capital load proxy multiple:

$$

M_i = \frac{\mathrm{CVaR}_q(L_i)}{\mathbb{E}[L_i]}.

$$

\subsection{Exclusion boundary}
Given threshold $T$, insurance collapse into exclusions is operationalized as

$$

M_i > T.

$$
The exclusion rate is the fraction of organizations with $M_i > T$.
"""
    path = os.path.join(out_dir, "appendix_equations.tex")
    with open(path, "w", encoding="utf8") as f:
        f.write(tex.strip() + "\n")
    return path


def write_mermaid_files(out_dir="outputs"):
    os.makedirs(out_dir, exist_ok=True)

    mermaids = {}

    mermaids["mermaid_fig_1_system_overview.mmd"] = r"""
mindmap
  root((Insurance mediated inclusion for frontier AI governance))
    Core objective
      Include non state actors
      Incentivise safer development and deployment
      Support international agreement stability
    Data layer
      Exposure data
        Deployment channels
        Tool access
        Update cadence
        Usage volume bins
      Control data
        Evaluations
        Monitoring and logging
        Access restrictions
        Incident response capability
      Incident data
        Hazard class
        Severity
        Discovery channel
        Remediation action
      Data governance
        Confidential reporting
        Aggregation releases
        Audit triggers
    Insurance and liability
      Underwriting incentives
        Premium gradients
        Control requirements
      Liability shaping
        Negligence standards
        Safe harbor for reporting
      Systemic risk handling
        Pooling facility
        Reinsurance layer
    International agreement hooks
      Qualified coverage requirement
      Mutual recognition
      Procurement linkage
      Supply chain leverage
""".strip()

    mermaids["mermaid_fig_2_inclusion_mechanism.mmd"] = r"""
flowchart TB
  A[International agreement commitments] --> B[Domestic implementing rules]
  B --> C[Qualified coverage requirement for frontier scale training or deployment]
  C --> D[Non state actors seek coverage]
  D --> E[Insurers require reporting and controls]
  E --> F[Standardised risk data submission]
  E --> G[Baseline control adoption]
  C --> H[Market access conditions]
  H --> I[Public procurement eligibility]
  H --> J[Regulated sector contracting]
  H --> K[Cross border cloud contracting norms]
  F --> L[Confidential data trust or insurer consortium]
  L --> M[Aggregate hazard statistics]
  L --> N[Audit triggers and dispute resolution]
  M --> O[Pricing improves and premiums reward controls]
  N --> P[Enforcement through coverage suspension]
""".strip()

    mermaids["mermaid_fig_3_data_flow_confidentiality.mmd"] = r"""
flowchart LR
  A[Organization logs and internal reports] --> B[Local summarisation and validation]
  B --> C[Confidential submission package]
  C --> D[Insurer underwriting view]
  C --> E[Regulator or treaty auditor view]
  C --> F[Public aggregate release]
  D --> G[Premium setting and control requirements]
  E --> H[Compliance checks and targeted audits]
  F --> I[Hazard trend monitoring]
  J[Safe harbor legal protection] -.-> C
  K[Contractual penalties for misreporting] -.-> C
""".strip()

    mermaids["mermaid_fig_4_liability_insurance_interaction.mmd"] = r"""
flowchart TD
  A[Incident occurs] --> B{Organization reports promptly}
  B -->|Yes| C[Safe harbor pathway]
  B -->|No| D[Adverse inference pathway]
  C --> E[Reduced punitive damages]
  C --> F[Lower premium increase]
  C --> G[Eligibility for systemic facility support]
  D --> H[Higher expected damages]
  D --> I[Higher premiums or non renewal]
  D --> J[Regulatory escalation]
  K[Baseline controls present] --> C
  K --> D
""".strip()

    mermaids["mermaid_fig_5_stress_workflow.mmd"] = r"""
flowchart TD
  A[Choose stress parameters p and xi] --> B[Run synthetic loss simulator]
  B --> C[Compute capital load proxy distribution M]
  C --> D[Select exclusion threshold T]
  D --> E{Is M greater than T}
  E -->|Yes| F[Insurance retreats into exclusions]
  E -->|No| G[Insurance remains viable]
  F --> H[Governance response]
  H --> I[Strengthen reporting and audit requirements]
  H --> J[Add systemic risk facility or reinsurance layer]
  G --> K[Governance response]
  K --> L[Require qualified coverage in agreement]
  K --> M[Condition premiums on baseline controls]
  K --> N[Use safe harbor to reward reporting]
""".strip()

    mermaids["mermaid_fig_6_systemic_facility_design.mmd"] = r"""
flowchart TB
  A[Systemic facility trigger definition] --> B[Event classification and verification]
  B --> C[Facility pays share of systemic losses]
  C --> D[Insurers maintain broad coverage]
  C --> E[Premiums remain stable]
  F[Eligibility conditions] --> G[Participation in reporting standard]
  F --> H[Baseline control compliance]
  F --> I[Audit cooperation]
  G --> C
  H --> C
  I --> C
  J[Moral hazard controls] --> K[Deductibles and co insurance]
  J --> L[Penalties for misreporting]
  J --> M[Rate adjustments after repeated negligence]
""".strip()

    mermaids["mermaid_fig_7_mutual_recognition_map.mmd"] = r"""
flowchart LR
  A[Signatory state A rules] --> B[Qualified insurer criteria]
  A --> C[Qualified auditor criteria]
  A --> D[Liability safe harbor statute]
  E[Signatory state B rules] --> B
  E --> C
  E --> D
  B --> F[Mutual recognition list]
  C --> F
  F --> G[Cross border coverage portability]
  F --> H[Cross border audit acceptance]
  G --> I[Non state actors comply to access markets]
  H --> I
""".strip()

    mermaids["mermaid_fig_8_control_incentive_gradient.mmd"] = r"""
flowchart TD
  A[Standardised exposure and control data] --> B[Underwriting risk differentiation]
  B --> C[Premium gradients]
  C --> D[Adoption of baseline controls]
  D --> E[Lower incident frequency and severity]
  E --> F[Lower capital load and broader coverage]
  F --> C
""".strip()

    for name, text in mermaids.items():
        with open(os.path.join(out_dir, name), "w", encoding="utf8") as f:
            f.write(text + "\n")

    return list(mermaids.keys())


def main():
    out_dir = "outputs"
    os.makedirs(out_dir, exist_ok=True)

    # Tables for Section 7 discussion, but you can include them in appendix if you prefer
    df1, df2 = make_tables_for_section_7(seed=0, out_dir=out_dir)

    # Stress grid for regime C and figures for main Section 7
    df_grid = stress_test_grid(seed=0, out_dir=out_dir, regime="C", backstop_share=0.0, T=6.0)

    # Figures for Section 7
    plot_heatmap(
        df_grid,
        value_col="median_M",
        title="Capital load proxy multiple across shock probability and tail index",
        out_path=os.path.join(out_dir, "fig_capital_load_heatmap.png")
    )
    plot_heatmap(
        df_grid,
        value_col="exclusion_rate",
        title="Exclusion rate across shock probability and tail index with threshold T equals 6",
        out_path=os.path.join(out_dir, "fig_exclusion_rate_heatmap.png")
    )
    plot_exclusion_boundary_curve(seed=0, out_dir=out_dir, regime="C", xi=1.3, backstop_share=0.0, T=6.0)

    # Appendix equations
    write_appendix_equations_tex(out_dir=out_dir)

    # Mermaid files
    write_mermaid_files(out_dir=out_dir)

    # Save run metadata for reproducibility
    meta = {
        "seed": 0,
        "outputs_dir": out_dir,
        "notes": "Synthetic simulator for governance stress testing. See appendix_equations.tex for model."
    }
    with open(os.path.join(out_dir, "run_metadata.json"), "w", encoding="utf8") as f:
        json.dump(meta, f, indent=2)

    print("Done. Outputs written to:", out_dir)
    print("Main text figures: fig_capital_load_heatmap.png, fig_exclusion_rate_heatmap.png, fig_exclusion_boundary_curve.png")
    print("Appendix equations: appendix_equations.tex")
    print("Mermaid diagrams: mermaid_fig_*.mmd")


if __name__ == "__main__":
    main()

Done. Outputs written to: outputs
Main text figures: fig_capital_load_heatmap.png, fig_exclusion_rate_heatmap.png, fig_exclusion_boundary_curve.png
Appendix equations: appendix_equations.tex
Mermaid diagrams: mermaid_fig_*.mmd
