diff --git a/.trajectories/completed/2026-05/traj_zqwco4gl76g3.json b/.trajectories/completed/2026-05/traj_zqwco4gl76g3.json new file mode 100644 index 000000000..93898c261 --- /dev/null +++ b/.trajectories/completed/2026-05/traj_zqwco4gl76g3.json @@ -0,0 +1,65 @@ +{ + "id": "traj_zqwco4gl76g3", + "version": 1, + "task": { + "title": "Fix issue 878" + }, + "status": "completed", + "startedAt": "2026-05-19T04:18:25.024Z", + "completedAt": "2026-05-19T04:27:18.903Z", + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-05-19T04:22:25.398Z" + } + ], + "chapters": [ + { + "id": "chap_5yldelefew3p", + "title": "Work", + "agentName": "default", + "startedAt": "2026-05-19T04:22:25.398Z", + "endedAt": "2026-05-19T04:27:18.903Z", + "events": [ + { + "ts": 1779164545398, + "type": "decision", + "content": "Route broker binary through library entry point: Route broker binary through library entry point", + "raw": { + "question": "Route broker binary through library entry point", + "chosen": "Route broker binary through library entry point", + "alternatives": [], + "reasoning": "Moving main.rs to call relay_broker::run_cli lets implementation modules live in the library crate and become pub(crate) instead of public Rust API while preserving the binary behavior." + }, + "significance": "high" + }, + { + "ts": 1779164631929, + "type": "decision", + "content": "Keep protocol and snippets as public Rust modules: Keep protocol and snippets as public Rust modules", + "raw": { + "question": "Keep protocol and snippets as public Rust modules", + "chosen": "Keep protocol and snippets as public Rust modules", + "alternatives": [], + "reasoning": "Current external Rust references only need snippets and the wire protocol remains the crate's intentional stable surface; broker runtime, relaycast plumbing, PTY, scheduling, metrics, and worker internals can be crate-private." + }, + "significance": "high" + } + ] + } + ], + "retrospective": { + "summary": "Narrowed the broker Rust crate public API to protocol/snippets/run_cli, moved the binary entry through the library, colocated broker and worker tests, documented the Rust API break, and verified cargo fmt, cargo clippy -D warnings, and cargo test --release.", + "approach": "Standard approach", + "confidence": 0.9 + }, + "commits": [], + "filesChanged": [], + "projectId": "/Users/will/Projects/AgentWorkforce/relay", + "tags": [], + "_trace": { + "startRef": "c54e118806e98b9defe0e0b5022c35be8e64a52f", + "endRef": "c54e118806e98b9defe0e0b5022c35be8e64a52f" + } +} diff --git a/.trajectories/completed/2026-05/traj_zqwco4gl76g3.md b/.trajectories/completed/2026-05/traj_zqwco4gl76g3.md new file mode 100644 index 000000000..a405b7faa --- /dev/null +++ b/.trajectories/completed/2026-05/traj_zqwco4gl76g3.md @@ -0,0 +1,39 @@ +# Trajectory: Fix issue 878 + +> **Status:** ✅ Completed +> **Confidence:** 90% +> **Started:** May 19, 2026 at 12:18 AM +> **Completed:** May 19, 2026 at 12:27 AM + +--- + +## Summary + +Narrowed the broker Rust crate public API to protocol/snippets/run_cli, moved the binary entry through the library, colocated broker and worker tests, documented the Rust API break, and verified cargo fmt, cargo clippy -D warnings, and cargo test --release. + +**Approach:** Standard approach + +--- + +## Key Decisions + +### Route broker binary through library entry point + +- **Chose:** Route broker binary through library entry point +- **Reasoning:** Moving main.rs to call relay_broker::run_cli lets implementation modules live in the library crate and become pub(crate) instead of public Rust API while preserving the binary behavior. + +### Keep protocol and snippets as public Rust modules + +- **Chose:** Keep protocol and snippets as public Rust modules +- **Reasoning:** Current external Rust references only need snippets and the wire protocol remains the crate's intentional stable surface; broker runtime, relaycast plumbing, PTY, scheduling, metrics, and worker internals can be crate-private. + +--- + +## Chapters + +### 1. Work + +_Agent: default_ + +- Route broker binary through library entry point: Route broker binary through library entry point +- Keep protocol and snippets as public Rust modules: Keep protocol and snippets as public Rust modules diff --git a/.trajectories/index.json b/.trajectories/index.json index 8aba14657..1288b5493 100644 --- a/.trajectories/index.json +++ b/.trajectories/index.json @@ -1,6 +1,6 @@ { "version": 1, - "lastUpdated": "2026-05-19T03:54:07.051Z", + "lastUpdated": "2026-05-19T04:27:19.071Z", "trajectories": { "traj_05xg7j388bc4": { "title": "Add browser workflow step integration", @@ -995,6 +995,13 @@ "startedAt": "2026-05-19T03:40:40.798Z", "completedAt": "2026-05-19T03:54:06.889Z", "path": "/Users/will/Projects/AgentWorkforce/relay/.trajectories/completed/2026-05/traj_5qbla7w4kzoi.json" + }, + "traj_zqwco4gl76g3": { + "title": "Fix issue 878", + "status": "completed", + "startedAt": "2026-05-19T04:18:25.024Z", + "completedAt": "2026-05-19T04:27:18.903Z", + "path": "/Users/will/Projects/AgentWorkforce/relay/.trajectories/completed/2026-05/traj_zqwco4gl76g3.json" } } } diff --git a/CHANGELOG.md b/CHANGELOG.md index 3263ed402..6e04faa34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `relay.spawn({ task })` now returns `success: false` and terminates the agent when task delivery fails after retries. - `agent-relay send` now uses the orchestrator identity by default so `agent-relay replies ` can correlate worker DMs. +- The `relay_broker` Rust crate now exposes only `protocol`, `snippets`, and `run_cli`; broker implementation modules are crate-private. ### Migration Guidance @@ -69,15 +70,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [6.2.2] - 2026-05-18 ### Technical Perspective + #### Architecture & API Changes + - Share interactive-attach prep helpers via attach.ts - Split runDriveSession to drop below complexity 15 (#897) #### Dependencies & Tooling + - Align trajectory title with retrospective scope - Sanitize absolute paths in metadata (#899) #### Releases + - v6.2.2 --- @@ -85,7 +90,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [6.2.1] - 2026-05-18 ### Technical Perspective + #### Releases + - v6.2.1 --- @@ -93,13 +100,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [6.2.0] - 2026-05-18 ### Product Perspective + #### User-Facing Features & Improvements + - **`new` / `relay` / `run` / `rm` verbs + `-n` silent alias (#864 sub-4)** (#864) - **`agent-relay drive ` interactive take-over client (#864 sub-3)** (#864) - **Per-agent session mode + pending-queue routes (#864 sub-2)** (#864) - **`agent-relay view ` read-only PTY stream client (#864 sub-1)** (#864) #### User-Impacting Fixes + - Defer spawn-and-attach import until --attach is set - Surface drainer write failures from pty write_all - Resolve #800 — broker: composable wait-conditions for CLI readiness (steal from ht) (#800) @@ -107,13 +117,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Resolve #802 — broker: add VT grid via alacritty_terminal (steal from ht, don't use libghostty) (#802) ### Technical Perspective + #### Architecture & API Changes + - Rename session-mode `relay` → `passthrough` across all surfaces - `new` takes positional NAME (drop `-n` flag) + scrub PR refs (#864) - Drop `run` verb, fold spawn-and-attach into `new --attach` (#889) - Unify worker request/response correlation (#871) (#871) #### Performance & Reliability + - Assert X-API-Key on every broker request - Actually assert on the API-key header in the harness - Cover drainer flush failure ack propagation @@ -122,6 +135,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add stale preview environment cleanup #### Dependencies & Tooling + - Drop PR references and legacy framing from code comments (#864) - Record `run` -> `new --attach` refactor decision - Record decisions for sub-PR 4 (#864) (#864) @@ -130,6 +144,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Inject PostHog key at build time (P0.5 of #881) (#881) #### Releases + - v6.2.0 --- diff --git a/crates/broker/src/broker.rs b/crates/broker/src/broker.rs index 7f75583f7..20e502612 100644 --- a/crates/broker/src/broker.rs +++ b/crates/broker/src/broker.rs @@ -1,10 +1,10 @@ use std::{collections::HashMap, io::Write, path::Path}; -use anyhow::{Context, Result}; -use relay_broker::{ +use crate::{ protocol::{AgentRuntime, AgentSpec}, supervisor::RestartPolicy, }; +use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; pub(crate) mod continuity; @@ -108,3 +108,86 @@ impl BrokerState { dead } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::protocol::AgentRuntime; + + #[test] + fn broker_state_default_is_empty() { + let state = BrokerState::default(); + assert!(state.agents.is_empty()); + } + + #[test] + fn broker_state_save_and_load_roundtrip() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("state.json"); + let mut state = BrokerState::default(); + state.agents.insert( + "w1".into(), + PersistedAgent { + runtime: AgentRuntime::Pty, + parent: None, + channels: vec![], + pid: Some(1), + started_at: None, + spec: None, + restart_policy: None, + initial_task: None, + }, + ); + state.save(&path).unwrap(); + let loaded = BrokerState::load(&path).unwrap(); + assert_eq!(loaded.agents.len(), 1); + assert!(loaded.agents.contains_key("w1")); + } + + #[test] + fn broker_state_load_missing_file_errors() { + let result = BrokerState::load(Path::new("/nonexistent/state.json")); + assert!(result.is_err()); + } + + #[test] + fn reap_dead_agents_removes_stale_no_pid() { + let mut state = BrokerState::default(); + state.agents.insert( + "ghost".into(), + PersistedAgent { + runtime: AgentRuntime::Pty, + parent: None, + channels: vec![], + pid: None, + started_at: None, + spec: None, + restart_policy: None, + initial_task: None, + }, + ); + let reaped = state.reap_dead_agents(); + assert_eq!(reaped, vec!["ghost"]); + assert!(state.agents.is_empty()); + } + + #[test] + fn reap_dead_agents_keeps_live_processes() { + let mut state = BrokerState::default(); + state.agents.insert( + "alive".into(), + PersistedAgent { + runtime: AgentRuntime::Pty, + parent: None, + channels: vec![], + pid: Some(std::process::id()), + started_at: None, + spec: None, + restart_policy: None, + initial_task: None, + }, + ); + assert!(state.reap_dead_agents().is_empty()); + assert_eq!(state.agents.len(), 1); + } +} diff --git a/crates/broker/src/broker_tests.rs b/crates/broker/src/broker_tests.rs deleted file mode 100644 index 0e37ef427..000000000 --- a/crates/broker/src/broker_tests.rs +++ /dev/null @@ -1,94 +0,0 @@ -//! Tests for broker.rs module. -//! -//! broker.rs public API: -//! - BrokerState::default() -> Self (empty agents HashMap) -//! - BrokerState::load(path: &Path) -> Result -//! - BrokerState::save(path: &Path) -> Result<()> -//! - BrokerState::reap_dead_agents(&mut self) -> Vec - -#[cfg(test)] -mod tests { - use std::path::Path; - - use crate::broker::{BrokerState, PersistedAgent}; - use relay_broker::protocol::AgentRuntime; - - #[test] - fn broker_state_default_is_empty() { - let state = BrokerState::default(); - assert!(state.agents.is_empty()); - } - - #[test] - fn broker_state_save_and_load_roundtrip() { - let dir = tempfile::tempdir().unwrap(); - let path = dir.path().join("state.json"); - let mut state = BrokerState::default(); - state.agents.insert( - "w1".into(), - PersistedAgent { - runtime: AgentRuntime::Pty, - parent: None, - channels: vec![], - pid: Some(1), - started_at: None, - spec: None, - restart_policy: None, - initial_task: None, - }, - ); - state.save(&path).unwrap(); - let loaded = BrokerState::load(&path).unwrap(); - assert_eq!(loaded.agents.len(), 1); - assert!(loaded.agents.contains_key("w1")); - } - - #[test] - fn broker_state_load_missing_file_errors() { - let result = BrokerState::load(Path::new("/nonexistent/state.json")); - assert!(result.is_err()); - } - - #[test] - fn reap_dead_agents_removes_stale_no_pid() { - // Agents with pid=None are stale → reap removes them - let mut state = BrokerState::default(); - state.agents.insert( - "ghost".into(), - PersistedAgent { - runtime: AgentRuntime::Pty, - parent: None, - channels: vec![], - pid: None, - started_at: None, - spec: None, - restart_policy: None, - initial_task: None, - }, - ); - let reaped = state.reap_dead_agents(); - assert_eq!(reaped, vec!["ghost"]); - assert!(state.agents.is_empty()); - } - - #[test] - fn reap_dead_agents_keeps_live_processes() { - // Agents with pid=Some(current_pid) survive reap - let mut state = BrokerState::default(); - state.agents.insert( - "alive".into(), - PersistedAgent { - runtime: AgentRuntime::Pty, - parent: None, - channels: vec![], - pid: Some(std::process::id()), - started_at: None, - spec: None, - restart_policy: None, - initial_task: None, - }, - ); - assert!(state.reap_dead_agents().is_empty()); - assert_eq!(state.agents.len(), 1); - } -} diff --git a/crates/broker/src/cli/mod.rs b/crates/broker/src/cli/mod.rs index 92e39089e..eb7002909 100644 --- a/crates/broker/src/cli/mod.rs +++ b/crates/broker/src/cli/mod.rs @@ -1,11 +1,11 @@ use std::path::PathBuf; -use anyhow::Result; -use clap::{Parser, Subcommand, ValueEnum}; -use relay_broker::{ +use crate::{ protocol::HeadlessProvider as ProtocolHeadlessProvider, telemetry::{TelemetryClient, TelemetryEvent}, }; +use anyhow::Result; +use clap::{Parser, Subcommand, ValueEnum}; use crate::{cli_mcp_args, pty_worker, runtime, swarm, wrap}; diff --git a/crates/broker/src/cli_mcp_args.rs b/crates/broker/src/cli_mcp_args.rs index 6887be5c5..a58a66004 100644 --- a/crates/broker/src/cli_mcp_args.rs +++ b/crates/broker/src/cli_mcp_args.rs @@ -3,10 +3,10 @@ use std::{ time::Duration, }; -use anyhow::{anyhow, bail, Context, Result}; -use relay_broker::relaycast::{ +use crate::relaycast::{ configure_relaycast_mcp_with_token, RelaycastHttpClient, RelaycastRegistrationError, }; +use anyhow::{anyhow, bail, Context, Result}; use serde::{Deserialize, Serialize}; use crate::cli::McpArgsCommand; @@ -253,8 +253,8 @@ fn home_dir_from_env() -> Option { #[cfg(test)] mod tests { + use crate::relaycast::configure_relaycast_mcp_with_token; use httpmock::{Method::POST, MockServer}; - use relay_broker::relaycast::configure_relaycast_mcp_with_token; use serde_json::{json, Value}; use std::sync::{Mutex, MutexGuard, PoisonError}; use tempfile::tempdir; diff --git a/crates/broker/src/lib.rs b/crates/broker/src/lib.rs index 5c5376660..d28733d15 100644 --- a/crates/broker/src/lib.rs +++ b/crates/broker/src/lib.rs @@ -1,25 +1,54 @@ -pub mod auth; -pub mod config; -pub mod control; -pub mod conversation_log; -pub mod crash_insights; -pub mod dedup; -pub mod events; -pub mod inject; -pub mod message_bridge; -pub mod metrics; -pub mod multi_workspace; -pub mod priorities; pub mod protocol; -pub mod pty; -pub mod queue; -pub mod redact; -pub mod relaycast; -pub mod relaycast_ws; -pub mod replay_buffer; -pub mod scheduler; -pub mod snapshot; pub mod snippets; -pub mod supervisor; -pub mod telemetry; -pub mod types; + +pub(crate) mod broker; +pub(crate) mod cli; +pub(crate) mod cli_mcp_args; +#[allow(dead_code)] +pub(crate) mod config; +pub(crate) mod control; +#[allow(dead_code)] +pub(crate) mod conversation_log; +pub(crate) mod crash_insights; +#[allow(dead_code)] +pub(crate) mod dedup; +#[allow(dead_code)] +pub(crate) mod events; +pub(crate) mod listen_api; +#[allow(dead_code)] +pub(crate) mod metrics; +pub(crate) mod priorities; +#[allow(dead_code)] +pub(crate) mod pty; +pub(crate) mod pty_worker; +#[allow(dead_code)] +pub(crate) mod queue; +pub(crate) mod readiness; +#[allow(dead_code)] +pub(crate) mod redact; +#[allow(dead_code)] +pub(crate) mod relaycast; +pub(crate) mod replay_buffer; +pub(crate) mod routing; +pub(crate) mod runtime; +#[allow(dead_code)] +pub(crate) mod scheduler; +pub(crate) mod snapshot; +pub(crate) mod spawner; +#[allow(dead_code)] +pub(crate) mod supervisor; +pub(crate) mod swarm; +pub(crate) mod swarm_tui; +#[allow(dead_code)] +pub(crate) mod telemetry; +#[allow(dead_code)] +pub(crate) mod types; +pub(crate) mod util; +pub(crate) mod wait; +pub(crate) mod worker; +pub(crate) mod worker_request; +pub(crate) mod wrap; + +pub async fn run_cli() -> anyhow::Result<()> { + cli::run().await +} diff --git a/crates/broker/src/listen_api.rs b/crates/broker/src/listen_api.rs index a96a4979c..d0e0bc591 100644 --- a/crates/broker/src/listen_api.rs +++ b/crates/broker/src/listen_api.rs @@ -6,7 +6,7 @@ use std::time::{Duration, Instant}; -use relay_broker::{ +use crate::{ protocol::MessageInjectionMode, relaycast::WorkspaceMembershipSummary, replay_buffer::ReplayBuffer, @@ -1758,7 +1758,7 @@ pub async fn broadcast_if_relevant( #[cfg(test)] mod wave0_contract_tests { - use relay_broker::replay_buffer::{ReplayBuffer, DEFAULT_REPLAY_CAPACITY}; + use crate::replay_buffer::{ReplayBuffer, DEFAULT_REPLAY_CAPACITY}; use serde_json::{json, Value}; use tokio::sync::broadcast; @@ -1812,7 +1812,7 @@ mod wave0_contract_tests { #[cfg(test)] mod tests { use super::broadcast_if_relevant; - use relay_broker::replay_buffer::{ReplayBuffer, DEFAULT_REPLAY_CAPACITY}; + use crate::replay_buffer::{ReplayBuffer, DEFAULT_REPLAY_CAPACITY}; use serde_json::{json, Value}; use tokio::sync::broadcast; @@ -1898,11 +1898,11 @@ mod tests { #[cfg(test)] mod auth_tests { + use crate::replay_buffer::{ReplayBuffer, DEFAULT_REPLAY_CAPACITY}; use axum::{ body::{to_bytes, Body}, http::{Request, StatusCode}, }; - use relay_broker::replay_buffer::{ReplayBuffer, DEFAULT_REPLAY_CAPACITY}; use serde_json::{json, Value}; use tokio::sync::{broadcast, mpsc}; use tower::ServiceExt; @@ -1911,9 +1911,9 @@ mod auth_tests { listen_api_router_with_auth, DeliveryRouteError, ListenApiConfig, ListenApiRequest, SetInboundDeliveryModeOk, }; + use crate::protocol::MessageInjectionMode; + use crate::types::{InboundDeliveryMode, PendingRelayMessage}; use crate::worker_request::RequestWorkerError; - use relay_broker::protocol::MessageInjectionMode; - use relay_broker::types::{InboundDeliveryMode, PendingRelayMessage}; fn test_router( broker_api_key: Option<&str>, @@ -2163,10 +2163,7 @@ mod auth_tests { let send_replier = tokio::spawn(async move { match rx.recv().await { Some(ListenApiRequest::Send { mode, reply, .. }) => { - assert!(matches!( - mode, - relay_broker::protocol::MessageInjectionMode::Wait - )); + assert!(matches!(mode, crate::protocol::MessageInjectionMode::Wait)); let _ = reply.send(Ok(json!({ "success": true, "event_id": "evt_1" }))); } other => panic!("unexpected request: {:?}", other.map(|_| "other")), @@ -2198,10 +2195,7 @@ mod auth_tests { let send_replier = tokio::spawn(async move { match rx.recv().await { Some(ListenApiRequest::Send { mode, reply, .. }) => { - assert!(matches!( - mode, - relay_broker::protocol::MessageInjectionMode::Steer - )); + assert!(matches!(mode, crate::protocol::MessageInjectionMode::Steer)); let _ = reply.send(Ok(json!({ "success": true, "event_id": "evt_2" }))); } other => panic!("unexpected request: {:?}", other.map(|_| "other")), diff --git a/crates/broker/src/main.rs b/crates/broker/src/main.rs index 94033596b..55ae6a01a 100644 --- a/crates/broker/src/main.rs +++ b/crates/broker/src/main.rs @@ -1,28 +1,6 @@ -mod broker; -mod cli; -mod cli_mcp_args; -mod listen_api; -mod pty_worker; -mod readiness; -mod routing; -mod runtime; -mod spawner; -mod swarm; -mod swarm_tui; -mod util; -mod wait; -mod worker; -mod worker_request; -mod wrap; - use anyhow::Result; #[tokio::main] async fn main() -> Result<()> { - cli::run().await + relay_broker::run_cli().await } - -#[cfg(test)] -mod broker_tests; -#[cfg(test)] -mod worker_tests; diff --git a/crates/broker/src/pty_worker.rs b/crates/broker/src/pty_worker.rs index ed2686ff9..98cac4abf 100644 --- a/crates/broker/src/pty_worker.rs +++ b/crates/broker/src/pty_worker.rs @@ -3,11 +3,11 @@ use std::{ time::{Duration, Instant}, }; -use anyhow::{Context, Result}; -use relay_broker::{ +use crate::{ protocol::{MessageInjectionMode, ProtocolEnvelope, RelayDelivery}, pty::PtySession, }; +use anyhow::{Context, Result}; use serde_json::{json, Value}; use tokio::{ io::{AsyncBufReadExt, BufReader}, @@ -29,11 +29,11 @@ use crate::cli::command_parse::parse_cli_command; use crate::cli::PtyCommand; use crate::readiness::{cli_prompt_ready, detect_cli_ready, GridReadinessSnapshot}; use crate::runtime::{get_terminal_size, send_frame}; +use crate::snapshot::Snapshot; use crate::util::ansi::{floor_char_boundary, strip_ansi}; use crate::worker::detection::ActivityDetector; use crate::wrap::{PtyAutoState, AUTO_SUGGESTION_BLOCK_TIMEOUT}; use base64::Engine; -use relay_broker::snapshot::Snapshot; #[derive(Debug, Clone)] struct PendingWorkerInjection { diff --git a/crates/broker/src/relaycast/mod.rs b/crates/broker/src/relaycast/mod.rs index e3a139ba2..6b344782f 100644 --- a/crates/broker/src/relaycast/mod.rs +++ b/crates/broker/src/relaycast/mod.rs @@ -1,24 +1,19 @@ -pub mod auth; -pub mod bridge; -pub mod dm_participants; -pub mod identity; -pub mod workspace; -pub mod ws; +pub(crate) mod auth; +pub(crate) mod bridge; +pub(crate) mod dm_participants; +pub(crate) mod identity; +pub(crate) mod workspace; +pub(crate) mod ws; -pub use crate::snippets::{ - configure_relaycast_mcp, configure_relaycast_mcp_with_token, ensure_relaycast_mcp_config, - relaycast_mcp_config_json, relaycast_mcp_config_json_with_token, -}; -pub use auth::{AuthClient, AuthSession, AuthSessionSet, CredentialCache, CredentialSet}; -pub use bridge::{map_ws_broker_command, map_ws_event, to_inject_request}; -pub use dm_participants::{resolve_dm_participants_cached, DmParticipantsCache}; -pub use identity::{agent_name_eq, is_self_name}; -pub use workspace::{ +pub(crate) use crate::snippets::{configure_relaycast_mcp_with_token, ensure_relaycast_mcp_config}; +pub(crate) use auth::AuthClient; +pub(crate) use bridge::{map_ws_broker_command, map_ws_event}; +pub(crate) use dm_participants::{resolve_dm_participants_cached, DmParticipantsCache}; +pub(crate) use identity::{agent_name_eq, is_self_name}; +pub(crate) use workspace::{ MultiWorkspaceSession, WorkspaceInboundMessage, WorkspaceMembershipSummary, - WorkspaceSessionHandle, }; -pub use ws::{ - format_worker_preregistration_error, registration_is_retryable, registration_retry_after_secs, - retry_agent_registration, RegRetryOutcome, RelaycastHttpClient, RelaycastRegistrationError, - RelaycastWsClient, WsControl, +pub(crate) use ws::{ + format_worker_preregistration_error, registration_retry_after_secs, retry_agent_registration, + RegRetryOutcome, RelaycastHttpClient, RelaycastRegistrationError, WsControl, }; diff --git a/crates/broker/src/relaycast/ws.rs b/crates/broker/src/relaycast/ws.rs index 82e40cfd2..98721eb23 100644 --- a/crates/broker/src/relaycast/ws.rs +++ b/crates/broker/src/relaycast/ws.rs @@ -322,7 +322,9 @@ pub struct RelaycastHttpClient { pub type RelaycastRegistrationError = AgentRegistrationError; pub type RegRetryOutcome = AgentRegistrationRetryOutcome; -pub use relaycast::{registration_is_retryable, registration_retry_after_secs}; +#[cfg(test)] +pub(crate) use relaycast::registration_is_retryable; +pub(crate) use relaycast::registration_retry_after_secs; impl RelaycastHttpClient { pub fn new( diff --git a/crates/broker/src/routing.rs b/crates/broker/src/routing.rs index 3e9e67419..47c1c90e8 100644 --- a/crates/broker/src/routing.rs +++ b/crates/broker/src/routing.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; -use relay_broker::types::{InboundKind, InboundRelayEvent}; +use crate::types::{InboundKind, InboundRelayEvent}; use crate::runtime::normalize_channel; @@ -245,7 +245,7 @@ pub(crate) fn display_target_for_dashboard( mod tests { use std::collections::HashSet; - use relay_broker::types::{InboundKind, InboundRelayEvent, RelayPriority, SenderKind}; + use crate::types::{InboundKind, InboundRelayEvent, RelayPriority, SenderKind}; use super::{ display_target_for_dashboard, is_self_echo, resolve_delivery_targets, diff --git a/crates/broker/src/runtime/delivery.rs b/crates/broker/src/runtime/delivery.rs index e8f59e4c5..ade5a6f19 100644 --- a/crates/broker/src/runtime/delivery.rs +++ b/crates/broker/src/runtime/delivery.rs @@ -165,7 +165,7 @@ pub(crate) fn queue_inbound_for_delivery_mode( dropped_from = %dropped_from, mode = state.mode.as_wire_str(), queue_len, - max_pending = relay_broker::types::MAX_PENDING_PER_WORKER, + max_pending = crate::types::MAX_PENDING_PER_WORKER, "pending queue full — evicting oldest message" ); } @@ -265,7 +265,7 @@ pub(crate) async fn queue_and_try_delivery( workers: &mut WorkerRegistry, pending_deliveries: &mut HashMap, worker_name: &str, - mapped: &relay_broker::types::InboundRelayEvent, + mapped: &crate::types::InboundRelayEvent, retry_interval: Duration, ) -> Result<()> { queue_and_try_delivery_raw( diff --git a/crates/broker/src/runtime/event_loop.rs b/crates/broker/src/runtime/event_loop.rs index e92ae17f5..1aa715523 100644 --- a/crates/broker/src/runtime/event_loop.rs +++ b/crates/broker/src/runtime/event_loop.rs @@ -21,7 +21,7 @@ pub(crate) struct BrokerRuntime { pub(super) worker_event_rx: mpsc::Receiver, pub(super) worker_events_open: bool, pub(super) workers: WorkerRegistry, - pub(super) crash_insights: relay_broker::crash_insights::CrashInsights, + pub(super) crash_insights: crate::crash_insights::CrashInsights, pub(super) crash_insights_path: PathBuf, pub(super) sdk_lines: tokio::io::Lines>, pub(super) stdin_open: bool, diff --git a/crates/broker/src/runtime/init.rs b/crates/broker/src/runtime/init.rs index 492431b5a..509105dd6 100644 --- a/crates/broker/src/runtime/init.rs +++ b/crates/broker/src/runtime/init.rs @@ -364,7 +364,7 @@ pub(crate) async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Re // Load crash insights from previous session let crash_insights_path = paths.state.parent().unwrap().join("crash-insights.json"); - let crash_insights = relay_broker::crash_insights::CrashInsights::load(&crash_insights_path); + let crash_insights = crate::crash_insights::CrashInsights::load(&crash_insights_path); let sdk_lines = BufReader::new(tokio::io::stdin()).lines(); let stdin_open = true; diff --git a/crates/broker/src/runtime/maintenance.rs b/crates/broker/src/runtime/maintenance.rs index 44bcf42f3..ba10d9bef 100644 --- a/crates/broker/src/runtime/maintenance.rs +++ b/crates/broker/src/runtime/maintenance.rs @@ -109,8 +109,8 @@ impl BrokerRuntime { for (name, code, signal) in &exited { // Record crash in insights let (category, description) = - relay_broker::crash_insights::CrashInsights::analyze(*code, signal.as_deref()); - crash_insights.record(relay_broker::crash_insights::CrashRecord { + crate::crash_insights::CrashInsights::analyze(*code, signal.as_deref()); + crash_insights.record(crate::crash_insights::CrashRecord { agent_name: name.clone(), exit_code: *code, signal: signal.clone(), @@ -130,7 +130,7 @@ impl BrokerRuntime { }); // Check supervisor for restart decision - use relay_broker::supervisor::RestartDecision; + use crate::supervisor::RestartDecision; match workers.supervisor.on_exit(name, *code, signal.as_deref()) { Some(RestartDecision::Restart { delay }) => { // Keep pending deliveries — we'll redeliver after restart diff --git a/crates/broker/src/runtime/mod.rs b/crates/broker/src/runtime/mod.rs index 029d1235a..ddb5af25c 100644 --- a/crates/broker/src/runtime/mod.rs +++ b/crates/broker/src/runtime/mod.rs @@ -25,7 +25,7 @@ use tokio::{ }; use uuid::Uuid; -use relay_broker::{ +use crate::{ dedup::DedupCache, protocol::{ AgentRuntime, AgentSpec, HeadlessProvider as ProtocolHeadlessProvider, diff --git a/crates/broker/src/runtime/session.rs b/crates/broker/src/runtime/session.rs index 73fee5c28..9ae5d82e9 100644 --- a/crates/broker/src/runtime/session.rs +++ b/crates/broker/src/runtime/session.rs @@ -223,7 +223,7 @@ timestamp='{}' opts.channels, opts.read_mcp_identity, opts.runtime_cwd, - relay_broker::events::EventEmitter::new(false), + crate::events::EventEmitter::new(false), ); log_startup_phase( startup_debug, diff --git a/crates/broker/src/runtime/tests.rs b/crates/broker/src/runtime/tests.rs index 54b2115f4..60ee0e2eb 100644 --- a/crates/broker/src/runtime/tests.rs +++ b/crates/broker/src/runtime/tests.rs @@ -6,6 +6,7 @@ use std::{ time::{Duration, Instant}, }; +use crate::protocol::{AgentSpec, MessageInjectionMode, RelayDelivery}; use crate::worker::{WorkerEvent, WorkerHandle, WorkerRegistry}; use crate::{ broker::injection_format::format_injection, @@ -17,7 +18,6 @@ use crate::{ }, }, }; -use relay_broker::protocol::{AgentSpec, MessageInjectionMode, RelayDelivery}; use serde_json::{json, Value}; use tokio::sync::mpsc; @@ -34,9 +34,9 @@ use super::{ sender_is_dashboard_label, should_clear_pending_delivery_for_event, AgentRuntime, InboundContext, InboundQueueOutcome, PendingDelivery, ProtocolHeadlessProvider, }; -use relay_broker::dedup::DedupCache; -use relay_broker::relaycast::{format_worker_preregistration_error, RelaycastRegistrationError}; -use relay_broker::types::{InboundDeliveryMode, InboundDeliveryState}; +use crate::dedup::DedupCache; +use crate::relaycast::{format_worker_preregistration_error, RelaycastRegistrationError}; +use crate::types::{InboundDeliveryMode, InboundDeliveryState}; fn env_test_lock() -> &'static Mutex<()> { static LOCK: OnceLock> = OnceLock::new(); diff --git a/crates/broker/src/spawner.rs b/crates/broker/src/spawner.rs index 7d85b14b8..f267c24fd 100644 --- a/crates/broker/src/spawner.rs +++ b/crates/broker/src/spawner.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, process::Stdio, time::Duration}; +use crate::relaycast::configure_relaycast_mcp_with_token; use anyhow::{Context, Result}; -use relay_broker::relaycast::configure_relaycast_mcp_with_token; use tokio::{ process::{Child, Command}, time::timeout, diff --git a/crates/broker/src/wait.rs b/crates/broker/src/wait.rs index 2d80e3e54..db1637667 100644 --- a/crates/broker/src/wait.rs +++ b/crates/broker/src/wait.rs @@ -343,7 +343,7 @@ fn preferred_trigger(conds: &[WaitCondition]) -> Trigger { /// Pre-built text-only `WaitSet`s expressing each CLI's "screen looks /// ready" rule. #[allow(dead_code)] -pub mod for_cli { +pub(crate) mod for_cli { use std::time::Duration; use super::{WaitSet, IDLE_SETTLE}; diff --git a/crates/broker/src/worker.rs b/crates/broker/src/worker.rs index 3b93244d4..0fc9f3e3e 100644 --- a/crates/broker/src/worker.rs +++ b/crates/broker/src/worker.rs @@ -5,13 +5,13 @@ use std::{ time::{Duration, Instant}, }; -use anyhow::{Context, Result}; -use relay_broker::{ +use crate::{ metrics::MetricsCollector, protocol::{AgentRuntime, AgentSpec, ProtocolEnvelope, RelayDelivery, PROTOCOL_VERSION}, relaycast::configure_relaycast_mcp_with_token, supervisor::Supervisor, }; +use anyhow::{Context, Result}; use serde_json::{json, Value}; use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, @@ -946,6 +946,50 @@ fn spawn_worker_reader( mod tests { use super::*; + fn make_registry(env: Vec<(String, String)>) -> WorkerRegistry { + let (tx, _rx) = mpsc::channel::(16); + WorkerRegistry::new(tx, env, PathBuf::from("/tmp/worker-tests"), Instant::now()) + } + + #[test] + fn worker_registry_starts_empty() { + let reg = make_registry(vec![]); + assert!(!reg.has_any_worker()); + assert!(reg.list().is_empty()); + } + + #[test] + fn has_worker_returns_false_for_unknown() { + let reg = make_registry(vec![]); + assert!(!reg.has_worker("nonexistent")); + } + + #[test] + fn worker_log_path_rejects_path_traversal() { + let reg = make_registry(vec![]); + assert!(reg.worker_log_path("..").is_none()); + assert!(reg.worker_log_path("../etc/passwd").is_none()); + assert!(reg.worker_log_path("foo/../bar").is_none()); + assert!(reg.worker_log_path("foo/bar").is_none()); + assert!(reg.worker_log_path("foo\\bar").is_none()); + assert!(reg.worker_log_path("valid-name").is_some()); + assert!(reg.worker_log_path("worker.1").is_some()); + } + + #[test] + fn env_value_lookup() { + let env = vec![("KEY".into(), "val".into())]; + let reg = make_registry(env); + assert_eq!(reg.env_value("KEY"), Some("val")); + assert_eq!(reg.env_value("MISSING"), None); + } + + #[test] + fn routing_workers_empty_when_no_workers() { + let reg = make_registry(vec![]); + assert!(reg.routing_workers().is_empty()); + } + #[test] fn args_include_model_override_detects_supported_forms() { assert!(args_include_model_override(&[ diff --git a/crates/broker/src/worker_tests.rs b/crates/broker/src/worker_tests.rs deleted file mode 100644 index 4f65052e0..000000000 --- a/crates/broker/src/worker_tests.rs +++ /dev/null @@ -1,66 +0,0 @@ -//! Tests for worker.rs module. -//! -//! worker.rs public API: -//! - WorkerRegistry::new(event_tx, worker_env, logs_dir, broker_start) -> Self -//! - WorkerRegistry::has_worker(name) -> bool -//! - WorkerRegistry::has_any_worker() -> bool -//! - WorkerRegistry::list() -> Vec -//! - WorkerRegistry::env_value(key) -> Option<&str> -//! - WorkerRegistry::worker_log_path(name) -> Option -//! - WorkerRegistry::routing_workers() -> Vec - -#[cfg(test)] -mod tests { - use std::path::PathBuf; - use std::time::Instant; - - use tokio::sync::mpsc; - - use crate::worker::{WorkerEvent, WorkerRegistry}; - - fn make_registry(env: Vec<(String, String)>) -> WorkerRegistry { - let (tx, _rx) = mpsc::channel::(16); - WorkerRegistry::new(tx, env, PathBuf::from("/tmp/worker-tests"), Instant::now()) - } - - #[test] - fn worker_registry_starts_empty() { - let reg = make_registry(vec![]); - assert!(!reg.has_any_worker()); - assert!(reg.list().is_empty()); - } - - #[test] - fn has_worker_returns_false_for_unknown() { - let reg = make_registry(vec![]); - assert!(!reg.has_worker("nonexistent")); - } - - #[test] - fn worker_log_path_rejects_path_traversal() { - let reg = make_registry(vec![]); - // ".." as a name component must be rejected - assert!(reg.worker_log_path("..").is_none()); - assert!(reg.worker_log_path("../etc/passwd").is_none()); - assert!(reg.worker_log_path("foo/../bar").is_none()); - assert!(reg.worker_log_path("foo/bar").is_none()); - assert!(reg.worker_log_path("foo\\bar").is_none()); - // Valid names are allowed - assert!(reg.worker_log_path("valid-name").is_some()); - assert!(reg.worker_log_path("worker.1").is_some()); - } - - #[test] - fn env_value_lookup() { - let env = vec![("KEY".into(), "val".into())]; - let reg = make_registry(env); - assert_eq!(reg.env_value("KEY"), Some("val")); - assert_eq!(reg.env_value("MISSING"), None); - } - - #[test] - fn routing_workers_empty_when_no_workers() { - let reg = make_registry(vec![]); - assert!(reg.routing_workers().is_empty()); - } -} diff --git a/crates/broker/src/wrap.rs b/crates/broker/src/wrap.rs index e1733a403..df4706083 100644 --- a/crates/broker/src/wrap.rs +++ b/crates/broker/src/wrap.rs @@ -1,8 +1,7 @@ use std::collections::{HashMap, VecDeque}; use std::time::{Duration, Instant}; -use anyhow::{Context, Result}; -use relay_broker::{ +use crate::{ control::{can_release_child, is_human_sender}, dedup::DedupCache, pty::PtySession, @@ -14,6 +13,7 @@ use relay_broker::{ telemetry::{ActionSource, TelemetryClient, TelemetryEvent}, types::{BrokerCommandPayload, InboundKind, SenderKind}, }; +use anyhow::{Context, Result}; use tokio::{sync::mpsc, time::MissedTickBehavior}; use crate::broker::{