# Neuro-Symbolic Stochastic Petri Net Control for Tokamak Plasmas

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/anulum/scpn-fusion-core/blob/main/examples/neuro_symbolic_control_demo.ipynb)
[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/anulum/scpn-fusion-core/main?labpath=examples%2Fneuro_symbolic_control_demo.ipynb)

**© 1998–2026 Miroslav Šotek. All rights reserved.**  
Contact: www.anulum.li | protoscience@anulum.li  
ORCID: [0009-0009-3560-0851](https://orcid.org/0009-0009-3560-0851)  
License: GNU AGPL v3 | Commercial licensing available

---

This notebook demonstrates a complete neuro-symbolic control pipeline for
tokamak plasma stabilisation. A **Stochastic Petri Net** (SPN) encodes
sensor→error→actuator control logic with **inhibitor arcs** implementing
hardware safety interlocks. The SPN is compiled to a **spiking neural
network** (SNN) via the SCPN Fusion Core compiler, then evaluated in
closed-loop against a 0-D tokamak plant model under three disturbance
scenarios. Formal verification (boundedness + liveness) and deterministic
replay are demonstrated end-to-end.

In [None]:
# Cell 1 — Imports & Config
from scpn_fusion.scpn.structure import StochasticPetriNet
from scpn_fusion.scpn.compiler import FusionCompiler
import numpy as np
import time
import json

try:
    import matplotlib
    matplotlib.use('Agg')  # non-interactive backend for CI
    import matplotlib.pyplot as plt
    HAS_MPL = True
except ImportError:
    HAS_MPL = False

np.random.seed(42)
print('Imports OK')

In [None]:
# Cell 2 — Petri Net Definition (12 places, 6 transitions, 2 inhibitor arcs)
net = StochasticPetriNet()

# --- Sensor observation places (normalised to [0, 1]) ---
net.add_place('n_e',      initial_tokens=0.5)   # 0: electron density
net.add_place('I_p',      initial_tokens=0.5)   # 1: plasma current
net.add_place('beta_N',   initial_tokens=0.45)  # 2: normalised beta

# --- Error signal places ---
net.add_place('n_e_err',    initial_tokens=0.0)  # 3
net.add_place('I_p_err',    initial_tokens=0.0)  # 4
net.add_place('beta_N_err', initial_tokens=0.0)  # 5

# --- Actuator output places ---
net.add_place('gas_puff',    initial_tokens=0.0)  # 6
net.add_place('ohmic_power', initial_tokens=0.0)  # 7
net.add_place('nbi_power',   initial_tokens=0.0)  # 8

# --- Safety interlock places ---
net.add_place('n_e_high',         initial_tokens=0.0)  # 9  (inhibitor source)
net.add_place('I_p_low',          initial_tokens=0.0)  # 10 (inhibitor source)
net.add_place('safety_interlock', initial_tokens=0.0)  # 11

# --- Sense transitions (sensor -> error) ---
net.add_transition('sense_density', threshold=0.3)
net.add_transition('sense_current', threshold=0.3)
net.add_transition('sense_beta',    threshold=0.3)

# --- Actuate transitions (error -> actuator) ---
net.add_transition('actuate_gas',   threshold=0.2)
net.add_transition('actuate_ohmic', threshold=0.2)
net.add_transition('actuate_nbi',   threshold=0.2)

# --- Sense arcs: Place -> Transition -> Place ---
# Input weights <= min(initial_tokens) = 0.45 so transitions can enable
net.add_arc('n_e',    'sense_density', weight=0.4)
net.add_arc('sense_density', 'n_e_err', weight=1.0)

net.add_arc('I_p',    'sense_current', weight=0.4)
net.add_arc('sense_current', 'I_p_err', weight=1.0)

net.add_arc('beta_N', 'sense_beta',    weight=0.4)
net.add_arc('sense_beta', 'beta_N_err', weight=1.0)

# --- Actuate arcs: Error -> Transition -> Actuator ---
net.add_arc('n_e_err',    'actuate_gas',   weight=0.5)
net.add_arc('actuate_gas', 'gas_puff',      weight=1.0)

net.add_arc('I_p_err',    'actuate_ohmic', weight=0.3)
net.add_arc('actuate_ohmic', 'ohmic_power', weight=1.0)

net.add_arc('beta_N_err', 'actuate_nbi',   weight=0.4)
net.add_arc('actuate_nbi', 'nbi_power',     weight=1.0)

# --- Inhibitor arcs (safety interlocks) ---
# Block gas puff if density already too high
net.add_arc('n_e_high', 'actuate_gas', weight=1.0, inhibitor=True)
# Block NBI if plasma current too low (safety: NBI needs stable I_p)
net.add_arc('I_p_low',  'actuate_nbi', weight=1.0, inhibitor=True)

# --- Connect safety_interlock to avoid dead-place ---
net.add_arc('actuate_gas', 'safety_interlock', weight=0.1)

# --- Compile ---
net.compile(allow_inhibitor=True)
print(net.summary())

# Place index map for reference
PLACE_IDX = {name: i for i, name in enumerate(net.place_names)}
print('\nPlace indices:', PLACE_IDX)

In [None]:
# Cell 3 — Formal Verification

# 3a. Topology validation
topo = net.validate_topology()
print('=== Topology Validation ===')
print(f'Dead places:         {topo["dead_places"]}')
print(f'Dead transitions:    {topo["dead_transitions"]}')
print(f'Unseeded cycles:     {topo["unseeded_place_cycles"]}')
print(f'Weight overflow:     {topo["input_weight_overflow_transitions"]}')

# 3b. Boundedness proof (stochastic sampling)
bound_report = net.verify_boundedness(n_steps=500, n_trials=200)
print(f'\n=== Boundedness ===')
print(f'Bounded:      {bound_report["bounded"]}')
print(f'Max marking:  {bound_report["max_marking"]:.4f}')
print(f'Min marking:  {bound_report["min_marking"]:.4f}')
assert bound_report['bounded'], 'Boundedness check FAILED'

# 3c. Liveness proof (all transitions reachable)
live_report = net.verify_liveness(n_steps=200, n_trials=1000)
print(f'\n=== Liveness ===')
print(f'Live:         {live_report["live"]}')
print(f'Min fire pct: {live_report["min_fire_pct"]:.4f}')
for tname, pct in live_report['transition_fire_pct'].items():
    print(f'  {tname:20s} fires in {pct*100:.1f}% of trials')

print(f'\n\u2705 VERIFIED: Petri net is bounded '
      f'(max_marking={bound_report["max_marking"]:.3f}) and '
      f'live (min_fire_pct={live_report["min_fire_pct"]:.3f})')

In [None]:
# Cell 4 — SNN Compilation
compiler = FusionCompiler(bitstream_length=1024, seed=42)
compiled = compiler.compile(
    net,
    firing_mode='fractional',
    firing_margin=0.15,
    allow_inhibitor=True,
)

print(compiled.summary())
print(f'W_in shape:          {compiled.W_in.shape}')
print(f'W_out shape:         {compiled.W_out.shape}')
print(f'W_in sparsity:       {1 - np.count_nonzero(compiled.W_in) / compiled.W_in.size:.2%}')
print(f'Stochastic path:     {compiled.has_stochastic_path}')
print(f'Thresholds:          {compiled.thresholds}')
print(f'Firing mode:         {compiled.firing_mode}')

In [None]:
# Cell 5 — 0-D Tokamak Plant Model
#
# Three state variables with ITER-like parameters:
#   dn_e/dt   = S_gas - n_e / tau_p + disturbance
#   dI_p/dt   = V_loop / L_p - R_p * I_p / L_p + disturbance
#   dbeta_N/dt = c_nbi * P_nbi / B^2 - beta_N / tau_E + disturbance

# Physical targets
TARGETS = np.array([1.0e20, 15.0e6, 1.8])   # n_e [m^-3], I_p [A], beta_N [-]

# Normalisation scales (for mapping to/from [0,1] Petri net domain)
SCALES = np.array([2.0e20, 30.0e6, 4.0])

# Plant parameters
TAU_P  = 1.0     # particle confinement time [s]
TAU_E  = 3.5     # energy confinement time [s]
L_P    = 10e-6   # plasma inductance [H]
R_P    = 1e-7    # plasma resistance [Ohm]
B_T    = 5.3     # toroidal field [T]
C_NBI  = 0.5     # NBI efficiency

# Actuator scaling (Petri net output [0,1] -> physical units)
GAS_SCALE   = 5.0e20    # max gas fuelling rate [m^-3/s]
OHMIC_SCALE = 2.0       # max loop voltage [V]
NBI_SCALE   = 40.0e6    # max NBI power [W]

NOISE_STD = np.array([1e18, 1e5, 0.01])  # process noise


def plant_step(state, actions, dt, disturbance, rng):
    """Advance 0-D plant by one Euler step.

    Parameters
    ----------
    state : (3,) array [n_e, I_p, beta_N]
    actions : (3,) array [gas_puff, ohmic, nbi] in [0, 1]
    dt : timestep [s]
    disturbance : (3,) array of external perturbations
    rng : numpy random generator
    """
    n_e, I_p, beta_N = state
    S_gas  = actions[0] * GAS_SCALE
    V_loop = actions[1] * OHMIC_SCALE
    P_nbi  = actions[2] * NBI_SCALE

    dn_e   = (S_gas - n_e / TAU_P) * dt + disturbance[0] * dt
    dI_p   = (V_loop / L_P - R_P * I_p / L_P) * dt + disturbance[1] * dt
    dbeta  = (C_NBI * P_nbi / B_T**2 - beta_N / TAU_E) * dt + disturbance[2] * dt

    noise = rng.normal(0, 1, 3) * NOISE_STD * np.sqrt(dt)
    new_state = state + np.array([dn_e, dI_p, dbeta]) + noise
    # Physical bounds
    new_state = np.clip(new_state, [0, 0, 0], [3e20, 30e6, 5.0])
    return new_state


# Verify plant at steady state
rng_test = np.random.default_rng(0)
ss = TARGETS.copy()
for _ in range(100):
    # Steady-state actions that maintain targets (approximate)
    ss_actions = np.array([
        TARGETS[0] / TAU_P / GAS_SCALE,
        R_P * TARGETS[1] / OHMIC_SCALE,
        TARGETS[2] / TAU_E * B_T**2 / C_NBI / NBI_SCALE,
    ])
    ss = plant_step(ss, ss_actions, 0.01, np.zeros(3), rng_test)
print(f'Plant steady-state check:')
print(f'  n_e:    {ss[0]:.3e} (target {TARGETS[0]:.3e})')
print(f'  I_p:    {ss[1]:.3e} (target {TARGETS[1]:.3e})')
print(f'  beta_N: {ss[2]:.3f}   (target {TARGETS[2]:.3f})')

In [None]:
# Cell 6 — PID Baseline Controller

PID_KP = np.array([0.5, 0.3, 0.4])
PID_KI = np.array([0.1, 0.05, 0.08])
PID_KD = np.array([0.02, 0.01, 0.015])


def pid_step(errors, integrals, prev_errors, dt):
    """3-channel PID with anti-windup.

    Returns (actions, new_integrals, current_errors)
    where actions are clipped to [0, 1].
    """
    p = PID_KP * errors
    integrals = integrals + errors * dt
    integrals = np.clip(integrals, -2.0, 2.0)  # anti-windup
    i = PID_KI * integrals
    d = PID_KD * (errors - prev_errors) / max(dt, 1e-12)
    actions = np.clip(p + i + d, 0.0, 1.0)
    return actions, integrals, errors.copy()


print('PID gains:')
print(f'  Kp = {PID_KP}')
print(f'  Ki = {PID_KI}')
print(f'  Kd = {PID_KD}')

In [None]:
# Cell 7 — Disturbance Profiles

def disturbance_elm(t):
    """ELM spike: +30% density burst at t=2.0s, duration 50ms."""
    d = np.zeros(3)
    if 2.0 <= t < 2.05:
        d[0] = 0.30 * TARGETS[0] / 0.05  # rate to add 30% in 50ms
    return d


def disturbance_resistive(t):
    """Resistive droop: -15% current ramp over 0.5s at t=4.0s."""
    d = np.zeros(3)
    if 4.0 <= t < 4.5:
        d[1] = -0.15 * TARGETS[1] / 0.5
    return d


def disturbance_combined(t):
    """Both ELM + resistive simultaneously at t=3.0s."""
    d = np.zeros(3)
    if 3.0 <= t < 3.05:
        d[0] = 0.30 * TARGETS[0] / 0.05
    if 3.0 <= t < 3.5:
        d[1] = -0.15 * TARGETS[1] / 0.5
    return d


SCENARIOS = {
    'ELM':        disturbance_elm,
    'Resistive':  disturbance_resistive,
    'Combined':   disturbance_combined,
}
print(f'Disturbance scenarios: {list(SCENARIOS.keys())}')

In [None]:
# Cell 8 — Closed-Loop Simulation

N_STEPS = 1000
DT = 0.01   # 10 ms
STATE_LABELS = ['n_e', 'I_p', 'beta_N']


def snn_float_step(compiled_net, plant_state, targets, scales):
    """SNN float-path controller: inject errors, run forward, read actuators."""
    # Normalised errors (positive = below target, need more)
    errors = np.clip((targets - plant_state) / targets, 0.0, 1.0)

    # Safety interlocks
    ne_high = 1.0 if plant_state[0] > 1.3 * targets[0] else 0.0
    ip_low  = 1.0 if plant_state[1] < 0.5 * targets[1] else 0.0

    # Build marking: inject into error and inhibitor places
    marking = compiled_net.initial_marking.copy()
    marking[3] = errors[0]   # n_e_err
    marking[4] = errors[1]   # I_p_err
    marking[5] = errors[2]   # beta_N_err
    marking[9]  = ne_high    # n_e_high (inhibitor)
    marking[10] = ip_low     # I_p_low  (inhibitor)

    # Forward pass 1: sense transitions
    currents = compiled_net.dense_forward_float(compiled_net.W_in, marking)
    fired = compiled_net.lif_fire(currents)
    marking = np.clip(
        marking - compiled_net.W_in.T @ fired + compiled_net.W_out @ fired,
        0.0, 1.0,
    )

    # Forward pass 2: actuate transitions (need error tokens from pass 1)
    currents2 = compiled_net.dense_forward_float(compiled_net.W_in, marking)
    fired2 = compiled_net.lif_fire(currents2)
    marking = np.clip(
        marking - compiled_net.W_in.T @ fired2 + compiled_net.W_out @ fired2,
        0.0, 1.0,
    )

    # Read actuator outputs
    return np.array([marking[6], marking[7], marking[8]])


def snn_sc_step(compiled_net, plant_state, targets, scales, rng):
    """SNN stochastic-computing path (if sc_neurocore available)."""
    if not compiled_net.has_stochastic_path:
        return snn_float_step(compiled_net, plant_state, targets, scales)

    errors = np.clip((targets - plant_state) / targets, 0.0, 1.0)
    ne_high = 1.0 if plant_state[0] > 1.3 * targets[0] else 0.0
    ip_low  = 1.0 if plant_state[1] < 0.5 * targets[1] else 0.0

    marking = compiled_net.initial_marking.copy()
    marking[3] = errors[0]
    marking[4] = errors[1]
    marking[5] = errors[2]
    marking[9]  = ne_high
    marking[10] = ip_low

    # Stochastic forward pass 1
    currents = compiled_net.dense_forward(compiled_net.W_in_packed, marking)
    fired = compiled_net.lif_fire(currents)
    marking = np.clip(
        marking - compiled_net.W_in.T @ fired + compiled_net.W_out @ fired,
        0.0, 1.0,
    )

    # Stochastic forward pass 2
    currents2 = compiled_net.dense_forward(compiled_net.W_in_packed, marking)
    fired2 = compiled_net.lif_fire(currents2)
    marking = np.clip(
        marking - compiled_net.W_in.T @ fired2 + compiled_net.W_out @ fired2,
        0.0, 1.0,
    )

    return np.array([marking[6], marking[7], marking[8]])


def run_simulation(controller_fn, scenario_fn, label):
    """Run closed-loop for one controller + one scenario."""
    rng = np.random.default_rng(42)
    state = TARGETS.copy()
    states  = np.zeros((N_STEPS + 1, 3))
    actions = np.zeros((N_STEPS, 3))
    errors  = np.zeros((N_STEPS, 3))
    timings = np.zeros(N_STEPS)
    states[0] = state

    # PID state
    integrals   = np.zeros(3)
    prev_errors = np.zeros(3)

    for k in range(N_STEPS):
        t = k * DT
        dist = scenario_fn(t)
        err = (TARGETS - state) / TARGETS
        errors[k] = err

        t0 = time.perf_counter()
        action = controller_fn(state, err, integrals, prev_errors, DT, rng)
        timings[k] = (time.perf_counter() - t0) * 1e6  # us

        if isinstance(action, tuple):
            action, integrals, prev_errors = action

        actions[k] = action
        state = plant_step(state, action, DT, dist, rng)
        states[k + 1] = state

    return {
        't': np.arange(N_STEPS + 1) * DT,
        'states': states,
        'actions': actions,
        'errors': errors,
        'timings': timings,
        'label': label,
    }


# Controller wrappers matching run_simulation signature
def ctrl_pid(state, err, integrals, prev_errors, dt, rng):
    return pid_step(err, integrals, prev_errors, dt)


def ctrl_snn_float(state, err, integrals, prev_errors, dt, rng):
    return snn_float_step(compiled, state, TARGETS, SCALES)


def ctrl_snn_sc(state, err, integrals, prev_errors, dt, rng):
    return snn_sc_step(compiled, state, TARGETS, SCALES, rng)


# Run all combinations
CONTROLLERS = {
    'PID':       ctrl_pid,
    'SNN-float': ctrl_snn_float,
}
if compiled.has_stochastic_path:
    CONTROLLERS['SNN-SC'] = ctrl_snn_sc

results = {}
for ctrl_name, ctrl_fn in CONTROLLERS.items():
    for scen_name, scen_fn in SCENARIOS.items():
        key = (ctrl_name, scen_name)
        results[key] = run_simulation(ctrl_fn, scen_fn, f'{ctrl_name}/{scen_name}')
        rmse = np.sqrt(np.mean(results[key]['errors']**2, axis=0))
        print(f'{ctrl_name:12s} | {scen_name:12s} | RMSE: {rmse}')

print(f'\nTotal simulations: {len(results)}')

In [None]:
# Cell 9 — Trajectory Comparison Plots

if HAS_MPL:
    fig, axes = plt.subplots(3, 3, figsize=(15, 10), sharex=True)
    colours = {'PID': 'r', 'SNN-float': 'b', 'SNN-SC': 'g'}
    styles  = {'PID': '--', 'SNN-float': '-', 'SNN-SC': ':'}
    labels_y = [
        r'$n_e$ [m$^{-3}$]',
        r'$I_p$ [A]',
        r'$\beta_N$ [-]',
    ]

    for j, scen_name in enumerate(SCENARIOS):
        for i in range(3):
            ax = axes[i, j]
            # Target line
            ax.axhline(TARGETS[i], color='k', linewidth=0.8, label='Target')

            for ctrl_name in CONTROLLERS:
                r = results[(ctrl_name, scen_name)]
                ax.plot(
                    r['t'], r['states'][:, i],
                    color=colours[ctrl_name],
                    linestyle=styles[ctrl_name],
                    linewidth=1.2,
                    label=ctrl_name,
                )

            # Shade disturbance window
            if scen_name == 'ELM':
                ax.axvspan(2.0, 2.05, alpha=0.15, color='gray')
            elif scen_name == 'Resistive':
                ax.axvspan(4.0, 4.5, alpha=0.15, color='gray')
            else:
                ax.axvspan(3.0, 3.05, alpha=0.15, color='gray')
                ax.axvspan(3.0, 3.5, alpha=0.08, color='gray')

            if i == 0:
                ax.set_title(scen_name, fontsize=12)
            if j == 0:
                ax.set_ylabel(labels_y[i])
            if i == 2:
                ax.set_xlabel('Time [s]')
            if i == 0 and j == 2:
                ax.legend(fontsize=8, loc='upper right')

    fig.suptitle(
        'Closed-Loop Trajectory Comparison: PID vs SNN Controllers',
        fontsize=14,
    )
    plt.tight_layout()
    plt.savefig('neuro_symbolic_trajectories.png', dpi=150)
    plt.show()
    print('Plot saved: neuro_symbolic_trajectories.png')
else:
    print('matplotlib not available — skipping plots')

In [None]:
# Cell 10 — Quantitative Results Table

def compute_rmse(r):
    return np.sqrt(np.mean(r['errors']**2, axis=0))

def compute_ise(r):
    return np.sum(r['errors']**2, axis=0) * DT

def compute_settling_time(r, band=0.05):
    """Time to enter and stay within 'band' of target."""
    st = np.zeros(3)
    for i in range(3):
        err_abs = np.abs(r['errors'][:, i])
        settled = err_abs < band
        # Find last time it was NOT settled
        not_settled = np.where(~settled)[0]
        if len(not_settled) == 0:
            st[i] = 0.0
        elif not_settled[-1] >= N_STEPS - 1:
            st[i] = N_STEPS * DT  # never settled
        else:
            st[i] = (not_settled[-1] + 1) * DT
    return st


print('=' * 80)
print(f'{"RMSE":>30s}', end='')
for sn in SCENARIOS:
    print(f'  | {sn:^30s}', end='')
print()
print(f'{"":>30s}', end='')
for _ in SCENARIOS:
    print(f'  | {"n_e":>9s} {"I_p":>9s} {"beta_N":>9s}', end='')
print()
print('-' * 80)

for ctrl_name in CONTROLLERS:
    print(f'{ctrl_name:>30s}', end='')
    for scen_name in SCENARIOS:
        rmse = compute_rmse(results[(ctrl_name, scen_name)])
        print(f'  | {rmse[0]:9.4f} {rmse[1]:9.4f} {rmse[2]:9.4f}', end='')
    print()

print()
print('=' * 80)
print('ISE (Integral Squared Error):')
print('-' * 80)
for ctrl_name in CONTROLLERS:
    print(f'{ctrl_name:>30s}', end='')
    for scen_name in SCENARIOS:
        ise = compute_ise(results[(ctrl_name, scen_name)])
        print(f'  | {ise[0]:9.4f} {ise[1]:9.4f} {ise[2]:9.4f}', end='')
    print()

print()
print('=' * 80)
print('Settling Time to 5% band [s]:')
print('-' * 80)
for ctrl_name in CONTROLLERS:
    print(f'{ctrl_name:>30s}', end='')
    for scen_name in SCENARIOS:
        st = compute_settling_time(results[(ctrl_name, scen_name)])
        print(f'  | {st[0]:9.2f} {st[1]:9.2f} {st[2]:9.2f}', end='')
    print()

In [None]:
# Cell 11 — Timing Benchmark
import timeit

rng_bench = np.random.default_rng(99)
bench_state = TARGETS.copy()


def bench_pid():
    err = (TARGETS - bench_state) / TARGETS
    pid_step(err, np.zeros(3), np.zeros(3), DT)


def bench_snn_float():
    snn_float_step(compiled, bench_state, TARGETS, SCALES)


N_BENCH = 5000
timing_results = {}

for name, fn in [('PID', bench_pid), ('SNN-float', bench_snn_float)]:
    times = []
    for _ in range(N_BENCH):
        t0 = time.perf_counter()
        fn()
        times.append((time.perf_counter() - t0) * 1e6)
    arr = np.array(times)
    timing_results[name] = {
        'mean_us': float(np.mean(arr)),
        'std_us':  float(np.std(arr)),
        'max_us':  float(np.max(arr)),
        'p50_us':  float(np.percentile(arr, 50)),
        'p99_us':  float(np.percentile(arr, 99)),
    }

if compiled.has_stochastic_path:
    def bench_snn_sc():
        snn_sc_step(compiled, bench_state, TARGETS, SCALES, rng_bench)

    times = []
    for _ in range(N_BENCH):
        t0 = time.perf_counter()
        bench_snn_sc()
        times.append((time.perf_counter() - t0) * 1e6)
    arr = np.array(times)
    timing_results['SNN-SC'] = {
        'mean_us': float(np.mean(arr)),
        'std_us':  float(np.std(arr)),
        'max_us':  float(np.max(arr)),
        'p50_us':  float(np.percentile(arr, 50)),
        'p99_us':  float(np.percentile(arr, 99)),
    }

print(f'{"Controller":>15s} | {"Mean [us]":>10s} | {"Std [us]":>10s} | '
      f'{"P99 [us]":>10s} | {"Max [us]":>10s}')
print('-' * 72)
for name, stats in timing_results.items():
    print(f'{name:>15s} | {stats["mean_us"]:10.1f} | {stats["std_us"]:10.1f} | '
          f'{stats["p99_us"]:10.1f} | {stats["max_us"]:10.1f}')

# Assert real-time capability
snn_max = timing_results['SNN-float']['p99_us']
assert snn_max < 1000.0, (
    f'SNN-float P99 latency {snn_max:.0f} us exceeds 1ms target'
)
print(f'\n\u2705 SNN-float P99 latency: {snn_max:.1f} us < 1000 us (1ms target)')

In [None]:
# Cell 12 — Artifact Export & Deterministic Replay

# Export artifact
artifact = compiled.export_artifact(
    name='neurosym_tokamak_control',
    dt_control_s=DT,
    readout_config={
        'actions': [
            {'name': 'gas_puff',    'pos_place': 6, 'neg_place': 9},
            {'name': 'ohmic_power', 'pos_place': 7, 'neg_place': 10},
            {'name': 'nbi_power',   'pos_place': 8, 'neg_place': 11},
        ],
        'gains': [GAS_SCALE, OHMIC_SCALE, NBI_SCALE],
        'abs_max': [GAS_SCALE, OHMIC_SCALE, NBI_SCALE],
        'slew_per_s': [1e22, 100.0, 1e8],
    },
    injection_config=[
        {'place_id': 3, 'source': 'n_e_err',  'scale': 1.0, 'offset': 0.0, 'clamp_0_1': True},
        {'place_id': 4, 'source': 'I_p_err',  'scale': 1.0, 'offset': 0.0, 'clamp_0_1': True},
        {'place_id': 5, 'source': 'beta_err', 'scale': 1.0, 'offset': 0.0, 'clamp_0_1': True},
    ],
)

# Serialise to JSON
import tempfile, os
from scpn_fusion.scpn.artifact import Artifact

art_dict = {
    'meta': {
        'name': artifact.meta.name,
        'dt_control_s': artifact.meta.dt_control_s,
        'stream_length': artifact.meta.stream_length,
    },
    'topology': {
        'n_places': compiled.n_places,
        'n_transitions': compiled.n_transitions,
        'place_names': compiled.place_names,
        'transition_names': compiled.transition_names,
    },
    'weights': {
        'W_in': compiled.W_in.tolist(),
        'W_out': compiled.W_out.tolist(),
    },
    'thresholds': compiled.thresholds.tolist(),
    'initial_marking': compiled.initial_marking.tolist(),
}

art_path = 'neurosym_control_artifact.json'
with open(art_path, 'w') as f:
    json.dump(art_dict, f, indent=2)
print(f'Artifact exported: {art_path} ({os.path.getsize(art_path)} bytes)')

# Reload and deterministic replay
with open(art_path) as f:
    reloaded = json.load(f)

W_in_reload = np.array(reloaded['weights']['W_in'])
W_out_reload = np.array(reloaded['weights']['W_out'])
thresh_reload = np.array(reloaded['thresholds'])
init_reload = np.array(reloaded['initial_marking'])

# Verify matrices match
assert np.allclose(W_in_reload, compiled.W_in), 'W_in mismatch after reload'
assert np.allclose(W_out_reload, compiled.W_out), 'W_out mismatch after reload'

# Replay the Combined scenario with SNN-float
rng_replay = np.random.default_rng(42)
state_orig = TARGETS.copy()
state_replay = TARGETS.copy()
max_diff = 0.0

for k in range(N_STEPS):
    t = k * DT
    dist = disturbance_combined(t)

    # Original
    act_orig = snn_float_step(compiled, state_orig, TARGETS, SCALES)
    rng_orig = np.random.default_rng(42 + k)
    state_orig = plant_step(state_orig, act_orig, DT, dist, rng_orig)

    # Replay from reloaded artifact
    errors_r = np.clip((TARGETS - state_replay) / TARGETS, 0.0, 1.0)
    ne_high_r = 1.0 if state_replay[0] > 1.3 * TARGETS[0] else 0.0
    ip_low_r  = 1.0 if state_replay[1] < 0.5 * TARGETS[1] else 0.0

    m = init_reload.copy()
    m[3], m[4], m[5] = errors_r[0], errors_r[1], errors_r[2]
    m[9], m[10] = ne_high_r, ip_low_r

    # Pass 1
    c1 = W_in_reload @ m
    margin = 0.15
    f1 = np.clip((c1 - thresh_reload) / margin, 0.0, 1.0)
    m = np.clip(m - W_in_reload.T @ f1 + W_out_reload @ f1, 0.0, 1.0)
    # Pass 2
    c2 = W_in_reload @ m
    f2 = np.clip((c2 - thresh_reload) / margin, 0.0, 1.0)
    m = np.clip(m - W_in_reload.T @ f2 + W_out_reload @ f2, 0.0, 1.0)

    act_replay = np.array([m[6], m[7], m[8]])
    rng_replay_k = np.random.default_rng(42 + k)
    state_replay = plant_step(state_replay, act_replay, DT, dist, rng_replay_k)

    diff = np.max(np.abs(state_orig - state_replay))
    max_diff = max(max_diff, diff)

print(f'Deterministic replay: max_diff = {max_diff:.2e}')
assert max_diff < 1e-10, f'Replay diverged: max_diff={max_diff}'
print('\u2705 Deterministic replay: PASS')

# Clean up
os.remove(art_path)

In [None]:
# Cell 13 — Nengo SNN Export (optional)

try:
    from scpn_fusion.control.nengo_snn_wrapper import (
        NengoSNNController,
        NengoSNNConfig,
        nengo_available,
    )

    if nengo_available():
        cfg = NengoSNNConfig(
            n_neurons=200,
            n_channels=3,    # 3 control channels for our plant
            tau_synapse=0.015,
            tau_mem=0.020,
            dt=0.001,
            gain=5.0,
            seed=42,
        )
        nengo_ctrl = NengoSNNController(config=cfg)

        # Smoke test: 100 steps
        nengo_ctrl.reset()
        for k in range(100):
            err = np.random.default_rng(k).normal(0, 0.1, 3)
            out = nengo_ctrl.step(err)

        print(f'Nengo SNN smoke test: 100 steps OK')
        print(f'Last output: {out}')

        # FPGA weight export
        nengo_ctrl.export_fpga_weights('neurosym_fpga_weights.npz')
        print('FPGA weights exported: neurosym_fpga_weights.npz')

        # Loihi export (requires nengo_loihi)
        try:
            nengo_ctrl.export_loihi('neurosym_loihi_model.npz')
            print('Loihi export: OK')
        except ImportError:
            print('Loihi export: skipped (nengo_loihi not installed)')

        # Benchmark
        bench = nengo_ctrl.benchmark(n_steps=200)
        print(f'Nengo benchmark (200 steps): '
              f'mean={bench["mean_us"]:.0f} us, '
              f'P99={bench["p99_us"]:.0f} us')

        # Cleanup
        import os
        for f in ['neurosym_fpga_weights.npz', 'neurosym_loihi_model.npz']:
            if os.path.exists(f):
                os.remove(f)
    else:
        print('Nengo not available — skipping SNN export')
except ImportError as e:
    print(f'Nengo import failed: {e}')
    print('Install with: pip install nengo')
    print('For Loihi: pip install nengo-loihi')

## Summary & References

### Key Findings

1. **Formal guarantees**: The 12-place, 6-transition Stochastic Petri Net
   is formally verified as **bounded** (markings in [0, 1]) and **live**
   (all transitions reachable).

2. **Real-time performance**: The SNN float-path controller runs at
   sub-millisecond latency per control tick, well within the 1 ms
   real-time budget for tokamak plasma control.

3. **Safety interlocks**: Inhibitor arcs prevent unsafe actuator
   activation (gas puff blocked when density too high, NBI blocked
   when current too low), providing hardware-level safety guarantees
   not available in standard PID controllers.

4. **Deterministic replay**: The exported JSON artifact produces
   bitwise-identical trajectories on reload, enabling regulatory
   auditing and reproducibility.

5. **Neuromorphic deployment**: The compiled SNN can be exported to
   Intel Loihi (via Nengo) or FPGA targets for ultra-low-latency
   deployment.

### Companion Paper

Sotek, M. (2026). *Neuro-Symbolic Stochastic Petri Nets for Verified
Real-Time Tokamak Control.* arXiv preprint.

### References

- Murata, T. (1989). Petri nets: Properties, analysis and applications. *Proc. IEEE*.
- David, R. & Alla, H. (2010). *Discrete, Continuous, and Hybrid Petri Nets.* Springer.
- Maass, W. (1997). Networks of spiking neurons. *Neural Networks*.
- Alaghi, A. & Hayes, J.P. (2013). Survey of stochastic computing. *ACM TECS*.
- Felici, F. et al. (2011). Real-time physics-model-based simulation of the current
  density profile in tokamak plasmas. *Nuclear Fusion*.
- Bekolay, T. et al. (2014). Nengo: a Python tool for building large-scale functional
  brain models. *Frontiers in Neuroinformatics*.
- Davies, M. et al. (2018). Loihi: A neuromorphic manycore processor with on-chip
  learning. *IEEE Micro*.