Cap firmware-job output mid-run + extract stream_events helper + WS backpressure handling#117
Conversation
``_trim_job_output`` runs only in the runner's ``finally`` block, so ``job.output`` grew unbounded for the lifetime of a still-running subprocess. A build that streams gigabytes of stderr in a tight loop (chatty ``external_components`` retry, esptool stuck on a repeating error, a misbehaving custom component dumping forever) holds every line in memory until the subprocess exits — the dashboard process OOMs first. Generalise ``_trim_job_output`` with a ``keep=`` kwarg and call it inline whenever ``job.output`` crosses ``_MAX_OUTPUT_LINES_INFLIGHT`` (50_000 lines, comfortably above the worst-case successful build ~10k and a verbose-failure ~30k, but bounds memory at ~5-10MB of strings even on the most adversarial output). The post-completion trim still pulls the kept slice down to ``_MAX_OUTPUT_LINES_RETAINED`` (2_000), and the elided-counter chains across both passes so the prepended ``... [output trimmed: N earlier line(s) elided]`` notice reflects the cumulative drop. Pin the contract with seven focused tests on ``_trim_job_output``: - Below-cap noop, above-cap drops head + prepends notice. - Re-trim is idempotent (same buffer in, same buffer out). - Cumulative elided count across two distinct trim cycles — catches a regression where the second trim would erase the first trim's contribution. - Sanity check that the in-flight cap is strictly larger than the retention cap (locks the relationship; a future tweak inverting them would surface immediately). - ``keep=_MAX_OUTPUT_LINES_INFLIGHT`` preserves more lines than the default retention cap. - The mid-run + post-completion combo chains correctly: in-flight trim plus terminal trim sum the elided counts. Pure refactor of an existing helper plus one new call site in the streaming loop. Full suite still passes (542 / 3 skipped, +7 new).
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #117 +/- ##
==========================================
+ Coverage 71.16% 73.21% +2.05%
==========================================
Files 36 36
Lines 4723 4775 +52
==========================================
+ Hits 3361 3496 +135
+ Misses 1362 1279 -83
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR adds an in-flight output cap for firmware jobs so long-running compile/upload subprocesses do not let job.output grow without bound in the dashboard backend. It extends the existing terminal-state trimming logic in the firmware controller and adds focused tests for the trim helper’s retention and elided-count behavior.
Changes:
- Generalize
_trim_job_output()with akeep=parameter so it can be used for both mid-run and post-completion trimming. - Apply trimming inside the subprocess streaming loop once output exceeds a new in-flight cap.
- Add unit tests covering retention behavior, idempotence, cumulative elided counts, and the interaction between mid-run and final trimming.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
esphome_device_builder/controllers/firmware.py |
Adds the new in-flight output cap and updates the trim helper used by firmware job execution. |
tests/test_job_output_cap.py |
Adds regression tests for _trim_job_output() and the new in-flight trimming behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Previous revision kept tweaking the absolute numbers; the right relationship is parametric. The post-completion retention floor (``_MAX_OUTPUT_LINES_RETAINED = 2000``) was fine — the only thing the audit fix needed to add is bounded mid-run growth. Define the in-flight cap as exactly 2x the retention floor and trim back to the retention floor when crossed: - ``_MAX_OUTPUT_LINES_RETAINED = 2000`` (unchanged from original) - ``_MAX_OUTPUT_LINES_INFLIGHT = _MAX_OUTPUT_LINES_RETAINED * 2`` - ``_INFLIGHT_TRIM_KEEP = _MAX_OUTPUT_LINES_RETAINED`` A user tailing a live build sees up to 2x the kept window during the run before old lines start aging off. Hysteresis: trimming to the retention floor leaves 2000 lines of headroom before the next trim fires, so each O(cap) slice copy is amortised across 2000 appends instead of paying per-line on adversarial output. Choosing ``keep == retained`` also makes the post-completion trim a no-op for any build that already triggered the in-flight trim — never a second round of context loss. Updates the chain test to reflect that ``keep == retained`` collapses the post-completion contribution to zero, and tightens the invariant test to ``cap > keep >= retained`` (equality on the right is intentional, see the docstring). Pure parametric tweak of the previous PR's commit; full suite still passes (550 / 3 skipped, +8 from the new file).
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Copilot review on #117 flagged that ``_trim_job_output``'s list reassignment can drop lines for clients following a running job. The pre-existing follow_job shape iterated ``job.output`` directly and only attached the bus listener afterwards, so a line appended during the history-send awaits fired a ``JOB_OUTPUT`` event with no subscriber for that follower and was silently dropped. The in-flight cap exacerbated the gap: post-trim ``job.output`` is a new list reference, so the iteration over the OLD reference was blind to every append after the first cap-crossing. Restructure follow_job so the snapshot and the subscription happen in synchronous-adjacent statements (no awaits between them). The bus fires synchronously and the streaming loop can only interrupt the coroutine at await points, so the loop cannot append between the snapshot and the listener attach — every line is either in the snapshot or captured by the listener, never both, never neither. The listener buffers events into an ``asyncio.Queue`` instead of creating per-event tasks; the body sends history first before draining the queue. That pins ordering: history strictly precedes live, and within each group the original order is preserved. Backpressure flows naturally through the queue — tradeoff against the previous create-task-per-event approach, which delivered concurrently but could deliver out of order on a contended loop. Pin the contract with four focused tests on ``follow_job``: - Terminal-job replay: full history then a single ``result``, exit code carried through. - Live events fired during history send arrive after history, in order — the test fires JOB_OUTPUT mid-flight by yielding control after follower setup. - Listener filters by ``job_id`` so concurrent jobs don't bleed output across followers. - Snapshot+subscribe atomicity: a line fired after the follower's first await yields lands in live (not history, never both, never neither).
Copilot review on #117 flagged that the post-exit error-detail path runs ``"".join(job.output)`` to look for the ``No module named esphome`` substring; with the new in-flight trim in place, a long noisy build can elide that line from ``job.output`` before the post-exit handler runs. The user then gets the generic ``"Process exited 0 but output contains errors"`` fallback instead of the actionable ``"esphome is not importable from the dashboard's Python environment …"`` install hint. Cure: pin the verdict at append time. ``_check_error`` already had the line in hand on each iteration of the streaming loop; add a ``saw_no_esphome_module`` flag alongside the existing ``has_error_in_output`` capture and consult that flag in the post-exit branch instead of re-scanning the (possibly-trimmed) buffer. Set-once, never-cleared. Also fixes the stale docstring on ``_trim_job_output`` that described the in-flight call as overriding ``keep`` to a *larger* value — current shape uses the same retention floor for both paths, with the difference living in the trigger threshold (2x the keep). Reworded the docstring to point at the actual relationship. Pin the at-append capture with a focused closure test that mirrors the runner's exact shape: define the flag + closure, feed a ``ModuleNotFoundError`` line followed by 2x cap worth of unrelated noise, verify the flag stays True after the source line would have been trimmed away.
Follow-up on the previous commit: the original substring check ``"No module named esphome" in full_output`` (note: no quotes around ``esphome``) never matched CPython's actual error format, which is ``No module named 'esphome'`` (single-quoted module name). The check was effectively dead code on real Python error output; the actionable install hint never fired in production. Two-substring match — ``"No module named"`` plus ``"esphome"`` — catches both the quoted CPython form and any unquoted re-thrown wrappers ESPHome emits. Test fixture exercises the realistic quoted form so a future regex tightening doesn't silently regress back to the broken behaviour.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Three follow-ups on Copilot's second-round review: - ``follow_job`` listener now subscribes to ``JOB_CANCELLED`` too — the runner fires it for both queued cancels (``cancel()``) and mid-run cancels (``_execute_job``'s ``cancel_requested`` branch and the ``CancelledError`` handler), but the previous shape only watched ``JOB_COMPLETED`` / ``JOB_FAILED`` so the follower hung forever once the user clicked Cancel. Wired through a new module-level ``_JOB_TERMINAL_EVENTS`` frozenset that mirrors ``_TERMINAL_JOB_STATUSES``; the in-listener ``in`` check now reads against the named constant rather than an inline tuple. - ``_FakeClient.send_event`` in tests/test_follow_job_race.py now ``await``s ``asyncio.sleep(0)`` so the history-send loop yields between iterations. Without that yield the entire snapshot drained in one uninterrupted task slice, so the test's ``bus.fire`` calls never landed mid-history-send and the race-window assertions weren't actually exercising the race they claimed to. With the yield, mid-history-send fires queue through the listener as intended and the strict-ordering assertion is meaningful. - Dropped the four base ``_trim_job_output`` cases from tests/test_job_output_cap.py — those (noop under cap, drop+notice over cap, idempotent re-trim, cumulative elided count) are already covered in tests/test_firmware_helpers.py. Helper expectations belong in one place to avoid drift; the remaining cases in test_job_output_cap.py are specifically about the in-flight cap and its interaction with the post-completion trim, which is unique to this PR.
|
there are leaks. I'm going to make a contextmanager to avoid leak patterns |
PR #117's scope is the in-flight output cap + the issues that surfaced during its review (follow_job race, JOB_CANCELLED hang, listener leak, EventBus.listening helper). The audit document belongs separately, not bundled with the bugfix.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Cap the follow_job queue at _MAX_OUTPUT_LINES_INFLIGHT and drop
newest on full so a slow follower can't accumulate every line in
memory the way the unbounded version did.
- Add _put_evicting helper for the terminal result + sentinel so the
drain loop always breaks even when the queue is full of unread
output (otherwise a stuck follower would park on queue.get forever
after a build finished).
- Switch event_bus.listening to per-iteration append + try/finally
so a partial subscribe (raise mid-loop) still releases listeners
attached so far instead of leaking them.
- Tighten saw_no_esphome_module to look for CPython's exact quoted
form ("No module named 'esphome'") rather than two loose
substrings — the previous check matched siblings like
"No module named 'esphome_dashboard'".
- Cover the new bounded-queue drop and eviction paths and the
JOB_CANCELLED follow_job path with tests.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Pulls the bounded-queue + atomic-snapshot+subscribe + drain-loop machinery out of follow_job and into a shared helpers.event_bus.stream_events helper, then routes follow_job, follow_jobs, and _cmd_subscribe_events through it. All three sites now share the same correctness properties: - Listeners attach inside `bus.listening` *before* the snapshot is awaited. Events fired during the seed-replay queue rather than racing it. - A bounded asyncio.Queue (default 4000 slots) drops newest on full for normal events via push() and evicts oldest for must-land items via push_priority(). A slow follower can no longer pin every fired event in memory. - Cancelling the helper task releases every listener via the with-block finally — same cleanup the previous follow_job had. This closes the race Copilot flagged on follow_jobs and _cmd_subscribe_events: both used create_task fan-out, so a DEVICE_UPDATED / JOB_QUEUED firing during the snapshot await arrived before the snapshot at the client. With stream_events the seed and live events are serialised through a single drain loop and ordering holds. Also clean up the remaining review notes: - Extract _is_no_module_named_esphome to a module-level helper so the regression test calls production code instead of reimplementing the substring check locally. - Update the inline trim-cost comment to reference the current cap=4000/keep=2000 numbers. - Refresh _cmd_subscribe_events docstring to reflect the new path. Tests added: - test_stream_events.py — pins the four helper contracts (snapshot ordering, cancel cleanup, end-to-break drain, push drop-on-full, push_priority eviction). - test_follow_jobs_race.py — locks the snapshot/live ordering for the all-jobs feed and the cancel-cleanup contract. - test_subscribe_events_cleanup.py — adds an ordering test for the device-events feed.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- follow_jobs: serialize the snapshot to dicts synchronously
*before* stream_events attaches listeners. Capturing the
FirmwareJob objects and calling to_dict() inside send_initial
was racy: between listener attach and each to_dict() the
runner could append output or transition status, so the same
update was folded into the snapshot dict AND delivered as a
live event. Dict-freeze closes the duplication.
- follow_jobs: route lifecycle events (queued/started/completed/
failed/cancelled) through push_priority instead of push so a
backlog of job_output can't drop a status transition. Without
that, a missed job_completed leaves the all-jobs panel stuck
on the old status forever (no resync after the snapshot).
job_output / job_progress remain on push (lossy is fine for
log lines).
- test_stream_events.test_push_drops_newest_when_queue_full:
release the drain gate and yield generously *before*
cancellation so the drain actually flushes the backlog. The
earlier shape cancelled immediately after gating, so the
assertion (`<= 1 + cap`) passed even against an unbounded
queue (received never grew past the seed). Strict-equality
assertion now fails against an unbounded queue.
- test_subscribe_events_subscribed_arrives_before_live_events:
give the test real devices and a yielding fake client so
send_initial actually awaits during the seed. The earlier
shape had db.devices=None and a non-yielding send_result, so
the handler finished send_initial in one task slice and the
live event was always queued *after* the seed (whether
ordering was guaranteed or not). With real awaits in
send_initial, the live event must queue mid-seed and arrive
after both initial_state and the subscribed confirm — which
is what the test now pins.
- Replace `lambda j: j.created_at` sort keys with
operator.attrgetter("created_at") in three sites — idiomatic
Python and slightly faster (avoids per-call function call
overhead).
Per maintainer feedback, silent drops on subscribe_events are worse than a forced disconnect: a missed DEVICE_REMOVED leaves the dashboard permanently stale until the user manually reconnects. Forcing the connection closed lets the frontend reconnect, call subscribe_events again, and rebuild from a fresh initial_state snapshot. Mechanism: - New StreamBackpressureError + push_or_terminate() primitive on StreamControls. push_or_terminate enqueues a terminate sentinel via push_priority on QueueFull; the helper's drain loop turns the sentinel into a raised exception so the surrounding handler exits. - subscribe_events now uses push_or_terminate. follow_job / follow_jobs keep their existing semantics (push for log lines, push_priority for must-land lifecycle events) — losing a stray output line is fine, but a state-tracking stream needs every event or the UI diverges. - WS dispatch catches StreamBackpressureError, sends an error payload so the client knows why, and calls schedule_close(). The aiohttp WS closes after the next send, the client reconnects and resyncs. Tests: - test_stream_events.test_push_or_terminate_raises_when_queue_full exercises the overflow path end-to-end and asserts the helper raises. - test_stream_events.test_push_or_terminate_does_not_raise_when_queue_has_room pins the no-overflow path so a regression where every event raises wouldn't slip through.
Verified by temporarily reverting the dict-freeze in follow_jobs to the racy shape (capture FirmwareJob objects, call to_dict inside the loop): the test fails with snapshot[b].output containing both pre-snapshot AND mid-snapshot lines (the racy to_dict captured the post-mutation state). With the dict-freeze restored the test passes — snapshot[b] sees only pre-snapshot content because the dict was captured before listeners attached and before any await. Two-job shape is required to actually exercise the race. With a single job the to_dict runs synchronously before any await, so the mutation can't interleave. With two jobs the helper awaits send_event for snapshot[a], the runner mutates job_b mid-yield, and the next iteration's to_dict captures the now-mutated state.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Three fixes for Copilot review:
1. WS dispatch was calling schedule_close() *after* send_error()
on StreamBackpressureError. send() only closes when the flag
is already set when a message is being written, so the close
never fired — the connection stayed open with the handler
task gone, and the frontend would stop receiving events but
never get the forced reconnect this branch was designed to
provoke. Swapped the order so the close fires as part of the
error send.
2. subscribe_events was using push_or_terminate for every event
type, including JOB_*. But initial_state only reseeds devices
+ importable — there's no job snapshot, so a forced
disconnect on a JOB_* overflow couldn't actually recover the
missed job state. The authoritative job stream is
follow_jobs (which has its own snapshot), so subscribe_events
now uses push (lossy drop-newest) for JOB_* events and keeps
push_or_terminate for the device events that ARE
resync-able via initial_state.
3. Add tests/test_ws_dispatch_backpressure.py with three cases:
- close-after-send ordering (verified the bug detection by
temporarily reverting the swap)
- generic exceptions don't close the connection
- send-failure during the error frame still closes
Plus mid-policy coverage in test_subscribe_events_cleanup.py
for the JOB_*-uses-push path and the DEVICE_*-uses-terminate
path so the per-event-type policy is locked in.
The 1-line _push_priority(name, payload) -> _push_priority_item((name, payload)) wrapper existed only to bridge the (name, payload) public signature with the item-based eviction helper. Drop the wrapper and inline the bridge as lambdas at the StreamControls construction site — same behaviour, one less indirection. _push_priority_item renamed to _force_enqueue since it's now the only function with that role.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
A client 4000+ events behind is already broken — its UI is showing wildly stale state regardless of policy. Selective handling (lossy push for log lines, push_priority for lifecycle, push_or_terminate for device events) added complexity without a real user-visible win: the connection is in a degraded state either way. Replaced the three-way split with uniform push_or_terminate. On overflow the WS closes, the client reconnects, and: - Device state reseeds from initial_state. - Authoritative job state comes from follow_jobs (which has its own snapshot for clean reconnect resync). For all-in-one clients that depend on subscribe_events for job events too, follow_jobs is the documented recovery path. The docstring on _cmd_subscribe_events spells this out so a future reader knows the design intent. Tests collapse the three policy-specific cases (drops_job_events_silently / terminates_on_device_event / lifecycle_events_always_land) into one parametrised test_subscribe_events_terminates_on_overflow_uniformly that exercises both event families with the same expected outcome.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Summary
Started as a targeted
_trim_job_outputcap on the firmware streaming loop; review surfaced adjacent correctness gaps that landed in this same PR. Now covers four overlapping concerns:_trim_job_outputruns inline in the streaming loop whenjob.outputcrosses_MAX_OUTPUT_LINES_INFLIGHT(4,000 lines), trimming back to_MAX_OUTPUT_LINES_RETAINED(2,000) with hysteresis so the nextcap - keepappends don't each pay an O(cap) slice copy. The post-completion trim still fires infinally; the elided-counter chains across both passes.helpers.event_bus.stream_eventshelper. Boundedasyncio.Queue(default 4,000 slots), atomic snapshot+subscribe, drain loop, cleanup-on-cancel. Replaces three ad-hoc patterns with one shared primitive that gets the OOM bound, the seed/live race window, and listener-leak-on-disconnect right in one place.follow_job/follow_jobs/_cmd_subscribe_eventsthroughstream_events. Closes:follow_jobsand_cmd_subscribe_events(previous shape usedcreate_taskfan-out; events fired during the seed could outrace it).follow_jobs(capturing liveFirmwareJobobjects and callingto_dict()later let mid-snapshot mutations duplicate into both the snapshot dict and the live event).subscribe_eventsreconnect (pre-existing; helper'swith bus.listeningcleans up on cancel).StreamBackpressureError+push_or_terminateprimitive: whensubscribe_eventsoverflows its bounded queue (client 4,000+ events behind), the dispatcher closes the WS so the client reconnects and reseeds device state frominitial_state. Authoritative job state lives infollow_jobs(which has its own snapshot), so a client needing reliable jobs subscribes there. Per-event-type splitting was tried then simplified to uniform fail-closed: a client that far behind has a wildly stale UI either way.Also: tightened the
"No module named 'esphome'"detection to the exact CPython quoted form (avoids false-positive sibling matches like'esphome_dashboard'); promoted the detection to append-time so the in-flight trim can't elide the line; replacedlambda j: j.created_atsort keys withoperator.attrgetter.Why
_trim_job_outputonly ran in the runner'sfinallyblock, sojob.outputgrew without bound for the lifetime of a still-running subprocess. A build streaming gigabytes of stderr in a tight loop held every line in memory until the subprocess exited; the dashboard process OOMs first.The other concerns landed here because they share the same memory/race surface — every WS-streaming command was implementing its own bounded-queue + snapshot+subscribe + cleanup-on-disconnect logic, and each had a slightly different bug. Extracting one helper and migrating the three sites through it was strictly safer than fixing each one in isolation, and Copilot's review surfaced the gaps systematically.
Test plan
uv run pytest tests/test_job_output_cap.py— original cap tests +_is_no_module_named_esphomepredicate test.uv run pytest tests/test_stream_events.py— pins the helper's contracts (snapshot/live ordering, cancel cleanup, end-breaks-drain, push drop-on-full, push_priority eviction, push_or_terminate raises).uv run pytest tests/test_follow_job_race.py— single-job follow race + slow-follower drop + terminal eviction + JOB_CANCELLED path.uv run pytest tests/test_follow_jobs_race.py— multi-job snapshot/live ordering + dict-freeze concurrent-mutation pin.uv run pytest tests/test_subscribe_events_cleanup.py— listener cleanup on cancel + seed-vs-live ordering + parametrised fail-closed-on-overflow.uv run pytest tests/test_ws_dispatch_backpressure.py— WS dispatch's close-after-send ordering (verified by temporarily reverting the swap), generic-exception non-close, send-failure-still-closes.uv run pytest— 663 passed.Security context
Discovered during a security audit; mid-run cap was the #1 actionable finding (#120). The follower-queue OOM and listener leak surfaced during this PR's review and bundle here because they share the streaming surface. Other audit findings either turned out to be false positives or warrant separate scope.