diff --git a/.gitignore b/.gitignore index f838ffa..a99f6ef 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,6 @@ dmypy.json coverage_output.log **/.DS_Store .local_coverage_data + +# Local credentialed runner for validate_lug_derivation +scripts/validate_lug_derivation/run_local.sh diff --git a/CHANGELOG.md b/CHANGELOG.md index 625ab93..2449c46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,19 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [2.6.3] - 04/2026 + +### Fixed + +- **Feedthrough values now derived via Kirchhoff instead of read from `downstream-lugs`** — `SpanPanelSnapshot.feedthrough_power_w`, `feedthrough_energy_consumed_wh`, and `feedthrough_energy_produced_wh` are computed as `main − Σ(branches)` inside + `HomieDeviceConsumer._build_snapshot` (and mirrored in the dirty-circuit rebuild path) rather than sourced from the native `energy.ebus.device.lugs.downstream` `active-power` / `imported-energy` / `exported-energy` properties. The native MQTT readings + exhibit a systematic ~400–550 W offset on `active-power` and can emit non-monotonic (including negative) cumulative values on `imported-energy`, making them unusable for either instantaneous power or lifetime energy accounting. Main-meter and per-branch + readings remain accurate, so Kirchhoff at the main bus produces a physically-consistent result: `P_main = P_feedthrough + Σ(branches, load-perspective)` for instantaneous power, and the same identity applied to _net_ energy + (`main.consumed − main.produced − Σ(branch.net)`) split into non-negative consumed / produced counters. Net-based energy handling is required because a per-direction subtraction would emit negative cumulative counters whenever circuits flow + bidirectionally (the classic case is PV self-consumption: `Σ(branch.consumed)` can exceed `main.consumed` even when the net balance is valid). The synthesized PV virtual circuit participates with the correct load-perspective sign, and unmapped tab + entries are zero-power, so both contribute safely to the sum. No public interface change — field names and types are unchanged; only the source of the values shifts. `downstream_l1_current_a` / `downstream_l2_current_a` continue to be read directly from + the downstream-lugs node — those per-phase readings are orthogonal to the defect. The underlying firmware defect is tracked upstream at [spanio/SPAN-API-Client-Docs#13](https://github.com/spanio/SPAN-API-Client-Docs/issues/13). + ## [2.6.2] - 04/2026 ### Changed diff --git a/pyproject.toml b/pyproject.toml index 2a66bed..f1fa266 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "span-panel-api" -version = "2.6.2" +version = "2.6.3" description = "A client library for SPAN Panel API" authors = [ {name = "SpanPanel"} diff --git a/scripts/validate_lug_derivation/README.md b/scripts/validate_lug_derivation/README.md new file mode 100644 index 0000000..707897e --- /dev/null +++ b/scripts/validate_lug_derivation/README.md @@ -0,0 +1,71 @@ +# validate_lug_derivation + +Diagnostic harness that compares **v1 REST**, **v2 MQTT (via the current library)**, and **Kirchhoff-derived** values for the SPAN panel's downstream-lugs (feedthrough) power and energy. Used to: + +1. Validate that the library's Kirchhoff derivation (introduced in `span-panel-api` 2.6.3) stays consistent with v1 REST as an independent ground truth. +2. **Track when the upstream firmware defect on `downstream-lugs` is fixed.** The SPAN API is in beta and carries no version signal, so we detect the fix empirically by watching the raw MQTT properties converge on the Kirchhoff-derived values. Upstream + issue: [spanio/SPAN-API-Client-Docs#13](https://github.com/spanio/SPAN-API-Client-Docs/issues/13). + +## Files + +- **`v1_reader.py`** — Captures N v1-REST snapshots using an isolated `span-panel-api==1.1.15` env (the last version that still shipped the v1 client). Output: JSON on stdout. +- **`v2_reader.py`** — Captures N v2-MQTT snapshots from the current workspace install. Also reaches into the library's accumulator to grab the **raw** `downstream-lugs/active-power`, `imported-energy`, `exported-energy` — pre-derivation — for firmware-fix + tracking. +- **`compare.py`** — Driver. Runs both readers in parallel via `uv run`, zips samples by index, prints a side-by-side table per sample, and flags anomalies. +- **`run_local.sh.example`** — Template for the credentialed wrapper. Copy to `run_local.sh` (which is gitignored), fill in the three credential fields, run. + +## Setup + +1. Obtain credentials for a live panel: + + - v1 token: existing pre-issued JWT (no re-registration needed). + - v2 passphrase: `hopPassphrase` for `/api/v2/auth/register`. + +2. Copy `run_local.sh.example` to `run_local.sh` (the latter is gitignored, so credentials won't leak), then edit: + + ```bash + HOST="192.168.X.Y" + V1_TOKEN="..." + V2_PASSPHRASE="..." + SAMPLES=5 + INTERVAL=3 + ``` + +3. Run: `./run_local.sh`. + +Requires `uv`. The v1 reader is auto-provisioned via `uv run --no-project --with span-panel-api==1.1.15`; the v2 reader uses the current workspace install. + +## Output — what to look for + +Per sample, `compare.py` prints: + +- **Power table** — `main_power_w`, reported vs derived feedthrough, Σcircuits partitioned (PV vs loads), for both APIs. +- **v2 `power_flows`** — the panel's own `pv/battery/grid/site` aggregates, as indicators. +- **v2 downstream-lugs raw (pre-derivation)** — the three raw MQTT properties the library **stopped** reading into the snapshot in 2.6.3. This is the firmware-fix tracker. +- **Energy net** — `(consumed − produced)` for main, reported feedthrough, Σcircuits, and derived feedthrough. +- **Flags** — anomalies (see next section). + +Post-2.6.3, `feedthrough_power_w (reported)` equals `(derived)` by construction on v2 (the library derives internally). That's expected — convergence on that row is itself the confirmation signal that the library-side fix is working. The interesting row +for ongoing tracking is the **raw** block. + +## Flags + +- **`firmware downstream-lugs active-power still offset by X W vs Kirchhoff — upstream defect present`** — The raw MQTT `active-power` differs from the Kirchhoff-derived feedthrough by more than 100 W. Current state while the upstream firmware defect is + unpatched. +- **`firmware downstream-lugs active-power within X W of Kirchhoff — upstream defect MAY be fixed (confirm over sustained samples)`** — The delta has dropped below 50 W. Could be sensor noise on a single sample — confirm across a longer run (e.g. + `SAMPLES=30 INTERVAL=10`) before declaring the upstream fix has shipped. +- **`firmware downstream-lugs imported-energy is NEGATIVE (X Wh) — upstream counter still broken`** — The cumulative `imported-energy` counter went negative, which is physically impossible for a monotonic counter. Historically observed; current live panels + sometimes emit positive values, so the flag is only armed when `< 0`. +- **`v1 feedthrough_consumed_wh is NEGATIVE (...)`** — The v1 REST feedthrough energy counter is broken on this panel too, independent of the MQTT defect. Included for completeness; v1 is not a viable fallback. +- **`v1 reported net energy off Kirchhoff by X Wh`** / **`v1 reported feedthrough off Kirchhoff by X W`** — Same shape of check on the v1 side. v1 active-power tends to track Kirchhoff within sensor noise; v1 energy diverges heavily due to the broken + counter. +- **`derived feedthrough power diverges across APIs: v1=A vs v2=B`** — The Kirchhoff-derived values from v1 and v2 disagree by more than 100 W. Usually explained by sample-timing skew when load is shifting quickly; a persistent gap would warrant + investigation of the sign-partitioning logic. + +## When the firmware is fixed + +Watch for the `active-power ... MAY be fixed` flag to fire on every sample across a sustained run (e.g. 30+ samples over several minutes). When that holds: + +1. Verify `imported-energy` stays non-negative and its delta from the derived `consumed_wh` stabilizes near zero. +2. Consider whether the library should switch back to reading the native `downstream-lugs` values directly, or continue deriving. Deriving is robust regardless of firmware state, so the change is optional — potentially valuable only if the panel's own + measurement is more precise than the computed one (which is not yet established). diff --git a/scripts/validate_lug_derivation/compare.py b/scripts/validate_lug_derivation/compare.py new file mode 100644 index 0000000..6f19718 --- /dev/null +++ b/scripts/validate_lug_derivation/compare.py @@ -0,0 +1,311 @@ +"""Drive v1 and v2 readers in parallel; derive feedthrough from main minus +Σcircuits; print a side-by-side comparison for each sample. + +Usage: + + python compare.py \ + --host 192.168.65.70 \ + --v1-token "$V1_TOKEN" \ + --v2-passphrase "$V2_PASSPHRASE" \ + --samples 5 --interval 3 + +Physics — Kirchhoff at the main bus (grid-perspective on main/feedthrough, +load-perspective on branch circuits where positive = consumption): + + P_main = P_feedthrough + Σ(branches, load-perspective) + => P_feedthrough_derived = P_main - Σ(branches) + +PV handling. A solar inverter connected to a branch appears as: + * v1 REST: two raw physical tab circuits in grid-perspective (positive = + power flowing INTO the bus from the inverter). No virtual PV entry. + * v2 MQTT: one synthesized "PV" virtual circuit in load-perspective + (negative = producing), AND the underlying physical tabs are suppressed. + +The v1-only circuits (by UUID set-difference with v2) therefore identify the +physical PV tabs. To get comparable load-perspective totals, we negate them: + + Σ_v1_load = Σ_v1_raw - 2 * Σ(v1-only circuits) + +Energy uses the Kirchhoff identity on NET counters: + + net_feedthrough = (main_consumed − main_produced) + − Σ(c.consumed − c.produced) +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +from pathlib import Path +import sys +from typing import Any + +HERE = Path(__file__).resolve().parent +SPAN_API_ROOT = HERE.parent.parent + + +def _fmt(x: float | None, width: int = 12) -> str: + if x is None: + return f"{'—':>{width}}" + return f"{x:>{width}.2f}" + + +async def _run(cmd: list[str], cwd: Path) -> dict[str, Any]: + proc = await asyncio.create_subprocess_exec( + *cmd, + cwd=str(cwd), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout_b, stderr_b = await proc.communicate() + if proc.returncode != 0: + raise RuntimeError( + f"reader exited {proc.returncode}: {stderr_b.decode(errors='replace')}" + ) + return json.loads(stdout_b.decode()) + + +def _partition_v1(v1: dict[str, Any], shared_ids: set[str]) -> dict[str, float]: + """Partition v1 circuits into 'load' (shared with v2) and 'pv_tabs' (v1-only, + grid-perspective) and return comparable sums.""" + load_p = load_c = load_pe = 0.0 + pv_p = pv_c = pv_pe = 0.0 + for c in v1["circuits"]: + p = float(c["instant_power_w"]) + cons = float(c["consumed_energy_wh"]) + prod = float(c["produced_energy_wh"]) + if c["circuit_id"] in shared_ids: + load_p += p + load_c += cons + load_pe += prod + else: + pv_p += p + pv_c += cons + pv_pe += prod + return { + "load_power_w": load_p, + "load_consumed_wh": load_c, + "load_produced_wh": load_pe, + "pv_tabs_power_w_grid": pv_p, + "pv_tabs_consumed_wh_grid": pv_c, + "pv_tabs_produced_wh_grid": pv_pe, + # Load-perspective total for Kirchhoff: flip pv_tabs sign for power. + # For energy we can't symmetrically swap consumed/produced without + # knowing which counter corresponds to which direction in raw REST. + # Power-space correction is what we need for Kirchhoff balance. + "sigma_load_persp_power_w": load_p - pv_p, + "sigma_all_raw_power_w": load_p + pv_p, + } + + +def _sum_v2_circuits(v2: dict[str, Any]) -> dict[str, float]: + p = sum(float(c["instant_power_w"]) for c in v2["circuits"]) + cons = sum(float(c["consumed_energy_wh"]) for c in v2["circuits"]) + prod = sum(float(c["produced_energy_wh"]) for c in v2["circuits"]) + return { + "sigma_power_w": p, + "sigma_consumed_wh": cons, + "sigma_produced_wh": prod, + } + + +def _print_sample(idx: int, v1: dict[str, Any], v2: dict[str, Any]) -> None: + shared_ids = {c["circuit_id"] for c in v1["circuits"]} & { + c["circuit_id"] for c in v2["circuits"] + } + p1 = _partition_v1(v1, shared_ids) + s2 = _sum_v2_circuits(v2) + + main_v1 = float(v1["main_power_w"]) + main_v2 = float(v2["main_power_w"]) + feed_v1 = float(v1["feedthrough_power_w"]) + feed_v2 = float(v2["feedthrough_power_w"]) + + # Derived feedthrough power (Kirchhoff, load-perspective Σ) + derived_v1 = main_v1 - p1["sigma_load_persp_power_w"] + derived_v2 = main_v2 - s2["sigma_power_w"] + + # Energy nets + net_main_v1 = float(v1["main_consumed_wh"]) - float(v1["main_produced_wh"]) + net_main_v2 = float(v2["main_consumed_wh"]) - float(v2["main_produced_wh"]) + net_feed_rpt_v1 = float(v1["feedthrough_consumed_wh"]) - float(v1["feedthrough_produced_wh"]) + net_feed_rpt_v2 = float(v2["feedthrough_consumed_wh"]) - float(v2["feedthrough_produced_wh"]) + net_circ_v1 = p1["load_consumed_wh"] + p1["pv_tabs_consumed_wh_grid"] - ( + p1["load_produced_wh"] + p1["pv_tabs_produced_wh_grid"] + ) + net_circ_v2 = s2["sigma_consumed_wh"] - s2["sigma_produced_wh"] + net_feed_der_v1 = net_main_v1 - net_circ_v1 + net_feed_der_v2 = net_main_v2 - net_circ_v2 + + dt = float(v2["t"]) - float(v1["t"]) + print(f"\n=== sample {idx} (v2 vs v1 capture offset: {dt:+.2f}s) ===") + print(f" shared circuits: {len(shared_ids)} " + f"v1-only (PV tabs): {len(v1['circuits']) - len(shared_ids)} " + f"v2-only (PV virtual): {len(v2['circuits']) - len(shared_ids)}") + + pv = v2.get("pv") or {} + if pv.get("feed_circuit_id"): + print(f" v2 pv: feed={pv['feed_circuit_id'][:8]} " + f"vendor={pv.get('vendor_name')} " + f"capacity={pv.get('nameplate_capacity_w')} W " + f"position={pv.get('relative_position')}") + + print("\n power (W):") + print(f"{' field':<44}{'v1':>12}{'v2':>12}{'Δ(v2-v1)':>12}") + rows_p: list[tuple[str, float, float]] = [ + ("main_power_w", main_v1, main_v2), + ("feedthrough_power_w (reported)", feed_v1, feed_v2), + ("Σ circuits (raw, v1 grid+load mixed)", + p1["sigma_all_raw_power_w"], s2["sigma_power_w"]), + ("Σ circuits (load-perspective)", + p1["sigma_load_persp_power_w"], s2["sigma_power_w"]), + ("Σ v1-only / v2-only (PV)", + p1["pv_tabs_power_w_grid"], + sum(float(c["instant_power_w"]) for c in v2["circuits"] + if c["circuit_id"] not in shared_ids)), + ("feedthrough_power_w (derived)", derived_v1, derived_v2), + ] + for label, a, b in rows_p: + print(f"{' ' + label:<44}{_fmt(a)}{_fmt(b)}{_fmt(b - a)}") + + # v2-only: power flows indicators + pfp = v2.get("power_flow_pv") + pfb = v2.get("power_flow_battery") + pfg = v2.get("power_flow_grid") + pfs = v2.get("power_flow_site") + print("\n v2 power_flows (W):") + print(f" pv={_fmt(pfp, 9)} battery={_fmt(pfb, 9)} " + f"grid={_fmt(pfg, 9)} site={_fmt(pfs, 9)}") + + # v2 raw downstream-lugs (pre-derivation) — captured directly from the + # MQTT accumulator before the library applies its Kirchhoff fix. + # Watch the delta vs derived to detect when the upstream firmware is + # patched (spanio/SPAN-API-Client-Docs#13). Because the SPAN API is in + # beta, there is no version signal that would announce the fix — we + # detect it empirically. + raw = v2.get("downstream_lugs_raw") or {} + raw_ap = raw.get("active_power_w") + raw_ie = raw.get("imported_energy_wh") + raw_ee = raw.get("exported_energy_wh") + print("\n v2 downstream-lugs raw (pre-derivation, upstream-firmware view):") + delta_ap = (raw_ap - derived_v2) if raw_ap is not None else None + print(f" active-power: {_fmt(raw_ap, 12)} " + f"Δ vs derived: {_fmt(delta_ap, 10)}") + print(f" imported-energy: {_fmt(raw_ie, 12)} " + f"(library-derived consumed: {_fmt(float(v2['feedthrough_consumed_wh']), 10)})") + print(f" exported-energy: {_fmt(raw_ee, 12)} " + f"(library-derived produced: {_fmt(float(v2['feedthrough_produced_wh']), 10)})") + + print("\n energy net (Wh = consumed - produced):") + print(f"{' field':<44}{'v1':>12}{'v2':>12}{'Δ(v2-v1)':>12}") + rows_e: list[tuple[str, float, float]] = [ + ("net_main", net_main_v1, net_main_v2), + ("net_feedthrough (reported)", net_feed_rpt_v1, net_feed_rpt_v2), + ("net_Σcircuits", net_circ_v1, net_circ_v2), + ("net_feedthrough (derived)", net_feed_der_v1, net_feed_der_v2), + ] + for label, a, b in rows_e: + print(f"{' ' + label:<44}{_fmt(a)}{_fmt(b)}{_fmt(b - a)}") + + # Cross-API consistency: derived should match across v1 and v2. + # Note: v2 library >= 2.6.3 derives feedthrough, so `feed_v2 == + # derived_v2` by construction — the defect signal now lives on the raw + # downstream-lugs values (see raw_ap / raw_ie / raw_ee above). + flags: list[str] = [] + if abs(derived_v1 - derived_v2) > 100.0: + flags.append( + f"derived feedthrough power diverges across APIs: " + f"v1={derived_v1:+.1f} W vs v2={derived_v2:+.1f} W" + ) + dp1 = derived_v1 - feed_v1 + if abs(dp1) > 100.0: + flags.append(f"v1 reported feedthrough off Kirchhoff by {dp1:+.1f} W") + if float(v1["feedthrough_consumed_wh"]) < 0: + flags.append( + f"v1 feedthrough_consumed_wh is NEGATIVE ({v1['feedthrough_consumed_wh']:.0f}) — " + f"counter cannot decrease" + ) + de1 = net_feed_der_v1 - net_feed_rpt_v1 + if abs(de1) > 1000.0: + flags.append(f"v1 reported net energy off Kirchhoff by {de1:+,.0f} Wh") + + # Raw downstream-lugs vs derived: this is the upstream-firmware tracker. + # When SPAN ships the fix, raw_ap will converge to derived_v2 and raw_ie + # will stop emitting negative values. + if raw_ap is not None and delta_ap is not None: + if abs(delta_ap) > 100.0: + flags.append( + f"firmware downstream-lugs active-power still offset by " + f"{delta_ap:+.1f} W vs Kirchhoff — upstream defect present" + ) + elif abs(delta_ap) < 50.0: + flags.append( + f"firmware downstream-lugs active-power within {abs(delta_ap):.1f} W of " + f"Kirchhoff — upstream defect MAY be fixed (confirm over sustained samples)" + ) + if raw_ie is not None and raw_ie < 0.0: + flags.append( + f"firmware downstream-lugs imported-energy is NEGATIVE ({raw_ie:.0f} Wh) — " + f"upstream counter still broken" + ) + + for f in flags: + print(f" ! {f}") + + +async def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--host", required=True) + parser.add_argument("--v1-token", required=True) + parser.add_argument("--v2-passphrase", required=True) + parser.add_argument("--port", type=int, default=80) + parser.add_argument("--samples", type=int, default=5) + parser.add_argument("--interval", type=float, default=3.0) + parser.add_argument("--dump-json", type=Path) + args = parser.parse_args() + + v1_cmd = [ + "uv", "run", "--no-project", "--with", "span-panel-api==1.1.15", + "python", str(HERE / "v1_reader.py"), + "--host", args.host, + "--token", args.v1_token, + "--port", str(args.port), + "--samples", str(args.samples), + "--interval", str(args.interval), + ] + v2_cmd = [ + "uv", "run", + "python", str(HERE / "v2_reader.py"), + "--host", args.host, + "--passphrase", args.v2_passphrase, + "--port", str(args.port), + "--samples", str(args.samples), + "--interval", str(args.interval), + ] + + v1_task = asyncio.create_task(_run(v1_cmd, cwd=HERE)) + v2_task = asyncio.create_task(_run(v2_cmd, cwd=SPAN_API_ROOT)) + v1_result, v2_result = await asyncio.gather(v1_task, v2_task) + + v1_samples = v1_result["samples"] + v2_samples = v2_result["samples"] + n = min(len(v1_samples), len(v2_samples)) + if n == 0: + print("no samples captured", file=sys.stderr) + return 1 + + for i in range(n): + _print_sample(i, v1_samples[i], v2_samples[i]) + + if args.dump_json is not None: + args.dump_json.write_text( + json.dumps({"v1": v1_result, "v2": v2_result}, indent=2) + ) + print(f"\nraw JSON written to {args.dump_json}") + + return 0 + + +if __name__ == "__main__": + sys.exit(asyncio.run(main())) diff --git a/scripts/validate_lug_derivation/run_local.sh.example b/scripts/validate_lug_derivation/run_local.sh.example new file mode 100644 index 0000000..b5d2dac --- /dev/null +++ b/scripts/validate_lug_derivation/run_local.sh.example @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +# Template for validate_lug_derivation. Copy to `run_local.sh` (gitignored), +# fill in the three credential fields, and run: `./run_local.sh`. + +set -euo pipefail + +HOST="192.168.X.Y" +V1_TOKEN="" +V2_PASSPHRASE="" + +SAMPLES=5 +INTERVAL=3 +DUMP="/tmp/lug-readings.json" + +cd "$(dirname "$0")" +exec python compare.py \ + --host "$HOST" \ + --v1-token "$V1_TOKEN" \ + --v2-passphrase "$V2_PASSPHRASE" \ + --samples "$SAMPLES" \ + --interval "$INTERVAL" \ + --dump-json "$DUMP" diff --git a/scripts/validate_lug_derivation/v1_reader.py b/scripts/validate_lug_derivation/v1_reader.py new file mode 100644 index 0000000..745ecc7 --- /dev/null +++ b/scripts/validate_lug_derivation/v1_reader.py @@ -0,0 +1,86 @@ +"""Read N v1 snapshots from a live panel and print JSON to stdout. + +Run in an isolated environment with span-panel-api==1.1.15: + + uv run --no-project --with 'span-panel-api==1.1.15' \ + python v1_reader.py --host 192.168.X.Y --token T --samples 5 --interval 3 + +Emits a single JSON object: {"api": "v1", "samples": [{...}, ...]}. +Each sample records `t` (unix seconds), main power/energy, feedthrough +power/energy, and per-circuit instant_power_w / consumed_energy_wh / +produced_energy_wh. +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import sys +import time + +from span_panel_api import SpanPanelClient + + +def _circuit_to_dict(circuit_id: str, circuit: object) -> dict[str, object]: + return { + "circuit_id": circuit_id, + "name": getattr(circuit, "name", "") or "", + "instant_power_w": float(getattr(circuit, "instant_power_w", 0.0)), + "consumed_energy_wh": float(getattr(circuit, "consumed_energy_wh", None) or 0.0), + "produced_energy_wh": float(getattr(circuit, "produced_energy_wh", None) or 0.0), + "tabs": list(getattr(circuit, "tabs", None) or []), + "relay_state": getattr(circuit, "relay_state").value, + } + + +async def read_once(client: SpanPanelClient) -> dict[str, object]: + panel = await client.get_panel_state() + circuits_out = await client.get_circuits() + + main_energy = panel.main_meter_energy + feed_energy = panel.feedthrough_energy + + circuits = [ + _circuit_to_dict(cid, c) + for cid, c in circuits_out.circuits.additional_properties.items() + ] + + return { + "t": time.time(), + "main_power_w": float(panel.instant_grid_power_w), + "feedthrough_power_w": float(panel.feedthrough_power_w), + "main_consumed_wh": float(main_energy.consumed_energy_wh), + "main_produced_wh": float(main_energy.produced_energy_wh), + "feedthrough_consumed_wh": float(feed_energy.consumed_energy_wh), + "feedthrough_produced_wh": float(feed_energy.produced_energy_wh), + "circuits": circuits, + } + + +async def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--host", required=True) + parser.add_argument("--token", required=True) + parser.add_argument("--port", type=int, default=80) + parser.add_argument("--samples", type=int, default=1) + parser.add_argument("--interval", type=float, default=3.0) + args = parser.parse_args() + + samples: list[dict[str, object]] = [] + async with SpanPanelClient( + host=args.host, port=args.port, use_ssl=False, timeout=15.0 + ) as client: + client.set_access_token(args.token) + for i in range(args.samples): + if i > 0: + await asyncio.sleep(args.interval) + samples.append(await read_once(client)) + + json.dump({"api": "v1", "samples": samples}, sys.stdout) + sys.stdout.write("\n") + return 0 + + +if __name__ == "__main__": + sys.exit(asyncio.run(main())) diff --git a/scripts/validate_lug_derivation/v2_reader.py b/scripts/validate_lug_derivation/v2_reader.py new file mode 100644 index 0000000..0443582 --- /dev/null +++ b/scripts/validate_lug_derivation/v2_reader.py @@ -0,0 +1,168 @@ +"""Read N v2 (MQTT) snapshots from a live panel and print JSON to stdout. + +Run from the span-panel-api workspace so the current editable install is used: + + uv run python v2_reader.py --host 192.168.X.Y --passphrase P \ + --samples 5 --interval 3 + +Emits a single JSON object: {"api": "v2", "samples": [{...}, ...]}. +Registers once, connects once, takes N snapshots at the requested interval. +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import sys +import time + +from span_panel_api import ( + MqttClientConfig, + SpanCircuitSnapshot, + SpanMqttClient, + SpanPanelSnapshot, + register_v2, +) + +# Same-repo diagnostic access to library internals: the purpose of this +# script is to observe the raw panel-published downstream-lugs properties +# alongside the library's Kirchhoff-derived feedthrough values, so we can +# detect empirically when the upstream firmware defect is fixed (the SPAN +# API is in beta and does not carry a version signal). +from span_panel_api.mqtt.const import LUGS_DOWNSTREAM # noqa: PLC2701 + + +def _parse_float_or_none(value: str | None) -> float | None: + if value is None or value == "": + return None + try: + return float(value) + except (ValueError, TypeError): + return None + + +def _read_downstream_lugs_raw(client: SpanMqttClient) -> dict[str, float | None]: + """Read the pre-derivation downstream-lugs MQTT properties. + + Reaches into the library's accumulator intentionally — these are the + raw values the library *stopped reading into the snapshot* in 2.6.3. + We surface them here so ``compare.py`` can flag when the firmware + offset collapses (indicating SPAN has shipped the upstream fix). + """ + homie = client._homie # noqa: SLF001 + if homie is None: + return {"active_power_w": None, "imported_energy_wh": None, "exported_energy_wh": None} + node = homie._find_lugs_node(LUGS_DOWNSTREAM) # noqa: SLF001 + acc = homie._acc # noqa: SLF001 + if node is None: + return {"active_power_w": None, "imported_energy_wh": None, "exported_energy_wh": None} + return { + "active_power_w": _parse_float_or_none(acc.get_prop(node, "active-power")), + "imported_energy_wh": _parse_float_or_none(acc.get_prop(node, "imported-energy")), + "exported_energy_wh": _parse_float_or_none(acc.get_prop(node, "exported-energy")), + } + + +def _circuit_to_dict(circuit_id: str, c: SpanCircuitSnapshot) -> dict[str, object]: + return { + "circuit_id": circuit_id, + "name": c.name, + "device_type": c.device_type, + "instant_power_w": c.instant_power_w, + "consumed_energy_wh": c.consumed_energy_wh, + "produced_energy_wh": c.produced_energy_wh, + "tabs": list(c.tabs), + "relay_state": c.relay_state, + } + + +def _snapshot_to_dict( + snap: SpanPanelSnapshot, + downstream_raw: dict[str, float | None], +) -> dict[str, object]: + return { + "t": time.time(), + "main_power_w": snap.instant_grid_power_w, + "feedthrough_power_w": snap.feedthrough_power_w, + "main_consumed_wh": snap.main_meter_energy_consumed_wh, + "main_produced_wh": snap.main_meter_energy_produced_wh, + "feedthrough_consumed_wh": snap.feedthrough_energy_consumed_wh, + "feedthrough_produced_wh": snap.feedthrough_energy_produced_wh, + "downstream_lugs_raw": downstream_raw, + "power_flow_pv": snap.power_flow_pv, + "power_flow_battery": snap.power_flow_battery, + "power_flow_grid": snap.power_flow_grid, + "power_flow_site": snap.power_flow_site, + "upstream_l1_current_a": snap.upstream_l1_current_a, + "upstream_l2_current_a": snap.upstream_l2_current_a, + "downstream_l1_current_a": snap.downstream_l1_current_a, + "downstream_l2_current_a": snap.downstream_l2_current_a, + "pv": { + "feed_circuit_id": snap.pv.feed_circuit_id, + "vendor_name": snap.pv.vendor_name, + "product_name": snap.pv.product_name, + "nameplate_capacity_w": snap.pv.nameplate_capacity_w, + "relative_position": snap.pv.relative_position, + }, + "battery": { + "connected": snap.battery.connected, + "soe_kwh": snap.battery.soe_kwh, + "soe_percentage": snap.battery.soe_percentage, + }, + "evse_node_ids": list(snap.evse.keys()), + "circuits": [_circuit_to_dict(cid, c) for cid, c in snap.circuits.items()], + } + + +async def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--host", required=True) + parser.add_argument("--passphrase", required=True) + parser.add_argument("--port", type=int, default=80) + parser.add_argument("--samples", type=int, default=1) + parser.add_argument("--interval", type=float, default=3.0) + args = parser.parse_args() + + auth = await register_v2( + host=args.host, + name="validate-lug-derivation", + passphrase=args.passphrase, + port=args.port, + ) + + broker = MqttClientConfig( + broker_host=auth.ebus_broker_host, + username=auth.ebus_broker_username, + password=auth.ebus_broker_password, + mqtts_port=auth.ebus_broker_mqtts_port, + ws_port=auth.ebus_broker_ws_port, + wss_port=auth.ebus_broker_wss_port, + ) + + client = SpanMqttClient( + host=args.host, + serial_number=auth.serial_number, + broker_config=broker, + panel_http_port=args.port, + ) + + samples: list[dict[str, object]] = [] + try: + await client.connect() + for i in range(args.samples): + if i > 0: + await asyncio.sleep(args.interval) + snap = await client.get_snapshot() + raw = _read_downstream_lugs_raw(client) + samples.append(_snapshot_to_dict(snap, raw)) + finally: + await client.close() + + json.dump({"api": "v2", "samples": samples}, sys.stdout) + sys.stdout.write("\n") + return 0 + + +if __name__ == "__main__": + sys.exit(asyncio.run(main())) diff --git a/src/span_panel_api/mqtt/field_metadata.py b/src/span_panel_api/mqtt/field_metadata.py index a888506..4a55908 100644 --- a/src/span_panel_api/mqtt/field_metadata.py +++ b/src/span_panel_api/mqtt/field_metadata.py @@ -36,8 +36,15 @@ # Static mapping: (node_type, property_id) → snapshot field path # # This encodes the library's internal knowledge of how _build_snapshot() -# maps Homie properties to snapshot dataclass fields. The mapping must be -# kept in sync with homie.py. +# populates snapshot dataclass fields. For most fields the Homie property +# is both the runtime source and the authority for unit/datatype. For the +# feedthrough power/energy fields the runtime values are derived via +# Kirchhoff from main meter - Σbranches (see ``_derive_feedthrough`` in +# homie.py) and the downstream-lugs entries below contribute unit / +# datatype only — the panel still publishes those properties (with the +# correct schema units), they are just no longer read for their values. +# +# The mapping must be kept in sync with homie.py. # --------------------------------------------------------------------------- _PROPERTY_FIELD_MAP: tuple[tuple[str, str, str], ...] = ( @@ -61,9 +68,13 @@ (TYPE_LUGS_UPSTREAM, "l1-current", "panel.upstream_l1_current_a"), (TYPE_LUGS_UPSTREAM, "l2-current", "panel.upstream_l2_current_a"), # --- Downstream lugs → panel.* (feedthrough) ----------------------------- + # feedthrough_power_w and feedthrough_energy_* are DERIVED at runtime + # (main - Σbranches via ``_derive_feedthrough``); these entries exist so + # the derived fields inherit the downstream-lugs schema unit/datatype. (TYPE_LUGS_DOWNSTREAM, "active-power", "panel.feedthrough_power_w"), (TYPE_LUGS_DOWNSTREAM, "imported-energy", "panel.feedthrough_energy_consumed_wh"), (TYPE_LUGS_DOWNSTREAM, "exported-energy", "panel.feedthrough_energy_produced_wh"), + # Per-phase currents below are read directly from downstream-lugs at runtime. (TYPE_LUGS_DOWNSTREAM, "l1-current", "panel.downstream_l1_current_a"), (TYPE_LUGS_DOWNSTREAM, "l2-current", "panel.downstream_l2_current_a"), # --- Circuit → circuit.* ------------------------------------------------- diff --git a/src/span_panel_api/mqtt/homie.py b/src/span_panel_api/mqtt/homie.py index 88767e8..597da0f 100644 --- a/src/span_panel_api/mqtt/homie.py +++ b/src/span_panel_api/mqtt/homie.py @@ -58,6 +58,52 @@ def _parse_int(value: str, default: int = 0) -> int: return default +def _derive_feedthrough( + circuits: dict[str, SpanCircuitSnapshot], + grid_power: float, + main_consumed: float, + main_produced: float, +) -> tuple[float, float, float]: + """Derive feedthrough power/energy via Kirchhoff at the main bus. + + The panel's native downstream-lugs readings are unreliable on MQTT: + active-power carries a systematic ~400-550 W offset and imported-energy + can emit non-monotonic / negative cumulative values. Main meter and + per-branch readings are accurate, so the energy-balance identities + + P_main = P_feedthrough + Σ(branches, load-perspective) + E_main,net = E_feedthrough,net + Σ(branches, net) + + yield physically-consistent feedthrough values. Branches must be in + load-perspective (positive = consumption) — which is the library's + canonical sign convention, enforced by ``_build_circuit``. The + synthesized PV virtual circuit is already included with the correct sign, + and unmapped tab entries are zero-power, so both participate safely. + + Cumulative feedthrough energy must be derived from the *net* identity + (imported - exported), not by subtracting per-direction counters + independently. A circuit can both consume and produce over time — + the classic case is PV self-consumption on the main panel, where + Σ(consumed) exceeds main.consumed and Σ(produced) exceeds main.produced + even when the net balance is correct. Per-direction subtraction would + emit negative cumulative counters in that regime. Instead we derive the + net feedthrough energy and split it into non-negative consumed/produced + components (only one direction is non-zero at any given snapshot, which + is the best a stateless derivation can produce). + + Returns ``(power_w, consumed_wh, produced_wh)``. + """ + sigma_power = sum(c.instant_power_w for c in circuits.values()) + sigma_net_energy = sum(c.consumed_energy_wh - c.produced_energy_wh for c in circuits.values()) + main_net_energy = main_consumed - main_produced + feedthrough_net_energy = main_net_energy - sigma_net_energy + return ( + grid_power - sigma_power, + max(feedthrough_net_energy, 0.0), + max(-feedthrough_net_energy, 0.0), + ) + + class HomieDeviceConsumer: """Build SPAN-specific snapshots from accumulated Homie property state. @@ -164,7 +210,23 @@ def _rebuild_dirty_circuits(self, dirty: frozenset[str]) -> SpanPanelSnapshot: unmapped = self._build_unmapped_tabs(updated_circuits) updated_circuits.update(unmapped) - return dataclasses.replace(cached, circuits=updated_circuits) + # Re-derive feedthrough using current Σcircuits against the cached + # main meter values. If upstream-lugs had been dirty the full-rebuild + # path would run instead, so the cached main values are still fresh. + feed_power, feed_consumed, feed_produced = _derive_feedthrough( + updated_circuits, + cached.instant_grid_power_w, + cached.main_meter_energy_consumed_wh, + cached.main_meter_energy_produced_wh, + ) + + return dataclasses.replace( + cached, + circuits=updated_circuits, + feedthrough_power_w=feed_power, + feedthrough_energy_consumed_wh=feed_consumed, + feedthrough_energy_produced_wh=feed_produced, + ) def _find_lugs_node(self, direction: str) -> str | None: """Find the lugs node with a specific direction. @@ -525,17 +587,13 @@ def _build_snapshot(self) -> SpanPanelSnapshot: l2_i = self._acc.get_prop(upstream_lugs, "l2-current") upstream_l2_current = _parse_float(l2_i) if l2_i else None - # Downstream lugs → feedthrough - feedthrough_power = 0.0 - feedthrough_consumed = 0.0 - feedthrough_produced = 0.0 + # Downstream lugs → per-phase currents only. + # Feedthrough power/energy are derived from Kirchhoff further below; + # the panel's native active-power / imported-energy / exported-energy + # on downstream-lugs are unreliable (see comment at derivation site). downstream_l1_current: float | None = None downstream_l2_current: float | None = None if downstream_lugs is not None: - feedthrough_power = _parse_float(self._acc.get_prop(downstream_lugs, "active-power")) - feedthrough_consumed = _parse_float(self._acc.get_prop(downstream_lugs, "imported-energy")) - feedthrough_produced = _parse_float(self._acc.get_prop(downstream_lugs, "exported-energy")) - dl1_i = self._acc.get_prop(downstream_lugs, "l1-current") downstream_l1_current = _parse_float(dl1_i) if dl1_i else None dl2_i = self._acc.get_prop(downstream_lugs, "l2-current") @@ -574,6 +632,10 @@ def _build_snapshot(self) -> SpanPanelSnapshot: unmapped = self._build_unmapped_tabs(circuits) circuits.update(unmapped) + feedthrough_power, feedthrough_consumed, feedthrough_produced = _derive_feedthrough( + circuits, grid_power, main_consumed, main_produced + ) + # Battery, PV, and EVSE metadata battery = self._build_battery() pv = self._build_pv() diff --git a/tests/test_mqtt_homie.py b/tests/test_mqtt_homie.py index fa92a0e..16fab43 100644 --- a/tests/test_mqtt_homie.py +++ b/tests/test_mqtt_homie.py @@ -563,20 +563,93 @@ def test_upstream_lugs_to_main_meter(self): assert snapshot.main_meter_energy_consumed_wh == 100000.0 assert snapshot.main_meter_energy_produced_wh == 5000.0 - def test_downstream_lugs_to_feedthrough(self): - """Test typed lugs (energy.ebus.device.lugs.downstream) map to feedthrough.""" + def test_feedthrough_derived_from_main_minus_circuits(self): + """Feedthrough power/energy are derived via Kirchhoff. + + Power: ``P_main − Σ(branches)``. + Energy: the *net* identity ``(main.consumed − main.produced) − + Σ(branch.net)``, then split into non-negative consumed/produced. + The panel's native downstream-lugs active-power / imported-energy / + exported-energy are ignored because they are unreliable on MQTT. + """ acc, consumer = _build_ready_consumer() - acc.handle_message(f"{PREFIX}/lugs-downstream/active-power", "1000.0") - acc.handle_message(f"{PREFIX}/lugs-downstream/imported-energy", "50000.0") - acc.handle_message(f"{PREFIX}/lugs-downstream/exported-energy", "1000.0") + # Main meter + acc.handle_message(f"{PREFIX}/lugs-upstream/active-power", "5000.0") + acc.handle_message(f"{PREFIX}/lugs-upstream/imported-energy", "100000.0") + acc.handle_message(f"{PREFIX}/lugs-upstream/exported-energy", "8000.0") + # One circuit: 1500 W of consumption, 3000 Wh consumed, 200 Wh produced. + # Homie raw active-power is grid-perspective; _build_circuit negates to + # load-perspective. Counters: exported-energy → consumed, imported → produced. + cid = "aabbccdd-1122-3344-5566-778899001122" + acc.handle_message(f"{PREFIX}/{cid}/active-power", "-1500.0") + acc.handle_message(f"{PREFIX}/{cid}/exported-energy", "3000.0") + acc.handle_message(f"{PREFIX}/{cid}/imported-energy", "200.0") + # Downstream-lugs active-power / energy values that would be used + # pre-derivation — set to obviously wrong numbers to prove they are + # ignored by the derivation path. + acc.handle_message(f"{PREFIX}/lugs-downstream/active-power", "99999.0") + acc.handle_message(f"{PREFIX}/lugs-downstream/imported-energy", "99999.0") + acc.handle_message(f"{PREFIX}/lugs-downstream/exported-energy", "99999.0") + + snapshot = consumer.build_snapshot() + # feedthrough_power = 5000 − 1500 = 3500 + assert snapshot.feedthrough_power_w == 3500.0 + # main_net = 100000 − 8000 = 92000; Σ(branch.net) = 3000 − 200 = 2800; + # feedthrough_net = 92000 − 2800 = 89200 (positive → consumed). + assert snapshot.feedthrough_energy_consumed_wh == 89200.0 + assert snapshot.feedthrough_energy_produced_wh == 0.0 + + def test_feedthrough_energy_derivation_handles_pv_self_consumption(self): + """Per-direction Σ(circuit) can exceed main-meter per-direction totals + when circuits flow bidirectionally (the classic case: PV producing on + one branch while loads consume on another on the same main panel). + A naive per-direction subtraction would emit negative cumulative + feedthrough counters; the net-based derivation stays non-negative. + + Scenario: + - Main meter: panel net-exported 500 Wh over its lifetime + (main.consumed = 0, main.produced = 500). + - PV circuit: produced 1000 Wh, consumed 0. + - Load circuit: consumed 500 Wh, produced 0. + - Σ(branch.consumed) = 500 > main.consumed; Σ(branch.produced) = + 1000 > main.produced. Per-direction subtraction would yield + consumed = −500 and produced = −500 — both impossible. + - Net derivation: main_net = −500, Σ(branch.net) = −500, + feedthrough_net = 0 → consumed = 0, produced = 0. + """ + pv_id = "aaaaaaaa-0000-0000-0000-000000000001" + load_id = "bbbbbbbb-0000-0000-0000-000000000002" + acc, consumer = _build_ready_consumer( + { + "core": {"type": TYPE_CORE}, + "lugs-upstream": {"type": TYPE_LUGS_UPSTREAM}, + "lugs-downstream": {"type": TYPE_LUGS_DOWNSTREAM}, + pv_id: {"type": TYPE_CIRCUIT}, + load_id: {"type": TYPE_CIRCUIT}, + "bess-0": {"type": TYPE_BESS}, + } + ) + acc.handle_message(f"{PREFIX}/lugs-upstream/imported-energy", "0.0") + acc.handle_message(f"{PREFIX}/lugs-upstream/exported-energy", "500.0") + # PV: produced 1000 Wh → imported-energy on the Homie wire. + acc.handle_message(f"{PREFIX}/{pv_id}/imported-energy", "1000.0") + acc.handle_message(f"{PREFIX}/{pv_id}/exported-energy", "0.0") + # Load: consumed 500 Wh → exported-energy on the Homie wire. + acc.handle_message(f"{PREFIX}/{load_id}/imported-energy", "0.0") + acc.handle_message(f"{PREFIX}/{load_id}/exported-energy", "500.0") snapshot = consumer.build_snapshot() - assert snapshot.feedthrough_power_w == 1000.0 - assert snapshot.feedthrough_energy_consumed_wh == 50000.0 - assert snapshot.feedthrough_energy_produced_wh == 1000.0 + assert snapshot.feedthrough_energy_consumed_wh == 0.0 + assert snapshot.feedthrough_energy_produced_wh == 0.0 def test_generic_lugs_with_direction_property(self): - """Test fallback: generic TYPE_LUGS + direction property.""" + """Test fallback: generic TYPE_LUGS + direction property. + + Verifies both lugs nodes are detected via the generic-type + + direction-property path. Downstream detection is proven via its + l1/l2-current properties (the only fields still read from that node + since feedthrough power/energy are derived via Kirchhoff). + """ acc, consumer = _build_ready_consumer( { "core": {"type": TYPE_CORE}, @@ -591,17 +664,21 @@ def test_generic_lugs_with_direction_property(self): acc.handle_message(f"{PREFIX}/upstream-lugs/exported-energy", "3000.0") acc.handle_message(f"{PREFIX}/downstream-lugs/direction", "DOWNSTREAM") - acc.handle_message(f"{PREFIX}/downstream-lugs/active-power", "200.0") - acc.handle_message(f"{PREFIX}/downstream-lugs/imported-energy", "40000.0") - acc.handle_message(f"{PREFIX}/downstream-lugs/exported-energy", "500.0") + acc.handle_message(f"{PREFIX}/downstream-lugs/l1-current", "7.5") + acc.handle_message(f"{PREFIX}/downstream-lugs/l2-current", "6.2") snapshot = consumer.build_snapshot() assert snapshot.instant_grid_power_w == 800.0 assert snapshot.main_meter_energy_consumed_wh == 90000.0 assert snapshot.main_meter_energy_produced_wh == 3000.0 - assert snapshot.feedthrough_power_w == 200.0 - assert snapshot.feedthrough_energy_consumed_wh == 40000.0 - assert snapshot.feedthrough_energy_produced_wh == 500.0 + # No circuits set up → Σ=0 → feedthrough_power = main directly; + # feedthrough net energy = 90000 − 3000 = 87000 (positive → consumed). + assert snapshot.feedthrough_power_w == 800.0 + assert snapshot.feedthrough_energy_consumed_wh == 87000.0 + assert snapshot.feedthrough_energy_produced_wh == 0.0 + # Downstream-lugs detection proven via per-phase currents. + assert snapshot.downstream_l1_current_a == 7.5 + assert snapshot.downstream_l2_current_a == 6.2 # --------------------------------------------------------------------------- diff --git a/uv.lock b/uv.lock index c9fdd0f..949dfc4 100644 --- a/uv.lock +++ b/uv.lock @@ -1310,7 +1310,7 @@ wheels = [ [[package]] name = "span-panel-api" -version = "2.6.2" +version = "2.6.3" source = { editable = "." } dependencies = [ { name = "httpx" },