From c2a1125cbcd8a12471fa11f1d46185596cfdc316 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Wed, 20 May 2026 13:11:01 -0700 Subject: [PATCH 1/4] =?UTF-8?q?dimos-module:=20bump=20mpsc=20channels=2016?= =?UTF-8?q?=20=E2=86=92=201024=20+=20log=20drops=20on=20back-pressure?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The per-input mpsc queue between the LCM recv loop and the user's handler had capacity 16. `TypedRoute::try_dispatch` was using `try_send` and silently dropping new messages when full. At 10 Hz publish + any handler slower than ~6 Hz, the queue filled in 1.6 s and every subsequent message was lost without any signal. This was the actual cause of the ~50 % "LCM drops" observed during the pgo_rust KITTI-360 benchmark — the kernel reported zero drops the whole time (UDP RcvbufErrors=0, lo drop=0), and a separate burst characterization confirmed pure LCM can do 42 MB/s @ 89 Hz at 0% loss. The drops were here, in dimos-module. Two changes: 1. INPUT_CHANNEL_CAPACITY 16 → 1024 (and PUBLISH_CHANNEL_CAPACITY 64 → 1024 for symmetry). At 10 Hz publish, 1024 gives ~100 s of headroom for handler stalls. Memory cost is bounded by the message types (rust mpsc only allocates entries as needed). 2. Per-route AtomicU64 `drops` counter on TypedRoute. Increments on `TrySendError::Full`. Logged via rate-limited eprintln at power-of-2 milestones (1, 2, 4, 8, 16, …). First drop fires immediately so operators see the first warning; subsequent frequency decays exponentially so a runaway handler doesn't spam stderr while still surfacing the failure. 4 new unit tests cover the behavior: - typed_route_drops_count_when_queue_full: 100 sent, capacity 4, expect exactly 96 drops. - typed_route_drops_counter_starts_at_zero: first sends under capacity must not increment drops. - typed_route_does_not_drop_when_queue_has_headroom: 1000 sent, capacity 2048, drops must be 0 (guards against the rate-limited log firing on the happy path). - typed_route_rate_limited_log_fires_on_power_of_two: 130 sent, capacity 1, expect 129 drops crossing the milestones 1, 2, 4, 8, 16, 32, 64, 128 — operator sees 8 log lines. All 22 dimos-module tests pass. Clippy clean. --- native/rust/dimos-module/src/module.rs | 116 +++++++++++++++++++++++-- 1 file changed, 110 insertions(+), 6 deletions(-) diff --git a/native/rust/dimos-module/src/module.rs b/native/rust/dimos-module/src/module.rs index 9746aec5a1..efc5ae935a 100644 --- a/native/rust/dimos-module/src/module.rs +++ b/native/rust/dimos-module/src/module.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::fmt::Debug; use std::io; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::sync::mpsc; @@ -9,8 +10,14 @@ use serde::de::DeserializeOwned; use crate::transport::Transport; -const INPUT_CHANNEL_CAPACITY: usize = 16; -const PUBLISH_CHANNEL_CAPACITY: usize = 64; +// Per-input handler mpsc-queue capacity. When full, new messages are +// dropped via `try_send` in `TypedRoute::try_dispatch`. 1024 was chosen +// to give ~100 s of headroom at 10 Hz publish — enough that a handler +// briefly stalling (GC pause, file I/O, etc.) doesn't cascade into +// silent drops. Drops past the cap are counted and logged with +// rate-limited eprintlns; see `TypedRoute::try_dispatch`. +const INPUT_CHANNEL_CAPACITY: usize = 1024; +const PUBLISH_CHANNEL_CAPACITY: usize = 1024; // Each input() call produces a TypedRoute that decodes its message type // and forwards it to the right Input's mpsc channel. @@ -22,15 +29,36 @@ struct TypedRoute { topic: String, decode: fn(&[u8]) -> io::Result, sender: mpsc::Sender, + /// Count of `try_send` Full errors on this route — the number of + /// messages silently dropped because the user's handler couldn't + /// keep up with publish rate. Logged in `try_dispatch` at + /// power-of-2 milestones so operators see "1 drop", "2 drops", + /// "4 drops"... without spamming once a stream goes badly wrong. + drops: AtomicU64, } impl Route for TypedRoute { fn try_dispatch(&self, data: &[u8]) { match (self.decode)(data) { - // If the input channel is full, the newest message is dropped. - Ok(msg) => { - let _ = self.sender.try_send(msg); - } + Ok(msg) => match self.sender.try_send(msg) { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full(_)) => { + let n = self.drops.fetch_add(1, Ordering::Relaxed) + 1; + // Power-of-2 rate limit: log at 1, 2, 4, 8, 16, ... + // First drop fires immediately so operators see the + // first warning, then frequency decays exponentially. + if n.is_power_of_two() { + eprintln!( + "dimos_module: input '{}' dropped {} message(s) — handler can't keep up with publish rate (queue cap = {})", + self.topic, n, INPUT_CHANNEL_CAPACITY, + ); + } + } + Err(mpsc::error::TrySendError::Closed(_)) => { + // Receiver dropped — user-side handler ended. Not + // a back-pressure issue; not logged at every call. + } + }, Err(e) => eprintln!("dimos_module: decode error on {}: {e}", self.topic), } } @@ -149,6 +177,7 @@ impl Builder { topic: topic.clone(), decode, sender: tx, + drops: AtomicU64::new(0), })); Input { topic, @@ -564,4 +593,79 @@ mod tests { fn ok_does_not_panic() { propagate_task_failure("recv", Ok(())); } + + // Back-pressure detection — when the per-input mpsc queue fills, + // try_dispatch must (a) drop the message and (b) increment the + // per-route `drops` counter so operators see "this handler can't + // keep up". Logging is rate-limited to power-of-2 milestones to + // avoid flooding stderr. + + fn make_route( + capacity: usize, + decode: fn(&[u8]) -> io::Result, + ) -> (TypedRoute, mpsc::Receiver) { + let (tx, rx) = mpsc::channel(capacity); + let route = TypedRoute { + topic: "/test".to_string(), + decode, + sender: tx, + drops: AtomicU64::new(0), + }; + (route, rx) + } + + #[test] + fn typed_route_drops_count_when_queue_full() { + // Capacity 4. Send 100 messages without reading any. The first 4 + // sit in the queue; the remaining 96 must be dropped, and the + // drops counter must reflect that exactly. + let (route, _rx) = make_route::>(4, |b| Ok(b.to_vec())); + for _ in 0..100 { + route.try_dispatch(&[1u8, 2, 3]); + } + let drops = route.drops.load(Ordering::Relaxed); + assert_eq!( + drops, 96, + "expected exactly 96 drops (100 sent, 4 queue capacity); got {drops}", + ); + } + + #[test] + fn typed_route_drops_counter_starts_at_zero() { + let (route, _rx) = make_route::>(8, |b| Ok(b.to_vec())); + assert_eq!(route.drops.load(Ordering::Relaxed), 0); + // First few sends fit in the queue and shouldn't increment drops. + for _ in 0..4 { + route.try_dispatch(&[1u8]); + } + assert_eq!(route.drops.load(Ordering::Relaxed), 0); + } + + #[test] + fn typed_route_does_not_drop_when_queue_has_headroom() { + // Sanity: if the queue capacity exceeds the number of messages + // dispatched, the drops counter must stay at zero. This guards + // against the rate-limited drop log firing on the happy path. + let (route, _rx) = make_route::>(2048, |b| Ok(b.to_vec())); + for _ in 0..1000 { + route.try_dispatch(&[7u8, 7, 7]); + } + assert_eq!(route.drops.load(Ordering::Relaxed), 0); + } + + #[test] + fn typed_route_rate_limited_log_fires_on_power_of_two() { + // The log spam threshold is `n.is_power_of_two()`. We can't easily + // capture stderr from the unit test, but we can verify the + // *count* of dropped messages traverses each power of 2, which is + // the trigger condition for the log. Capacity 1, dispatch 130. + let (route, _rx) = make_route::>(1, |b| Ok(b.to_vec())); + for _ in 0..130 { + route.try_dispatch(&[1u8]); + } + let drops = route.drops.load(Ordering::Relaxed); + // 130 sent, 1 queued, 129 dropped. 129 covers the milestones + // 1, 2, 4, 8, 16, 32, 64, 128 — operator would see 8 log lines. + assert_eq!(drops, 129); + } } From 2c4aee94c0e3fb4235ec92c69ae99e78b3b3614c Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Wed, 20 May 2026 13:31:19 -0700 Subject: [PATCH 2/4] Simplify comments --- native/rust/dimos-module/src/module.rs | 52 ++++---------------------- 1 file changed, 8 insertions(+), 44 deletions(-) diff --git a/native/rust/dimos-module/src/module.rs b/native/rust/dimos-module/src/module.rs index efc5ae935a..5893878a11 100644 --- a/native/rust/dimos-module/src/module.rs +++ b/native/rust/dimos-module/src/module.rs @@ -10,12 +10,6 @@ use serde::de::DeserializeOwned; use crate::transport::Transport; -// Per-input handler mpsc-queue capacity. When full, new messages are -// dropped via `try_send` in `TypedRoute::try_dispatch`. 1024 was chosen -// to give ~100 s of headroom at 10 Hz publish — enough that a handler -// briefly stalling (GC pause, file I/O, etc.) doesn't cascade into -// silent drops. Drops past the cap are counted and logged with -// rate-limited eprintlns; see `TypedRoute::try_dispatch`. const INPUT_CHANNEL_CAPACITY: usize = 1024; const PUBLISH_CHANNEL_CAPACITY: usize = 1024; @@ -29,11 +23,6 @@ struct TypedRoute { topic: String, decode: fn(&[u8]) -> io::Result, sender: mpsc::Sender, - /// Count of `try_send` Full errors on this route — the number of - /// messages silently dropped because the user's handler couldn't - /// keep up with publish rate. Logged in `try_dispatch` at - /// power-of-2 milestones so operators see "1 drop", "2 drops", - /// "4 drops"... without spamming once a stream goes badly wrong. drops: AtomicU64, } @@ -44,20 +33,15 @@ impl Route for TypedRoute { Ok(()) => {} Err(mpsc::error::TrySendError::Full(_)) => { let n = self.drops.fetch_add(1, Ordering::Relaxed) + 1; - // Power-of-2 rate limit: log at 1, 2, 4, 8, 16, ... - // First drop fires immediately so operators see the - // first warning, then frequency decays exponentially. + // Rate-limited: log at drop counts 1, 2, 4, 8, ... if n.is_power_of_two() { eprintln!( - "dimos_module: input '{}' dropped {} message(s) — handler can't keep up with publish rate (queue cap = {})", + "dimos_module: input '{}' dropped {} message(s) — handler can't keep up (queue cap = {})", self.topic, n, INPUT_CHANNEL_CAPACITY, ); } } - Err(mpsc::error::TrySendError::Closed(_)) => { - // Receiver dropped — user-side handler ended. Not - // a back-pressure issue; not logged at every call. - } + Err(mpsc::error::TrySendError::Closed(_)) => {} }, Err(e) => eprintln!("dimos_module: decode error on {}: {e}", self.topic), } @@ -594,11 +578,7 @@ mod tests { propagate_task_failure("recv", Ok(())); } - // Back-pressure detection — when the per-input mpsc queue fills, - // try_dispatch must (a) drop the message and (b) increment the - // per-route `drops` counter so operators see "this handler can't - // keep up". Logging is rate-limited to power-of-2 milestones to - // avoid flooding stderr. + // Back-pressure / drop counter fn make_route( capacity: usize, @@ -616,25 +596,17 @@ mod tests { #[test] fn typed_route_drops_count_when_queue_full() { - // Capacity 4. Send 100 messages without reading any. The first 4 - // sit in the queue; the remaining 96 must be dropped, and the - // drops counter must reflect that exactly. let (route, _rx) = make_route::>(4, |b| Ok(b.to_vec())); for _ in 0..100 { route.try_dispatch(&[1u8, 2, 3]); } - let drops = route.drops.load(Ordering::Relaxed); - assert_eq!( - drops, 96, - "expected exactly 96 drops (100 sent, 4 queue capacity); got {drops}", - ); + assert_eq!(route.drops.load(Ordering::Relaxed), 96); } #[test] fn typed_route_drops_counter_starts_at_zero() { let (route, _rx) = make_route::>(8, |b| Ok(b.to_vec())); assert_eq!(route.drops.load(Ordering::Relaxed), 0); - // First few sends fit in the queue and shouldn't increment drops. for _ in 0..4 { route.try_dispatch(&[1u8]); } @@ -643,9 +615,6 @@ mod tests { #[test] fn typed_route_does_not_drop_when_queue_has_headroom() { - // Sanity: if the queue capacity exceeds the number of messages - // dispatched, the drops counter must stay at zero. This guards - // against the rate-limited drop log firing on the happy path. let (route, _rx) = make_route::>(2048, |b| Ok(b.to_vec())); for _ in 0..1000 { route.try_dispatch(&[7u8, 7, 7]); @@ -655,17 +624,12 @@ mod tests { #[test] fn typed_route_rate_limited_log_fires_on_power_of_two() { - // The log spam threshold is `n.is_power_of_two()`. We can't easily - // capture stderr from the unit test, but we can verify the - // *count* of dropped messages traverses each power of 2, which is - // the trigger condition for the log. Capacity 1, dispatch 130. + // 130 sent, 1 queued, 129 dropped — crosses milestones at + // 1, 2, 4, 8, 16, 32, 64, 128 (8 log lines). let (route, _rx) = make_route::>(1, |b| Ok(b.to_vec())); for _ in 0..130 { route.try_dispatch(&[1u8]); } - let drops = route.drops.load(Ordering::Relaxed); - // 130 sent, 1 queued, 129 dropped. 129 covers the milestones - // 1, 2, 4, 8, 16, 32, 64, 128 — operator would see 8 log lines. - assert_eq!(drops, 129); + assert_eq!(route.drops.load(Ordering::Relaxed), 129); } } From 0af0f81059ab5d4e77cec9ccedc12f98c3366326 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Wed, 20 May 2026 14:39:17 -0700 Subject: [PATCH 3/4] dimos-module: throttle drop-warning log to MAX_ERROR_LOG_RATE/sec Replaces the power-of-2 milestone trigger with a wall-clock throttle. New constant MAX_ERROR_LOG_RATE caps drop-warning log lines per route at N/sec (currently 1). The drops counter still increments on every dropped message; only the stderr line is throttled. Implementation: last_log: Mutex> on TypedRoute. First drop after a quiet period fires immediately; subsequent drops within 1/MAX_ERROR_LOG_RATE seconds are silent. 22/22 dimos-module tests pass. --- native/rust/dimos-module/src/module.rs | 33 +++++++++++++++++++++----- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/native/rust/dimos-module/src/module.rs b/native/rust/dimos-module/src/module.rs index 5893878a11..d55da25c55 100644 --- a/native/rust/dimos-module/src/module.rs +++ b/native/rust/dimos-module/src/module.rs @@ -2,7 +2,8 @@ use std::collections::HashMap; use std::fmt::Debug; use std::io; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::sync::mpsc; @@ -12,6 +13,8 @@ use crate::transport::Transport; const INPUT_CHANNEL_CAPACITY: usize = 1024; const PUBLISH_CHANNEL_CAPACITY: usize = 1024; +/// Maximum drop-warning log lines per second per route. +const MAX_ERROR_LOG_RATE: u32 = 1; // Each input() call produces a TypedRoute that decodes its message type // and forwards it to the right Input's mpsc channel. @@ -24,6 +27,7 @@ struct TypedRoute { decode: fn(&[u8]) -> io::Result, sender: mpsc::Sender, drops: AtomicU64, + last_log: Mutex>, } impl Route for TypedRoute { @@ -33,8 +37,14 @@ impl Route for TypedRoute { Ok(()) => {} Err(mpsc::error::TrySendError::Full(_)) => { let n = self.drops.fetch_add(1, Ordering::Relaxed) + 1; - // Rate-limited: log at drop counts 1, 2, 4, 8, ... - if n.is_power_of_two() { + let interval = Duration::from_secs(1) / MAX_ERROR_LOG_RATE; + let mut last = self.last_log.lock().unwrap(); + let now = Instant::now(); + if last + .map(|t| now.duration_since(t) >= interval) + .unwrap_or(true) + { + *last = Some(now); eprintln!( "dimos_module: input '{}' dropped {} message(s) — handler can't keep up (queue cap = {})", self.topic, n, INPUT_CHANNEL_CAPACITY, @@ -162,6 +172,7 @@ impl Builder { decode, sender: tx, drops: AtomicU64::new(0), + last_log: Mutex::new(None), })); Input { topic, @@ -590,6 +601,7 @@ mod tests { decode, sender: tx, drops: AtomicU64::new(0), + last_log: Mutex::new(None), }; (route, rx) } @@ -623,13 +635,22 @@ mod tests { } #[test] - fn typed_route_rate_limited_log_fires_on_power_of_two() { - // 130 sent, 1 queued, 129 dropped — crosses milestones at - // 1, 2, 4, 8, 16, 32, 64, 128 (8 log lines). + fn typed_route_log_throttle_skips_within_interval() { + // Dispatch many drops back-to-back. All must increment the + // counter, but `last_log` is set exactly once (the first drop) + // because the entire loop runs inside one MAX_ERROR_LOG_RATE + // window. let (route, _rx) = make_route::>(1, |b| Ok(b.to_vec())); + let interval = Duration::from_secs(1) / MAX_ERROR_LOG_RATE; + let before = Instant::now(); for _ in 0..130 { route.try_dispatch(&[1u8]); } + assert!( + before.elapsed() < interval, + "test invalid: loop must complete within one log-throttle window", + ); assert_eq!(route.drops.load(Ordering::Relaxed), 129); + assert!(route.last_log.lock().unwrap().is_some()); } } From 625e0d9275dd9199f5659e0b874e97a4cd8b7812 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Wed, 20 May 2026 14:42:44 -0700 Subject: [PATCH 4/4] =?UTF-8?q?dimos-module:=20simplify=20tests=20+=20rena?= =?UTF-8?q?me=20drops=20=E2=86=92=20drop=5Fcount?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reduced the back-pressure test suite to a single test (typed_route_logs_error_on_drop) that fills the queue, forces one drop, and verifies (a) the counter increments and (b) last_log is set (proxying for "log fired"). Dropped the three other tests as redundant with the single-message case. Renamed the field `drops` → `drop_count` to read more naturally. 19/19 dimos-module tests pass. --- native/rust/dimos-module/src/module.rs | 73 +++++--------------------- 1 file changed, 14 insertions(+), 59 deletions(-) diff --git a/native/rust/dimos-module/src/module.rs b/native/rust/dimos-module/src/module.rs index d55da25c55..6a71008c7f 100644 --- a/native/rust/dimos-module/src/module.rs +++ b/native/rust/dimos-module/src/module.rs @@ -26,7 +26,7 @@ struct TypedRoute { topic: String, decode: fn(&[u8]) -> io::Result, sender: mpsc::Sender, - drops: AtomicU64, + drop_count: AtomicU64, last_log: Mutex>, } @@ -36,7 +36,7 @@ impl Route for TypedRoute { Ok(msg) => match self.sender.try_send(msg) { Ok(()) => {} Err(mpsc::error::TrySendError::Full(_)) => { - let n = self.drops.fetch_add(1, Ordering::Relaxed) + 1; + let n = self.drop_count.fetch_add(1, Ordering::Relaxed) + 1; let interval = Duration::from_secs(1) / MAX_ERROR_LOG_RATE; let mut last = self.last_log.lock().unwrap(); let now = Instant::now(); @@ -171,7 +171,7 @@ impl Builder { topic: topic.clone(), decode, sender: tx, - drops: AtomicU64::new(0), + drop_count: AtomicU64::new(0), last_log: Mutex::new(None), })); Input { @@ -589,68 +589,23 @@ mod tests { propagate_task_failure("recv", Ok(())); } - // Back-pressure / drop counter - - fn make_route( - capacity: usize, - decode: fn(&[u8]) -> io::Result, - ) -> (TypedRoute, mpsc::Receiver) { - let (tx, rx) = mpsc::channel(capacity); + #[test] + fn typed_route_logs_error_on_drop() { + let (tx, _rx) = mpsc::channel::>(1); let route = TypedRoute { topic: "/test".to_string(), - decode, + decode: |b| Ok(b.to_vec()), sender: tx, - drops: AtomicU64::new(0), + drop_count: AtomicU64::new(0), last_log: Mutex::new(None), }; - (route, rx) - } - - #[test] - fn typed_route_drops_count_when_queue_full() { - let (route, _rx) = make_route::>(4, |b| Ok(b.to_vec())); - for _ in 0..100 { - route.try_dispatch(&[1u8, 2, 3]); - } - assert_eq!(route.drops.load(Ordering::Relaxed), 96); - } - - #[test] - fn typed_route_drops_counter_starts_at_zero() { - let (route, _rx) = make_route::>(8, |b| Ok(b.to_vec())); - assert_eq!(route.drops.load(Ordering::Relaxed), 0); - for _ in 0..4 { - route.try_dispatch(&[1u8]); - } - assert_eq!(route.drops.load(Ordering::Relaxed), 0); - } - - #[test] - fn typed_route_does_not_drop_when_queue_has_headroom() { - let (route, _rx) = make_route::>(2048, |b| Ok(b.to_vec())); - for _ in 0..1000 { - route.try_dispatch(&[7u8, 7, 7]); - } - assert_eq!(route.drops.load(Ordering::Relaxed), 0); - } - - #[test] - fn typed_route_log_throttle_skips_within_interval() { - // Dispatch many drops back-to-back. All must increment the - // counter, but `last_log` is set exactly once (the first drop) - // because the entire loop runs inside one MAX_ERROR_LOG_RATE - // window. - let (route, _rx) = make_route::>(1, |b| Ok(b.to_vec())); - let interval = Duration::from_secs(1) / MAX_ERROR_LOG_RATE; - let before = Instant::now(); - for _ in 0..130 { - route.try_dispatch(&[1u8]); - } + // Fill the queue, then force a drop. + route.try_dispatch(&[1u8]); + route.try_dispatch(&[1u8]); + assert_eq!(route.drop_count.load(Ordering::Relaxed), 1); assert!( - before.elapsed() < interval, - "test invalid: loop must complete within one log-throttle window", + route.last_log.lock().unwrap().is_some(), + "drop must trigger a log", ); - assert_eq!(route.drops.load(Ordering::Relaxed), 129); - assert!(route.last_log.lock().unwrap().is_some()); } }