Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions src/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::{
collections::HashMap,
ffi::OsStr,
path::Path,
sync::atomic::{AtomicU64, Ordering},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};

Expand Down Expand Up @@ -990,6 +992,83 @@ pub(crate) fn is_auto_suggestion(output: &str) -> bool {
has_cursor_ghost || has_send_hint
}

/// Case-insensitive comparison for agent names.
///
/// Agent names may have inconsistent casing across registration, WebSocket
/// events, and API responses. Centralising the comparison here prevents
/// recurrences of the case-sensitivity routing bugs seen in commits 64bcb2f7
/// and PR #641.
pub(crate) fn agent_name_eq(a: &str, b: &str) -> bool {
a.eq_ignore_ascii_case(b)
}

/// Check whether *any* of the `self_names` match `name` (case-insensitive).
pub(crate) fn is_self_name<'a, I>(self_names: I, name: &str) -> bool
where
I: IntoIterator<Item = &'a String>,
{
self_names.into_iter().any(|n| agent_name_eq(n, name))
}

static DM_DROPS_TOTAL: AtomicU64 = AtomicU64::new(0);

/// Return the total number of DMs silently dropped due to participant
/// resolution failures. Useful for metrics / incident detection.
pub(crate) fn dm_drops_total() -> u64 {
DM_DROPS_TOTAL.load(Ordering::Relaxed)
}

pub(crate) const DM_PARTICIPANT_CACHE_TTL: Duration = Duration::from_secs(30);
const MAX_DM_CACHE_ENTRIES: usize = 8192;

pub(crate) async fn resolve_dm_participants_cached(
http: &relay_broker::relaycast_ws::RelaycastHttpClient,
cache: &mut HashMap<String, (Instant, Vec<String>)>,
workspace_id: &str,
conversation_id: &str,
) -> Vec<String> {
let workspace_id = workspace_id.trim();
let conversation_id = conversation_id.trim();
if conversation_id.is_empty() {
return vec![];
}
let cache_key = format!("{workspace_id}:{conversation_id}");

if let Some((fetched_at, participants)) = cache.get(&cache_key) {
if fetched_at.elapsed() < DM_PARTICIPANT_CACHE_TTL {
return participants.clone();
}
}

match http.get_dm_participants(conversation_id).await {
Ok(fetched) => {
let fetched: Vec<String> = fetched;
if cache.len() >= MAX_DM_CACHE_ENTRIES {
if let Some(oldest_key) = cache
.iter()
.min_by_key(|(_, (ts, _))| *ts)
.map(|(k, _)| k.clone())
{
cache.remove(&oldest_key);
}
}
cache.insert(cache_key, (Instant::now(), fetched.clone()));
fetched
}
Err(error) => {
DM_DROPS_TOTAL.fetch_add(1, Ordering::Relaxed);
tracing::warn!(
workspace_id = %workspace_id,
conversation_id = %conversation_id,
error = %error,
dm_drops_total = DM_DROPS_TOTAL.load(Ordering::Relaxed),
"failed resolving DM participants — DM silently dropped"
);
vec![]
}
Comment on lines +1058 to +1068
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Error results from DM participant resolution are no longer cached, enabling API call storms under sustained failures

The old resolve_dm_participants in src/main.rs:5256-5270 unconditionally cached the result (including vec![] from unwrap_or_else on error) for the 30-second TTL. The new resolve_dm_participants_cached only caches on the Ok path (src/helpers.rs:1055) and skips caching entirely on Err (src/helpers.rs:1058-1068). This means that when the DM participants API is failing (network issue, outage, etc.), every incoming DM for the same conversation-ID will trigger a fresh API call instead of being throttled by the 30-second TTL. Under sustained failures with many DMs, this can multiply failed API requests significantly compared to the old behaviour, which capped retries at roughly one per TTL window per conversation.

Suggested change
Err(error) => {
DM_DROPS_TOTAL.fetch_add(1, Ordering::Relaxed);
tracing::warn!(
workspace_id = %workspace_id,
conversation_id = %conversation_id,
error = %error,
dm_drops_total = DM_DROPS_TOTAL.load(Ordering::Relaxed),
"failed resolving DM participants — DM silently dropped"
);
vec![]
}
Err(error) => {
DM_DROPS_TOTAL.fetch_add(1, Ordering::Relaxed);
tracing::warn!(
workspace_id = %workspace_id,
conversation_id = %conversation_id,
error = %error,
dm_drops_total = DM_DROPS_TOTAL.load(Ordering::Relaxed),
"failed resolving DM participants — DM silently dropped"
);
let empty: Vec<String> = vec![];
cache.insert(cache_key, (Instant::now(), empty.clone()));
empty
}
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -1867,4 +1946,48 @@ mod tests {
assert_eq!(normalize_cli_name("/usr/local/bin/claude"), "claude");
assert_eq!(normalize_cli_name("codex"), "codex");
}

// ==================== agent_name_eq / is_self_name tests ====================

#[test]
fn agent_name_eq_case_insensitive() {
assert!(agent_name_eq("Alice", "alice"));
assert!(agent_name_eq("alice", "ALICE"));
assert!(agent_name_eq("Worker-1", "worker-1"));
assert!(!agent_name_eq("Alice", "Bob"));
}

#[test]
fn agent_name_eq_empty_strings() {
assert!(agent_name_eq("", ""));
assert!(!agent_name_eq("", "Alice"));
}

#[test]
fn is_self_name_matches_any() {
let names = vec!["Alice".to_string(), "alice-dev".to_string()];
assert!(is_self_name(&names, "alice"));
assert!(is_self_name(&names, "ALICE"));
assert!(is_self_name(&names, "Alice-Dev"));
assert!(!is_self_name(&names, "Bob"));
}

#[test]
fn is_self_name_empty_list() {
let names: Vec<String> = vec![];
assert!(!is_self_name(&names, "Alice"));
}

// ==================== DM participant cache tests ====================

#[test]
fn dm_cache_ttl_constant_is_reasonable() {
assert!(DM_PARTICIPANT_CACHE_TTL.as_secs() > 0);
assert!(DM_PARTICIPANT_CACHE_TTL.as_secs() <= 300);
}

#[test]
fn dm_cache_eviction_cap_is_set() {
assert_eq!(MAX_DM_CACHE_ENTRIES, 8192);
}
}
56 changes: 9 additions & 47 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ mod swarm_tui;
mod wrap;

use helpers::{
detect_bypass_permissions_prompt, detect_claude_trust_prompt, detect_codex_model_prompt,
detect_gemini_action_required, detect_gemini_trust_prompt, detect_gemini_untrusted_banner,
detect_opencode_permission_prompt, floor_char_boundary, is_auto_suggestion,
is_bypass_selection_menu, is_in_editor_mode, normalize_cli_name, parse_cli_command, strip_ansi,
TerminalQueryParser,
agent_name_eq, detect_bypass_permissions_prompt, detect_claude_trust_prompt,
detect_codex_model_prompt, detect_gemini_action_required, detect_gemini_trust_prompt,
detect_gemini_untrusted_banner, detect_opencode_permission_prompt, floor_char_boundary,
is_auto_suggestion, is_bypass_selection_menu, is_in_editor_mode, is_self_name,
normalize_cli_name, parse_cli_command, strip_ansi, TerminalQueryParser,
};
use listen_api::{broadcast_if_relevant, listen_api_router, ListenApiRequest};
use routing::display_target_for_dashboard;
Expand Down Expand Up @@ -64,7 +64,7 @@ use spawner::{spawn_env_vars, terminate_child, Spawner};
const DEFAULT_DELIVERY_RETRY_MS: u64 = 1_000;
const MAX_DELIVERY_RETRIES: u32 = 10;
const DEFAULT_RELAYCAST_BASE_URL: &str = "https://api.relaycast.dev";
const DM_PARTICIPANT_CACHE_TTL: Duration = Duration::from_secs(30);
use helpers::resolve_dm_participants_cached;
const THREAD_HISTORY_LIMIT: usize = 1_000;
const DEFAULT_HTTP_API_LOCAL_DELIVERY_TIMEOUT_MS: u64 = 3_000;
const DEFAULT_HTTP_API_RELAYCAST_SEND_TIMEOUT_MS: u64 = 20_000;
Expand Down Expand Up @@ -2963,7 +2963,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> {
if delivery_plan.needs_dm_resolution {
let conversation_id = mapped.target.clone();
tracing::info!(conversation_id = %conversation_id, "resolving DM participants");
let participants = resolve_dm_participants(
let participants = resolve_dm_participants_cached(
&workspace_http,
&mut dm_participants_cache,
&workspace_id,
Expand All @@ -2974,7 +2974,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> {

if let Some(participant) = participants
.iter()
.find(|participant| !participant.eq_ignore_ascii_case(&mapped.from))
.find(|participant| !agent_name_eq(participant, &mapped.from))
{
delivery_plan.display_target = participant.clone();
}
Expand Down Expand Up @@ -3003,9 +3003,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> {

let display_target =
display_target_for_dashboard(&delivery_plan.display_target, &workspace_self_names, &workspace_self_name);
let display_from = if workspace_self_names
.iter()
.any(|name| mapped.from.eq_ignore_ascii_case(name))
let display_from = if is_self_name(&workspace_self_names, &mapped.from)
{
workspace_self_name.clone()
} else {
Expand Down Expand Up @@ -5234,42 +5232,6 @@ async fn retry_pending_delivery(
}
}

async fn resolve_dm_participants(
relaycast_http: &RelaycastHttpClient,
dm_participants_cache: &mut HashMap<String, (Instant, Vec<String>)>,
workspace_id: &str,
conversation_id: &str,
) -> Vec<String> {
let workspace_id = workspace_id.trim();
let conversation_id = conversation_id.trim();
if conversation_id.is_empty() {
return vec![];
}
let cache_key = format!("{workspace_id}:{conversation_id}");

if let Some((fetched_at, participants)) = dm_participants_cache.get(&cache_key) {
if fetched_at.elapsed() < DM_PARTICIPANT_CACHE_TTL {
return participants.clone();
}
}

let fetched = relaycast_http
.get_dm_participants(conversation_id)
.await
.unwrap_or_else(|error| {
tracing::debug!(
workspace_id = %workspace_id,
conversation_id = %conversation_id,
error = %error,
"failed resolving DM participants"
);
vec![]
});

dm_participants_cache.insert(cache_key, (Instant::now(), fetched.clone()));
fetched
}

fn drop_pending_for_worker(
pending_deliveries: &mut HashMap<String, PendingDelivery>,
worker_name: &str,
Expand Down
61 changes: 47 additions & 14 deletions src/wrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::time::{Duration, Instant};

use super::*;
use crate::helpers::{
check_echo_in_output, floor_char_boundary, format_injection_for_worker_with_workspace,
agent_name_eq, check_echo_in_output, floor_char_boundary,
format_injection_for_worker_with_workspace, is_self_name, resolve_dm_participants_cached,
ActivityDetector, DeliveryOutcome, PendingActivity, PendingVerification, ThrottleState,
ACTIVITY_BUFFER_KEEP_BYTES, ACTIVITY_BUFFER_MAX_BYTES, ACTIVITY_WINDOW,
MAX_VERIFICATION_ATTEMPTS, VERIFICATION_WINDOW,
Expand Down Expand Up @@ -668,6 +669,7 @@ pub(crate) async fn run_wrap(

// Dedup for WS events
let mut dedup = DedupCache::new(Duration::from_secs(300), 8192);
let mut dm_participants_cache: HashMap<String, (Instant, Vec<String>)> = HashMap::new();

// Buffer for extracting message IDs from MCP tool responses in PTY output.
// When the agent sends messages via MCP, the response contains the message ID.
Expand Down Expand Up @@ -1016,16 +1018,27 @@ pub(crate) async fn run_wrap(
&workspace_id,
workspace_alias.as_deref(),
) {
// Skip presence and reaction events — they carry no content
// to inject and cause agents to respond to empty messages.
if matches!(mapped.kind, InboundKind::Presence | InboundKind::ReactionReceived) {
tracing::debug!(
kind = ?mapped.kind,
from = %mapped.from,
"skipping non-message event in wrap mode"
);
continue;
}

let dedup_key = format!("{}:{}", mapped.workspace_id, mapped.event_id);
if !dedup.insert_if_new(&dedup_key, Instant::now()) {
tracing::debug!(event_id = %mapped.event_id, workspace_id = %mapped.workspace_id, "dedup: skipping relay event");
continue;
}
if workspace_self_names.contains(&mapped.from)
if is_self_name(&workspace_self_names, &mapped.from)
|| mapped
.sender_agent_id
.as_ref()
.is_some_and(|id| workspace_self_agent_ids.contains(id))
.is_some_and(|id| workspace_self_agent_ids.iter().any(|self_id| agent_name_eq(self_id, id)))
{
tracing::debug!(
from = %mapped.from,
Expand All @@ -1037,21 +1050,41 @@ pub(crate) async fn run_wrap(

// DM routing: only deliver DMs addressed to this agent.
// Channel messages (target starts with '#') are broadcast
// to all subscribers. Allow through: empty targets (presence),
// thread replies, conversation_id fallbacks.
// to all subscribers. Allow through: empty targets (presence)
// and thread replies.
if !mapped.target.is_empty()
&& !mapped.target.starts_with('#')
&& mapped.target != "thread"
&& !mapped.target.starts_with("dm_")
&& !mapped.target.starts_with("conv_")
&& !workspace_self_names.contains(&mapped.target)
{
tracing::debug!(
target = %mapped.target,
self_names = ?workspace_self_names,
"skipping DM not addressed to this agent"
);
continue;
if mapped.target.starts_with("dm_") || mapped.target.starts_with("conv_") {
// Conversation-ID target: resolve participants to check
// if this wrapped agent is part of the DM.
let participants = resolve_dm_participants_cached(
&workspace_child_http,
&mut dm_participants_cache,
&workspace_id,
&mapped.target,
).await;
let is_participant = workspace_self_names.iter().any(|name| {
participants.iter().any(|p| agent_name_eq(p, name))
});
if !is_participant {
tracing::debug!(
target = %mapped.target,
participants = ?participants,
self_names = ?workspace_self_names,
"skipping DM — agent not in participants"
);
continue;
}
} else if !is_self_name(&workspace_self_names, &mapped.target) {
tracing::debug!(
target = %mapped.target,
self_names = ?workspace_self_names,
"skipping DM not addressed to this agent"
);
continue;
}
}

let delivery_id = format!("wrap_{}", mapped.event_id);
Expand Down
Loading