Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion crates/fbuild-daemon/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,20 @@ impl DaemonContext {
port: u16,
shutdown_tx: tokio::sync::watch::Sender<bool>,
spawner_cwd: String,
) -> Self {
Self::with_hub(port, shutdown_tx, spawner_cwd, BroadcastHub::new())
}

/// Construct with a caller-supplied [`BroadcastHub`]. Used by
/// `main.rs` so the tracing layer can be registered against
/// `hub.log_tx` before the daemon emits its first
/// `tracing::info!` (otherwise the layer would need a late-bound
/// handle through a global OnceLock).
pub fn with_hub(
port: u16,
shutdown_tx: tokio::sync::watch::Sender<bool>,
spawner_cwd: String,
broadcast_hub: BroadcastHub,
) -> Self {
let now_unix = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
Expand Down Expand Up @@ -154,7 +168,7 @@ impl DaemonContext {
source_mtime,
last_activity: Arc::new(std::sync::Mutex::new(Instant::now())),
spawner_cwd,
broadcast_hub: BroadcastHub::new(),
broadcast_hub,
avr8js_sessions: DashMap::new(),
gc_mutex: Arc::new(tokio::sync::Mutex::new(())),
}
Expand Down
1 change: 1 addition & 0 deletions crates/fbuild-daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@
pub mod context;
pub mod device_manager;
pub mod handlers;
pub mod log_layer;
pub mod models;
pub mod status_manager;
208 changes: 208 additions & 0 deletions crates/fbuild-daemon/src/log_layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
//! Tracing-subscriber layer that forwards events to
//! [`BroadcastHub::log_tx`] so `/ws/logs` subscribers see the same
//! events that are written to the daemon's stderr.
//!
//! # Why (issue #66)
//!
//! The native ESP32 `write-flash` path (`fbuild-deploy::esp32_native`)
//! already emits progress via `tracing::info!()` — per-region start,
//! 10%-throttled byte counts, region-complete markers. Before this
//! layer the events only reached stderr; WebSocket clients subscribed
//! to `/ws/logs` received nothing during a flash because no other
//! bridge existed. Installing this layer alongside the existing
//! `tracing_subscriber::fmt` layer makes the WebSocket stream the live
//! progress feed the deploy path was already producing, without adding
//! a separate progress API.
//!
//! # Cycle avoidance
//!
//! Events originating inside the `/ws/logs` handler itself (e.g. a
//! `tracing::info!("Logs WebSocket connected")` from
//! `handlers::websockets`) are dropped by module-path filter — sending
//! them back onto `log_tx` would just re-feed subscribers their own
//! connect/disconnect notices. The broadcast channel is bounded so a
//! cycle could not deadlock, but the filter keeps the stream quieter.
//!
//! # Cost when no clients
//!
//! `broadcast::Sender::receiver_count` is a single atomic load. When
//! no `/ws/logs` clients are connected the layer skips JSON
//! serialization entirely and bottoms out at one atomic read per
//! event.

use std::fmt::Write as _;

use tokio::sync::broadcast;
use tracing::field::{Field, Visit};
use tracing::{Event, Subscriber};
use tracing_subscriber::layer::Context;
use tracing_subscriber::Layer;

/// Tracing layer that JSON-serializes each event and publishes it on
/// the provided broadcast channel. Drop the layer (or drop every
/// receiver) to stop forwarding.
pub struct BroadcastLogLayer {
tx: broadcast::Sender<String>,
}

impl BroadcastLogLayer {
pub fn new(tx: broadcast::Sender<String>) -> Self {
Self { tx }
}
}

fn now_unix() -> f64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64()
}

impl<S> Layer<S> for BroadcastLogLayer
where
S: Subscriber,
{
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
// Cheap early-out when nothing is subscribed. `/ws/logs` is an
// opt-in stream; the common case is zero clients and we should
// not pay serialization cost for events that nobody reads.
if self.tx.receiver_count() == 0 {
return;
}

let meta = event.metadata();
let module = meta.module_path().unwrap_or_else(|| meta.target());

// Drop events from the `/ws/logs` handler itself so client
// connect/disconnect notices don't feed themselves back onto
// the same channel they announced. See module docstring.
if module.contains("handlers::websockets") {
return;
}

let mut visitor = MessageVisitor::default();
event.record(&mut visitor);
let message = visitor.finish();

// Same shape that `/ws/logs` already sends for its welcome
// frame — clients can parse every log line with one schema.
let payload = serde_json::json!({
"type": "log",
"level": meta.level().as_str(),
"message": message,
"timestamp": now_unix(),
"module": module,
})
.to_string();

// Ignore send errors: the only failure mode for a bounded
// broadcast with no active receivers is `SendError` — which is
// harmless and already guarded above, but subscribers can race
// between the `receiver_count` check and the send.
let _ = self.tx.send(payload);
}
}

/// Collects the event's `message` and any named fields into a single
/// human-readable string matching the shape `fmt::Layer` writes to
/// stderr. Kept in this file (not `tracing_subscriber::fmt::format`)
/// because we only need the rendered message, not the full formatter
/// machinery.
#[derive(Default)]
struct MessageVisitor {
message: String,
fields: String,
}

impl MessageVisitor {
fn finish(self) -> String {
match (self.message.is_empty(), self.fields.is_empty()) {
(true, true) => String::new(),
(false, true) => self.message,
(true, false) => self.fields,
(false, false) => format!("{} {}", self.message, self.fields),
}
}

fn push_field_debug(&mut self, name: &str, value: &dyn std::fmt::Debug) {
if !self.fields.is_empty() {
self.fields.push(' ');
}
let _ = write!(&mut self.fields, "{}={:?}", name, value);
}

fn push_field_str(&mut self, name: &str, value: &str) {
if !self.fields.is_empty() {
self.fields.push(' ');
}
let _ = write!(&mut self.fields, "{}={}", name, value);
}
}

impl Visit for MessageVisitor {
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
if field.name() == "message" {
// `tracing`'s format macros pass the rendered `Arguments`
// here; its `Debug` impl is `Display`-equivalent, so this
// prints the same text the user wrote in `info!(...)`.
let _ = write!(&mut self.message, "{:?}", value);
} else {
self.push_field_debug(field.name(), value);
}
}

fn record_str(&mut self, field: &Field, value: &str) {
if field.name() == "message" {
self.message.push_str(value);
} else {
self.push_field_str(field.name(), value);
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn message_visitor_joins_message_and_fields() {
let mut v = MessageVisitor::default();
v.push_field_str("port", "/dev/ttyUSB0");
v.push_field_debug("size", &42usize);
v.message.push_str("native write: begin region");
let rendered = v.finish();
assert!(rendered.starts_with("native write: begin region "));
assert!(rendered.contains("port=/dev/ttyUSB0"));
assert!(rendered.contains("size=42"));
}

#[test]
fn message_visitor_message_only() {
let mut v = MessageVisitor::default();
v.message.push_str("hello");
assert_eq!(v.finish(), "hello");
}

#[test]
fn message_visitor_fields_only() {
let mut v = MessageVisitor::default();
v.push_field_str("port", "COM7");
assert_eq!(v.finish(), "port=COM7");
}

/// Regression: no clients subscribed means no payload work. A
/// fresh `broadcast::channel` has zero receivers once the initial
/// `_rx` is dropped, and `send` returns `Err`. The layer treats
/// this as the common case and short-circuits before serializing.
#[test]
fn layer_noop_when_no_subscribers() {
let (tx, _) = broadcast::channel::<String>(4);
// Drop the initial receiver by letting `_` fall out of scope.
let layer = BroadcastLogLayer::new(tx.clone());
assert_eq!(layer.tx.receiver_count(), 0);
// If there were an attempt to serialize we'd observe zero sends
// against zero receivers — with receiver_count==0 the layer
// exits before building any string.
assert_eq!(tx.receiver_count(), 0);
}
}
26 changes: 22 additions & 4 deletions crates/fbuild-daemon/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ use axum::routing::{get, post};
use axum::Router;
use clap::Parser;
use fbuild_daemon::context::{
DaemonContext, IDLE_TIMEOUT, SELF_EVICTION_TIMEOUT, STALE_LOCK_CHECK_INTERVAL,
BroadcastHub, DaemonContext, IDLE_TIMEOUT, SELF_EVICTION_TIMEOUT, STALE_LOCK_CHECK_INTERVAL,
};
use fbuild_daemon::handlers::{cache, devices, emulator, health, locks, operations, websockets};
use fbuild_daemon::log_layer::BroadcastLogLayer;
use std::sync::Arc;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

#[derive(Parser)]
#[command(name = "fbuild-daemon", about = "fbuild daemon server")]
Expand Down Expand Up @@ -47,17 +50,32 @@ async fn main() {

let port = args.port.unwrap_or_else(fbuild_paths::get_daemon_port);

tracing_subscriber::fmt()
.with_env_filter(
// Build the broadcast hub before installing the tracing subscriber
// so the `/ws/logs` bridge layer (issue #66 follow-up) can be
// registered with the very first tracing event. Any later event —
// including native ESP32 `write-flash` progress — lands on the same
// channel that `/ws/logs` subscribers read.
let broadcast_hub = BroadcastHub::new();
let log_tx = broadcast_hub.log_tx.clone();

tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive(tracing::Level::INFO.into()),
)
.with(tracing_subscriber::fmt::layer())
.with(BroadcastLogLayer::new(log_tx))
.init();

tracing::info!("fbuild daemon starting on port {}", port);

let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
let context = Arc::new(DaemonContext::new(port, shutdown_tx, args.spawner_cwd));
let context = Arc::new(DaemonContext::with_hub(
port,
shutdown_tx,
args.spawner_cwd,
broadcast_hub,
));

let app = Router::new()
.route("/", get(health::root))
Expand Down
Loading