# Reconciliation #10: End-to-End Pipeline

## Overview

This notebook validates the **complete workflow** from configuration through
simulation to analysis, verifying that key cross-module invariants hold at
every stage of the pipeline.

**Pipeline stages tested:**

1. **Config** -- All required fields populated, no NaN/None in critical parameters  
2. **Manufacturer** -- Initial balance sheet balances (A = L + E)  
3. **Losses** -- Generated losses have positive amounts, correct types  
4. **Insurance** -- Recoveries do not exceed losses for any event  
5. **Simulation** -- Ruined paths stay at zero, no negative time steps  
6. **Analysis** -- Time-average growth <= ensemble-average growth (non-ergodicity)  
7. **Metrics** -- VaR(99%) > VaR(95%) > VaR(90%) (monotonicity)  
8. **Final** -- Ruin probability in [0, 1], growth rates finite, no NaN in results

## Prerequisites

- `ergodic_insurance` package installed
- All core modules importable

## Runtime target

< 60 seconds (200 paths, 10-year horizon)

## Audience

Developers and actuaries validating framework correctness.

In [None]:
# --- Setup and Imports ---
import sys
import os
import time
import warnings

import numpy as np
import pandas as pd

# Ensure helpers are importable
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath("__file__")), ""))
sys.path.insert(0, os.path.dirname(os.path.abspath(".")))

from _reconciliation_helpers import (
    ReconciliationChecker,
    final_summary,
    section_header,
    notebook_header,
    timed_cell,
    fmt_dollar,
    display_df,
    create_standard_manufacturer,
    create_standard_loss_generator,
    create_simple_insurance_program,
)

# Core framework imports
from ergodic_insurance.config import Config, ManufacturerConfig
from ergodic_insurance.manufacturer import WidgetManufacturer
from ergodic_insurance.loss_distributions import ManufacturingLossGenerator, LossEvent
from ergodic_insurance.insurance_program import InsuranceProgram
from ergodic_insurance.simulation import Simulation, SimulationResults
from ergodic_insurance.ergodic_analyzer import ErgodicAnalyzer
from ergodic_insurance.risk_metrics import RiskMetrics

# Suppress deprecation warnings from legacy InsurancePolicy usage inside framework
warnings.filterwarnings("ignore", category=DeprecationWarning)

# Fixed random seed for reproducibility
SEED = 42
np.random.seed(SEED)

# Pipeline parameters
N_PATHS = 200
TIME_HORIZON = 10

print(f"Pipeline parameters: {N_PATHS} paths, {TIME_HORIZON}-year horizon, seed={SEED}")

In [None]:
notebook_header(
    10,
    "End-to-End Pipeline",
    "Validates the complete workflow from configuration through simulation to analysis, "
    "checking cross-module invariants at every stage.",
)

---
## Stage 1: Configuration

Verify that a manufacturing profile configuration has all required fields
populated with valid (non-NaN, non-None) values in critical parameters.

In [None]:
section_header("Stage 1: Configuration")
chk_config = ReconciliationChecker("Configuration Checks")

with timed_cell("Config"):
    # Create a manufacturing-profile configuration
    config = Config.from_company(
        initial_assets=10_000_000,
        operating_margin=0.10,
        industry="manufacturing",
        tax_rate=0.25,
        growth_rate=0.05,
        time_horizon_years=TIME_HORIZON,
    )

    # Check required sections exist
    chk_config.check(
        config.manufacturer is not None,
        "ManufacturerConfig present",
    )
    chk_config.check(
        config.simulation is not None,
        "SimulationConfig present",
    )
    chk_config.check(
        config.growth is not None,
        "GrowthConfig present",
    )

    # Check critical numeric fields are not NaN or None
    mfg_cfg = config.manufacturer
    chk_config.check(
        mfg_cfg.initial_assets > 0 and not np.isnan(mfg_cfg.initial_assets),
        "initial_assets > 0 and finite",
        f"initial_assets={mfg_cfg.initial_assets:,.0f}",
    )
    chk_config.check(
        0 < mfg_cfg.base_operating_margin < 1,
        "base_operating_margin in (0, 1)",
        f"margin={mfg_cfg.base_operating_margin}",
    )
    chk_config.check(
        0 < mfg_cfg.tax_rate < 1,
        "tax_rate in (0, 1)",
        f"tax_rate={mfg_cfg.tax_rate}",
    )
    chk_config.check(
        mfg_cfg.asset_turnover_ratio > 0,
        "asset_turnover_ratio > 0",
        f"turnover={mfg_cfg.asset_turnover_ratio}",
    )
    chk_config.check(
        0 < mfg_cfg.retention_ratio <= 1,
        "retention_ratio in (0, 1]",
        f"retention={mfg_cfg.retention_ratio}",
    )

    # Validate completeness (framework method)
    issues = config.validate_completeness()
    chk_config.check(
        len(issues) == 0,
        "Config validates as complete",
        f"issues={issues}" if issues else "No issues",
    )

    print(f"  Config: {mfg_cfg.initial_assets:,.0f} initial assets, "
          f"{mfg_cfg.base_operating_margin:.0%} margin, {TIME_HORIZON}yr horizon")

chk_config.display_results()

---
## Stage 2: Manufacturer

Create the manufacturer and verify the initial balance sheet equation
A = L + E holds.

In [None]:
section_header("Stage 2: Manufacturer")
chk_mfg = ReconciliationChecker("Manufacturer Checks")

with timed_cell("Manufacturer"):
    manufacturer = create_standard_manufacturer(
        initial_assets=10_000_000,
        asset_turnover=0.8,
        operating_margin=0.10,
        tax_rate=0.25,
        retention_ratio=0.70,
    )

    # Balance sheet equation: A = L + E
    total_assets = float(manufacturer.total_assets)
    total_liabilities = float(manufacturer.total_liabilities)
    equity = float(manufacturer.equity)

    chk_mfg.assert_close(
        total_assets,
        total_liabilities + equity,
        tol=1.0,  # $1 tolerance for rounding
        message="Balance sheet: A = L + E",
        label_actual="Assets",
        label_expected="L + E",
    )

    chk_mfg.check(
        total_assets > 0,
        "Total assets > 0",
        f"assets={fmt_dollar(total_assets)}",
    )
    chk_mfg.check(
        equity > 0,
        "Initial equity > 0",
        f"equity={fmt_dollar(equity)}",
    )

    # Verify initial assets match config
    chk_mfg.assert_close(
        total_assets,
        10_000_000,
        tol=1.0,
        message="Initial assets match config ($10M)",
        label_actual="Actual assets",
        label_expected="Config target",
    )

    print(f"  Assets: {fmt_dollar(total_assets)}")
    print(f"  Liabilities: {fmt_dollar(total_liabilities)}")
    print(f"  Equity: {fmt_dollar(equity)}")
    print(f"  A - (L + E) = {fmt_dollar(total_assets - total_liabilities - equity)}")

chk_mfg.display_results()

---
## Stage 3: Loss Generation

Generate losses and verify all amounts are positive and types are valid.

In [None]:
section_header("Stage 3: Loss Generation")
chk_loss = ReconciliationChecker("Loss Generation Checks")

with timed_cell("Loss Generation"):
    loss_gen = ManufacturingLossGenerator.create_simple(
        frequency=3.0,
        severity_mean=150_000,
        severity_std=100_000,
        seed=SEED,
    )

    # Generate losses for a 10-year period
    all_losses, loss_stats = loss_gen.generate_losses(
        duration=10.0,
        revenue=12_000_000,
    )

    valid_loss_types = {"attritional", "large", "catastrophic", "extreme", "operational"}

    chk_loss.check(
        len(all_losses) > 0,
        "At least one loss generated",
        f"n_losses={len(all_losses)}",
    )

    # Check all loss amounts are positive
    all_positive = all(loss.amount > 0 for loss in all_losses)
    chk_loss.check(
        all_positive,
        "All loss amounts are positive",
        f"n_losses={len(all_losses)}",
    )

    # Check all loss types are valid
    actual_types = set(loss.loss_type for loss in all_losses)
    types_valid = actual_types.issubset(valid_loss_types)
    chk_loss.check(
        types_valid,
        "All loss types are recognized",
        f"types={actual_types}",
    )

    # Check loss times are within the generation window
    times_valid = all(0 <= loss.time <= 10.0 for loss in all_losses)
    chk_loss.check(
        times_valid,
        "All loss times in [0, 10]",
        f"min_t={min(l.time for l in all_losses):.2f}, "
        f"max_t={max(l.time for l in all_losses):.2f}",
    )

    # Check statistics dict is populated
    chk_loss.check(
        loss_stats["total_losses"] == len(all_losses),
        "Statistics dict matches generated count",
        f"stats_count={loss_stats['total_losses']}, actual={len(all_losses)}",
    )

    total_loss_amount = sum(l.amount for l in all_losses)
    print(f"  Generated {len(all_losses)} losses over 10 years")
    print(f"  Total loss amount: {fmt_dollar(total_loss_amount)}")
    print(f"  Average per loss: {fmt_dollar(total_loss_amount / max(len(all_losses), 1))}")
    print(f"  Types: {actual_types}")

chk_loss.display_results()

---
## Stage 4: Insurance

Create an insurance program and verify that recoveries never exceed losses.

In [None]:
section_header("Stage 4: Insurance")
chk_ins = ReconciliationChecker("Insurance Checks")

with timed_cell("Insurance"):
    insurance = InsuranceProgram.simple(
        deductible=50_000,
        limit=5_000_000,
        rate=0.025,
    )

    premium = insurance.calculate_premium()
    total_coverage = insurance.get_total_coverage()

    chk_ins.check(
        premium > 0,
        "Premium is positive",
        f"premium={fmt_dollar(premium)}",
    )
    chk_ins.check(
        total_coverage > 0,
        "Total coverage is positive",
        f"coverage={fmt_dollar(total_coverage)}",
    )

    # Process each loss event and verify recovery <= loss
    max_ratio = 0.0
    violations = 0
    for loss in all_losses:
        result = insurance.process_claim(loss.amount)
        recovery = result.insurance_recovery
        if recovery > loss.amount + 0.01:  # small tolerance
            violations += 1
        if loss.amount > 0:
            ratio = recovery / loss.amount
            max_ratio = max(max_ratio, ratio)
    # Reset insurance state after test
    insurance.reset_annual()

    chk_ins.check(
        violations == 0,
        "No recovery exceeds loss amount",
        f"violations={violations}/{len(all_losses)}, max_ratio={max_ratio:.4f}",
    )

    # Recovery ratio must be <= 1.0
    chk_ins.check(
        max_ratio <= 1.0 + 1e-9,
        "Max recovery/loss ratio <= 1.0",
        f"max_ratio={max_ratio:.6f}",
    )

    # Deductible check: claims below deductible should have zero recovery
    small_claim_result = insurance.process_claim(10_000)  # Below $50K deductible
    chk_ins.check(
        small_claim_result.insurance_recovery == 0.0,
        "Claim below deductible gets zero recovery",
        f"recovery={small_claim_result.insurance_recovery} for $10K claim",
    )
    insurance.reset_annual()

    print(f"  Annual premium: {fmt_dollar(premium)}")
    print(f"  Total coverage: {fmt_dollar(total_coverage)}")
    print(f"  Tested {len(all_losses)} claims, max recovery ratio: {max_ratio:.4f}")

chk_ins.display_results()

---
## Stage 5: Simulation

Run N_PATHS simulations and verify:
- Ruined paths stay at zero equity after ruin year
- No negative equity before ruin (equity goes to zero, not negative, on ruin)

In [None]:
section_header("Stage 5: Simulation")
chk_sim = ReconciliationChecker("Simulation Checks")

with timed_cell("Simulation"):
    sim_results_list = []
    equity_trajectories = []
    n_ruined = 0
    ruin_post_zero_violations = 0

    for i in range(N_PATHS):
        # Create fresh manufacturer for each path
        mfg = create_standard_manufacturer(
            initial_assets=10_000_000,
            asset_turnover=0.8,
            operating_margin=0.10,
            tax_rate=0.25,
            retention_ratio=0.70,
        )

        loss_gen_i = ManufacturingLossGenerator.create_simple(
            frequency=3.0,
            severity_mean=150_000,
            severity_std=100_000,
            seed=SEED + i,
        )

        # Create a fresh insurance program for each path
        ins_i = InsuranceProgram.simple(
            deductible=50_000,
            limit=5_000_000,
            rate=0.025,
        )

        sim = Simulation(
            manufacturer=mfg,
            loss_generator=loss_gen_i,
            insurance_policy=ins_i,
            time_horizon=TIME_HORIZON,
            seed=SEED + i,
        )
        result = sim.run()
        sim_results_list.append(result)
        equity_trajectories.append(result.equity.copy())

        # Check if ruined
        if result.insolvency_year is not None:
            n_ruined += 1
            # Verify post-ruin equity stays at zero
            ruin_yr = result.insolvency_year
            if ruin_yr + 1 < TIME_HORIZON:
                post_ruin_equity = result.equity[ruin_yr + 1:]
                if np.any(post_ruin_equity != 0):
                    ruin_post_zero_violations += 1

    print(f"  Completed {N_PATHS} simulations")
    print(f"  Ruined paths: {n_ruined}/{N_PATHS} ({100*n_ruined/N_PATHS:.1f}%)")

    # Check: all simulations produced results
    chk_sim.check(
        len(sim_results_list) == N_PATHS,
        f"All {N_PATHS} simulations completed",
        f"completed={len(sim_results_list)}",
    )

    # Check: ruined paths stay at zero
    chk_sim.check(
        ruin_post_zero_violations == 0,
        "Ruined paths stay at zero equity after ruin",
        f"violations={ruin_post_zero_violations}",
    )

    # Check: each result has correct time horizon length
    correct_lengths = all(len(r.years) == TIME_HORIZON for r in sim_results_list)
    chk_sim.check(
        correct_lengths,
        f"All results have {TIME_HORIZON} time steps",
    )

    # Check: no NaN in equity trajectories (NaN in ROE is acceptable for ruined paths)
    nan_equity_count = sum(
        1 for r in sim_results_list if np.any(np.isnan(r.equity))
    )
    chk_sim.check(
        nan_equity_count == 0,
        "No NaN values in equity trajectories",
        f"paths_with_nan_equity={nan_equity_count}",
    )

    # Check: surviving paths have positive final equity
    surviving = [r for r in sim_results_list if r.insolvency_year is None]
    if len(surviving) > 0:
        all_positive_final = all(r.equity[-1] > 0 for r in surviving)
        chk_sim.check(
            all_positive_final,
            "All surviving paths have positive final equity",
            f"n_surviving={len(surviving)}",
        )
    else:
        chk_sim.check(True, "(No surviving paths to check)", "All paths ruined")

chk_sim.display_results()

---
## Stage 6: Ergodic Analysis

Compare time-average growth versus ensemble-average growth. For multiplicative
dynamics with volatility, theory predicts:

**time-average growth rate <= ensemble-average growth rate**

This is a hallmark of non-ergodicity.

In [None]:
section_header("Stage 6: Ergodic Analysis")
chk_ergodic = ReconciliationChecker("Ergodic Analysis Checks")

with timed_cell("Ergodic Analysis"):
    analyzer = ErgodicAnalyzer()

    # Compute time-average growth for each surviving path
    surviving_results = [r for r in sim_results_list if r.insolvency_year is None]
    surviving_equity = [r.equity for r in surviving_results]

    time_avg_growths = []
    for traj in surviving_equity:
        g = analyzer.calculate_time_average_growth(traj)
        if np.isfinite(g):
            time_avg_growths.append(g)

    mean_time_avg = np.mean(time_avg_growths) if time_avg_growths else float("-inf")

    # Compute ensemble average of final equity, then derive growth
    # Ensemble average: E[X(T)] across all paths at terminal time
    if len(surviving_equity) > 0:
        ensemble_stats = analyzer.calculate_ensemble_average(
            surviving_equity,
            metric="growth_rate",
        )
        ensemble_avg_growth = ensemble_stats["mean"]
    else:
        ensemble_avg_growth = 0.0

    print(f"  Surviving paths: {len(surviving_results)}/{N_PATHS}")
    print(f"  Mean time-average growth: {mean_time_avg:.6f}")
    print(f"  Ensemble-average growth:  {ensemble_avg_growth:.6f}")

    # Key ergodic check: time-avg <= ensemble-avg for volatile multiplicative process
    # Allow small tolerance for finite-sample noise (especially with only 200 paths)
    tolerance = 0.05  # generous tolerance for finite sample
    chk_ergodic.check(
        mean_time_avg <= ensemble_avg_growth + tolerance,
        "Time-avg growth <= ensemble-avg growth (non-ergodicity)",
        f"time_avg={mean_time_avg:.6f}, ensemble_avg={ensemble_avg_growth:.6f}, "
        f"gap={ensemble_avg_growth - mean_time_avg:.6f}",
    )

    # Both growth rates should be finite
    chk_ergodic.check(
        np.isfinite(mean_time_avg),
        "Mean time-average growth is finite",
        f"value={mean_time_avg:.6f}",
    )
    chk_ergodic.check(
        np.isfinite(ensemble_avg_growth),
        "Ensemble-average growth is finite",
        f"value={ensemble_avg_growth:.6f}",
    )

chk_ergodic.display_results()

---
## Stage 7: Risk Metrics

Compute VaR at multiple confidence levels and verify monotonicity:

**VaR(99%) > VaR(95%) > VaR(90%)**

In [None]:
section_header("Stage 7: Risk Metrics")
chk_risk = ReconciliationChecker("Risk Metrics Checks")

with timed_cell("Risk Metrics"):
    # Collect total claim amounts per path as our "loss" distribution
    total_claims_per_path = np.array([
        float(np.sum(r.claim_amounts)) for r in sim_results_list
    ])

    # Filter to paths with non-zero claims for RiskMetrics
    # (RiskMetrics requires non-empty array)
    losses_for_metrics = total_claims_per_path[total_claims_per_path > 0]

    if len(losses_for_metrics) > 0:
        rm = RiskMetrics(losses_for_metrics, seed=SEED)

        var_90 = rm.var(0.90)
        var_95 = rm.var(0.95)
        var_99 = rm.var(0.99)
        tvar_95 = rm.tvar(0.95)

        print(f"  VaR(90%): {fmt_dollar(var_90)}")
        print(f"  VaR(95%): {fmt_dollar(var_95)}")
        print(f"  VaR(99%): {fmt_dollar(var_99)}")
        print(f"  TVaR(95%): {fmt_dollar(tvar_95)}")

        # Monotonicity check: VaR(99%) >= VaR(95%) >= VaR(90%)
        chk_risk.check(
            var_99 >= var_95,
            "VaR(99%) >= VaR(95%) (monotonicity)",
            f"VaR99={fmt_dollar(var_99)}, VaR95={fmt_dollar(var_95)}",
        )
        chk_risk.check(
            var_95 >= var_90,
            "VaR(95%) >= VaR(90%) (monotonicity)",
            f"VaR95={fmt_dollar(var_95)}, VaR90={fmt_dollar(var_90)}",
        )

        # TVaR >= VaR (coherent risk measure property)
        chk_risk.check(
            tvar_95 >= var_95,
            "TVaR(95%) >= VaR(95%) (coherence)",
            f"TVaR={fmt_dollar(tvar_95)}, VaR={fmt_dollar(var_95)}",
        )

        # All metrics should be finite and non-negative
        chk_risk.check(
            all(np.isfinite([var_90, var_95, var_99, tvar_95])),
            "All risk metrics are finite",
        )
        chk_risk.check(
            all(v >= 0 for v in [var_90, var_95, var_99, tvar_95]),
            "All risk metrics are non-negative",
        )
    else:
        chk_risk.check(False, "Insufficient loss data for risk metrics", "No non-zero claims")

chk_risk.display_results()

---
## Stage 8: Final Cross-Cutting Checks

Verify global invariants across the entire pipeline output:
- Ruin probability in [0, 1]
- Growth rates are finite
- No NaN in final equity values

In [None]:
section_header("Stage 8: Final Cross-Cutting Checks")
chk_final = ReconciliationChecker("Final Invariant Checks")

with timed_cell("Final Checks"):
    # Ruin probability in [0, 1]
    ruin_prob = n_ruined / N_PATHS
    chk_final.assert_in_range(
        ruin_prob,
        0.0,
        1.0,
        message="Ruin probability in [0, 1]",
    )

    # Growth rates are finite for surviving paths
    growth_rates_finite = all(np.isfinite(g) for g in time_avg_growths)
    chk_final.check(
        growth_rates_finite,
        "All surviving-path growth rates are finite",
        f"n_finite={sum(np.isfinite(g) for g in time_avg_growths)}/{len(time_avg_growths)}",
    )

    # No NaN in final equity across all results
    final_equities = np.array([r.equity[-1] for r in sim_results_list])
    chk_final.check(
        not np.any(np.isnan(final_equities)),
        "No NaN in final equity values",
        f"n_nan={np.sum(np.isnan(final_equities))}",
    )

    # No NaN in revenue arrays
    nan_revenue = sum(
        1 for r in sim_results_list if np.any(np.isnan(r.revenue))
    )
    chk_final.check(
        nan_revenue == 0,
        "No NaN in revenue arrays",
        f"paths_with_nan_revenue={nan_revenue}",
    )

    # No NaN in assets arrays
    nan_assets = sum(
        1 for r in sim_results_list if np.any(np.isnan(r.assets))
    )
    chk_final.check(
        nan_assets == 0,
        "No NaN in assets arrays",
        f"paths_with_nan_assets={nan_assets}",
    )

    # Summary stats are computable (no exceptions)
    try:
        sample_stats = sim_results_list[0].summary_stats()
        stats_ok = isinstance(sample_stats, dict) and len(sample_stats) > 0
    except Exception as e:
        stats_ok = False
    chk_final.check(
        stats_ok,
        "summary_stats() returns valid dict",
        f"n_keys={len(sample_stats) if stats_ok else 0}",
    )

    # Mean final equity is finite
    mean_final_equity = np.mean(final_equities)
    chk_final.check(
        np.isfinite(mean_final_equity),
        "Mean final equity is finite",
        f"mean={fmt_dollar(mean_final_equity)}",
    )

    print(f"  Ruin probability: {ruin_prob:.2%}")
    print(f"  Mean final equity: {fmt_dollar(mean_final_equity)}")
    print(f"  Surviving paths: {len(surviving_results)}/{N_PATHS}")

chk_final.display_results()

---
## Summary

In [None]:
final_summary(
    chk_config,
    chk_mfg,
    chk_loss,
    chk_ins,
    chk_sim,
    chk_ergodic,
    chk_risk,
    chk_final,
)