# Dynamic CIB (simulation-first)\n
\n
This notebook demonstrates a minimal multi-period CIB simulation with a cyclic descriptor and an ensemble summary.

In [None]:
import matplotlib.pyplot as plt

from cib import (
    DynamicCIB,
    DynamicVisualizer,
    UncertainCIBMatrix,
    numeric_quantile_timelines,
    state_probability_timelines,
)
from cib.example_data import (
    DATASET_B5_CONFIDENCE,
    DATASET_B5_DESCRIPTORS,
    DATASET_B5_IMPACTS,
    DATASET_B5_INITIAL_SCENARIO,
    DATASET_B5_NUMERIC_MAPPING,
    dataset_b5_cyclic_descriptors,
    dataset_b5_threshold_rule_fast_permitting,
    seeds_for_run,
)
from cib.core import Scenario
from cib.scoring import scenario_diagnostics
from cib.shocks import ShockModel

periods = [2025, 2030, 2035, 2040, 2045]
descriptor = "Electrification_Demand"

matrix = UncertainCIBMatrix(DATASET_B5_DESCRIPTORS)
matrix.set_impacts(DATASET_B5_IMPACTS, confidence=DATASET_B5_CONFIDENCE)

dyn = DynamicCIB(matrix, periods=periods)
for cd in dataset_b5_cyclic_descriptors():
    dyn.add_cyclic_descriptor(cd)

dyn.add_threshold_rule(dataset_b5_threshold_rule_fast_permitting())

# Non-Gaussian dynamic shocks are configured (fat tails + rare jumps).
# A moderate run count is used for interactive notebook execution.
n_runs = 200
base_seed = 123
paths = []
for m in range(n_runs):
    seeds = seeds_for_run(base_seed, m)

    sm = ShockModel(matrix)
    sm.add_dynamic_shocks(
        periods=periods,
        tau=0.26,
        rho=0.6,
        innovation_dist="student_t",
        innovation_df=5.0,
        jump_prob=0.02,
        jump_scale=0.70,
    )
    dynamic_shocks = sm.sample_dynamic_shocks(seeds["dynamic_shock_seed"])

    # A workshop-style assumption is used: uncertainty is increased toward the longer-term future.
    sigma_by_period = {int(t): 1.0 + 0.85 * i for i, t in enumerate(periods)}

    paths.append(
        dyn.simulate_path(
            initial=DATASET_B5_INITIAL_SCENARIO,
            seed=seeds["dynamic_shock_seed"],
            dynamic_shocks_by_period=dynamic_shocks,
            judgment_sigma_scale_by_period=sigma_by_period,
            structural_sigma=0.15,
            structural_seed_base=seeds["structural_shock_seed"],
            equilibrium_mode="relax_unshocked",
        )
    )


def _hamming_distance(a: dict[str, str], b: dict[str, str]) -> int:
    """A simple Hamming distance is computed between two state dictionaries."""
    return sum(1 for k in a.keys() if a.get(k) != b.get(k))


# Scenario quality diagnostics are printed.
print("Scenario diagnostics (initial):")
print(scenario_diagnostics(Scenario(DATASET_B5_INITIAL_SCENARIO, matrix), matrix))

final_period_idx = len(periods) - 1
print("Scenario diagnostics (final period; first 10 runs):")
for i, p in enumerate(paths[:10]):
    s_realised = p.scenarios[final_period_idx]
    s_equilibrium = None
    if p.equilibrium_scenarios is not None:
        s_equilibrium = p.equilibrium_scenarios[final_period_idx]

    d_realised = scenario_diagnostics(s_realised, matrix)
    if s_equilibrium is not None:
        d_equilibrium = scenario_diagnostics(s_equilibrium, matrix)
        h = _hamming_distance(s_realised.to_dict(), s_equilibrium.to_dict())
        print(
            f"  run={i} realised_consistent={d_realised.is_consistent} realised_margin={d_realised.consistency_margin:.3f} "
            f"equilibrium_consistent={d_equilibrium.is_consistent} equilibrium_margin={d_equilibrium.consistency_margin:.3f} "
            f"hamming(realised,equilibrium)={h}"
        )
    else:
        print(
            f"  run={i} realised_consistent={d_realised.is_consistent} realised_margin={d_realised.consistency_margin:.3f}"
        )

# Ensemble summaries are computed.

timelines = state_probability_timelines(paths, scenario_mode="realized")
timelines_equilibrium = state_probability_timelines(paths, scenario_mode="equilibrium")
quantiles = numeric_quantile_timelines(
    paths,
    descriptor=descriptor,
    numeric_mapping=DATASET_B5_NUMERIC_MAPPING[descriptor],
    quantiles=(0.05, 0.5, 0.95),
    scenario_mode="realized",
)

mapping = DATASET_B5_NUMERIC_MAPPING[descriptor]
expectation = {
    int(t): sum(float(p) * float(mapping[s]) for s, p in timelines[t][descriptor].items())
    for t in timelines
}

DynamicVisualizer.plot_descriptor_stochastic_summary(
    timelines=timelines,
    quantiles_by_period=quantiles,
    numeric_expectation_by_period=expectation,
    descriptor=descriptor,
    title="Electrification_Demand (realised): probability bands + fan + spaghetti",
    spaghetti_paths=paths,
    spaghetti_numeric_mapping=mapping,
    spaghetti_max_runs=200,
)
plt.tight_layout()
plt.show()


# Research-facing plots are produced: equilibrium probabilities and realised–equilibrium gap.

key_descriptors = ["Policy_Stringency", "Grid_Flexibility", "Electrification_Demand"]

fig, axs = plt.subplots(nrows=2, ncols=2, figsize=(12, 7))
axs = axs.reshape(-1)

DynamicVisualizer.plot_state_probability_bands(
    timelines=timelines_equilibrium,
    descriptor=key_descriptors[0],
    ax=axs[0],
    title=f"{key_descriptors[0]} (equilibrium probabilities)",
)
DynamicVisualizer.plot_state_probability_bands(
    timelines=timelines_equilibrium,
    descriptor=key_descriptors[1],
    ax=axs[1],
    title=f"{key_descriptors[1]} (equilibrium probabilities)",
)
DynamicVisualizer.plot_state_probability_bands(
    timelines=timelines_equilibrium,
    descriptor=key_descriptors[2],
    ax=axs[2],
    title=f"{key_descriptors[2]} (equilibrium probabilities)",
)

# The mean Hamming distance is computed per period across runs.
h_by_period = {int(t): [] for t in periods}
for p in paths:
    if p.equilibrium_scenarios is None:
        continue
    for t, s_realised, s_equilibrium in zip(
        p.periods, p.scenarios, p.equilibrium_scenarios
    ):
        h_by_period[int(t)].append(
            _hamming_distance(s_realised.to_dict(), s_equilibrium.to_dict())
        )

xs = [int(t) for t in periods]
means = [float(sum(h_by_period[t]) / max(1, len(h_by_period[t]))) for t in xs]
axs[3].plot(xs, means, linewidth=2.0)
axs[3].set_xticks(xs)
axs[3].set_xlabel("Period")
axs[3].set_ylabel("Mean Hamming distance")
axs[3].set_title("Realised–equilibrium gap (mean per period)")

plt.tight_layout()
plt.show()

In [None]:
# A hybrid branching pathway graph is built (enumeration when feasible).

from cib import BranchingPathwayBuilder

builder = BranchingPathwayBuilder(
    base_matrix=matrix,
    periods=periods,
    initial=DATASET_B5_INITIAL_SCENARIO,
    cyclic_descriptors=dataset_b5_cyclic_descriptors(),
    threshold_rules=[dataset_b5_threshold_rule_fast_permitting()],
    # Enumeration is used only when the scenario space is sufficiently small.
    # Note: if enumeration mode is selected, structural_sigma and
    # judgment_sigma_scale_by_period are ignored by design.
    # To ensure uncertainty is applied, decrease max_states_to_enumerate to force
    # sampling mode.
    max_states_to_enumerate=2000,
    n_transition_samples=120,
    max_nodes_per_period=40,
    base_seed=123,
    # Shock settings are mirrored from the Monte Carlo simulation above.
    structural_sigma=0.15,
    judgment_sigma_scale_by_period=sigma_by_period,
    dynamic_tau=0.26,
    dynamic_rho=0.6,
    dynamic_innovation_dist="student_t",
    dynamic_innovation_df=5.0,
    dynamic_jump_prob=0.02,
    dynamic_jump_scale=0.70,
)

branching = builder.build(top_k=10)

plt.figure(figsize=(12, 4))
DynamicVisualizer.plot_pathway_tree(
    periods=branching.periods,
    scenarios_by_period=branching.scenarios_by_period,
    edges=branching.edges,
    top_paths=branching.top_paths,
    key_descriptors=["Policy_Stringency", "Grid_Flexibility", "Electrification_Demand"],
    title="Branching pathway tree (top paths only)",
    min_edge_weight=0.03,
)
plt.tight_layout()
plt.show()

print("Top paths:")
for idxs, w in branching.top_paths:
    print(f"  weight={w:.3f}  nodes={idxs}")

In [None]:
# Network analysis examples.

from collections import Counter

from cib import NetworkAnalyzer, ScenarioVisualizer

print("\nNetwork analysis (base matrix; aggregated impacts):")
net = NetworkAnalyzer(matrix)
centrality = net.compute_centrality_measures()
top = sorted(
    ((d, m["degree"], m["betweenness"]) for d, m in centrality.items()),
    key=lambda x: x[1],
    reverse=True,
)
for d, deg, bet in top[:10]:
    print(f"  {d}: degree={deg:.3f} betweenness={bet:.3f}")

# Scenario network (final period): show enough mass to represent the Monte Carlo distribution.
# A "mean scenario" is not well-defined for categorical states.
# This plot scales node sizes by scenario frequency.
final_all = [p.scenarios[final_period_idx] for p in paths]
counts = Counter(final_all)

coverage_target = 0.85
max_nodes = 30
min_nodes = 8
min_count = 2

selected = []
covered = 0
for s, n in counts.most_common():
    if int(n) < int(min_count):
        break
    if len(selected) >= max_nodes:
        break
    selected.append(s)
    covered += int(n)
    if len(selected) >= min_nodes and (covered / len(final_all)) >= float(coverage_target):
        break

if len(selected) < min_nodes:
    selected = [s for s, _n in counts.most_common(min_nodes)]
    covered = sum(int(counts[s]) for s in selected)

final_scenarios = selected
weights = {s: float(counts[s]) for s in final_scenarios}
period_label = int(periods[final_period_idx])

print(
    f"\nScenario network (period {period_label}): plotting {len(final_scenarios)} scenarios "
    f"covering {covered}/{len(final_all)} runs (target={coverage_target:.0%}, min_count={min_count})."
)

print("\nPlotted scenario definitions (node label -> count -> state assignment):")
for i, s in enumerate(final_scenarios):
    c = int(counts[s])
    print(f"  S{i}  count={c}  scenario={s.to_dict()}")

try:
    import os

    import cib.example_data as mod

    package_dir = os.path.dirname(os.path.dirname(mod.__file__))
    results_dir = os.path.join(package_dir, "results")
    os.makedirs(results_dir, exist_ok=True)
    out_path = os.path.join(results_dir, "example_dynamic_cib_final_scenarios.txt")
    with open(out_path, "w", encoding="utf-8") as f:
        f.write(f"Final-period scenario network definitions (period={period_label}).\n")
        f.write(f"Selected scenarios: {len(final_scenarios)}.\n")
        f.write(f"Coverage: {covered}/{len(final_all)}.\n\n")
        for i, s in enumerate(final_scenarios):
            c = int(counts[s])
            f.write(f"S{i}  count={c}  scenario={s.to_dict()}\n")
    print(f"\nSaved scenario definitions to results/{os.path.basename(out_path)}")
except Exception as e:
    print(f"\nCould not save scenario definitions: {e}")

if len(final_scenarios) >= 2:
    plt.figure(figsize=(11, 8))
    ScenarioVisualizer.scenario_network(
        final_scenarios,
        matrix,
        layout="spring",
        edge_metric="hamming",
        max_edges_per_node=5,
        show_transitions=False,
        node_weights=weights,
        label_counts=True,
    )
    plt.tight_layout()
    plt.show()
else:
    print("Not enough distinct final-period scenarios to plot a scenario network.")
