rpc: wire up reader thread + per-command-id dispatcher#2
Open
halbothpa wants to merge 2 commits into
Open
Conversation
The reader-thread / pending-call infrastructure has been declared but
dead since the file was written: `_reader_thread`, `_handle_message`,
`_pending`, `_notifications` all existed, none were used. Every call
went through synchronous `_send` + bare `_recv`, with `_recv`
unprotected by the `_io_lock`. Consequence: concurrent callers (e.g.
MCP tool + a viewer) raced on serial reads, log subscription was
silently broken (push messages with no one to dispatch them), and
`subscribe_logs` would corrupt the next command response by leaving
notifications in the buffer.
This commit makes the dispatcher real:
* `_reader_loop` runs in a daemon thread started by `open()` and
stopped by `close()`. It's the only caller of `_recv_raw` (renamed
from `_recv`).
* `_handle_message` routes by content kind:
- `system_log_response` -> `_log_lines`, `_on_log`, and any
queues from `subscribe_log_queue()`
- `gui_screen_frame` -> queues from `subscribe_frames()`
- everything else -> `_pending[command_id]` (via
`_PendingCall.event`), or `_notifications` if unrouted.
* Per-command correlation uses `_PendingCall` (Event + msg list +
optional error). `_request` waits for `has_next=False`, returns the
final response; `_request_stream` returns the full chain.
* Subscriber queues are bounded (frames: 30, logs: 4096) and drop
oldest on overflow, so a slow consumer can't pin memory.
* On close / EOF / reader-thread exit, every still-waiting call is
failed with `ConnectionError` so request methods don't hang.
Public surface gains:
* `start_screen_stream()` / `stop_screen_stream()`
* `subscribe_frames()` / `unsubscribe_frames()`
* `subscribe_log_queue()` / `unsubscribe_log_queue()`
`screen_frame()` is reimplemented in terms of `subscribe_frames` +
`start_screen_stream`, so one-shot grabs and a long-lived viewer can
coexist on the same `RpcSession` (every subscriber receives every
frame).
`device_info`, `storage_list`, `storage_read`, `storage_write` are
updated to use the new `_request*` contract; previously they bypassed
`_request` and did their own send + recv loops which the dispatcher
would have mis-routed.
Verified against a real Flipper (Momentum mntm-dev, COM4):
* multi-message responses drain correctly
* screen subscriber queue receives push frames
* concurrent `device_info()` from two threads returns matching results
(the prior code couldn't do this — same-port races corrupted both
responses).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Addresses two issues found in code review of the dispatcher commit: * The reader-loop `except Exception: continue` branch had no backoff and no cap, so a persistent non-EOF parse error (e.g. a varint overflow or a protobuf decode failure on a wedged stream) would spin a CPU core at 100 % indefinitely. Now: sleep 10 ms per error iteration and give up after 50 consecutive failures, treating it the same as EOF so pending callers unblock. * `screen_frame()` subscribed to the frame stream before sending `StartScreenStreamRequest`. If a previous-but-not-yet-stopped stream still had frames in flight, one of those would arrive in the new subscriber queue and be returned as the "new" capture. Now: drain the queue between `start_screen_stream` (which blocks on the ACK so all earlier wire traffic has been processed) and the awaited `q.get()`, so we never return a frame that predates our intended start point. Verified against a real Flipper: * three back-to-back `screen_frame()` calls all return 1024 bytes * two concurrent `screen_frame()` from separate threads both succeed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR wires RpcSession to use a background serial reader and command-id-based response dispatching, enabling concurrent RPC calls and fan-out of pushed log/frame messages.
Changes:
- Adds pending-call tracking, reader-thread dispatch, and failure propagation for in-flight calls.
- Adds bounded queue subscribers for logs and screen frames.
- Migrates device info, storage, and screen frame APIs onto the new request/stream infrastructure.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+510
to
+512
| Activate device-side log delivery with :meth:`subscribe_logs` first. | ||
| """ | ||
| q: "queue.Queue[str]" = queue.Queue(maxsize=maxsize) |
Comment on lines
566
to
+587
| # Each fork patches certain keys uniquely; sniff them. | ||
| origin = (props.get("firmware_origin_fork") or "").lower() | ||
| commit = (props.get("firmware_commit") or "").lower() | ||
| branch = (props.get("firmware_branch") or "").lower() | ||
| for key in (origin, commit, branch): | ||
| if "momentum" in key: | ||
| return "momentum" | ||
| if "unleashed" in key: | ||
| return "unleashed" | ||
| if "roguemaster" in key: | ||
| return "roguemaster" | ||
| if "xtreme" in key: | ||
| return "xtreme" | ||
| return "ofw" | ||
|
|
||
| def screen_frame(self) -> bytes: | ||
| """Return the current 128x64 framebuffer (1024 bytes, 1bpp packed).""" | ||
| self._request(gui_start_screen_stream_request=gui_pb2.StartScreenStreamRequest()) | ||
| # First frame arrives as a notification; capture it | ||
| frame = self._recv() | ||
| data = bytes(frame.gui_screen_frame.data) | ||
| self._request(gui_stop_screen_stream_request=gui_pb2.StopScreenStreamRequest()) | ||
| return data | ||
| def screen_frame(self, timeout: float = 5.0) -> bytes: | ||
| """Return the current 128x64 framebuffer (1024 bytes, 1bpp packed). | ||
|
|
||
| One-shot: starts the stream, pulls a single fresh frame off the | ||
| subscriber queue, then stops the stream. Safe to call concurrently | ||
| with a long-lived viewer subscription — every subscriber gets every | ||
| frame. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Activates the reader-thread / pending-call infrastructure in
flipper_core.pythat's been declared but dead since the file was written (_reader_thread,_handle_message,_pending,_notificationsall existed, none were used). Every call went through synchronous_send+ bare_recv, with_recvunprotected by_io_lock.This unblocks:
RpcSession).Stacked on #1 — merge that first.
What changes
_reader_loopruns in a daemon thread started byopen(), stopped byclose(). The only caller of_recv_raw(renamed from_recv)._handle_messageroutes by content kind:system_log_response→_log_lines,_on_logcallback, queues fromsubscribe_log_queue()gui_screen_frame→ queues fromsubscribe_frames()_pending[command_id](via_PendingCall.event), or_notificationsif unrouted_PendingCall(Event + msg list + optional error) handles per-command correlation._requestwaits forhas_next=False, returns the final response;_request_streamreturns the full chain.ConnectionErrorso request methods don't hang.screen_frame()is reimplemented in terms ofsubscribe_frames+start_screen_stream, so one-shot grabs and a long-lived viewer can coexist (every subscriber receives every frame).device_info,storage_list,storage_read,storage_writemigrated to the new_request*contract.New public surface
Test plan
Verified against a real Flipper (Momentum mntm-dev, COM4) — all 10 checks passed:
device_info()drains 60-property multi-message streamscreen_frame()returns 1024 bytes via subscriber queuesubscribe_frames()+start_screen_stream()pushes ≥3 frames in 2.5sstorage_list("/ext")returns 32 entriesdevice_info()from two threads both succeed with matching firmware_version — proves the dispatcher routes bycommand_idinstead of FIFO-on-the-wire (the old code couldn't do this)flipper_screen.py --jsonstill works (no regression to the CLI surface)Trade-offs
_recv(). Acceptable for interactive use._recv()removed; downstream code that imported it directly would break, but it was always intended as internal.🤖 Generated with Claude Code