From 85c31f65331cfb0a8976f83f1b5890fb79b91f6f Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 4 May 2026 13:06:57 -0400 Subject: [PATCH] Update WS protocol to communicate progress messages --- diagnostics/index.html | 9 ++++- diagnostics/src/server.rs | 82 +++++++++++++++++++++++++++++++++------ 2 files changed, 77 insertions(+), 14 deletions(-) diff --git a/diagnostics/index.html b/diagnostics/index.html index 403e39b2d..129965653 100644 --- a/diagnostics/index.html +++ b/diagnostics/index.html @@ -485,8 +485,13 @@

ws.addEventListener('message', event => { try { const data = JSON.parse(event.data); - if (Array.isArray(data)) { - applyBatch(data); + // Wire format is one Frame envelope per WebSocket message: + // { type: "Frame", ts_us: , updates: [...] } + // The server only emits a Frame once the capture frontier has + // advanced past `ts_us`, so each Frame is a complete view at that + // closed logical timestamp. + if (data && data.type === 'Frame' && Array.isArray(data.updates)) { + applyBatch(data.updates); } } catch (e) { // ignore parse errors diff --git a/diagnostics/src/server.rs b/diagnostics/src/server.rs index a0a40c2b6..be7d0ed16 100644 --- a/diagnostics/src/server.rs +++ b/diagnostics/src/server.rs @@ -14,7 +14,7 @@ //! (e.g., `python3 -m http.server 8000`). A future improvement could embed //! static file serving here so only one port is needed. -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::net::TcpListener; use std::sync::mpsc; use std::thread; @@ -155,6 +155,23 @@ fn run_server(port: u16, sink: SinkHandle) { let mut clients: HashMap> = HashMap::new(); let mut next_client_id: usize = 0; + // Per-client buffer of records awaiting their timestamp to close, keyed + // by the inner `ts` of each record. A timestamp is "closed" when the + // capture stream's frontier (tracked in `progress_counts`) advances past + // it — at which point we flush the bucket as a single Frame to the client. + let mut pending: HashMap>> = + HashMap::new(); + // Running multiplicities from `Event::Progress` updates. The current + // frontier is the smallest key with positive count; an entry at zero is + // removed. Anything strictly less than the smallest live key is closed. + // + // Timely's capture protocol sends progress as *deltas* relative to an + // assumed initial frontier of `{T::default(): 1}` — so we must seed the + // counter that way, otherwise the first event's `(0ns, -1)` retraction + // leaves us at `{0ns: -1}` and the frontier sticks at 0 forever. + let mut progress_counts: BTreeMap = BTreeMap::new(); + progress_counts.insert(Duration::default(), 1); + loop { // Accept pending connections. loop { @@ -183,17 +200,34 @@ fn run_server(port: u16, sink: SinkHandle) { } } - // Drain diagnostic updates and group by client. - let mut batches_by_client: HashMap> = HashMap::new(); + // Drain diagnostic updates: bucket records by their inner timestamp + // per client; absorb progress updates into the running frontier. + let mut frontier_changed = false; loop { match receiver.try_recv() { - Ok(Event::Messages(_time, data)) => { - for ((client_id, update), _ts, diff) in data { + Ok(Event::Messages(_envelope_time, data)) => { + // The capture envelope time is incidental; the meaningful + // logical time is the per-record `ts`. + for ((client_id, update), ts, diff) in data { let json = update_to_json(&update, diff); - batches_by_client.entry(client_id).or_default().push(json); + pending + .entry(client_id) + .or_default() + .entry(ts) + .or_default() + .push(json); } } - Ok(Event::Progress(_)) => {} + Ok(Event::Progress(updates)) => { + for (t, diff) in updates { + let entry = progress_counts.entry(t).or_insert(0); + *entry += diff; + if *entry == 0 { + progress_counts.remove(&t); + } + } + frontier_changed = true; + } Err(mpsc::TryRecvError::Empty) => break, Err(mpsc::TryRecvError::Disconnected) => { eprintln!("Diagnostics output channel closed, shutting down server"); @@ -206,14 +240,37 @@ fn run_server(port: u16, sink: SinkHandle) { } } - // Send batched updates to each client. + // If the frontier moved, flush every closed timestamp bucket. One + // Frame per closed `ts`, in timestamp order, so each Frame is one + // atomic transaction on the client. let mut disconnected = Vec::new(); - for (client_id, updates) in &batches_by_client { - if let Some(ws) = clients.get_mut(client_id) { - if !updates.is_empty() { - let payload = serde_json::to_string(updates).unwrap(); + if frontier_changed { + // Anything strictly less than the smallest live progress count + // is closed. If `progress_counts` is empty, every buffered + // timestamp is closed. + let frontier: Option = progress_counts.keys().next().copied(); + for (client_id, buckets) in pending.iter_mut() { + let closed: Vec = match frontier { + Some(f) => buckets.range(..f).map(|(t, _)| *t).collect(), + None => buckets.keys().copied().collect(), + }; + for ts in closed { + let updates = buckets.remove(&ts).unwrap_or_default(); + if updates.is_empty() { + continue; + } + let Some(ws) = clients.get_mut(client_id) else { + continue; + }; + let frame = serde_json::json!({ + "type": "Frame", + "ts_us": ts.as_micros() as u64, + "updates": updates, + }); + let payload = serde_json::to_string(&frame).unwrap(); if ws.send(Message::Text(payload.into())).is_err() { disconnected.push(*client_id); + break; } } } @@ -229,6 +286,7 @@ fn run_server(port: u16, sink: SinkHandle) { } for client_id in disconnected { clients.remove(&client_id); + pending.remove(&client_id); client_input.disconnect(client_id, start.elapsed()); eprintln!("Diagnostics client {client_id} disconnected"); }