diff --git a/diagnostics/index.html b/diagnostics/index.html
index 403e39b2d..129965653 100644
--- a/diagnostics/index.html
+++ b/diagnostics/index.html
@@ -485,8 +485,13 @@
ws.addEventListener('message', event => {
try {
const data = JSON.parse(event.data);
- if (Array.isArray(data)) {
- applyBatch(data);
+ // Wire format is one Frame envelope per WebSocket message:
+ // { type: "Frame", ts_us: , updates: [...] }
+ // The server only emits a Frame once the capture frontier has
+ // advanced past `ts_us`, so each Frame is a complete view at that
+ // closed logical timestamp.
+ if (data && data.type === 'Frame' && Array.isArray(data.updates)) {
+ applyBatch(data.updates);
}
} catch (e) {
// ignore parse errors
diff --git a/diagnostics/src/server.rs b/diagnostics/src/server.rs
index a0a40c2b6..be7d0ed16 100644
--- a/diagnostics/src/server.rs
+++ b/diagnostics/src/server.rs
@@ -14,7 +14,7 @@
//! (e.g., `python3 -m http.server 8000`). A future improvement could embed
//! static file serving here so only one port is needed.
-use std::collections::HashMap;
+use std::collections::{BTreeMap, HashMap};
use std::net::TcpListener;
use std::sync::mpsc;
use std::thread;
@@ -155,6 +155,23 @@ fn run_server(port: u16, sink: SinkHandle) {
let mut clients: HashMap> = HashMap::new();
let mut next_client_id: usize = 0;
+ // Per-client buffer of records awaiting their timestamp to close, keyed
+ // by the inner `ts` of each record. A timestamp is "closed" when the
+ // capture stream's frontier (tracked in `progress_counts`) advances past
+ // it — at which point we flush the bucket as a single Frame to the client.
+ let mut pending: HashMap>> =
+ HashMap::new();
+ // Running multiplicities from `Event::Progress` updates. The current
+ // frontier is the smallest key with positive count; an entry at zero is
+ // removed. Anything strictly less than the smallest live key is closed.
+ //
+ // Timely's capture protocol sends progress as *deltas* relative to an
+ // assumed initial frontier of `{T::default(): 1}` — so we must seed the
+ // counter that way, otherwise the first event's `(0ns, -1)` retraction
+ // leaves us at `{0ns: -1}` and the frontier sticks at 0 forever.
+ let mut progress_counts: BTreeMap = BTreeMap::new();
+ progress_counts.insert(Duration::default(), 1);
+
loop {
// Accept pending connections.
loop {
@@ -183,17 +200,34 @@ fn run_server(port: u16, sink: SinkHandle) {
}
}
- // Drain diagnostic updates and group by client.
- let mut batches_by_client: HashMap> = HashMap::new();
+ // Drain diagnostic updates: bucket records by their inner timestamp
+ // per client; absorb progress updates into the running frontier.
+ let mut frontier_changed = false;
loop {
match receiver.try_recv() {
- Ok(Event::Messages(_time, data)) => {
- for ((client_id, update), _ts, diff) in data {
+ Ok(Event::Messages(_envelope_time, data)) => {
+ // The capture envelope time is incidental; the meaningful
+ // logical time is the per-record `ts`.
+ for ((client_id, update), ts, diff) in data {
let json = update_to_json(&update, diff);
- batches_by_client.entry(client_id).or_default().push(json);
+ pending
+ .entry(client_id)
+ .or_default()
+ .entry(ts)
+ .or_default()
+ .push(json);
}
}
- Ok(Event::Progress(_)) => {}
+ Ok(Event::Progress(updates)) => {
+ for (t, diff) in updates {
+ let entry = progress_counts.entry(t).or_insert(0);
+ *entry += diff;
+ if *entry == 0 {
+ progress_counts.remove(&t);
+ }
+ }
+ frontier_changed = true;
+ }
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => {
eprintln!("Diagnostics output channel closed, shutting down server");
@@ -206,14 +240,37 @@ fn run_server(port: u16, sink: SinkHandle) {
}
}
- // Send batched updates to each client.
+ // If the frontier moved, flush every closed timestamp bucket. One
+ // Frame per closed `ts`, in timestamp order, so each Frame is one
+ // atomic transaction on the client.
let mut disconnected = Vec::new();
- for (client_id, updates) in &batches_by_client {
- if let Some(ws) = clients.get_mut(client_id) {
- if !updates.is_empty() {
- let payload = serde_json::to_string(updates).unwrap();
+ if frontier_changed {
+ // Anything strictly less than the smallest live progress count
+ // is closed. If `progress_counts` is empty, every buffered
+ // timestamp is closed.
+ let frontier: Option = progress_counts.keys().next().copied();
+ for (client_id, buckets) in pending.iter_mut() {
+ let closed: Vec = match frontier {
+ Some(f) => buckets.range(..f).map(|(t, _)| *t).collect(),
+ None => buckets.keys().copied().collect(),
+ };
+ for ts in closed {
+ let updates = buckets.remove(&ts).unwrap_or_default();
+ if updates.is_empty() {
+ continue;
+ }
+ let Some(ws) = clients.get_mut(client_id) else {
+ continue;
+ };
+ let frame = serde_json::json!({
+ "type": "Frame",
+ "ts_us": ts.as_micros() as u64,
+ "updates": updates,
+ });
+ let payload = serde_json::to_string(&frame).unwrap();
if ws.send(Message::Text(payload.into())).is_err() {
disconnected.push(*client_id);
+ break;
}
}
}
@@ -229,6 +286,7 @@ fn run_server(port: u16, sink: SinkHandle) {
}
for client_id in disconnected {
clients.remove(&client_id);
+ pending.remove(&client_id);
client_input.disconnect(client_id, start.elapsed());
eprintln!("Diagnostics client {client_id} disconnected");
}