# CTMC simulations

In [2]:
"""CTMC simulator
================
Simulates each CTMC (generated by *ctmc_generator_absorbing.py*) under a
*discrete-time* approximation with Δt = 1 hour.  All λ rates are converted to
per-step transition probabilities on the fly.

Major fixes over the previous draft
----------------------------------
* **Rates → probabilities**: uses the exponential formula so the per-step
action matches the underlying CTMC.
* **Spatial contagion**: probability derived from λ instead of treating λ as a
  probability directly.
* **Pathlib everywhere**: works in notebooks and scripts alike.
* **RNG reproducibility**: single `random.Random(SEED)` instance.
* **Neighbour sets**: duplicates removed.
* **Clean plotting**: x-axis labelled "Time (h)", y-axis capped at total
  number of nodes.

Feel free to adjust `N_STEPS`, `DT_HOURS`, or `SEED`.
"""

from __future__ import annotations

import math
import os
import random
from collections import defaultdict
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

# ── PARAMETERS ────────────────────────────────────────────────────────────
N_STEPS: int = 100          # number of time steps (hours)
N_RUNS: int = 100           # Monte-Carlo replications
DT_HOURS: float = 1.0       # discrete time slice (h)
INITIAL_ATTACK_CHANCE = 0.01
DECAY_FACTOR = 0.995        # spatial λ decay per step while attacker lives
ATTACKER_LIFESPAN = 15      # steps
SEED: int = 42              # RNG seed for reproducibility

# ── STATES ───────────────────────────────────────────────────────────────
states: list[str] = [
    "Normal",
    "Compromised",
    "Down",
    "Mitigated",
    "Recovered",  # absorbing
]

# ── DIRECTORIES ──────────────────────────────────────────────────────────
BASE_DIR = Path.cwd()
ctmc_dir = BASE_DIR / "csv" / "ctmcs"
output_csv_dir = BASE_DIR / "csv" / "simulations"
output_plot_dir = BASE_DIR / "plots" / "ctmc_results"
output_csv_dir.mkdir(parents=True, exist_ok=True)
output_plot_dir.mkdir(parents=True, exist_ok=True)

# ── UTILITY FUNCTIONS ────────────────────────────────────────────────────

def sample_next_state(
    current: str,
    rate_list: list[tuple[str, float]],
    rng: random.Random,
    dt: float = DT_HOURS,
) -> str:
    """Return a destination state sampled from exponential rates.

    If *rate_list* is empty → returns *current* (absorbing).
    """
    if not rate_list:
        return current

    total_rate = sum(lam for _, lam in rate_list if lam is not None)
    if total_rate == 0:
        return current

    # Probability of *any* jump out of the source state within Δt.
    p_leave = 1.0 - math.exp(-total_rate * dt)

    # Probabilities to each destination conditional on leaving.
    probs = [(dst, p_leave * lam / total_rate) for dst, lam in rate_list if lam]

    # Add stay-probability.
    probs.append((current, 1.0 - p_leave))

    dsts, weights = zip(*probs)
    return rng.choices(dsts, weights=weights)[0]


def contagion_probability(
    n_infected_neighbors: int,
    lam: float,
    decay: float,
    dt: float = DT_HOURS,
) -> float:
    """Probability that at least one infection attempt succeeds during Δt."""
    if n_infected_neighbors == 0:
        return 0.0
    effective_rate = lam * n_infected_neighbors * decay
    return 1.0 - math.exp(-effective_rate * dt)


# ── MAIN SIMULATION LOOP ─────────────────────────────────────────────────
rng = random.Random(SEED)
all_histories: dict[str, pd.DataFrame] = {}

for ctmc_file in ctmc_dir.glob("ctmc_*.csv"):
    ctmc_name = ctmc_file.stem.replace("ctmc_", "")
    print(f"Simulating CTMC for class: {ctmc_name}")

    df = pd.read_csv(ctmc_file)
    if df.empty:
        print(f"⚠️  Skipping {ctmc_name}: empty CTMC file")
        continue

    # ── Build structures ────────────────────────────────────────────────
    nodes = sorted({src.split("|")[0] for src in df["source"]})
    n_nodes = len(nodes)

    # Internal transitions as (node, src_state) → list[(dst_state, λ)]
    internal_transitions: dict[tuple[str, str], list[tuple[str, float]]] = defaultdict(list)
    for _, row in df[df["type"] == "internal"].iterrows():
        src_node, src_state = row["source"].split("|")
        _, dst_state = row["target"].split("|")
        internal_transitions[(src_node, src_state)].append((dst_state, row["rate"]))

    # Neighbour sets for spatial contagion
    neighbors: dict[str, set[str]] = defaultdict(set)
    spatial_subset = df[df["type"] == "spatial"]
    for _, row in spatial_subset.iterrows():
        src_hex, _ = row["source"].split("|")
        tgt_hex, _ = row["target"].split("|")
        neighbors[src_hex].add(tgt_hex)

    spatial_rate = spatial_subset["rate"].iloc[0] if not spatial_subset.empty else 0.01

    # ── Accumulate averages over runs ───────────────────────────────────
    history_acc = np.zeros((N_STEPS, len(states)), dtype=float)

    for _ in range(N_RUNS):
        # Initial states
        current_state = {h: "Normal" for h in nodes}
        initially_infected = rng.sample(nodes, k=int(n_nodes * INITIAL_ATTACK_CHANCE))
        for h in initially_infected:
            current_state[h] = "Compromised"

        spatial_decay = 1.0
        run_history = []

        for t in range(N_STEPS):
            next_state: dict[str, str] = {}

            # Internal transitions
            for h in nodes:
                src_state = current_state[h]
                rate_list = internal_transitions.get((h, src_state), [])
                next_state[h] = sample_next_state(src_state, rate_list, rng)

            # Spatial contagion (while attacker active)
            if t < ATTACKER_LIFESPAN:
                for h in nodes:
                    if current_state[h] == "Normal":
                        inf_neighbors = [n for n in neighbors[h] if current_state.get(n) == "Compromised"]
                        p_inf = contagion_probability(len(inf_neighbors), spatial_rate, spatial_decay)
                        if rng.random() < p_inf:
                            next_state[h] = "Compromised"
                spatial_decay *= DECAY_FACTOR

            current_state = next_state

            # Tally state counts
            counts = defaultdict(int)
            for s in current_state.values():
                counts[s] += 1
            run_history.append([counts[s] for s in states])

        history_acc += np.asarray(run_history)

    # ── Aggregate & save ────────────────────────────────────────────────
    avg_history = history_acc / N_RUNS
    df_hist = pd.DataFrame(avg_history, columns=states)
    df_hist.index.name = "time"
    df_hist.to_csv(output_csv_dir / f"simulation_{ctmc_name}.csv")

    # ── Plot ────────────────────────────────────────────────────────────
    plt.figure(figsize=(12, 6))
    cmap = plt.get_cmap("tab10")
    for i, s in enumerate(states):
        plt.plot(df_hist.index, df_hist[s], label=s, color=cmap(i), linewidth=2)

    plt.title(f"CTMC Simulation: {ctmc_name}")
    plt.xlabel("Time (h)")
    plt.ylabel("Average number of nodes")
    plt.ylim(0, n_nodes)
    plt.grid(True, alpha=0.3)
    plt.legend()
    plt.tight_layout()
    plt.savefig(output_plot_dir / f"ctmc_{ctmc_name}.pdf")
    plt.close()

    all_histories[ctmc_name] = df_hist

print("Simulation batch finished.")


Simulating CTMC for class: ambulance_station
Simulating CTMC for class: atm
Simulating CTMC for class: bank
Simulating CTMC for class: gov_office
Simulating CTMC for class: hospital
Simulating CTMC for class: power
Simulating CTMC for class: surveillance
Simulating CTMC for class: telecom
Simulation batch finished.
