# StoneGate Leak Check (Simulator)

This notebook performs a simple leak-check procedure against the simulator devices:
- Controller: `press_ctrl0` (seal / pump enable / setpoint)
- Sensor: `press0` (pressure_kPa)

Model assumption for the estimate:
$$\frac{dP}{dt} = -k(P - P_{atm})$$
so $P(t) - P_{atm} = (P_0 - P_{atm}) e^{-kt}$.

Run the backend in simulator mode first:
- `./backend/build/StoneGate --sim` (default `ws://localhost:8080/status`)

In [None]:
import asyncio
import json
import time

import matplotlib.pyplot as plt

import stonegate_api as sg

sg.WS_URL = 'ws://localhost:8080/status'
PRESS_CTRL_ID = 'press_ctrl0'
PRESS_SENSOR_ID = 'press0'
P_ATM_KPA = 101.3

TARGET_KPA = 40.0
OBSERVE_S = 60.0
SAMPLE_PERIOD_S = 1.0


In [None]:
# Leak-check run

devs = await sg.rpc('devices.list', {})
ids = {d.get('id') for d in devs.get('devices', []) if isinstance(d, dict)}
assert PRESS_CTRL_ID in ids and PRESS_SENSOR_ID in ids, f'Missing required devices; have: {sorted(i for i in ids if isinstance(i, str))}'

# Seal and pump to target pressure
await sg.device_action(PRESS_CTRL_ID, json.loads(r"""{"set":{"sealed":true,"pump_enabled":true}}"""))
await sg.device_action(PRESS_CTRL_ID, json.loads(rf"""{{"set":{{"pressure_setpoint_kPa":{TARGET_KPA}}}}}"""))

await sg.wait_for_stable(PRESS_SENSOR_ID, 'pressure_kPa', tolerance=0.3, window_s=4.0, consecutive=6, timeout_s=60.0)
p0 = await sg.poll_required_number(PRESS_SENSOR_ID, 'pressure_kPa')
print(f'Stabilized at ~{p0:.3f} kPa; disabling pump…')

# Disable pump and observe drift
await sg.device_action(PRESS_CTRL_ID, json.loads(r"""{"set":{"pump_enabled":false}}"""))
samples = []  # (t, P_kPa)
t_start = time.time()
while True:
    t = time.time()
    p = await sg.poll_required_number(PRESS_SENSOR_ID, 'pressure_kPa')
    samples.append((t, p))
    if (t - t_start) >= OBSERVE_S:
        break
    await asyncio.sleep(SAMPLE_PERIOD_S)

k = sg.estimate_leak_rate_per_s(samples, p_atm_kpa=P_ATM_KPA)
print(f'End pressure: {samples[-1][1]:.3f} kPa after {OBSERVE_S:.1f}s')
if k is None:
    print('Leak estimate: insufficient/unstable data (try longer observe time).')
else:
    tau = 1.0 / k
    print(f'Estimated leak_rate_per_s: {k:.6f} (tau ≈ {tau:.1f}s)')

# Plot
ts = [t - samples[0][0] for t, _ in samples]
ps = [p for _, p in samples]
plt.plot(ts, ps)
plt.axhline(P_ATM_KPA, linestyle='--', label='P_atm')
plt.xlabel('time (s)')
plt.ylabel('pressure (kPa)')
plt.title('Leak check: pressure drift with pump disabled')
plt.legend()
plt.show()