From ebbc7727392c802d15cb5d273db4da743d1a12d1 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 7 Jun 2026 13:43:44 +0800 Subject: [PATCH 1/5] fix(agent): parallel write batch must pass the safety gate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The parallel write fast path executed tools directly via the ToolExecutor, bypassing ToolSafetyGate entirely — so with multiple write calls in one turn, permission checks and skill restrictions were not enforced. Make can_run_parallel_write_batch consult the gate itself (single source of truth): only fast-path when, for every call, no active skill restriction forbids the tool AND the permission checker explicitly Allows it. A missing checker resolves to Ask (a Deny without a confirmation manager), so it correctly refuses. This preserves the optimization for the explicit-allow case while closing the bypass. Exposes ToolSafetyGate::{check_skill_restrictions,permission_decision} as pub(crate). --- core/src/agent/parallel_tool_runtime.rs | 25 +++++++ core/src/agent/tests.rs | 98 +++++++++++++++++++++++++ core/src/safety_gate.rs | 8 +- 3 files changed, 129 insertions(+), 2 deletions(-) diff --git a/core/src/agent/parallel_tool_runtime.rs b/core/src/agent/parallel_tool_runtime.rs index 8c72c8f..c4e20ab 100644 --- a/core/src/agent/parallel_tool_runtime.rs +++ b/core/src/agent/parallel_tool_runtime.rs @@ -83,6 +83,13 @@ impl AgentLoop { } pub(super) fn can_run_parallel_write_batch(&self, tool_calls: &[ToolCall]) -> bool { + // The parallel fast path executes tools directly via the ToolExecutor + // (see `execute_parallel_write_batch`), bypassing `ToolSafetyGate`. It is + // only safe when the gate would unconditionally EXECUTE every call. + // + // Hooks and HITL confirmation make the decision stateful/interactive + // (pre-tool hooks have side effects; an `Ask` must round-trip to a + // human), so never fast-path when either is configured. if self.config.hook_engine.is_some() || self.config.confirmation_manager.is_some() || tool_calls.len() <= 1 @@ -90,6 +97,24 @@ impl AgentLoop { return false; } + // Skill restrictions and permission checks live in `ToolSafetyGate`, + // which the parallel path skips entirely. Consult the gate itself (the + // single source of truth) and only fast-path when, for EVERY call, no + // active skill restriction forbids the tool AND the permission checker + // explicitly Allows it. A missing checker resolves to `Ask`, which with + // no confirmation manager is a Deny — so it correctly refuses here. + let gate = crate::safety_gate::ToolSafetyGate::new(&self.config); + let all_allowed = tool_calls.iter().all(|tc| { + gate.check_skill_restrictions(&tc.name).is_none() + && matches!( + gate.permission_decision(&tc.name, &tc.args), + crate::permissions::PermissionDecision::Allow + ) + }); + if !all_allowed { + return false; + } + if !tool_calls .iter() .all(|tc| is_parallel_safe_write(&tc.name, &tc.args)) diff --git a/core/src/agent/tests.rs b/core/src/agent/tests.rs index 991b4f3..68d3cc2 100644 --- a/core/src/agent/tests.rs +++ b/core/src/agent/tests.rs @@ -2003,3 +2003,101 @@ async fn test_agent_end_event_contains_final_text() { assert_eq!(usage.total_tokens, 15); } } + +/// Regression: the parallel write fast path bypasses `ToolSafetyGate`, so it +/// may run only when the gate would unconditionally EXECUTE every call. Before +/// the fix it ignored the permission checker and skill restrictions entirely, +/// letting denied / ask / skill-restricted writes land ungated. +#[test] +fn parallel_write_batch_only_fast_paths_when_gate_would_execute() { + use crate::llm::ToolCall; + use crate::permissions::{PermissionChecker, PermissionDecision}; + use crate::skills::{Skill, SkillKind, SkillRegistry}; + + struct Static(PermissionDecision); + impl PermissionChecker for Static { + fn check(&self, _tool: &str, _args: &serde_json::Value) -> PermissionDecision { + self.0 + } + } + + fn write_call(id: &str, path: &str) -> ToolCall { + ToolCall { + id: id.to_string(), + name: "write_file".to_string(), + args: serde_json::json!({ "path": path, "content": "x" }), + } + } + + fn loop_with( + checker: Option>, + skills: Option>, + ) -> AgentLoop { + // `skill_registry` overrides the default builtins where needed. + let config = AgentConfig { + permission_checker: checker, + skill_registry: skills, + ..Default::default() + }; + AgentLoop::new( + Arc::new(MockLlmClient::new(vec![])), + Arc::new(ToolExecutor::new("/tmp".to_string())), + test_tool_context(), + config, + ) + } + + let calls = vec![write_call("a", "a.txt"), write_call("b", "b.txt")]; + let allow = || Some(Arc::new(Static(PermissionDecision::Allow)) as Arc); + + // Explicit Allow + no restricting skills → fast path is taken. + assert!( + loop_with(allow(), None).can_run_parallel_write_batch(&calls), + "explicit Allow with no restrictions → parallel write batch is allowed" + ); + + // No permission checker → gate resolves to Ask (a Deny without a confirmation + // manager), so the fast path must be refused. + assert!( + !loop_with(None, None).can_run_parallel_write_batch(&calls), + "missing checker resolves to Ask/Deny → fast path refused" + ); + + // Explicit Deny → refused. + assert!( + !loop_with(Some(Arc::new(Static(PermissionDecision::Deny))), None) + .can_run_parallel_write_batch(&calls), + "permission Deny → fast path refused" + ); + + // Ask → refused (sequential path would need a human round-trip). + assert!( + !loop_with(Some(Arc::new(Static(PermissionDecision::Ask))), None) + .can_run_parallel_write_batch(&calls), + "permission Ask → fast path refused" + ); + + // Allow, but an active skill restriction forbids write_file → refused. + let restricted = SkillRegistry::new(); + restricted.register_unchecked(Arc::new(Skill { + name: "read-only".to_string(), + description: String::new(), + allowed_tools: Some("read(*)".to_string()), + disable_model_invocation: false, + kind: SkillKind::Instruction, + content: String::new(), + tags: Vec::new(), + version: None, + })); + assert!( + !loop_with(allow(), Some(Arc::new(restricted))).can_run_parallel_write_batch(&calls), + "active skill restriction disallowing write_file → fast path refused" + ); + + // Default builtins do not restrict → still fast-paths with Allow. + assert!( + loop_with(allow(), Some(Arc::new(SkillRegistry::with_builtins()))) + .can_run_parallel_write_batch(&calls), + "non-restricting builtins → fast path still allowed" + ); +} diff --git a/core/src/safety_gate.rs b/core/src/safety_gate.rs index 8cd4f96..a41b714 100644 --- a/core/src/safety_gate.rs +++ b/core/src/safety_gate.rs @@ -101,7 +101,7 @@ impl<'a> ToolSafetyGate<'a> { } } - fn check_skill_restrictions(&self, tool_name: &str) -> Option { + pub(crate) fn check_skill_restrictions(&self, tool_name: &str) -> Option { let registry = self.config.skill_registry.as_ref()?; let restricting_skills = registry.global_tool_restricting_skills(); if restricting_skills.is_empty() { @@ -123,7 +123,11 @@ impl<'a> ToolSafetyGate<'a> { }) } - fn permission_decision(&self, tool_name: &str, args: &serde_json::Value) -> PermissionDecision { + pub(crate) fn permission_decision( + &self, + tool_name: &str, + args: &serde_json::Value, + ) -> PermissionDecision { self.config .permission_checker .as_ref() From 3e9d4b1cd7dd2a1ed4f49f5ba1230bd9c53ad6c4 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 7 Jun 2026 13:44:01 +0800 Subject: [PATCH 2/5] feat(orchestration): programmable Workflow facade, execute_loop, shared budget Compose the existing combinators into a runtime-driven, Claude-Code-style dynamic workflow mechanism, reusing the AgentExecutor / SessionStore / WorkflowCheckpoint / BudgetGuard seams rather than inventing parallel ones. - Workflow facade (orchestration/workflow.rs): a cheaply-clonable handle whose verbs agent/parallel/phase/pipeline each delegate to one combinator; phase is a named, resumable barrier ({root_id}/{index}:{name}) emitting WorkflowEvent milestones. Control flow lives in the host language. - execute_loop + LoopDecision (combinators.rs): bounded loop-until-dry with a mandatory max_iterations hard cap. - WorkflowBudget (orchestration/workflow_budget.rs): an aggregating BudgetGuard that sums token spend from every step into one shared ledger (soft cap). - Wire BudgetGuard through ChildRunContext so a single guard spans a fan-out (closes a gap: budget was per-session_id only). - AgentSession::workflow()/workflow_with_token_budget(): pre-wire executor, store, per-step events, session-derived root id, and the shared budget. Also fix agent_executor() to install parent_context (security/skill/workspace), so orchestrated steps are neither more nor less privileged than delegated ones. - README: document the Workflow facade, loop, and shared budget. Tests: full lib suite green; new unit + e2e coverage for every verb, resume, loop caps, budget aggregation, and a real child-agent workflow step. --- README.md | 54 ++ core/src/agent_api.rs | 92 +++- core/src/agent_api/capabilities.rs | 1 + core/src/agent_api/tests.rs | 104 ++++ core/src/child_run.rs | 8 + core/src/lib.rs | 7 +- core/src/orchestration/combinators.rs | 144 ++++- core/src/orchestration/mod.rs | 8 +- core/src/orchestration/workflow.rs | 611 ++++++++++++++++++++++ core/src/orchestration/workflow_budget.rs | 226 ++++++++ 10 files changed, 1247 insertions(+), 8 deletions(-) create mode 100644 core/src/orchestration/workflow.rs create mode 100644 core/src/orchestration/workflow_budget.rs diff --git a/README.md b/README.md index 8ef256c..63301ec 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,16 @@ Everything else is an extension of that loop. `session.parallel` / `pipeline` / `parallelResumable` (Node) and `parallel` / `pipeline` / `parallel_resumable` (Python). See [Programmable Orchestration](#programmable-orchestration) below. +- **Workflow facade, loop & shared budget** — `AgentSession::workflow()` returns + a cheaply-clonable `Workflow` that pre-wires the session's executor (inheriting + the same governance as model-driven delegation), persistence store, per-step + event stream, and a session-derived stable root id. Its verbs `agent` / + `parallel` / `phase` / `pipeline` each delegate to one combinator; `phase` is a + named resume boundary that emits `WorkflowEvent` milestones. `execute_loop` / + `LoopDecision` add a bounded loop-until-dry (with a mandatory `max_iterations` + guard). `WorkflowBudget` aggregates token spend from every step into one shared + ledger — a soft, workflow-wide cost cap installed through the existing + `BudgetGuard` seam. ### What's new in 3.3 @@ -632,6 +642,50 @@ await session.parallelResumable(specs, "nightly-audit"); // hangs past timeoutMs (default 30s) fails closed (treated as null). ``` +### The `Workflow` facade (Rust) + +`AgentSession::workflow()` returns a cheaply-clonable `Workflow` that bundles the +session's executor, persistence store, per-step event stream, and a stable, +session-derived root id. Control flow is ordinary Rust — `await` a verb, inspect +the outcomes, decide what runs next: + +```rust +let wf = session.workflow(); // or session.workflow_with_token_budget(Some(500_000)) + +// One step, then a *variable* fan-out computed from its result (the "dynamic" +// part: the shape is decided at runtime, not declared up front). +let plan = wf.agent(AgentStepSpec::new("plan", "plan", "plan", goal)).await; +let specs: Vec = plan.output.lines().enumerate() + .map(|(i, line)| AgentStepSpec::new(format!("impl-{i}"), "general", "impl", line)) + .collect(); + +// `phase` is a NAMED barrier and a resume boundary: with a store it journals +// progress under "{root_id}/{index}:{name}" and emits PhaseStart/PhaseEnd on the +// WorkflowEvent stream (subscribe via `wf.subscribe()`). +let done = wf.phase("implement", specs).await; +let reviews = wf.phase("review", to_review(&done)).await; // budget shared across all phases +``` + +- **Verbs** — `agent` (one step), `parallel` (barrier fan-out), `phase` (named, + resumable barrier + milestones), `pipeline` (per-item staged chains), and the + non-failing `log`. Each delegates to exactly one combinator; the facade owns no + scheduling. +- **Loop** — `execute_loop(executor, initial, max_iterations, tx, |round| …)` + returns `LoopDecision::{Continue(specs), Stop}` and runs rounds until the + predicate stops, a round has no work (dry), or `max_iterations` (a **mandatory + hard cap**) is hit — the guard that makes an LLM-driven loop safe. +- **Budget** — `workflow_with_token_budget(Some(limit))` installs a + `WorkflowBudget` as every child run's `budget_guard`, so per-turn LLM + accounting feeds **one** shared ledger; `wf.budget_snapshot()` reads it and a + `WorkflowEvent::BudgetExhausted` fires once the cap is reached. It is a *soft* + ceiling: a wide fan-out can race a few in-flight turns past the cap before the + post-call ledger catches up (the framework never force-kills a running + fan-out). +- **Safety** — every step runs through the same `AgentExecutor` seam as + model-driven delegation, so child runs inherit the session's `SecurityProvider`, + skill restrictions, confirmation, and workspace — orchestrated steps are + neither more nor less privileged than delegated ones. + --- ## Design Principles diff --git a/core/src/agent_api.rs b/core/src/agent_api.rs index 0977e57..6471bce 100644 --- a/core/src/agent_api.rs +++ b/core/src/agent_api.rs @@ -932,15 +932,101 @@ impl AgentSession { /// run against; a host can instead supply its own executor to place steps /// across a cluster. pub fn agent_executor(&self) -> Arc { - let executor = crate::tools::TaskExecutor::with_mcp( + Arc::new(self.build_task_executor(self.parent_run_context())) + } + + /// Build the in-box [`TaskExecutor`](crate::tools::TaskExecutor) for this + /// session, applying `parent` as the child-run capability context. Shared by + /// [`agent_executor`](Self::agent_executor) and [`workflow`](Self::workflow) + /// so both wire children identically. + fn build_task_executor( + &self, + parent: crate::child_run::ChildRunContext, + ) -> crate::tools::TaskExecutor { + crate::tools::TaskExecutor::with_mcp( Arc::clone(&self.agent_registry), Arc::clone(&self.llm_client), self.workspace.display().to_string(), Arc::clone(&self.mcp_manager), ) + .with_parent_context(parent) .with_subagent_tracker(Arc::clone(&self.subagent_tasks)) - .with_max_parallel_tasks(self.config.max_parallel_tasks); - Arc::new(executor) + .with_max_parallel_tasks(self.config.max_parallel_tasks) + } + + /// A programmable [`Workflow`](crate::orchestration::Workflow) bound to this + /// session. + /// + /// Pre-wired with this session's executor (inheriting the same governance as + /// model-driven delegation), persistence store (so each + /// [`phase`](crate::orchestration::Workflow::phase) is a resume boundary), + /// per-step event stream, and a session-derived stable root id. Control flow + /// is ordinary Rust: `await` a verb, inspect the outcomes, decide what runs + /// next. + pub fn workflow(&self) -> crate::orchestration::Workflow { + self.workflow_with_token_budget(None) + } + + /// Like [`workflow`](Self::workflow) but with a hard token ceiling shared + /// across every step. The cap is a best-effort *soft* cost ceiling — under a + /// wide fan-out a few in-flight turns can race past it before the shared + /// ledger catches up (see [`WorkflowBudget`](crate::orchestration::WorkflowBudget)). + pub fn workflow_with_token_budget( + &self, + limit_tokens: Option, + ) -> crate::orchestration::Workflow { + use crate::budget::BudgetGuard; + + // One shared ledger for the whole workflow, wrapping the session's own + // budget guard (if any) so a host's per-tenant accounting keeps working. + let mut budget = crate::orchestration::WorkflowBudget::new(limit_tokens); + if let Some(inner) = self.config.budget_guard.clone() { + budget = budget.with_inner(inner); + } + let budget = Arc::new(budget); + + // Install the shared ledger as the child runs' budget guard so every + // step's per-turn LLM accounting feeds it. + let mut parent = self.parent_run_context(); + parent.budget_guard = Some(Arc::clone(&budget) as Arc); + let executor: Arc = + Arc::new(self.build_task_executor(parent)); + + let mut builder = crate::orchestration::Workflow::builder(executor) + .with_root_id(format!("wf-{}", self.session_id)) + .with_budget(Arc::clone(&budget)); + if let Some(store) = self.session_store.clone() { + builder = builder.with_store(store); + } + if let Some(step_events) = self.tool_context.agent_event_tx.clone() { + builder = builder.with_step_events(step_events); + } + builder.build() + } + + /// Build the [`ChildRunContext`](crate::child_run::ChildRunContext) that + /// orchestrated / delegated child runs inherit from this session. + /// + /// Mirrors the context the model-driven `task` / `parallel_task` path + /// installs (see `register_task_capability` in `agent_api/capabilities.rs`) + /// so a step run through [`agent_executor`](Self::agent_executor) carries the + /// SAME governance — security provider, skill restrictions, confirmation, + /// the shared workspace, and the safety limits — instead of weaker, ambient + /// authority. Sourced from the session's resolved config; `hook_engine` + /// stays `None` to match the model-driven path. + pub(crate) fn parent_run_context(&self) -> crate::child_run::ChildRunContext { + crate::child_run::ChildRunContext { + security_provider: self.config.security_provider.clone(), + hook_engine: None, + skill_registry: self.config.skill_registry.clone(), + tool_timeout_ms: self.config.tool_timeout_ms, + max_parallel_tasks: Some(self.config.max_parallel_tasks), + max_execution_time_ms: self.config.max_execution_time_ms, + circuit_breaker_threshold: Some(self.config.circuit_breaker_threshold), + confirmation_manager: self.config.confirmation_manager.clone(), + workspace_services: Some(Arc::clone(&self.tool_context.workspace_services)), + budget_guard: self.config.budget_guard.clone(), + } } /// The session's persistence store, if one is configured — needed by the diff --git a/core/src/agent_api/capabilities.rs b/core/src/agent_api/capabilities.rs index d7004ee..a6ef64a 100644 --- a/core/src/agent_api/capabilities.rs +++ b/core/src/agent_api/capabilities.rs @@ -188,6 +188,7 @@ fn register_task_capability( circuit_breaker_threshold: opts.circuit_breaker_threshold, confirmation_manager: opts.confirmation_manager.clone(), workspace_services: opts.workspace_services.clone(), + budget_guard: opts.budget_guard.clone(), }; let registry = Arc::new(registry); diff --git a/core/src/agent_api/tests.rs b/core/src/agent_api/tests.rs index 2b42621..a8ecd85 100644 --- a/core/src/agent_api/tests.rs +++ b/core/src/agent_api/tests.rs @@ -3217,3 +3217,107 @@ async fn cancel_subagent_task_marks_snapshot_cancelled() { assert!(!session.cancel_subagent_task(&task_id).await); assert!(!session.cancel_subagent_task("task-unknown").await); } + +/// Regression: `agent_executor()` must install the same `ChildRunContext` the +/// model-driven `task` path uses, so orchestrated/scripted steps inherit the +/// session's governance instead of running under weaker ambient authority. +/// Before the fix the executor was built without `.with_parent_context(..)`. +#[tokio::test] +async fn test_agent_executor_inherits_parent_run_context() { + use crate::security::DefaultSecurityProvider; + use crate::skills::SkillRegistry; + + let agent = Agent::from_config(test_config()).await.unwrap(); + + let security: Arc = + Arc::new(DefaultSecurityProvider::new()); + let skills = Arc::new(SkillRegistry::new()); + let opts = SessionOptions::new() + .with_security_provider(Arc::clone(&security)) + .with_skill_registry(Arc::clone(&skills)); + + let session = agent.session("/tmp/test-workspace", Some(opts)).unwrap(); + let ctx = session.parent_run_context(); + + assert!( + ctx.security_provider.is_some(), + "security provider must propagate to delegated/orchestrated child runs" + ); + assert!( + ctx.skill_registry.is_some(), + "skill registry (skill restrictions) must propagate to child runs" + ); + assert!( + ctx.workspace_services.is_some(), + "workspace services must propagate so child tools share the workspace" + ); + assert!( + ctx.hook_engine.is_none(), + "hook_engine stays None, matching the model-driven task path" + ); +} + +/// `AgentSession::workflow()` must pre-wire a shared budget ledger and a stable, +/// session-derived root id (so phase checkpoints resume across runs). +#[tokio::test] +async fn test_session_workflow_is_prewired_with_budget_and_stable_root_id() { + let agent = Agent::from_config(test_config()).await.unwrap(); + let session = agent.session("/tmp/test-workspace", None).unwrap(); + + let wf = session.workflow(); + // An uncapped workflow still owns a ledger (for snapshots / aggregation). + let snap = wf + .budget_snapshot() + .expect("workflow is pre-wired with a shared budget ledger"); + assert_eq!(snap.limit_tokens, None); + assert_eq!(snap.consumed_tokens, 0); + assert!( + wf.root_id().contains(session.id()), + "root id is session-derived so phase checkpoints are stable across runs" + ); + + // A capped workflow records its hard ceiling. + let capped = session.workflow_with_token_budget(Some(50_000)); + assert_eq!(capped.budget_snapshot().unwrap().limit_tokens, Some(50_000)); +} + +/// End-to-end: `session.workflow().agent(spec)` actually spawns a real child +/// agent loop through the wired executor and returns its output. Uses a static +/// mock LLM so the built-in `explore` agent finishes with that text. +#[tokio::test] +async fn test_session_workflow_runs_a_real_child_agent_step() { + let agent = Agent::from_config(test_config()).await.unwrap(); + let opts = SessionOptions::new(); + let session = agent + .build_session( + "/tmp/test-workflow-e2e".into(), + Arc::new(StaticStreamingClient::new("explored answer")), + &opts, + ) + .unwrap(); + + let wf = session.workflow(); + let outcome = wf + .agent(crate::orchestration::AgentStepSpec::new( + "t1", + "explore", + "explore", + "find the auth code", + )) + .await; + + assert!(outcome.success, "child step failed: {}", outcome.output); + assert_eq!(outcome.agent, "explore"); + assert!( + outcome.output.contains("explored answer"), + "child agent returned the mock LLM output; got: {}", + outcome.output + ); + + // The shared ledger recorded the child's token usage (proves the workflow + // budget was installed as the child's budget guard). + assert!( + wf.budget_snapshot().unwrap().consumed_tokens > 0, + "child LLM usage fed the shared workflow budget" + ); +} diff --git a/core/src/child_run.rs b/core/src/child_run.rs index a2d45af..d3c3983 100644 --- a/core/src/child_run.rs +++ b/core/src/child_run.rs @@ -17,6 +17,7 @@ //! | circuit_breaker_threshold | Yes | LLM failure handling should be consistent | //! | confirmation_manager | Depends | Governed by ConfirmationInheritance | //! | workspace_services | Yes | Child tools must operate on the same workspace | +//! | budget_guard | Yes | One shared cost ledger spans the whole fan-out | //! | memory | No | Child has isolated context | //! | queue_config | No | Child runs are synchronous within parent | //! | planning_mode | No | Child tasks are pre-planned by parent | @@ -43,6 +44,10 @@ pub struct ChildRunContext { pub circuit_breaker_threshold: Option, pub confirmation_manager: Option>, pub workspace_services: Option>, + /// Shared budget/quota guard. When inherited, every child run feeds the same + /// guard, so a single ledger can cap an entire delegated fan-out / workflow + /// rather than each child counting independently. + pub budget_guard: Option>, } impl ChildRunContext { @@ -75,5 +80,8 @@ impl ChildRunContext { if config.confirmation_manager.is_none() { config.confirmation_manager = self.confirmation_manager.clone(); } + if config.budget_guard.is_none() { + config.budget_guard = self.budget_guard.clone(); + } } } diff --git a/core/src/lib.rs b/core/src/lib.rs index 3ee8975..4e4b927 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -136,9 +136,10 @@ pub use llm::{ Message, OpenAiClient, TokenUsage, }; pub use orchestration::{ - execute_pipeline, execute_steps_parallel, execute_steps_parallel_resumable, AgentExecutor, - AgentStepSpec, PipelineStage, StepOutcome, WorkflowCheckpoint, WorkflowStepRecord, - WORKFLOW_CHECKPOINT_SCHEMA_VERSION, + execute_loop, execute_pipeline, execute_steps_parallel, execute_steps_parallel_resumable, + AgentExecutor, AgentStepSpec, BudgetSnapshot, LoopDecision, PipelineStage, StepOutcome, + Workflow, WorkflowBudget, WorkflowBuilder, WorkflowCheckpoint, WorkflowEvent, + WorkflowStepRecord, WORKFLOW_CHECKPOINT_SCHEMA_VERSION, }; pub use prompts::{AgentStyle, DetectionConfidence, PlanningMode, SystemPromptSlots}; pub use run::{ diff --git a/core/src/orchestration/combinators.rs b/core/src/orchestration/combinators.rs index e35c404..c159792 100644 --- a/core/src/orchestration/combinators.rs +++ b/core/src/orchestration/combinators.rs @@ -6,7 +6,7 @@ //! of stages independently — no barrier between stages. use super::checkpoint::WorkflowCheckpoint; -use super::executor::{AgentExecutor, AgentStepSpec, StepOutcome}; +use super::executor::{execute_steps_parallel, AgentExecutor, AgentStepSpec, StepOutcome}; use crate::agent::AgentEvent; use crate::ordered_parallel::run_ordered_parallel_with_limit; use crate::store::SessionStore; @@ -214,6 +214,63 @@ pub async fn execute_steps_parallel_resumable( merged } +/// What an [`execute_loop`] predicate decides after seeing a round's outcomes. +pub enum LoopDecision { + /// Run another round with these specs. + Continue(Vec), + /// Stop now; the loop returns the round that just completed. + Stop, +} + +/// Run rounds until the predicate says [`Stop`](LoopDecision::Stop), a round is +/// asked to run no specs, or `max_iterations` is reached — whichever comes +/// first. Each round is a barrier ([`execute_steps_parallel`]); `next` receives +/// the just-completed round's outcomes and decides whether (and with what) to +/// continue. Returns the last round's outcomes (empty if `initial` was empty). +/// +/// `max_iterations` is **mandatory and a hard cap**: it is clamped to at least +/// 1, and once reached the loop stops even if the predicate returns +/// [`Continue`](LoopDecision::Continue). This is the guard that makes an +/// LLM-driven, unknown-length loop (e.g. loop-until-dry) safe — the predicate +/// must never be the *only* termination condition. +/// +/// This is the "loop" shape from the orchestration grammar; like the other +/// combinators it is written purely against the [`AgentExecutor`] seam and adds +/// no scheduling of its own. +pub async fn execute_loop( + executor: Arc, + initial: Vec, + max_iterations: usize, + event_tx: Option>, + mut next: F, +) -> Vec +where + F: FnMut(&[StepOutcome]) -> LoopDecision + Send, +{ + let cap = max_iterations.max(1); + let mut specs = initial; + let mut last = Vec::new(); + let mut iterations = 0; + + while !specs.is_empty() { + let round = execute_steps_parallel( + Arc::clone(&executor), + std::mem::take(&mut specs), + event_tx.clone(), + ) + .await; + iterations += 1; + let decision = next(&round); + last = round; + match decision { + LoopDecision::Continue(more) if iterations < cap => specs = more, + _ => break, + } + } + + last +} + #[cfg(test)] mod tests { use super::*; @@ -664,4 +721,89 @@ mod tests { assert_eq!(out[1].task_id, "c"); assert!(out.iter().all(|o| o.success)); } + + #[tokio::test] + async fn loop_stops_when_predicate_says_stop() { + let exec: Arc = Arc::new(EchoExecutor::new()); + let mut rounds = 0; + let out = crate::orchestration::execute_loop( + exec, + vec![AgentStepSpec::new("r0", "explore", "d", "p")], + 10, + None, + |outcomes| { + rounds += 1; + // Continue twice, then stop on the third round. + if rounds < 3 { + LoopDecision::Continue(vec![AgentStepSpec::new( + format!("r{rounds}"), + "explore", + "d", + outcomes[0].output.clone(), + )]) + } else { + LoopDecision::Stop + } + }, + ) + .await; + assert_eq!(rounds, 3, "predicate saw exactly three rounds"); + assert_eq!(out.len(), 1, "returns the last round's outcomes"); + assert!(out[0].success); + } + + #[tokio::test] + async fn loop_is_hard_capped_by_max_iterations() { + let exec: Arc = Arc::new(EchoExecutor::new()); + let mut rounds = 0; + // A predicate that NEVER stops — only max_iterations terminates it. + let _ = crate::orchestration::execute_loop( + exec, + vec![AgentStepSpec::new("r", "explore", "d", "p")], + 3, + None, + |_outcomes| { + rounds += 1; + LoopDecision::Continue(vec![AgentStepSpec::new("r", "explore", "d", "p")]) + }, + ) + .await; + assert_eq!( + rounds, 3, + "max_iterations is a hard cap on a never-stopping predicate" + ); + } + + #[tokio::test] + async fn loop_with_empty_initial_runs_nothing() { + let exec: Arc = Arc::new(EchoExecutor::new()); + let mut called = false; + let out = crate::orchestration::execute_loop(exec, vec![], 5, None, |_| { + called = true; + LoopDecision::Stop + }) + .await; + assert!(out.is_empty()); + assert!(!called, "predicate is not invoked when there is no work"); + } + + #[tokio::test] + async fn loop_stops_when_predicate_requests_no_further_specs() { + // Continue with an empty spec set ends the loop (no work left = dry). + let exec: Arc = Arc::new(EchoExecutor::new()); + let mut rounds = 0; + let out = crate::orchestration::execute_loop( + exec, + vec![AgentStepSpec::new("r0", "explore", "d", "p")], + 10, + None, + |_| { + rounds += 1; + LoopDecision::Continue(vec![]) // nothing more to do → loop ends + }, + ) + .await; + assert_eq!(rounds, 1); + assert_eq!(out.len(), 1, "the completed round is still returned"); + } } diff --git a/core/src/orchestration/mod.rs b/core/src/orchestration/mod.rs index 38ec137..f0d731a 100644 --- a/core/src/orchestration/mod.rs +++ b/core/src/orchestration/mod.rs @@ -24,7 +24,13 @@ mod checkpoint; mod combinators; mod executor; +mod workflow; +mod workflow_budget; pub use checkpoint::{WorkflowCheckpoint, WorkflowStepRecord, WORKFLOW_CHECKPOINT_SCHEMA_VERSION}; -pub use combinators::{execute_pipeline, execute_steps_parallel_resumable, PipelineStage}; +pub use combinators::{ + execute_loop, execute_pipeline, execute_steps_parallel_resumable, LoopDecision, PipelineStage, +}; pub use executor::{execute_steps_parallel, AgentExecutor, AgentStepSpec, StepOutcome}; +pub use workflow::{Workflow, WorkflowBuilder, WorkflowEvent}; +pub use workflow_budget::{BudgetSnapshot, WorkflowBudget}; diff --git a/core/src/orchestration/workflow.rs b/core/src/orchestration/workflow.rs new file mode 100644 index 0000000..801fc14 --- /dev/null +++ b/core/src/orchestration/workflow.rs @@ -0,0 +1,611 @@ +//! A typed, host-driven workflow facade over the [`AgentExecutor`] seam. +//! +//! [`Workflow`] is a thin orchestrator with **zero new execution machinery**: +//! each verb resolves to exactly one existing combinator call (see +//! [`executor`](super::executor) and [`combinators`](super::combinators)). The +//! genuinely new behavior is observability — workflow-level milestone events on +//! a dedicated [`WorkflowEvent`] broadcast — and a resume boundary at each +//! [`phase`](Workflow::phase). +//! +//! ## Control flow lives in the host +//! +//! Unlike the model-driven `task`/`parallel_task` tools (the LLM decides to fan +//! out) this is *programmable*: the caller `await`s a verb, inspects the +//! returned [`StepOutcome`]s, and decides what runs next with ordinary Rust +//! `if`/`for`/`while`/`?`/`match`. The host language IS the interpreter, so +//! there is no embedded scripting engine to sandbox and the grammar stays +//! minimal (4 verbs + `log`) — anything richer (retry, map/reduce, timeouts) is +//! a host concern. +//! +//! ```no_run +//! # use std::sync::Arc; +//! # use a3s_code_core::orchestration::{Workflow, AgentExecutor, AgentStepSpec}; +//! # async fn run(executor: Arc) { +//! let wf = Workflow::builder(executor).build(); +//! +//! // One step. +//! let plan = wf +//! .agent(AgentStepSpec::new("plan", "plan", "plan the work", "Plan the refactor")) +//! .await; +//! +//! // Fan a *variable* number of steps out of the prior result — this is the +//! // "dynamic" part: the shape is computed at runtime, not declared up front. +//! if plan.success { +//! let specs: Vec = plan +//! .output +//! .lines() +//! .enumerate() +//! .map(|(i, line)| AgentStepSpec::new(format!("impl-{i}"), "general", "impl", line)) +//! .collect(); +//! let results = wf.phase("implement", specs).await; // resumable barrier +//! wf.log("info", &format!("implemented {} steps", results.len()), serde_json::Value::Null); +//! } +//! # } +//! ``` + +use super::combinators::{execute_pipeline, execute_steps_parallel_resumable, PipelineStage}; +use super::executor::{execute_steps_parallel, AgentExecutor, AgentStepSpec, StepOutcome}; +use super::workflow_budget::{BudgetSnapshot, WorkflowBudget}; +use crate::agent::AgentEvent; +use crate::store::SessionStore; +use serde::{Deserialize, Serialize}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use tokio::sync::broadcast; + +/// Default capacity of the [`WorkflowEvent`] broadcast channel. Slow +/// subscribers lag (and drop) rather than block the workflow — milestones are +/// best-effort; durable audit belongs on the trace/AHP path. +const DEFAULT_WORKFLOW_EVENT_CAPACITY: usize = 256; + +/// Workflow-level milestone stream, distinct from the per-step +/// [`AgentEvent`] stream the combinators already emit. +/// +/// Bridged onto whatever consumes `AgentEvent` is *not* automatic — a host that +/// wants these in its existing UI subscribes via [`Workflow::subscribe`]. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum WorkflowEvent { + /// A named [`phase`](Workflow::phase) began. + PhaseStart { + name: String, + index: usize, + step_count: usize, + }, + /// A named [`phase`](Workflow::phase) finished. + PhaseEnd { + name: String, + index: usize, + succeeded: usize, + failed: usize, + }, + /// A structured log line emitted by [`Workflow::log`]. + Log { + level: String, + message: String, + fields: serde_json::Value, + }, + /// The shared workflow budget reached its cap; subsequent child LLM calls + /// will be denied. Emitted at the phase boundary where exhaustion is first + /// observed. The fan-out in flight is **not** force-killed. + BudgetExhausted { resource: String, reason: String }, +} + +/// Builder for a [`Workflow`]. The executor is mandatory (it is the seam to the +/// host's placement / scheduling); everything else is optional. +pub struct WorkflowBuilder { + executor: Arc, + store: Option>, + step_events: Option>, + root_id: Option, + budget: Option>, +} + +impl WorkflowBuilder { + /// Start a builder around `executor` (the default in-box executor is + /// [`AgentSession::agent_executor`](crate::AgentSession::agent_executor)). + pub fn new(executor: Arc) -> Self { + Self { + executor, + store: None, + step_events: None, + root_id: None, + budget: None, + } + } + + /// Attach the shared budget ledger this workflow reports on. + /// + /// The SAME `Arc` must also be installed as the executor's + /// child `budget_guard` (the in-box wiring is done by + /// [`AgentSession::workflow`](crate::AgentSession::workflow)); the workflow + /// only *reads* it for [`budget_snapshot`](Workflow::budget_snapshot) and to + /// emit [`WorkflowEvent::BudgetExhausted`]. + pub fn with_budget(mut self, budget: Arc) -> Self { + self.budget = Some(budget); + self + } + + /// Persist progress so each [`phase`](Workflow::phase) becomes a resume + /// boundary. Without a store, phases run as plain (non-resumable) barriers. + pub fn with_store(mut self, store: Arc) -> Self { + self.store = Some(store); + self + } + + /// Thread the per-step [`AgentEvent`] sender into every combinator call so + /// existing subagent/AHP/trace listeners observe child-run lifecycle events. + pub fn with_step_events(mut self, step_events: broadcast::Sender) -> Self { + self.step_events = Some(step_events); + self + } + + /// Set the stable root id used to derive per-phase checkpoint keys. Supply a + /// deterministic id (e.g. session-derived) for resume to work across runs; + /// otherwise a random id is used and resume is effectively disabled. + pub fn with_root_id(mut self, root_id: impl Into) -> Self { + self.root_id = Some(root_id.into()); + self + } + + /// Finalize the [`Workflow`], creating its event channel. + pub fn build(self) -> Workflow { + let (events, _) = broadcast::channel(DEFAULT_WORKFLOW_EVENT_CAPACITY); + let root_id = self + .root_id + .unwrap_or_else(|| format!("wf-{}", uuid::Uuid::new_v4())); + Workflow { + executor: self.executor, + store: self.store, + events, + step_events: self.step_events, + root_id, + phase_seq: Arc::new(AtomicUsize::new(0)), + budget: self.budget, + } + } +} + +/// A cheaply-clonable (all-`Arc`) typed orchestration facade. +/// +/// Every verb delegates to exactly one existing combinator; [`Workflow`] owns no +/// scheduling or LLM logic. It is `Clone` so a phase can itself spawn +/// sub-workflows that share the same event bus, store, and phase numbering. +#[derive(Clone)] +pub struct Workflow { + executor: Arc, + store: Option>, + events: broadcast::Sender, + step_events: Option>, + root_id: String, + phase_seq: Arc, + budget: Option>, +} + +impl Workflow { + /// Begin building a workflow around `executor`. + pub fn builder(executor: Arc) -> WorkflowBuilder { + WorkflowBuilder::new(executor) + } + + /// The stable root id phase checkpoint keys are derived from. + pub fn root_id(&self) -> &str { + &self.root_id + } + + /// A snapshot of the shared budget ledger, if a budget is attached. + pub fn budget_snapshot(&self) -> Option { + self.budget.as_ref().map(|b| b.snapshot()) + } + + /// Subscribe to this workflow's [`WorkflowEvent`] milestones. + pub fn subscribe(&self) -> broadcast::Receiver { + self.events.subscribe() + } + + /// Run a single step. A one-element barrier — reuses the panic isolation and + /// placement of [`parallel`](Self::parallel) so one and many steps share a + /// single code path. + pub async fn agent(&self, spec: AgentStepSpec) -> StepOutcome { + let task_id = spec.task_id.clone(); + let agent = spec.agent.clone(); + execute_steps_parallel( + Arc::clone(&self.executor), + vec![spec], + self.step_events.clone(), + ) + .await + .into_iter() + .next() + .unwrap_or_else(|| StepOutcome::failed(task_id, agent, "step produced no outcome")) + } + + /// Barrier fan-out. Bounded by the executor's + /// [`concurrency_hint`](AgentExecutor::concurrency_hint); input order is + /// preserved and a panicked branch becomes a failed [`StepOutcome`]. + pub async fn parallel(&self, specs: Vec) -> Vec { + execute_steps_parallel(Arc::clone(&self.executor), specs, self.step_events.clone()).await + } + + /// A *named* barrier and a resume boundary. + /// + /// Emits [`WorkflowEvent::PhaseStart`]/[`WorkflowEvent::PhaseEnd`] around the + /// inner combinator. When a store is configured the phase runs through + /// [`execute_steps_parallel_resumable`] keyed by a deterministic + /// `"{root_id}/{index}:{name}"` id, so an interrupted run skips already + /// completed steps; otherwise it is a plain [`parallel`](Self::parallel). + pub async fn phase(&self, name: &str, specs: Vec) -> Vec { + let index = self.phase_seq.fetch_add(1, Ordering::SeqCst); + let _ = self.events.send(WorkflowEvent::PhaseStart { + name: name.to_string(), + index, + step_count: specs.len(), + }); + + let out = match &self.store { + Some(store) => { + let workflow_id = format!("{}/{index}:{name}", self.root_id); + execute_steps_parallel_resumable( + Arc::clone(&self.executor), + specs, + &workflow_id, + Arc::clone(store), + self.step_events.clone(), + ) + .await + } + None => self.parallel(specs).await, + }; + + let failed = out.iter().filter(|o| !o.success).count(); + let _ = self.events.send(WorkflowEvent::PhaseEnd { + name: name.to_string(), + index, + succeeded: out.len() - failed, + failed, + }); + + // Surface a budget cap once it is reached so the host can stop issuing + // further phases. Enforcement itself lives in the child loops (via the + // shared budget guard); this is observation only. + if let Some(budget) = &self.budget { + if budget.is_exhausted() { + let snap = budget.snapshot(); + let _ = self.events.send(WorkflowEvent::BudgetExhausted { + resource: "workflow_tokens".to_string(), + reason: format!( + "workflow token budget exhausted ({} / {} tokens)", + snap.consumed_tokens, + snap.limit_tokens.unwrap_or(0) + ), + }); + } + } + out + } + + /// Per-item staged chains with **no barrier between stages** — delegates + /// straight to [`execute_pipeline`]. Item A may be in stage 3 while item B is + /// still in stage 1. + pub async fn pipeline( + &self, + items: Vec, + stages: Vec>, + ) -> Vec> + where + I: Send + 'static, + { + execute_pipeline( + Arc::clone(&self.executor), + items, + stages, + self.step_events.clone(), + ) + .await + } + + /// Emit a best-effort structured log line on the [`WorkflowEvent`] stream. + /// Synchronous and non-failing: a full channel drops the line rather than + /// blocking the workflow. + pub fn log(&self, level: &str, message: &str, fields: serde_json::Value) { + let _ = self.events.send(WorkflowEvent::Log { + level: level.to_string(), + message: message.to_string(), + fields, + }); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::orchestration::WorkflowCheckpoint; + use crate::store::MemorySessionStore; + use async_trait::async_trait; + use std::collections::HashMap; + + /// Echoes the prompt into the output; records which task ids it ran. + struct EchoExecutor { + ran: Arc>>, + } + + impl EchoExecutor { + fn new() -> Self { + Self { + ran: Arc::new(tokio::sync::Mutex::new(Vec::new())), + } + } + } + + #[async_trait] + impl AgentExecutor for EchoExecutor { + async fn execute_step( + &self, + spec: AgentStepSpec, + _event_tx: Option>, + ) -> StepOutcome { + self.ran.lock().await.push(spec.task_id.clone()); + StepOutcome { + task_id: spec.task_id.clone(), + session_id: format!("task-run-{}", spec.task_id), + agent: spec.agent.clone(), + output: spec.prompt.clone(), + success: spec.agent != "fail", + structured: None, + } + } + fn concurrency_hint(&self) -> usize { + 4 + } + } + + fn spec(id: &str, agent: &str, prompt: &str) -> AgentStepSpec { + AgentStepSpec::new(id, agent, "d", prompt) + } + + #[tokio::test] + async fn agent_runs_a_single_step() { + let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build(); + let out = wf.agent(spec("a", "explore", "hello")).await; + assert_eq!(out.task_id, "a"); + assert_eq!(out.output, "hello"); + assert!(out.success); + } + + #[tokio::test] + async fn parallel_preserves_order_and_isolates_failure() { + let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build(); + let out = wf + .parallel(vec![ + spec("a", "explore", "pa"), + spec("b", "fail", "pb"), + spec("c", "review", "pc"), + ]) + .await; + assert_eq!( + out.iter().map(|o| o.task_id.as_str()).collect::>(), + vec!["a", "b", "c"] + ); + assert!(out[0].success); + assert!( + !out[1].success, + "the failing branch surfaces as success=false" + ); + assert!(out[2].success); + } + + #[tokio::test] + async fn phase_emits_start_and_end_milestones() { + let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build(); + let mut rx = wf.subscribe(); + let out = wf + .phase( + "review", + vec![spec("a", "review", "p"), spec("b", "fail", "p")], + ) + .await; + assert_eq!(out.len(), 2); + + let start = rx.recv().await.unwrap(); + assert_eq!( + start, + WorkflowEvent::PhaseStart { + name: "review".to_string(), + index: 0, + step_count: 2, + } + ); + let end = rx.recv().await.unwrap(); + assert_eq!( + end, + WorkflowEvent::PhaseEnd { + name: "review".to_string(), + index: 0, + succeeded: 1, + failed: 1, + } + ); + } + + #[tokio::test] + async fn phases_get_monotonic_indices() { + let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build(); + let mut rx = wf.subscribe(); + wf.phase("one", vec![spec("a", "explore", "p")]).await; + wf.phase("two", vec![spec("b", "explore", "p")]).await; + + let indices: Vec = { + let mut seen = Vec::new(); + while let Ok(ev) = rx.try_recv() { + if let WorkflowEvent::PhaseStart { index, .. } = ev { + seen.push(index); + } + } + seen + }; + assert_eq!(indices, vec![0, 1], "phase indices increment per call"); + } + + #[tokio::test] + async fn phase_with_store_resumes_from_checkpoint() { + let store: Arc = Arc::new(MemorySessionStore::new()); + let exec = Arc::new(EchoExecutor::new()); + let ran = Arc::clone(&exec.ran); + let wf = Workflow::builder(exec) + .with_store(Arc::clone(&store)) + .with_root_id("root") + .build(); + + // Pre-seed the checkpoint for phase 0 ("implement"): step "a" is already + // done. The deterministic id must match what phase() derives. + let mut done = HashMap::new(); + done.insert( + "a".to_string(), + StepOutcome { + task_id: "a".into(), + session_id: "task-run-a".into(), + agent: "explore".into(), + output: "cached-a".into(), + success: true, + structured: None, + }, + ); + store + .save_workflow_checkpoint( + "root/0:implement", + &WorkflowCheckpoint::from_completed("root/0:implement", &done, 1), + ) + .await + .unwrap(); + + let out = wf + .phase( + "implement", + vec![spec("a", "explore", "pa"), spec("b", "review", "pb")], + ) + .await; + + assert_eq!( + *ran.lock().await, + vec!["b".to_string()], + "only the not-yet-completed step actually runs" + ); + assert_eq!( + out[0].output, "cached-a", + "completed step reuses its cached outcome" + ); + assert!(out.iter().all(|o| o.success)); + } + + #[tokio::test] + async fn pipeline_chains_stages_per_item() { + let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build(); + let stages: Vec> = vec![ + Arc::new(|_prev: Option<&StepOutcome>, item: &&str| { + Some(AgentStepSpec::new("s1", "explore", "d", *item)) + }), + Arc::new(|prev: Option<&StepOutcome>, _item: &&str| { + let prior = prev.map(|o| o.output.clone()).unwrap_or_default(); + Some(AgentStepSpec::new( + "s2", + "review", + "d", + format!("review of: {prior}"), + )) + }), + ]; + let out = wf.pipeline(vec!["alpha", "beta"], stages).await; + assert_eq!(out.len(), 2); + assert_eq!(out[0].as_ref().unwrap().output, "review of: alpha"); + assert_eq!(out[1].as_ref().unwrap().output, "review of: beta"); + } + + #[tokio::test] + async fn log_emits_a_log_event() { + let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build(); + let mut rx = wf.subscribe(); + wf.log("info", "hello", serde_json::json!({ "k": 1 })); + let ev = rx.recv().await.unwrap(); + assert_eq!( + ev, + WorkflowEvent::Log { + level: "info".to_string(), + message: "hello".to_string(), + fields: serde_json::json!({ "k": 1 }), + } + ); + } + + #[tokio::test] + async fn phase_emits_budget_exhausted_when_capped() { + use crate::budget::BudgetGuard; + use crate::llm::TokenUsage; + + let budget = Arc::new(WorkflowBudget::new(Some(10))); + // Spend past the cap before the phase runs (simulates child loops having + // already recorded usage into the shared ledger). + budget + .record_after_llm( + "s", + &TokenUsage { + total_tokens: 12, + ..Default::default() + }, + ) + .await; + + let wf = Workflow::builder(Arc::new(EchoExecutor::new())) + .with_budget(Arc::clone(&budget)) + .build(); + let mut rx = wf.subscribe(); + wf.phase("p", vec![spec("a", "explore", "p")]).await; + + let mut saw_exhausted = false; + while let Ok(ev) = rx.try_recv() { + if let WorkflowEvent::BudgetExhausted { resource, .. } = ev { + assert_eq!(resource, "workflow_tokens"); + saw_exhausted = true; + } + } + assert!( + saw_exhausted, + "phase boundary emits BudgetExhausted once capped" + ); + assert_eq!(wf.budget_snapshot().unwrap().consumed_tokens, 12); + } + + #[tokio::test] + async fn no_budget_means_no_snapshot_and_no_exhausted_event() { + let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build(); + let mut rx = wf.subscribe(); + wf.phase("p", vec![spec("a", "explore", "p")]).await; + assert!(wf.budget_snapshot().is_none()); + while let Ok(ev) = rx.try_recv() { + assert!( + !matches!(ev, WorkflowEvent::BudgetExhausted { .. }), + "no budget → never a BudgetExhausted event" + ); + } + } + + #[tokio::test] + async fn agent_returns_failed_outcome_when_executor_yields_nothing() { + // A pathological executor whose fan-out yields no result still gives the + // caller an addressable failed outcome rather than panicking. + struct Empty; + #[async_trait] + impl AgentExecutor for Empty { + async fn execute_step( + &self, + spec: AgentStepSpec, + _tx: Option>, + ) -> StepOutcome { + StepOutcome::failed(spec.task_id, spec.agent, "boom") + } + } + let wf = Workflow::builder(Arc::new(Empty)).build(); + let out = wf.agent(spec("x", "explore", "p")).await; + assert_eq!(out.task_id, "x"); + assert!(!out.success); + } +} diff --git a/core/src/orchestration/workflow_budget.rs b/core/src/orchestration/workflow_budget.rs new file mode 100644 index 0000000..94039ba --- /dev/null +++ b/core/src/orchestration/workflow_budget.rs @@ -0,0 +1,226 @@ +//! A workflow-scoped, aggregating [`BudgetGuard`]. +//! +//! [`BudgetGuard`] alone counts per child-loop (keyed by `session_id`); there is +//! no single ledger spanning a fan-out. [`WorkflowBudget`] closes that gap: it +//! wraps an optional inner guard, accumulates token spend from **every** child +//! step into one shared atomic counter, and refuses further LLM calls once an +//! optional hard `limit_tokens` is reached. +//! +//! It implements [`BudgetGuard`] itself, so it installs through the unchanged +//! seam — set it as the child runs' `budget_guard` (via the +//! [`ChildRunContext`](crate::child_run::ChildRunContext) the executor applies) +//! and the existing per-turn `check_before_llm` / `record_after_llm` decision +//! points in the agent loop feed this one ledger automatically. +//! +//! ## Honest limit +//! +//! `record_after_llm` runs *after* a call returns, while `check_before_llm` runs +//! before. Under a wide [`parallel`](super::Workflow::parallel) fan-out several +//! in-flight turns can therefore race a few calls past a hard cap before the +//! ledger catches up. This is a **soft cost ceiling**, not a hard per-token +//! guarantee — the same race the per-session [`BudgetGuard`] already documents. +//! The framework never force-kills a running fan-out; an exhausted budget +//! surfaces as a `Deny` on the *next* `check_before_llm`, which the agent loop +//! turns into a failed step the host can react to. + +use crate::budget::{BudgetDecision, BudgetGuard}; +use crate::llm::TokenUsage; +use async_trait::async_trait; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +/// An immutable view of a [`WorkflowBudget`] ledger. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct BudgetSnapshot { + /// Total tokens recorded across every child step so far. + pub consumed_tokens: u64, + /// The hard ceiling, if one was configured. + pub limit_tokens: Option, +} + +impl BudgetSnapshot { + /// Tokens left before the cap, saturating at 0. `None` when uncapped. + pub fn remaining_tokens(&self) -> Option { + self.limit_tokens + .map(|limit| limit.saturating_sub(self.consumed_tokens)) + } + + /// Whether a configured cap has been reached. + pub fn is_exhausted(&self) -> bool { + matches!(self.limit_tokens, Some(limit) if self.consumed_tokens >= limit) + } +} + +/// A shared, workflow-scoped token ledger that also acts as a [`BudgetGuard`]. +pub struct WorkflowBudget { + inner: Option>, + consumed_tokens: AtomicU64, + limit_tokens: Option, +} + +impl WorkflowBudget { + /// A ledger with an optional hard token ceiling and no inner guard. + pub fn new(limit_tokens: Option) -> Self { + Self { + inner: None, + consumed_tokens: AtomicU64::new(0), + limit_tokens, + } + } + + /// Delegate `check`/`record` to `inner` in addition to maintaining the + /// shared ledger — lets a host's existing per-tenant guard keep working + /// while the workflow gets its aggregate cap. + pub fn with_inner(mut self, inner: Arc) -> Self { + self.inner = Some(inner); + self + } + + /// Total tokens recorded so far. + pub fn consumed_tokens(&self) -> u64 { + self.consumed_tokens.load(Ordering::SeqCst) + } + + /// A point-in-time view of the ledger. + pub fn snapshot(&self) -> BudgetSnapshot { + BudgetSnapshot { + consumed_tokens: self.consumed_tokens(), + limit_tokens: self.limit_tokens, + } + } + + /// Whether the configured cap has been reached. + pub fn is_exhausted(&self) -> bool { + self.snapshot().is_exhausted() + } +} + +#[async_trait] +impl BudgetGuard for WorkflowBudget { + async fn check_before_llm( + &self, + session_id: &str, + estimated_prompt_tokens: usize, + ) -> BudgetDecision { + if self.is_exhausted() { + return BudgetDecision::Deny { + resource: "workflow_tokens".to_string(), + reason: format!( + "workflow token budget exhausted ({} / {} tokens)", + self.consumed_tokens(), + self.limit_tokens.unwrap_or(0) + ), + }; + } + match &self.inner { + Some(inner) => { + inner + .check_before_llm(session_id, estimated_prompt_tokens) + .await + } + None => BudgetDecision::Allow, + } + } + + async fn record_after_llm(&self, session_id: &str, usage: &TokenUsage) { + self.consumed_tokens + .fetch_add(usage.total_tokens as u64, Ordering::SeqCst); + if let Some(inner) = &self.inner { + inner.record_after_llm(session_id, usage).await; + } + } + + async fn check_before_tool(&self, session_id: &str, tool_name: &str) -> BudgetDecision { + if self.is_exhausted() { + return BudgetDecision::Deny { + resource: "workflow_tokens".to_string(), + reason: "workflow token budget exhausted".to_string(), + }; + } + match &self.inner { + Some(inner) => inner.check_before_tool(session_id, tool_name).await, + None => BudgetDecision::Allow, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::AtomicUsize; + + fn usage(total: usize) -> TokenUsage { + TokenUsage { + total_tokens: total, + ..Default::default() + } + } + + #[tokio::test] + async fn accumulates_and_caps() { + let budget = WorkflowBudget::new(Some(100)); + assert!(matches!( + budget.check_before_llm("s", 0).await, + BudgetDecision::Allow + )); + + budget.record_after_llm("a", &usage(60)).await; + budget.record_after_llm("b", &usage(50)).await; // total 110 >= 100 + assert_eq!(budget.consumed_tokens(), 110); + assert!(budget.is_exhausted()); + + match budget.check_before_llm("s", 0).await { + BudgetDecision::Deny { resource, .. } => assert_eq!(resource, "workflow_tokens"), + other => panic!("expected Deny, got {other:?}"), + } + } + + #[tokio::test] + async fn uncapped_never_denies() { + let budget = WorkflowBudget::new(None); + budget.record_after_llm("a", &usage(1_000_000)).await; + assert!(!budget.is_exhausted()); + assert!(matches!( + budget.check_before_llm("s", 0).await, + BudgetDecision::Allow + )); + assert_eq!(budget.snapshot().remaining_tokens(), None); + } + + #[tokio::test] + async fn snapshot_reports_remaining() { + let budget = WorkflowBudget::new(Some(100)); + budget.record_after_llm("a", &usage(30)).await; + let snap = budget.snapshot(); + assert_eq!(snap.consumed_tokens, 30); + assert_eq!(snap.remaining_tokens(), Some(70)); + assert!(!snap.is_exhausted()); + } + + #[tokio::test] + async fn delegates_to_inner_guard() { + #[derive(Default)] + struct Counting { + checks: AtomicUsize, + records: AtomicUsize, + } + #[async_trait] + impl BudgetGuard for Counting { + async fn check_before_llm(&self, _: &str, _: usize) -> BudgetDecision { + self.checks.fetch_add(1, Ordering::SeqCst); + BudgetDecision::Allow + } + async fn record_after_llm(&self, _: &str, _: &TokenUsage) { + self.records.fetch_add(1, Ordering::SeqCst); + } + } + + let inner = Arc::new(Counting::default()); + let budget = WorkflowBudget::new(Some(1000)).with_inner(inner.clone()); + budget.check_before_llm("s", 0).await; + budget.record_after_llm("s", &usage(10)).await; + assert_eq!(inner.checks.load(Ordering::SeqCst), 1); + assert_eq!(inner.records.load(Ordering::SeqCst), 1); + assert_eq!(budget.consumed_tokens(), 10); + } +} From 0b616d8ff13c6f6f93a830c4a79ed94fe02cec21 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 7 Jun 2026 13:56:02 +0800 Subject: [PATCH 3/5] feat(sdk): budgeted workflow fan-out (workflowParallel / workflow_parallel) Expose the shared workflow budget through both SDKs via a session method built on AgentSession::workflow_with_token_budget(): run a fan-out where every child agent feeds ONE token ledger and, once the cap is reached, further child LLM calls are denied (a soft cap; the in-flight fan-out is never force-killed). Returns the per-step outcomes plus the ledger snapshot. - Node (napi): session.workflowParallel(specs, budgetTokens?) -> { outcomes, budget } - Python (pyo3): session.workflow_parallel(specs, budget_tokens=None) -> dict - Regenerate node generated.d.ts; document both in the README SDK examples. Both native modules build (napi build / maturin) and pass an offline smoke test (empty fan-out takes no LLM path; correct ledger snapshot). Full orchestration behavior is covered by the core crate's test suite. --- README.md | 10 +++++ sdk/node/generated.d.ts | 93 +++++++++++++++++++++++++++++++++++++++++ sdk/node/src/lib.rs | 56 +++++++++++++++++++++++++ sdk/python/src/lib.rs | 50 ++++++++++++++++++++++ 4 files changed, 209 insertions(+) diff --git a/README.md b/README.md index 63301ec..3abc4dd 100644 --- a/README.md +++ b/README.md @@ -618,6 +618,12 @@ results = session.pipeline( # Resumable: progress is journaled under a workflow id via the session store, # so an interrupted run (or one resumed on another node) skips completed steps. outcomes = session.parallel_resumable(specs, "nightly-audit") + +# Budgeted fan-out: every child agent feeds ONE shared token ledger; once the +# cap is hit further child LLM calls are denied (a soft cap). Returns the +# outcomes plus the ledger snapshot. +res = session.workflow_parallel(specs, budget_tokens=500_000) +print(res["budget"]["consumed_tokens"], res["budget"]["limit_tokens"]) ``` ```javascript @@ -637,6 +643,10 @@ const results = await session.pipeline( ); await session.parallelResumable(specs, "nightly-audit"); + +// Budgeted fan-out: all child agents share one token ledger (a soft cap). +const { outcomes: out, budget } = await session.workflowParallel(specs, 500_000); +console.log(budget.consumedTokens, budget.limitTokens); // NOTE: pipeline stage callbacks MUST NOT throw — return null to stop a chain. // A throw aborts the process (same constraint as setBudgetGuard). A stage that // hangs past timeoutMs (default 30s) fails closed (treated as null). diff --git a/sdk/node/generated.d.ts b/sdk/node/generated.d.ts index a46329d..9676c7e 100644 --- a/sdk/node/generated.d.ts +++ b/sdk/node/generated.d.ts @@ -745,6 +745,52 @@ export interface McpServerStatusEntry { toolCount: number error?: string } +/** One unit of orchestrated agent work — what to run, independent of where. */ +export interface AgentStepSpecObject { + /** Stable id for this step (assigned by the caller). */ + taskId: string + /** Registry key of the agent to run (e.g. "explore", "review"). */ + agent: string + /** Short label for display/tracking. */ + description: string + /** Instruction handed to the child agent. */ + prompt: string + /** Optional per-step tool-round cap. */ + maxSteps?: number + /** Optional parent session id for event correlation. */ + parentSessionId?: string + /** + * When set, the step must return JSON conforming to this schema; the + * validated object lands in `StepOutcomeObject.structured`. + */ + outputSchema?: any +} +/** The result of running one orchestrated step. */ +export interface StepOutcomeObject { + taskId: string + sessionId: string + agent: string + output: string + success: boolean + /** Schema-validated structured output, when the step requested one. */ + structured?: any +} +/** + * A snapshot of a workflow's shared token ledger. `consumedTokens` is the total + * recorded across every step; `limitTokens` is the hard ceiling, if one was set. + */ +export interface WorkflowBudgetObject { + consumedTokens: number + limitTokens?: number +} +/** + * The result of a budgeted workflow fan-out: the per-step outcomes plus the + * shared budget ledger snapshot. + */ +export interface WorkflowParallelResult { + outcomes: Array + budget: WorkflowBudgetObject +} /** * Shape of the JS handlers object accepted by `session.setBudgetGuard`. * Each field is optional — methods that aren't provided fall back to @@ -1213,6 +1259,53 @@ export declare class Session { * when no checkpoint exists for `checkpointRunId`. */ resumeRun(checkpointRunId: string): Promise + /** + * Run `specs` as a fan-out of agent steps, bounded by the session's + * configured parallelism, and resolve with each step's outcome in input + * order. A failed step surfaces as `success: false` without failing the + * batch. + */ + parallel(specs: Array): Promise> + /** + * Like `parallel`, but resumable: progress is journaled under + * `workflowId` via the session's `sessionStore`, so an interrupted run + * skips already-completed steps. Rejects when no `sessionStore` is + * configured. + */ + parallelResumable(specs: Array, workflowId: string): Promise> + /** + * Run `specs` as a fan-out under one shared workflow token budget. + * + * Every child agent feeds a single ledger; once `budgetTokens` is reached, + * further child LLM calls are denied (a *soft* cap — a wide fan-out can race + * a few in-flight turns past it before the post-call ledger catches up; the + * in-flight fan-out is never force-killed). Omit `budgetTokens` for an + * uncapped ledger that still aggregates spend. Resolves with the outcomes + * (input order; a failed step is `success: false`) and the ledger snapshot. + */ + workflowParallel(specs: Array, budgetTokens?: number | undefined | null): Promise + /** + * Run each item through a chain of `stages`, with no barrier between + * stages — item A can be in stage 3 while item B is still in stage 1. + * + * Each stage is a function `(ctx) => spec | null` where `ctx` is + * `{ previous: StepOutcomeObject | null, item: any }`. Return an + * `AgentStepSpecObject` (camelCase keys) to run that step, or `null` to + * stop the item's chain. A chain also stops when a step fails. + * + * IMPORTANT: a stage callback MUST NOT throw — in this napi version a JS + * throw at return-conversion aborts the process (same constraint as + * `setBudgetGuard`). Wrap your logic in try/catch and return `null` on + * error. A stage that hangs past `timeoutMs` (default 30s) fails closed + * (treated as `null`, stopping that chain) rather than blocking forever. + * + * This is a *synchronous* napi method that returns a Promise via a + * deferred: the JS stage functions (which are not `Send`) are converted + * to thread-safe functions on the JS thread here, then the chains run on + * the worker runtime and resolve the Promise — so the event loop is never + * blocked and no non-`Send` value crosses the async boundary. + */ + pipeline(items: Array, stages: Array<(ctx: { previous: StepOutcomeObject | null, item: any }) => AgentStepSpecObject | null>, timeoutMs?: number): Promise> /** * Send a prompt or request and get a streaming event iterator. * diff --git a/sdk/node/src/lib.rs b/sdk/node/src/lib.rs index 0974153..f2937e8 100644 --- a/sdk/node/src/lib.rs +++ b/sdk/node/src/lib.rs @@ -3079,6 +3079,23 @@ impl From for StepOutcomeObject { } } +/// A snapshot of a workflow's shared token ledger. `consumedTokens` is the total +/// recorded across every step; `limitTokens` is the hard ceiling, if one was set. +#[napi(object)] +#[derive(Clone)] +pub struct WorkflowBudgetObject { + pub consumed_tokens: i64, + pub limit_tokens: Option, +} + +/// The result of a budgeted workflow fan-out: the per-step outcomes plus the +/// shared budget ledger snapshot. +#[napi(object)] +pub struct WorkflowParallelResult { + pub outcomes: Vec, + pub budget: WorkflowBudgetObject, +} + /// Workspace-bound session. All LLM and tool operations happen here. #[napi] pub struct Session { @@ -3191,6 +3208,45 @@ impl Session { Ok(outcomes.into_iter().map(StepOutcomeObject::from).collect()) } + /// Run `specs` as a fan-out under one shared workflow token budget. + /// + /// Every child agent feeds a single ledger; once `budgetTokens` is reached, + /// further child LLM calls are denied (a *soft* cap — a wide fan-out can race + /// a few in-flight turns past it before the post-call ledger catches up; the + /// in-flight fan-out is never force-killed). Omit `budgetTokens` for an + /// uncapped ledger that still aggregates spend. Resolves with the outcomes + /// (input order; a failed step is `success: false`) and the ledger snapshot. + #[napi] + pub async fn workflow_parallel( + &self, + specs: Vec, + budget_tokens: Option, + ) -> napi::Result { + let session = self.inner.clone(); + let rust_specs: Vec = specs.into_iter().map(Into::into).collect(); + let limit = budget_tokens.map(|t| t.max(0) as u64); + let (outcomes, snapshot) = get_runtime() + .spawn(async move { + let wf = session.workflow_with_token_budget(limit); + let outcomes = wf.parallel(rust_specs).await; + (outcomes, wf.budget_snapshot()) + }) + .await + .map_err(|e| napi::Error::from_reason(format!("Task join error: {e}")))?; + Ok(WorkflowParallelResult { + outcomes: outcomes.into_iter().map(StepOutcomeObject::from).collect(), + budget: snapshot + .map(|b| WorkflowBudgetObject { + consumed_tokens: b.consumed_tokens as i64, + limit_tokens: b.limit_tokens.map(|l| l as i64), + }) + .unwrap_or(WorkflowBudgetObject { + consumed_tokens: 0, + limit_tokens: limit.map(|l| l as i64), + }), + }) + } + /// Run each item through a chain of `stages`, with no barrier between /// stages — item A can be in stage 3 while item B is still in stage 1. /// diff --git a/sdk/python/src/lib.rs b/sdk/python/src/lib.rs index 6859120..47146a5 100644 --- a/sdk/python/src/lib.rs +++ b/sdk/python/src/lib.rs @@ -1456,6 +1456,56 @@ impl PySession { outcomes.iter().map(|o| step_outcome_to_py(py, o)).collect() } + /// Run `specs` as a fan-out under one shared workflow token budget. + /// + /// All child agents feed a single ledger; once `budget_tokens` is reached, + /// further child LLM calls are denied (a *soft* cap — a wide fan-out can race + /// a few in-flight turns past it before the post-call ledger catches up; the + /// in-flight fan-out is never force-killed). Pass `budget_tokens=None` for an + /// uncapped ledger that still aggregates spend. Returns a dict + /// `{"outcomes": [...], "budget": {"consumed_tokens", "limit_tokens"}}`. + #[pyo3(signature = (specs, budget_tokens=None))] + fn workflow_parallel( + &self, + py: Python<'_>, + specs: Vec>, + budget_tokens: Option, + ) -> PyResult { + let rust_specs = specs + .iter() + .map(|s| py_to_step_spec(py, s)) + .collect::>>()?; + let session = self.inner.clone(); + let (outcomes, snapshot) = py.allow_threads(move || { + get_runtime().block_on(async move { + let wf = session.workflow_with_token_budget(budget_tokens); + let outcomes = wf.parallel(rust_specs).await; + (outcomes, wf.budget_snapshot()) + }) + }); + + let outcomes_py = outcomes + .iter() + .map(|o| step_outcome_to_py(py, o)) + .collect::>>()?; + let budget = PyDict::new(py); + budget.set_item( + "consumed_tokens", + snapshot.as_ref().map(|b| b.consumed_tokens).unwrap_or(0), + )?; + budget.set_item( + "limit_tokens", + snapshot + .as_ref() + .and_then(|b| b.limit_tokens) + .or(budget_tokens), + )?; + let result = PyDict::new(py); + result.set_item("outcomes", outcomes_py)?; + result.set_item("budget", budget)?; + Ok(result.into_any().unbind()) + } + /// Run each item through a chain of `stages`, with no barrier between /// stages. Each stage is a callable `stage(ctx) -> spec_dict | None`, where /// `ctx = {"previous": , "item": }`. Return a From ea025833696a150c6f7c46e701bb22532f8f79f3 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 7 Jun 2026 14:08:28 +0800 Subject: [PATCH 4/5] test: integration coverage for the Workflow facade, loop & shared budget - core/tests/test_workflow_facade_real_llm.rs (#[ignore], real-LLM gated like test_orchestration_real_llm.rs): the Workflow facade phase + milestones, execute_loop's max_iterations hard cap, session.workflow() running a real child agent with the shared ledger accumulating spend, and sequential budget enforcement (a step started after the cap is denied). - sdk/node/test.mjs: offline workflowParallel shape check (empty fan-out, ledger snapshot) in the existing smoke run. - sdk/python/tests/test_workflow_parallel.py: the same offline check for Python. Verified: core suite green; the integration target compiles and registers its 4 ignored cases; node 'npm test' and the python smoke both pass offline. --- core/tests/test_workflow_facade_real_llm.rs | 187 ++++++++++++++++++++ sdk/node/test.mjs | 12 ++ sdk/python/tests/test_workflow_parallel.py | 65 +++++++ 3 files changed, 264 insertions(+) create mode 100644 core/tests/test_workflow_facade_real_llm.rs create mode 100644 sdk/python/tests/test_workflow_parallel.py diff --git a/core/tests/test_workflow_facade_real_llm.rs b/core/tests/test_workflow_facade_real_llm.rs new file mode 100644 index 0000000..8f0b5b4 --- /dev/null +++ b/core/tests/test_workflow_facade_real_llm.rs @@ -0,0 +1,187 @@ +//! Real-LLM integration tests for the programmable Workflow facade, the +//! `execute_loop` combinator, and the shared `WorkflowBudget` — the surface the +//! mock-backed unit tests can't fully exercise (real child-agent loops, real +//! token accounting feeding the shared ledger). +//! +//! `#[ignore]` — requires a live provider in `.a3s/config.acl`. Run: +//! +//! ```bash +//! A3S_CONFIG_FILE=/abs/path/.a3s/config.acl \ +//! cargo test -p a3s-code-core --test test_workflow_facade_real_llm -- --ignored --nocapture +//! ``` + +use std::path::PathBuf; +use std::sync::Arc; + +use a3s_code_core::config::CodeConfig; +use a3s_code_core::llm::create_client_with_config; +use a3s_code_core::orchestration::{ + execute_loop, AgentExecutor, AgentStepSpec, LoopDecision, Workflow, WorkflowEvent, +}; +use a3s_code_core::subagent::AgentRegistry; +use a3s_code_core::tools::TaskExecutor; +use a3s_code_core::{Agent, AgentSession}; + +fn repo_config_path() -> PathBuf { + std::env::var_os("A3S_CONFIG_FILE") + .map(PathBuf::from) + .unwrap_or_else(|| { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("../../..") + .join(".a3s/config.acl") + }) +} + +/// A bare real-LLM executor over a throwaway workspace. Keep the returned guard +/// in scope so the temp dir is cleaned up (no stray temp files). +fn real_executor() -> (Arc, tempfile::TempDir) { + let path = repo_config_path(); + let config = CodeConfig::from_file(&path) + .unwrap_or_else(|e| panic!("failed to load {}: {e}", path.display())); + let llm_client = + create_client_with_config(config.default_llm_config().expect("default llm config")); + let workspace = tempfile::tempdir().expect("temp workspace"); + let executor = TaskExecutor::new( + Arc::new(AgentRegistry::new()), + llm_client, + workspace.path().to_string_lossy().to_string(), + ); + (Arc::new(executor), workspace) +} + +/// A real-LLM session built from the repo config, over a throwaway workspace. +async fn real_session() -> (AgentSession, tempfile::TempDir) { + let path = repo_config_path(); + let config = CodeConfig::from_file(&path) + .unwrap_or_else(|e| panic!("failed to load {}: {e}", path.display())); + let agent = Agent::from_config(config) + .await + .expect("build agent from real config"); + let workspace = tempfile::tempdir().expect("temp workspace"); + let session = agent + .session(workspace.path().to_string_lossy().to_string(), None) + .expect("create session"); + (session, workspace) +} + +/// The facade's `phase` runs a real fan-out and emits PhaseStart/PhaseEnd +/// milestones on the WorkflowEvent stream. +#[tokio::test(flavor = "multi_thread")] +#[ignore = "requires real provider credentials and network access"] +async fn real_workflow_phase_runs_and_emits_milestones() { + let (exec, _ws) = real_executor(); + let wf = Workflow::builder(exec).build(); + let mut rx = wf.subscribe(); + + let out = wf + .phase( + "probe", + vec![AgentStepSpec::new( + "g1", + "general", + "echo", + "Reply with exactly the word READY.", + ) + .with_max_steps(2)], + ) + .await; + + assert_eq!(out.len(), 1); + assert!(out[0].success, "phase step succeeded: {}", out[0].output); + + let start = rx.recv().await.expect("PhaseStart milestone"); + assert!( + matches!(start, WorkflowEvent::PhaseStart { ref name, step_count, .. } if name == "probe" && step_count == 1), + "got: {start:?}" + ); + let end = rx.recv().await.expect("PhaseEnd milestone"); + assert!( + matches!(end, WorkflowEvent::PhaseEnd { succeeded, failed, .. } if succeeded == 1 && failed == 0), + "got: {end:?}" + ); +} + +/// `execute_loop` runs real rounds and `max_iterations` hard-caps a predicate +/// that never returns `Stop`. +#[tokio::test(flavor = "multi_thread")] +#[ignore = "requires real provider credentials and network access"] +async fn real_workflow_loop_is_hard_capped() { + let (exec, _ws) = real_executor(); + let mut rounds = 0; + let step = || { + AgentStepSpec::new("r", "general", "echo", "Reply with exactly the word OK.") + .with_max_steps(2) + }; + + let _ = execute_loop(exec, vec![step()], 2, None, |_outcomes| { + rounds += 1; + // Never stop voluntarily — only max_iterations terminates us. + LoopDecision::Continue(vec![step()]) + }) + .await; + + assert_eq!(rounds, 2, "max_iterations caps a never-stopping predicate"); +} + +/// `session.workflow().agent(..)` spawns a real child agent and its token usage +/// feeds the shared (uncapped) ledger. +#[tokio::test(flavor = "multi_thread")] +#[ignore = "requires real provider credentials and network access"] +async fn real_session_workflow_runs_child_and_accumulates_budget() { + let (session, _ws) = real_session().await; + let wf = session.workflow(); + + let outcome = wf + .agent( + AgentStepSpec::new("t1", "general", "echo", "Reply with exactly the word DONE.") + .with_max_steps(2), + ) + .await; + + assert!(outcome.success, "child step succeeded: {}", outcome.output); + let snap = wf.budget_snapshot().expect("workflow has a shared ledger"); + assert!( + snap.consumed_tokens > 0, + "the child's real LLM usage fed the shared workflow ledger" + ); + assert_eq!(snap.limit_tokens, None, "this workflow is uncapped"); +} + +/// A tiny token cap is enforced through the shared guard: the first sequential +/// step runs (ledger starts empty), and a step started after the ledger is +/// exhausted is denied (surfaces as a failed outcome). Sequential on purpose so +/// the assertion is deterministic (no in-flight overshoot race). +#[tokio::test(flavor = "multi_thread")] +#[ignore = "requires real provider credentials and network access"] +async fn real_session_workflow_budget_denies_after_cap() { + let (session, _ws) = real_session().await; + let wf = session.workflow_with_token_budget(Some(1)); + + let first = wf + .agent( + AgentStepSpec::new("b1", "general", "echo", "Reply with exactly the word ONE.") + .with_max_steps(2), + ) + .await; + assert!( + first.success, + "first step runs before the ledger is exhausted: {}", + first.output + ); + assert!( + wf.budget_snapshot().unwrap().consumed_tokens >= 1, + "the first step recorded usage into the shared ledger" + ); + + let second = wf + .agent( + AgentStepSpec::new("b2", "general", "echo", "Reply with exactly the word TWO.") + .with_max_steps(2), + ) + .await; + assert!( + !second.success, + "a step started after the cap is denied (budget exhausted): {}", + second.output + ); +} diff --git a/sdk/node/test.mjs b/sdk/node/test.mjs index ee9081d..e30608e 100644 --- a/sdk/node/test.mjs +++ b/sdk/node/test.mjs @@ -117,6 +117,18 @@ assert.match(result.text, /tools=\d+$/, 'custom slash command should receive too assert.equal(cancelled, false, 'cancelling an unknown subagent task id should resolve to false') } +// --- Workflow facade: budgeted fan-out (offline shape check, no LLM) --- +{ + assert.equal(typeof session.workflowParallel, 'function', 'workflowParallel should be exposed') + // An empty fan-out takes no LLM path: outcomes empty, ledger snapshot present. + const capped = await session.workflowParallel([], 50000) + assert.deepEqual(capped.outcomes, [], 'empty specs -> empty outcomes') + assert.equal(capped.budget.consumedTokens, 0, 'no spend yet') + assert.equal(capped.budget.limitTokens, 50000, 'limit reflected in the ledger snapshot') + const uncapped = await session.workflowParallel([]) + assert.ok(uncapped.budget.limitTokens == null, 'uncapped -> no limit (null/undefined)') +} + session.close() console.log('node sdk integration ok') diff --git a/sdk/python/tests/test_workflow_parallel.py b/sdk/python/tests/test_workflow_parallel.py new file mode 100644 index 0000000..d6d7b28 --- /dev/null +++ b/sdk/python/tests/test_workflow_parallel.py @@ -0,0 +1,65 @@ +"""Smoke test for the budgeted workflow fan-out exposed on Session. + +Verifies `session.workflow_parallel(...)` is reachable from Python and returns +the expected shape (outcomes + shared budget ledger snapshot) for the empty +fan-out, which takes no LLM path. Mirrors the Node SDK smoke in +sdk/node/test.mjs. Full orchestration behavior is covered by the core crate's +real-LLM integration tests. + +Run with: A3S_CONFIG_FILE not needed — uses inline ACL. +""" + +from __future__ import annotations + +import tempfile + +from a3s_code import Agent, LocalWorkspaceBackend, PermissionPolicy, SessionOptions + + +INLINE_CONFIG = """ +default_model = "anthropic/claude-sonnet-4-20250514" + +providers "anthropic" { + api_key = "test-key" + models "claude-sonnet-4-20250514" { + name = "Claude Sonnet 4" + } +} +""".strip() + + +def main() -> None: + workspace = tempfile.mkdtemp(prefix="a3s-code-python-workflow-") + agent = Agent.create(INLINE_CONFIG) + + opts = SessionOptions() + opts.permission_policy = PermissionPolicy(default_decision="allow") + opts.workspace_backend = LocalWorkspaceBackend(workspace) + + session = agent.session(workspace, opts) + + assert hasattr( + session, "workflow_parallel" + ), "Session should expose workflow_parallel" + + # An empty fan-out takes no LLM path: outcomes empty, ledger snapshot present. + capped = session.workflow_parallel([], budget_tokens=50000) + assert capped["outcomes"] == [], f"empty specs -> empty outcomes, got {capped!r}" + assert ( + capped["budget"]["consumed_tokens"] == 0 + ), f"no spend yet, got {capped['budget']!r}" + assert ( + capped["budget"]["limit_tokens"] == 50000 + ), f"limit reflected in the ledger, got {capped['budget']!r}" + + uncapped = session.workflow_parallel([]) + assert ( + uncapped["budget"]["limit_tokens"] is None + ), f"uncapped -> None limit, got {uncapped['budget']!r}" + + session.close() + print("python sdk workflow_parallel ok") + + +if __name__ == "__main__": + main() From 59a1f8d1576cf3777b9521371fa819cb3220c423 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 7 Jun 2026 14:32:20 +0800 Subject: [PATCH 5/5] refactor(sdk): fold the budgeted fan-out into parallel(specs, budgetTokens?) Drop the awkwardly-named workflowParallel / workflow_parallel methods. The only difference from parallel was a shared token budget, so make it an optional argument on parallel itself instead of leaking the internal 'workflow' facade name into the public API. - Node: parallel(specs, budgetTokens?) -> Array | WorkflowParallelResult (napi Either). No budget => the plain outcomes array (unchanged, non-breaking); a budget => { outcomes, budget } (ledger snapshot). - Python: parallel(specs, budget_tokens=None) -> list | dict, same split. - Update README SDK examples; node test.mjs and the renamed test_parallel_budget.py cover both the array and budgeted shapes offline. Rebuilt both native modules (napi build / maturin); npm test and the python smoke pass; clippy/fmt clean. --- README.md | 15 +-- sdk/node/generated.d.ts | 21 ++-- sdk/node/src/lib.rs | 89 +++++++++-------- sdk/node/test.mjs | 19 ++-- sdk/python/src/lib.rs | 110 ++++++++++----------- sdk/python/tests/test_parallel_budget.py | 60 +++++++++++ sdk/python/tests/test_workflow_parallel.py | 65 ------------ 7 files changed, 187 insertions(+), 192 deletions(-) create mode 100644 sdk/python/tests/test_parallel_budget.py delete mode 100644 sdk/python/tests/test_workflow_parallel.py diff --git a/README.md b/README.md index 3abc4dd..6ef5780 100644 --- a/README.md +++ b/README.md @@ -619,10 +619,11 @@ results = session.pipeline( # so an interrupted run (or one resumed on another node) skips completed steps. outcomes = session.parallel_resumable(specs, "nightly-audit") -# Budgeted fan-out: every child agent feeds ONE shared token ledger; once the -# cap is hit further child LLM calls are denied (a soft cap). Returns the -# outcomes plus the ledger snapshot. -res = session.workflow_parallel(specs, budget_tokens=500_000) +# Budgeted fan-out: pass `budget_tokens` to parallel() so every child agent feeds +# ONE shared token ledger; once the cap is hit further child LLM calls are denied +# (a soft cap). With a budget, parallel() returns {outcomes, budget} (the ledger +# snapshot) instead of the plain outcomes list. +res = session.parallel(specs, budget_tokens=500_000) print(res["budget"]["consumed_tokens"], res["budget"]["limit_tokens"]) ``` @@ -644,8 +645,10 @@ const results = await session.pipeline( await session.parallelResumable(specs, "nightly-audit"); -// Budgeted fan-out: all child agents share one token ledger (a soft cap). -const { outcomes: out, budget } = await session.workflowParallel(specs, 500_000); +// Budgeted fan-out: pass a token budget to parallel() so all child agents share +// one ledger (a soft cap). With a budget, parallel() resolves to {outcomes, +// budget} instead of the plain outcomes array. +const { outcomes: out, budget } = await session.parallel(specs, 500_000); console.log(budget.consumedTokens, budget.limitTokens); // NOTE: pipeline stage callbacks MUST NOT throw — return null to stop a chain. // A throw aborts the process (same constraint as setBudgetGuard). A stage that diff --git a/sdk/node/generated.d.ts b/sdk/node/generated.d.ts index 9676c7e..eed87e1 100644 --- a/sdk/node/generated.d.ts +++ b/sdk/node/generated.d.ts @@ -1264,8 +1264,16 @@ export declare class Session { * configured parallelism, and resolve with each step's outcome in input * order. A failed step surfaces as `success: false` without failing the * batch. + * + * Pass `budgetTokens` to run the fan-out under one shared token budget: + * every child agent feeds a single ledger and, once the cap is reached, + * further child LLM calls are denied (a *soft* cap — a wide fan-out can race + * a few in-flight turns past it before the post-call ledger catches up; the + * in-flight fan-out is never force-killed). With a budget the result is + * `{ outcomes, budget }` (the ledger snapshot); without one it is the plain + * outcomes array, unchanged. */ - parallel(specs: Array): Promise> + parallel(specs: Array, budgetTokens?: number | undefined | null): Promise | WorkflowParallelResult> /** * Like `parallel`, but resumable: progress is journaled under * `workflowId` via the session's `sessionStore`, so an interrupted run @@ -1273,17 +1281,6 @@ export declare class Session { * configured. */ parallelResumable(specs: Array, workflowId: string): Promise> - /** - * Run `specs` as a fan-out under one shared workflow token budget. - * - * Every child agent feeds a single ledger; once `budgetTokens` is reached, - * further child LLM calls are denied (a *soft* cap — a wide fan-out can race - * a few in-flight turns past it before the post-call ledger catches up; the - * in-flight fan-out is never force-killed). Omit `budgetTokens` for an - * uncapped ledger that still aggregates spend. Resolves with the outcomes - * (input order; a failed step is `success: false`) and the ledger snapshot. - */ - workflowParallel(specs: Array, budgetTokens?: number | undefined | null): Promise /** * Run each item through a chain of `stages`, with no barrier between * stages — item A can be in stage 3 while item B is still in stage 1. diff --git a/sdk/node/src/lib.rs b/sdk/node/src/lib.rs index f2937e8..834c8f1 100644 --- a/sdk/node/src/lib.rs +++ b/sdk/node/src/lib.rs @@ -3158,21 +3158,59 @@ impl Session { /// configured parallelism, and resolve with each step's outcome in input /// order. A failed step surfaces as `success: false` without failing the /// batch. - #[napi] + /// + /// Pass `budgetTokens` to run the fan-out under one shared token budget: + /// every child agent feeds a single ledger and, once the cap is reached, + /// further child LLM calls are denied (a *soft* cap — a wide fan-out can race + /// a few in-flight turns past it before the post-call ledger catches up; the + /// in-flight fan-out is never force-killed). With a budget the result is + /// `{ outcomes, budget }` (the ledger snapshot); without one it is the plain + /// outcomes array, unchanged. + #[napi(ts_return_type = "Promise | WorkflowParallelResult>")] pub async fn parallel( &self, specs: Vec, - ) -> napi::Result> { + budget_tokens: Option, + ) -> napi::Result, WorkflowParallelResult>> { let session = self.inner.clone(); let rust_specs: Vec = specs.into_iter().map(Into::into).collect(); - let outcomes = get_runtime() + + // No budget → unchanged behavior: the plain outcomes array. + let Some(budget) = budget_tokens else { + let outcomes = get_runtime() + .spawn(async move { + let executor = session.agent_executor(); + execute_steps_parallel(executor, rust_specs, None).await + }) + .await + .map_err(|e| napi::Error::from_reason(format!("Task join error: {e}")))?; + return Ok(Either::A( + outcomes.into_iter().map(StepOutcomeObject::from).collect(), + )); + }; + + // Budget → shared ledger across the fan-out; return outcomes + snapshot. + let limit = budget.max(0) as u64; + let (outcomes, snapshot) = get_runtime() .spawn(async move { - let executor = session.agent_executor(); - execute_steps_parallel(executor, rust_specs, None).await + let wf = session.workflow_with_token_budget(Some(limit)); + let outcomes = wf.parallel(rust_specs).await; + (outcomes, wf.budget_snapshot()) }) .await .map_err(|e| napi::Error::from_reason(format!("Task join error: {e}")))?; - Ok(outcomes.into_iter().map(StepOutcomeObject::from).collect()) + Ok(Either::B(WorkflowParallelResult { + outcomes: outcomes.into_iter().map(StepOutcomeObject::from).collect(), + budget: snapshot + .map(|b| WorkflowBudgetObject { + consumed_tokens: b.consumed_tokens as i64, + limit_tokens: b.limit_tokens.map(|l| l as i64), + }) + .unwrap_or(WorkflowBudgetObject { + consumed_tokens: 0, + limit_tokens: Some(limit as i64), + }), + })) } /// Like `parallel`, but resumable: progress is journaled under @@ -3208,45 +3246,6 @@ impl Session { Ok(outcomes.into_iter().map(StepOutcomeObject::from).collect()) } - /// Run `specs` as a fan-out under one shared workflow token budget. - /// - /// Every child agent feeds a single ledger; once `budgetTokens` is reached, - /// further child LLM calls are denied (a *soft* cap — a wide fan-out can race - /// a few in-flight turns past it before the post-call ledger catches up; the - /// in-flight fan-out is never force-killed). Omit `budgetTokens` for an - /// uncapped ledger that still aggregates spend. Resolves with the outcomes - /// (input order; a failed step is `success: false`) and the ledger snapshot. - #[napi] - pub async fn workflow_parallel( - &self, - specs: Vec, - budget_tokens: Option, - ) -> napi::Result { - let session = self.inner.clone(); - let rust_specs: Vec = specs.into_iter().map(Into::into).collect(); - let limit = budget_tokens.map(|t| t.max(0) as u64); - let (outcomes, snapshot) = get_runtime() - .spawn(async move { - let wf = session.workflow_with_token_budget(limit); - let outcomes = wf.parallel(rust_specs).await; - (outcomes, wf.budget_snapshot()) - }) - .await - .map_err(|e| napi::Error::from_reason(format!("Task join error: {e}")))?; - Ok(WorkflowParallelResult { - outcomes: outcomes.into_iter().map(StepOutcomeObject::from).collect(), - budget: snapshot - .map(|b| WorkflowBudgetObject { - consumed_tokens: b.consumed_tokens as i64, - limit_tokens: b.limit_tokens.map(|l| l as i64), - }) - .unwrap_or(WorkflowBudgetObject { - consumed_tokens: 0, - limit_tokens: limit.map(|l| l as i64), - }), - }) - } - /// Run each item through a chain of `stages`, with no barrier between /// stages — item A can be in stage 3 while item B is still in stage 1. /// diff --git a/sdk/node/test.mjs b/sdk/node/test.mjs index e30608e..8f0306b 100644 --- a/sdk/node/test.mjs +++ b/sdk/node/test.mjs @@ -117,16 +117,17 @@ assert.match(result.text, /tools=\d+$/, 'custom slash command should receive too assert.equal(cancelled, false, 'cancelling an unknown subagent task id should resolve to false') } -// --- Workflow facade: budgeted fan-out (offline shape check, no LLM) --- +// --- parallel() budget overload (offline shape check, no LLM) --- { - assert.equal(typeof session.workflowParallel, 'function', 'workflowParallel should be exposed') - // An empty fan-out takes no LLM path: outcomes empty, ledger snapshot present. - const capped = await session.workflowParallel([], 50000) - assert.deepEqual(capped.outcomes, [], 'empty specs -> empty outcomes') - assert.equal(capped.budget.consumedTokens, 0, 'no spend yet') - assert.equal(capped.budget.limitTokens, 50000, 'limit reflected in the ledger snapshot') - const uncapped = await session.workflowParallel([]) - assert.ok(uncapped.budget.limitTokens == null, 'uncapped -> no limit (null/undefined)') + // An empty fan-out takes no LLM path. Without a budget, parallel() returns the + // plain outcomes array (unchanged behavior). + const plain = await session.parallel([]) + assert.deepEqual(plain, [], 'no budget -> plain outcomes array') + // With a budget, parallel() returns { outcomes, budget } (the ledger snapshot). + const budgeted = await session.parallel([], 50000) + assert.deepEqual(budgeted.outcomes, [], 'empty specs -> empty outcomes') + assert.equal(budgeted.budget.consumedTokens, 0, 'no spend yet') + assert.equal(budgeted.budget.limitTokens, 50000, 'limit reflected in the ledger snapshot') } session.close() diff --git a/sdk/python/src/lib.rs b/sdk/python/src/lib.rs index 47146a5..a60683e 100644 --- a/sdk/python/src/lib.rs +++ b/sdk/python/src/lib.rs @@ -1406,19 +1406,69 @@ impl PySession { /// `task_id`, `agent`, `description`, `prompt`, optional `max_steps`, /// `parent_session_id`, `output_schema`. A failed step surfaces as /// `success: False` without failing the batch. - fn parallel(&self, py: Python<'_>, specs: Vec>) -> PyResult> { + /// + /// Pass `budget_tokens` to run the fan-out under one shared token budget: + /// every child agent feeds a single ledger and, once the cap is reached, + /// further child LLM calls are denied (a soft cap; the in-flight fan-out is + /// never force-killed). With a budget the result is a dict + /// `{"outcomes": [...], "budget": {"consumed_tokens", "limit_tokens"}}`; + /// without one it is the plain list of outcome dicts, unchanged. + #[pyo3(signature = (specs, budget_tokens=None))] + fn parallel( + &self, + py: Python<'_>, + specs: Vec>, + budget_tokens: Option, + ) -> PyResult { let rust_specs = specs .iter() .map(|s| py_to_step_spec(py, s)) .collect::>>()?; let session = self.inner.clone(); - let outcomes = py.allow_threads(move || { + + // No budget → unchanged behavior: a plain list of outcome dicts. + let Some(limit) = budget_tokens else { + let outcomes = py.allow_threads(move || { + get_runtime().block_on(async move { + let executor = session.agent_executor(); + execute_steps_parallel(executor, rust_specs, None).await + }) + }); + let items = outcomes + .iter() + .map(|o| step_outcome_to_py(py, o)) + .collect::>>()?; + return Ok(PyList::new(py, items)?.into_any().unbind()); + }; + + // Budget → shared ledger across the fan-out; return {"outcomes", "budget"}. + let (outcomes, snapshot) = py.allow_threads(move || { get_runtime().block_on(async move { - let executor = session.agent_executor(); - execute_steps_parallel(executor, rust_specs, None).await + let wf = session.workflow_with_token_budget(Some(limit)); + let outcomes = wf.parallel(rust_specs).await; + (outcomes, wf.budget_snapshot()) }) }); - outcomes.iter().map(|o| step_outcome_to_py(py, o)).collect() + let outcomes_py = outcomes + .iter() + .map(|o| step_outcome_to_py(py, o)) + .collect::>>()?; + let budget = PyDict::new(py); + budget.set_item( + "consumed_tokens", + snapshot.as_ref().map(|b| b.consumed_tokens).unwrap_or(0), + )?; + budget.set_item( + "limit_tokens", + snapshot + .as_ref() + .and_then(|b| b.limit_tokens) + .or(Some(limit)), + )?; + let result = PyDict::new(py); + result.set_item("outcomes", outcomes_py)?; + result.set_item("budget", budget)?; + Ok(result.into_any().unbind()) } /// Like `parallel`, but resumable: progress is journaled under @@ -1456,56 +1506,6 @@ impl PySession { outcomes.iter().map(|o| step_outcome_to_py(py, o)).collect() } - /// Run `specs` as a fan-out under one shared workflow token budget. - /// - /// All child agents feed a single ledger; once `budget_tokens` is reached, - /// further child LLM calls are denied (a *soft* cap — a wide fan-out can race - /// a few in-flight turns past it before the post-call ledger catches up; the - /// in-flight fan-out is never force-killed). Pass `budget_tokens=None` for an - /// uncapped ledger that still aggregates spend. Returns a dict - /// `{"outcomes": [...], "budget": {"consumed_tokens", "limit_tokens"}}`. - #[pyo3(signature = (specs, budget_tokens=None))] - fn workflow_parallel( - &self, - py: Python<'_>, - specs: Vec>, - budget_tokens: Option, - ) -> PyResult { - let rust_specs = specs - .iter() - .map(|s| py_to_step_spec(py, s)) - .collect::>>()?; - let session = self.inner.clone(); - let (outcomes, snapshot) = py.allow_threads(move || { - get_runtime().block_on(async move { - let wf = session.workflow_with_token_budget(budget_tokens); - let outcomes = wf.parallel(rust_specs).await; - (outcomes, wf.budget_snapshot()) - }) - }); - - let outcomes_py = outcomes - .iter() - .map(|o| step_outcome_to_py(py, o)) - .collect::>>()?; - let budget = PyDict::new(py); - budget.set_item( - "consumed_tokens", - snapshot.as_ref().map(|b| b.consumed_tokens).unwrap_or(0), - )?; - budget.set_item( - "limit_tokens", - snapshot - .as_ref() - .and_then(|b| b.limit_tokens) - .or(budget_tokens), - )?; - let result = PyDict::new(py); - result.set_item("outcomes", outcomes_py)?; - result.set_item("budget", budget)?; - Ok(result.into_any().unbind()) - } - /// Run each item through a chain of `stages`, with no barrier between /// stages. Each stage is a callable `stage(ctx) -> spec_dict | None`, where /// `ctx = {"previous": , "item": }`. Return a diff --git a/sdk/python/tests/test_parallel_budget.py b/sdk/python/tests/test_parallel_budget.py new file mode 100644 index 0000000..06b2ac2 --- /dev/null +++ b/sdk/python/tests/test_parallel_budget.py @@ -0,0 +1,60 @@ +"""Smoke test for the budget overload on `Session.parallel(...)`. + +Verifies that `parallel(specs)` returns the plain outcomes list and +`parallel(specs, budget_tokens=...)` returns the richer +`{"outcomes": [...], "budget": {...}}` shape — checked on the empty fan-out, +which takes no LLM path. Mirrors the Node SDK smoke in sdk/node/test.mjs. Full +orchestration behavior is covered by the core crate's real-LLM integration tests. + +Run with: A3S_CONFIG_FILE not needed — uses inline ACL. +""" + +from __future__ import annotations + +import tempfile + +from a3s_code import Agent, LocalWorkspaceBackend, PermissionPolicy, SessionOptions + + +INLINE_CONFIG = """ +default_model = "anthropic/claude-sonnet-4-20250514" + +providers "anthropic" { + api_key = "test-key" + models "claude-sonnet-4-20250514" { + name = "Claude Sonnet 4" + } +} +""".strip() + + +def main() -> None: + workspace = tempfile.mkdtemp(prefix="a3s-code-python-parallel-budget-") + agent = Agent.create(INLINE_CONFIG) + + opts = SessionOptions() + opts.permission_policy = PermissionPolicy(default_decision="allow") + opts.workspace_backend = LocalWorkspaceBackend(workspace) + + session = agent.session(workspace, opts) + + # No budget -> the plain list of outcome dicts (unchanged behavior). + plain = session.parallel([]) + assert plain == [], f"no budget -> plain list, got {plain!r}" + + # With a budget -> {outcomes, budget}. Empty fan-out takes no LLM path. + budgeted = session.parallel([], budget_tokens=50000) + assert budgeted["outcomes"] == [], f"empty specs -> empty outcomes, got {budgeted!r}" + assert ( + budgeted["budget"]["consumed_tokens"] == 0 + ), f"no spend yet, got {budgeted['budget']!r}" + assert ( + budgeted["budget"]["limit_tokens"] == 50000 + ), f"limit reflected in the ledger, got {budgeted['budget']!r}" + + session.close() + print("python sdk parallel budget overload ok") + + +if __name__ == "__main__": + main() diff --git a/sdk/python/tests/test_workflow_parallel.py b/sdk/python/tests/test_workflow_parallel.py deleted file mode 100644 index d6d7b28..0000000 --- a/sdk/python/tests/test_workflow_parallel.py +++ /dev/null @@ -1,65 +0,0 @@ -"""Smoke test for the budgeted workflow fan-out exposed on Session. - -Verifies `session.workflow_parallel(...)` is reachable from Python and returns -the expected shape (outcomes + shared budget ledger snapshot) for the empty -fan-out, which takes no LLM path. Mirrors the Node SDK smoke in -sdk/node/test.mjs. Full orchestration behavior is covered by the core crate's -real-LLM integration tests. - -Run with: A3S_CONFIG_FILE not needed — uses inline ACL. -""" - -from __future__ import annotations - -import tempfile - -from a3s_code import Agent, LocalWorkspaceBackend, PermissionPolicy, SessionOptions - - -INLINE_CONFIG = """ -default_model = "anthropic/claude-sonnet-4-20250514" - -providers "anthropic" { - api_key = "test-key" - models "claude-sonnet-4-20250514" { - name = "Claude Sonnet 4" - } -} -""".strip() - - -def main() -> None: - workspace = tempfile.mkdtemp(prefix="a3s-code-python-workflow-") - agent = Agent.create(INLINE_CONFIG) - - opts = SessionOptions() - opts.permission_policy = PermissionPolicy(default_decision="allow") - opts.workspace_backend = LocalWorkspaceBackend(workspace) - - session = agent.session(workspace, opts) - - assert hasattr( - session, "workflow_parallel" - ), "Session should expose workflow_parallel" - - # An empty fan-out takes no LLM path: outcomes empty, ledger snapshot present. - capped = session.workflow_parallel([], budget_tokens=50000) - assert capped["outcomes"] == [], f"empty specs -> empty outcomes, got {capped!r}" - assert ( - capped["budget"]["consumed_tokens"] == 0 - ), f"no spend yet, got {capped['budget']!r}" - assert ( - capped["budget"]["limit_tokens"] == 50000 - ), f"limit reflected in the ledger, got {capped['budget']!r}" - - uncapped = session.workflow_parallel([]) - assert ( - uncapped["budget"]["limit_tokens"] is None - ), f"uncapped -> None limit, got {uncapped['budget']!r}" - - session.close() - print("python sdk workflow_parallel ok") - - -if __name__ == "__main__": - main()