From e9052301d4d7f8844f17757cb574e9bb2df51a70 Mon Sep 17 00:00:00 2001 From: zackees Date: Sat, 18 Apr 2026 08:41:43 -0700 Subject: [PATCH] feat(python): async WebSocket-backed AsyncSerialMonitor methods (closes #65) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Completes the remaining async surface from #65: - __aenter__ / __aexit__ — async context-manager that opens the /ws/serial-monitor session, sends the attach handshake, stores the split sink/source halves, and cleans up on exit. - read_lines(timeout_secs) — async batch read, honors auto_reconnect (Preempted → continue; Reconnected → continue). - write(data) — async write that waits for daemon write_ack. - write_json_rpc(request, timeout_secs) — async JSON-RPC send + poll for REMOTE: response, preserving the PR #57 full-timeout guarantee by keeping polling across empty batches. Send+Sync refactor: - Split WebSocketStream via .split() into WsSink + WsSource. - Each half stored in its own Arc>> so concurrent read/write futures do not serialize each other and each future owns its own Arc clone. Sync SerialMonitor is untouched — additive only. Deferred follow-up: unit tests for the new async methods. The existing 11 fbuild-python tests still pass; new tests would need a more elaborate WebSocket mock than the HTTP mock used for send_op_async (PR #84). Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/fbuild-python/src/lib.rs | 375 ++++++++++++++++++++++++++++++-- 1 file changed, 361 insertions(+), 14 deletions(-) diff --git a/crates/fbuild-python/src/lib.rs b/crates/fbuild-python/src/lib.rs index 49a32521..f843250e 100644 --- a/crates/fbuild-python/src/lib.rs +++ b/crates/fbuild-python/src/lib.rs @@ -25,7 +25,7 @@ use base64::Engine; use futures::{SinkExt, StreamExt}; use pyo3::prelude::*; use serde::{Deserialize, Serialize}; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; use tokio_tungstenite::tungstenite; @@ -1039,39 +1039,263 @@ const PYTHON_MODULE_VERSION: &str = env!("CARGO_PKG_VERSION"); /// Python-visible AsyncSerialMonitor class. /// -/// Native async equivalent of `SerialMonitor`. Scaffolds the pattern for -/// Issue #65 — long-running operations are exposed as `async def` so +/// Native async equivalent of `SerialMonitor`. Exposes every long-running +/// serial operation (`read_lines`, `write`, `write_json_rpc`) plus the +/// context-manager pair (`__aenter__` / `__aexit__`) as `async def` so /// callers can `await` them directly from an asyncio event loop without -/// the thread-pool shim FastLED currently uses (`_run_in_thread`). +/// FastLED's thread-pool shim (`_run_in_thread`). See FastLED/fbuild#65. /// -/// The first async method shipped is `reset_device` because it's -/// stateless (pure HTTP, no WebSocket). Future work will migrate -/// `read_lines`, `write`, and `write_json_rpc` as the shared WebSocket -/// state is refactored to be `Send + Sync` across `await` points. +/// ## Send + Sync refactor +/// +/// The sync `SerialMonitor` wraps its `WsSink`/`WsSource` halves in +/// `std::sync::Mutex`, which cannot be held across `.await`. For the async +/// surface we switch to `Arc>>`: +/// +/// * `tokio::sync::Mutex` — safe to hold across `.await` points, which +/// we do when calling `sink.send(...).await` / `source.next().await`. +/// * `Arc<...>` — lets us `clone` the handle into `async move { ... }` +/// blocks handed to `future_into_py`, so each future owns its own +/// reference into the shared connection state. +/// * `Option<_>` — the WS halves are absent before `__aenter__` and +/// after `__aexit__`, and the outer `Arc>>` stays +/// alive across both transitions. +/// * Read/write halves live in **separate** mutexes so a pending +/// `read_lines` does not serialize an unrelated `write` (each direction +/// of the WebSocket has independent framing state). +/// +/// The sync `SerialMonitor` is untouched — this is a purely additive +/// surface. /// /// ```python /// import asyncio /// from fbuild._native import AsyncSerialMonitor /// /// async def main(): -/// mon = AsyncSerialMonitor(port="COM13", baud_rate=115200) -/// ok = await mon.reset_device(board="esp32s3") +/// async with AsyncSerialMonitor(port="COM13", baud_rate=115200) as mon: +/// lines = await mon.read_lines(timeout_secs=5.0) +/// await mon.write("hello\n") +/// ok = await mon.reset_device(board="esp32s3") /// /// asyncio.run(main()) /// ``` #[pyclass] struct AsyncSerialMonitor { port: String, - #[allow(dead_code)] baud_rate: u32, + auto_reconnect: bool, + verbose: bool, + client_id: String, + ws_write: Arc>>, + ws_read: Arc>>, } #[pymethods] impl AsyncSerialMonitor { #[new] - #[pyo3(signature = (port, baud_rate=115200))] - fn new(port: String, baud_rate: u32) -> Self { - Self { port, baud_rate } + #[pyo3(signature = (port, baud_rate=115200, auto_reconnect=true, verbose=false))] + fn new(port: String, baud_rate: u32, auto_reconnect: bool, verbose: bool) -> Self { + Self { + port, + baud_rate, + auto_reconnect, + verbose, + client_id: uuid::Uuid::new_v4().to_string(), + ws_write: Arc::new(tokio::sync::Mutex::new(None)), + ws_read: Arc::new(tokio::sync::Mutex::new(None)), + } + } + + /// Async context-manager entry. Connects to the daemon's + /// `/ws/serial-monitor` endpoint, sends the `attach` handshake, and + /// stores the split sink/source halves for subsequent `read_lines` / + /// `write` calls. Mirrors the sync `SerialMonitor::__enter__` contract. + fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> PyResult> { + let port = slf.port.clone(); + let baud_rate = slf.baud_rate; + let client_id = slf.client_id.clone(); + let verbose = slf.verbose; + let ws_write_slot = slf.ws_write.clone(); + let ws_read_slot = slf.ws_read.clone(); + let slf_obj: PyObject = slf.into_py(py); + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let daemon_port = fbuild_paths::get_daemon_port(); + let ws_url = format!("ws://127.0.0.1:{}/ws/serial-monitor", daemon_port); + + let (ws_stream, _) = tokio_tungstenite::connect_async(&ws_url) + .await + .map_err(|e| { + pyo3::exceptions::PyConnectionError::new_err(format!( + "failed to connect to daemon WebSocket at {}: {}", + ws_url, e + )) + })?; + + let (mut write, mut read) = ws_stream.split(); + + let attach = ClientMessage::Attach { + client_id: client_id.clone(), + port: port.clone(), + baud_rate, + open_if_needed: true, + pre_acquire_writer: true, + }; + let attach_json = serde_json::to_string(&attach).unwrap(); + + write + .send(tungstenite::Message::Text(attach_json)) + .await + .map_err(|e| { + pyo3::exceptions::PyConnectionError::new_err(format!( + "failed to send attach: {}", + e + )) + })?; + + let msg = read + .next() + .await + .ok_or_else(|| { + pyo3::exceptions::PyConnectionError::new_err("WebSocket closed before attach") + })? + .map_err(|e| { + pyo3::exceptions::PyConnectionError::new_err(format!("WebSocket error: {}", e)) + })?; + + if let tungstenite::Message::Text(text) = msg { + match serde_json::from_str::(&text) { + Ok(ServerMessage::Attached { success, .. }) if success => { + if verbose { + eprintln!("attached to {} at {} baud", port, baud_rate); + } + } + Ok(ServerMessage::Error { message }) => { + return Err(pyo3::exceptions::PyRuntimeError::new_err(format!( + "attach failed: {}", + message + ))); + } + _ => { + return Err(pyo3::exceptions::PyRuntimeError::new_err( + "unexpected response to attach", + )); + } + } + } + + *ws_write_slot.lock().await = Some(write); + *ws_read_slot.lock().await = Some(read); + + Ok(slf_obj) + }) + } + + /// Async context-manager exit. Sends a `detach` + `Close` frame and + /// clears the stored sink/source halves. Mirrors sync `__exit__`. + #[pyo3(signature = (_exc_type=None, _exc_val=None, _exc_tb=None))] + fn __aexit__<'py>( + &self, + py: Python<'py>, + _exc_type: Option, + _exc_val: Option, + _exc_tb: Option, + ) -> PyResult> { + let ws_write_slot = self.ws_write.clone(); + let ws_read_slot = self.ws_read.clone(); + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + if let Some(mut write) = ws_write_slot.lock().await.take() { + let detach = serde_json::to_string(&ClientMessage::Detach).unwrap(); + let _ = write.send(tungstenite::Message::Text(detach)).await; + let _ = write.send(tungstenite::Message::Close(None)).await; + } + let _ = ws_read_slot.lock().await.take(); + Ok(false) + }) + } + + /// Async counterpart to `SerialMonitor::read_lines`. Pulls batches of + /// lines from the daemon until at least one batch arrives or the + /// timeout elapses. Handles `Preempted` / `Reconnected` transparently + /// when `auto_reconnect=true` just like the sync path. + #[pyo3(signature = (timeout_secs=30.0))] + fn read_lines<'py>(&self, py: Python<'py>, timeout_secs: f64) -> PyResult> { + let ws_read_slot = self.ws_read.clone(); + let auto_reconnect = self.auto_reconnect; + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + Ok(read_lines_async(ws_read_slot, auto_reconnect, timeout_secs).await) + }) + } + + /// Async counterpart to `SerialMonitor::write`. Returns `true` on + /// successful delivery (daemon acknowledged with `write_ack`), + /// `false` otherwise. Mirrors the sync contract except the return + /// type is a bool rather than `bytes_written` to match the async + /// signature spec in FastLED/fbuild#65. + fn write<'py>(&self, py: Python<'py>, data: &str) -> PyResult> { + let ws_write_slot = self.ws_write.clone(); + let ws_read_slot = self.ws_read.clone(); + let encoded = base64::engine::general_purpose::STANDARD.encode(data.as_bytes()); + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + Ok(write_async(ws_write_slot, ws_read_slot, encoded).await) + }) + } + + /// Async counterpart to `SerialMonitor::write_json_rpc`. Serializes + /// `request` to JSON, sends it with a trailing newline, then polls + /// `read_lines` until a `REMOTE:` response arrives or the full + /// `timeout_secs` elapses. Honors the PR #57 guarantee that an empty + /// batch does not short-circuit the overall deadline. + /// + /// Returns the JSON-decoded response on success, raises + /// `TimeoutError` on timeout. + #[pyo3(signature = (request, timeout_secs=5.0))] + fn write_json_rpc<'py>( + &self, + py: Python<'py>, + request: &Bound<'_, PyAny>, + timeout_secs: f64, + ) -> PyResult> { + // Serialize the request on the calling thread (needs the GIL) + // before we enter the async block, mirroring the send_op_async + // pattern of moving only owned primitives across the .await + // boundary. + let json_str: String = py + .import_bound("json")? + .call_method1("dumps", (request,))? + .extract()?; + let data = format!("{}\n", json_str); + let encoded = base64::engine::general_purpose::STANDARD.encode(data.as_bytes()); + + let ws_write_slot = self.ws_write.clone(); + let ws_read_slot = self.ws_read.clone(); + let auto_reconnect = self.auto_reconnect; + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + // Fire-and-acknowledge the write first; we don't abort on + // a write failure because the sync surface also proceeds + // to poll for a REMOTE: response (which is how the Python + // tests exercise this path). + let _ = write_async(ws_write_slot, ws_read_slot.clone(), encoded).await; + + let json_part = + wait_for_remote_json_rpc_response_async(timeout_secs, ws_read_slot, auto_reconnect) + .await; + + match json_part { + Some(payload) => Python::with_gil(|py| { + let json_module = py.import_bound("json")?; + let parsed = json_module.call_method1("loads", (payload.trim(),))?; + Ok(parsed.unbind()) + }), + None => Err(pyo3::exceptions::PyTimeoutError::new_err(format!( + "no REMOTE: response within {} seconds", + timeout_secs + ))), + } + }) } /// Asynchronously reset the device via the daemon's `POST /api/reset` @@ -1123,6 +1347,129 @@ impl AsyncSerialMonitor { } } +/// Shared async read-batch loop used by `AsyncSerialMonitor::read_lines` +/// and `write_json_rpc`. Acquires the source mutex only for the duration +/// of each `.next()` call so that concurrent `write` / `__aexit__` +/// futures can still progress between iterations. +async fn read_lines_async( + ws_read_slot: Arc>>, + auto_reconnect: bool, + timeout_secs: f64, +) -> Vec { + let mut lines = Vec::new(); + let deadline = std::time::Instant::now() + std::time::Duration::from_secs_f64(timeout_secs); + + loop { + let now = std::time::Instant::now(); + if now >= deadline { + break; + } + let remaining = deadline - now; + + let mut guard = ws_read_slot.lock().await; + let Some(source) = guard.as_mut() else { + // Session not entered (or already exited). Nothing to read. + break; + }; + + let result = tokio::time::timeout(remaining, source.next()).await; + // Drop the guard before continuing the loop so another task + // can take the mutex (e.g. __aexit__). + drop(guard); + + match result { + Ok(Some(Ok(tungstenite::Message::Text(text)))) => { + match serde_json::from_str::(&text) { + Ok(ServerMessage::Data { + lines: data_lines, .. + }) => { + lines.extend(data_lines); + if !lines.is_empty() { + break; + } + } + Ok(ServerMessage::Preempted { .. }) => { + if auto_reconnect { + continue; + } + break; + } + Ok(ServerMessage::Reconnected { .. }) => continue, + _ => continue, + } + } + Ok(Some(Ok(tungstenite::Message::Close(_)))) | Ok(None) => break, + Err(_) => break, // timeout + _ => continue, + } + } + + lines +} + +/// Shared async write path used by `AsyncSerialMonitor::write` and +/// `write_json_rpc`. Sends the already-base64-encoded payload, then +/// awaits a `write_ack` (with a 5-second bound matching the sync path). +/// Returns `true` if the ack reported any bytes written. +async fn write_async( + ws_write_slot: Arc>>, + ws_read_slot: Arc>>, + encoded: String, +) -> bool { + let msg = serde_json::to_string(&ClientMessage::Write { data: encoded }).unwrap(); + + { + let mut guard = ws_write_slot.lock().await; + let Some(sink) = guard.as_mut() else { + return false; + }; + if sink.send(tungstenite::Message::Text(msg)).await.is_err() { + return false; + } + } + + // Wait for write_ack. Hold the read half's mutex only across this + // single `.next()` so concurrent `read_lines` futures can resume. + let mut guard = ws_read_slot.lock().await; + let Some(source) = guard.as_mut() else { + return false; + }; + let ack_timeout = std::time::Duration::from_secs(5); + match tokio::time::timeout(ack_timeout, source.next()).await { + Ok(Some(Ok(tungstenite::Message::Text(text)))) => { + matches!( + serde_json::from_str::(&text), + Ok(ServerMessage::WriteAck { bytes_written, .. }) if bytes_written > 0 + ) + } + _ => false, + } +} + +/// Async counterpart to `wait_for_remote_json_rpc_response`. Keeps +/// polling `read_lines_async` until the deadline expires, even if an +/// individual batch comes back empty — preserving the PR #57 fix. +async fn wait_for_remote_json_rpc_response_async( + timeout_secs: f64, + ws_read_slot: Arc>>, + auto_reconnect: bool, +) -> Option { + let deadline = std::time::Instant::now() + std::time::Duration::from_secs_f64(timeout_secs); + + while std::time::Instant::now() < deadline { + let remaining = (deadline - std::time::Instant::now()).as_secs_f64(); + if remaining <= 0.0 { + break; + } + let lines = read_lines_async(ws_read_slot.clone(), auto_reconnect, remaining).await; + if let Some(json_part) = extract_remote_json_rpc_response(&lines) { + return Some(json_part); + } + } + + None +} + /// Python-visible AsyncDaemon class. /// /// Native async counterpart to `Daemon`. Follows the same additive