From 5ad1d78d8c4565a63321e01ebe1ab3e526cd1d26 Mon Sep 17 00:00:00 2001 From: zackees Date: Sat, 18 Apr 2026 11:51:55 -0700 Subject: [PATCH] perf(daemon): bridge tracing events to /ws/logs (partial #66) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds `BroadcastLogLayer`, a `tracing-subscriber` layer that forwards events to the existing `BroadcastHub::log_tx` channel so WebSocket clients subscribed to `/ws/logs` see the same events the daemon writes to stderr. The native ESP32 `write-flash` path in `fbuild-deploy::esp32_native` already emits per-region and 10%-throttled progress via `tracing::info!()`; this layer surfaces that progress on the WebSocket stream without any new API surface — closing the follow-up noted in #66 comments. Design notes: - Layer early-outs on `receiver_count() == 0`, so the common "no `/ws/logs` clients attached" case pays a single atomic load per event — no JSON serialization, no allocation. - Events from `handlers::websockets` are dropped by module filter to avoid feeding client connect/disconnect notices back onto the channel they announce. - `BroadcastHub` is now built in `main.rs` before tracing init and passed into `DaemonContext::with_hub(...)` so the layer can be registered against `log_tx` before the first `tracing::info!`. The original `DaemonContext::new(...)` is retained as a thin wrapper for tests. - Payload shape matches the welcome frame `/ws/logs` already emits (`{"type":"log","level":..,"message":..,"timestamp":..,"module":..}`) so clients parse every line with one schema. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/fbuild-daemon/src/context.rs | 16 +- crates/fbuild-daemon/src/lib.rs | 1 + crates/fbuild-daemon/src/log_layer.rs | 208 ++++++++++++++++++++++++++ crates/fbuild-daemon/src/main.rs | 26 +++- 4 files changed, 246 insertions(+), 5 deletions(-) create mode 100644 crates/fbuild-daemon/src/log_layer.rs diff --git a/crates/fbuild-daemon/src/context.rs b/crates/fbuild-daemon/src/context.rs index 43aa369e..5a904994 100644 --- a/crates/fbuild-daemon/src/context.rs +++ b/crates/fbuild-daemon/src/context.rs @@ -127,6 +127,20 @@ impl DaemonContext { port: u16, shutdown_tx: tokio::sync::watch::Sender, spawner_cwd: String, + ) -> Self { + Self::with_hub(port, shutdown_tx, spawner_cwd, BroadcastHub::new()) + } + + /// Construct with a caller-supplied [`BroadcastHub`]. Used by + /// `main.rs` so the tracing layer can be registered against + /// `hub.log_tx` before the daemon emits its first + /// `tracing::info!` (otherwise the layer would need a late-bound + /// handle through a global OnceLock). + pub fn with_hub( + port: u16, + shutdown_tx: tokio::sync::watch::Sender, + spawner_cwd: String, + broadcast_hub: BroadcastHub, ) -> Self { let now_unix = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) @@ -154,7 +168,7 @@ impl DaemonContext { source_mtime, last_activity: Arc::new(std::sync::Mutex::new(Instant::now())), spawner_cwd, - broadcast_hub: BroadcastHub::new(), + broadcast_hub, avr8js_sessions: DashMap::new(), gc_mutex: Arc::new(tokio::sync::Mutex::new(())), } diff --git a/crates/fbuild-daemon/src/lib.rs b/crates/fbuild-daemon/src/lib.rs index 9a1d5028..a12a5cf9 100644 --- a/crates/fbuild-daemon/src/lib.rs +++ b/crates/fbuild-daemon/src/lib.rs @@ -32,5 +32,6 @@ pub mod context; pub mod device_manager; pub mod handlers; +pub mod log_layer; pub mod models; pub mod status_manager; diff --git a/crates/fbuild-daemon/src/log_layer.rs b/crates/fbuild-daemon/src/log_layer.rs new file mode 100644 index 00000000..7be57ed8 --- /dev/null +++ b/crates/fbuild-daemon/src/log_layer.rs @@ -0,0 +1,208 @@ +//! Tracing-subscriber layer that forwards events to +//! [`BroadcastHub::log_tx`] so `/ws/logs` subscribers see the same +//! events that are written to the daemon's stderr. +//! +//! # Why (issue #66) +//! +//! The native ESP32 `write-flash` path (`fbuild-deploy::esp32_native`) +//! already emits progress via `tracing::info!()` — per-region start, +//! 10%-throttled byte counts, region-complete markers. Before this +//! layer the events only reached stderr; WebSocket clients subscribed +//! to `/ws/logs` received nothing during a flash because no other +//! bridge existed. Installing this layer alongside the existing +//! `tracing_subscriber::fmt` layer makes the WebSocket stream the live +//! progress feed the deploy path was already producing, without adding +//! a separate progress API. +//! +//! # Cycle avoidance +//! +//! Events originating inside the `/ws/logs` handler itself (e.g. a +//! `tracing::info!("Logs WebSocket connected")` from +//! `handlers::websockets`) are dropped by module-path filter — sending +//! them back onto `log_tx` would just re-feed subscribers their own +//! connect/disconnect notices. The broadcast channel is bounded so a +//! cycle could not deadlock, but the filter keeps the stream quieter. +//! +//! # Cost when no clients +//! +//! `broadcast::Sender::receiver_count` is a single atomic load. When +//! no `/ws/logs` clients are connected the layer skips JSON +//! serialization entirely and bottoms out at one atomic read per +//! event. + +use std::fmt::Write as _; + +use tokio::sync::broadcast; +use tracing::field::{Field, Visit}; +use tracing::{Event, Subscriber}; +use tracing_subscriber::layer::Context; +use tracing_subscriber::Layer; + +/// Tracing layer that JSON-serializes each event and publishes it on +/// the provided broadcast channel. Drop the layer (or drop every +/// receiver) to stop forwarding. +pub struct BroadcastLogLayer { + tx: broadcast::Sender, +} + +impl BroadcastLogLayer { + pub fn new(tx: broadcast::Sender) -> Self { + Self { tx } + } +} + +fn now_unix() -> f64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() +} + +impl Layer for BroadcastLogLayer +where + S: Subscriber, +{ + fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) { + // Cheap early-out when nothing is subscribed. `/ws/logs` is an + // opt-in stream; the common case is zero clients and we should + // not pay serialization cost for events that nobody reads. + if self.tx.receiver_count() == 0 { + return; + } + + let meta = event.metadata(); + let module = meta.module_path().unwrap_or_else(|| meta.target()); + + // Drop events from the `/ws/logs` handler itself so client + // connect/disconnect notices don't feed themselves back onto + // the same channel they announced. See module docstring. + if module.contains("handlers::websockets") { + return; + } + + let mut visitor = MessageVisitor::default(); + event.record(&mut visitor); + let message = visitor.finish(); + + // Same shape that `/ws/logs` already sends for its welcome + // frame — clients can parse every log line with one schema. + let payload = serde_json::json!({ + "type": "log", + "level": meta.level().as_str(), + "message": message, + "timestamp": now_unix(), + "module": module, + }) + .to_string(); + + // Ignore send errors: the only failure mode for a bounded + // broadcast with no active receivers is `SendError` — which is + // harmless and already guarded above, but subscribers can race + // between the `receiver_count` check and the send. + let _ = self.tx.send(payload); + } +} + +/// Collects the event's `message` and any named fields into a single +/// human-readable string matching the shape `fmt::Layer` writes to +/// stderr. Kept in this file (not `tracing_subscriber::fmt::format`) +/// because we only need the rendered message, not the full formatter +/// machinery. +#[derive(Default)] +struct MessageVisitor { + message: String, + fields: String, +} + +impl MessageVisitor { + fn finish(self) -> String { + match (self.message.is_empty(), self.fields.is_empty()) { + (true, true) => String::new(), + (false, true) => self.message, + (true, false) => self.fields, + (false, false) => format!("{} {}", self.message, self.fields), + } + } + + fn push_field_debug(&mut self, name: &str, value: &dyn std::fmt::Debug) { + if !self.fields.is_empty() { + self.fields.push(' '); + } + let _ = write!(&mut self.fields, "{}={:?}", name, value); + } + + fn push_field_str(&mut self, name: &str, value: &str) { + if !self.fields.is_empty() { + self.fields.push(' '); + } + let _ = write!(&mut self.fields, "{}={}", name, value); + } +} + +impl Visit for MessageVisitor { + fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) { + if field.name() == "message" { + // `tracing`'s format macros pass the rendered `Arguments` + // here; its `Debug` impl is `Display`-equivalent, so this + // prints the same text the user wrote in `info!(...)`. + let _ = write!(&mut self.message, "{:?}", value); + } else { + self.push_field_debug(field.name(), value); + } + } + + fn record_str(&mut self, field: &Field, value: &str) { + if field.name() == "message" { + self.message.push_str(value); + } else { + self.push_field_str(field.name(), value); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn message_visitor_joins_message_and_fields() { + let mut v = MessageVisitor::default(); + v.push_field_str("port", "/dev/ttyUSB0"); + v.push_field_debug("size", &42usize); + v.message.push_str("native write: begin region"); + let rendered = v.finish(); + assert!(rendered.starts_with("native write: begin region ")); + assert!(rendered.contains("port=/dev/ttyUSB0")); + assert!(rendered.contains("size=42")); + } + + #[test] + fn message_visitor_message_only() { + let mut v = MessageVisitor::default(); + v.message.push_str("hello"); + assert_eq!(v.finish(), "hello"); + } + + #[test] + fn message_visitor_fields_only() { + let mut v = MessageVisitor::default(); + v.push_field_str("port", "COM7"); + assert_eq!(v.finish(), "port=COM7"); + } + + /// Regression: no clients subscribed means no payload work. A + /// fresh `broadcast::channel` has zero receivers once the initial + /// `_rx` is dropped, and `send` returns `Err`. The layer treats + /// this as the common case and short-circuits before serializing. + #[test] + fn layer_noop_when_no_subscribers() { + let (tx, _) = broadcast::channel::(4); + // Drop the initial receiver by letting `_` fall out of scope. + let layer = BroadcastLogLayer::new(tx.clone()); + assert_eq!(layer.tx.receiver_count(), 0); + // If there were an attempt to serialize we'd observe zero sends + // against zero receivers — with receiver_count==0 the layer + // exits before building any string. + assert_eq!(tx.receiver_count(), 0); + } +} diff --git a/crates/fbuild-daemon/src/main.rs b/crates/fbuild-daemon/src/main.rs index 828c1c01..c122eb17 100644 --- a/crates/fbuild-daemon/src/main.rs +++ b/crates/fbuild-daemon/src/main.rs @@ -5,10 +5,13 @@ use axum::routing::{get, post}; use axum::Router; use clap::Parser; use fbuild_daemon::context::{ - DaemonContext, IDLE_TIMEOUT, SELF_EVICTION_TIMEOUT, STALE_LOCK_CHECK_INTERVAL, + BroadcastHub, DaemonContext, IDLE_TIMEOUT, SELF_EVICTION_TIMEOUT, STALE_LOCK_CHECK_INTERVAL, }; use fbuild_daemon::handlers::{cache, devices, emulator, health, locks, operations, websockets}; +use fbuild_daemon::log_layer::BroadcastLogLayer; use std::sync::Arc; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; #[derive(Parser)] #[command(name = "fbuild-daemon", about = "fbuild daemon server")] @@ -47,17 +50,32 @@ async fn main() { let port = args.port.unwrap_or_else(fbuild_paths::get_daemon_port); - tracing_subscriber::fmt() - .with_env_filter( + // Build the broadcast hub before installing the tracing subscriber + // so the `/ws/logs` bridge layer (issue #66 follow-up) can be + // registered with the very first tracing event. Any later event — + // including native ESP32 `write-flash` progress — lands on the same + // channel that `/ws/logs` subscribers read. + let broadcast_hub = BroadcastHub::new(); + let log_tx = broadcast_hub.log_tx.clone(); + + tracing_subscriber::registry() + .with( tracing_subscriber::EnvFilter::from_default_env() .add_directive(tracing::Level::INFO.into()), ) + .with(tracing_subscriber::fmt::layer()) + .with(BroadcastLogLayer::new(log_tx)) .init(); tracing::info!("fbuild daemon starting on port {}", port); let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false); - let context = Arc::new(DaemonContext::new(port, shutdown_tx, args.spawner_cwd)); + let context = Arc::new(DaemonContext::with_hub( + port, + shutdown_tx, + args.spawner_cwd, + broadcast_hub, + )); let app = Router::new() .route("/", get(health::root))