Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ Everything else is an extension of that loop.
`session.parallel` / `pipeline` / `parallelResumable` (Node) and
`parallel` / `pipeline` / `parallel_resumable` (Python). See
[Programmable Orchestration](#programmable-orchestration) below.
- **Workflow facade, loop & shared budget** — `AgentSession::workflow()` returns
a cheaply-clonable `Workflow` that pre-wires the session's executor (inheriting
the same governance as model-driven delegation), persistence store, per-step
event stream, and a session-derived stable root id. Its verbs `agent` /
`parallel` / `phase` / `pipeline` each delegate to one combinator; `phase` is a
named resume boundary that emits `WorkflowEvent` milestones. `execute_loop` /
`LoopDecision` add a bounded loop-until-dry (with a mandatory `max_iterations`
guard). `WorkflowBudget` aggregates token spend from every step into one shared
ledger — a soft, workflow-wide cost cap installed through the existing
`BudgetGuard` seam.

### What's new in 3.3

Expand Down Expand Up @@ -608,6 +618,13 @@ results = session.pipeline(
# Resumable: progress is journaled under a workflow id via the session store,
# so an interrupted run (or one resumed on another node) skips completed steps.
outcomes = session.parallel_resumable(specs, "nightly-audit")

# Budgeted fan-out: pass `budget_tokens` to parallel() so every child agent feeds
# ONE shared token ledger; once the cap is hit further child LLM calls are denied
# (a soft cap). With a budget, parallel() returns {outcomes, budget} (the ledger
# snapshot) instead of the plain outcomes list.
res = session.parallel(specs, budget_tokens=500_000)
print(res["budget"]["consumed_tokens"], res["budget"]["limit_tokens"])
```

```javascript
Expand All @@ -627,11 +644,61 @@ const results = await session.pipeline(
);

await session.parallelResumable(specs, "nightly-audit");

// Budgeted fan-out: pass a token budget to parallel() so all child agents share
// one ledger (a soft cap). With a budget, parallel() resolves to {outcomes,
// budget} instead of the plain outcomes array.
const { outcomes: out, budget } = await session.parallel(specs, 500_000);
console.log(budget.consumedTokens, budget.limitTokens);
// NOTE: pipeline stage callbacks MUST NOT throw — return null to stop a chain.
// A throw aborts the process (same constraint as setBudgetGuard). A stage that
// hangs past timeoutMs (default 30s) fails closed (treated as null).
```

### The `Workflow` facade (Rust)

`AgentSession::workflow()` returns a cheaply-clonable `Workflow` that bundles the
session's executor, persistence store, per-step event stream, and a stable,
session-derived root id. Control flow is ordinary Rust — `await` a verb, inspect
the outcomes, decide what runs next:

```rust
let wf = session.workflow(); // or session.workflow_with_token_budget(Some(500_000))

// One step, then a *variable* fan-out computed from its result (the "dynamic"
// part: the shape is decided at runtime, not declared up front).
let plan = wf.agent(AgentStepSpec::new("plan", "plan", "plan", goal)).await;
let specs: Vec<AgentStepSpec> = plan.output.lines().enumerate()
.map(|(i, line)| AgentStepSpec::new(format!("impl-{i}"), "general", "impl", line))
.collect();

// `phase` is a NAMED barrier and a resume boundary: with a store it journals
// progress under "{root_id}/{index}:{name}" and emits PhaseStart/PhaseEnd on the
// WorkflowEvent stream (subscribe via `wf.subscribe()`).
let done = wf.phase("implement", specs).await;
let reviews = wf.phase("review", to_review(&done)).await; // budget shared across all phases
```

- **Verbs** — `agent` (one step), `parallel` (barrier fan-out), `phase` (named,
resumable barrier + milestones), `pipeline` (per-item staged chains), and the
non-failing `log`. Each delegates to exactly one combinator; the facade owns no
scheduling.
- **Loop** — `execute_loop(executor, initial, max_iterations, tx, |round| …)`
returns `LoopDecision::{Continue(specs), Stop}` and runs rounds until the
predicate stops, a round has no work (dry), or `max_iterations` (a **mandatory
hard cap**) is hit — the guard that makes an LLM-driven loop safe.
- **Budget** — `workflow_with_token_budget(Some(limit))` installs a
`WorkflowBudget` as every child run's `budget_guard`, so per-turn LLM
accounting feeds **one** shared ledger; `wf.budget_snapshot()` reads it and a
`WorkflowEvent::BudgetExhausted` fires once the cap is reached. It is a *soft*
ceiling: a wide fan-out can race a few in-flight turns past the cap before the
post-call ledger catches up (the framework never force-kills a running
fan-out).
- **Safety** — every step runs through the same `AgentExecutor` seam as
model-driven delegation, so child runs inherit the session's `SecurityProvider`,
skill restrictions, confirmation, and workspace — orchestrated steps are
neither more nor less privileged than delegated ones.

---

## Design Principles
Expand Down
25 changes: 25 additions & 0 deletions core/src/agent/parallel_tool_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,38 @@ impl AgentLoop {
}

pub(super) fn can_run_parallel_write_batch(&self, tool_calls: &[ToolCall]) -> bool {
// The parallel fast path executes tools directly via the ToolExecutor
// (see `execute_parallel_write_batch`), bypassing `ToolSafetyGate`. It is
// only safe when the gate would unconditionally EXECUTE every call.
//
// Hooks and HITL confirmation make the decision stateful/interactive
// (pre-tool hooks have side effects; an `Ask` must round-trip to a
// human), so never fast-path when either is configured.
if self.config.hook_engine.is_some()
|| self.config.confirmation_manager.is_some()
|| tool_calls.len() <= 1
{
return false;
}

// Skill restrictions and permission checks live in `ToolSafetyGate`,
// which the parallel path skips entirely. Consult the gate itself (the
// single source of truth) and only fast-path when, for EVERY call, no
// active skill restriction forbids the tool AND the permission checker
// explicitly Allows it. A missing checker resolves to `Ask`, which with
// no confirmation manager is a Deny — so it correctly refuses here.
let gate = crate::safety_gate::ToolSafetyGate::new(&self.config);
let all_allowed = tool_calls.iter().all(|tc| {
gate.check_skill_restrictions(&tc.name).is_none()
&& matches!(
gate.permission_decision(&tc.name, &tc.args),
crate::permissions::PermissionDecision::Allow
)
});
if !all_allowed {
return false;
}

if !tool_calls
.iter()
.all(|tc| is_parallel_safe_write(&tc.name, &tc.args))
Expand Down
98 changes: 98 additions & 0 deletions core/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2003,3 +2003,101 @@ async fn test_agent_end_event_contains_final_text() {
assert_eq!(usage.total_tokens, 15);
}
}

/// Regression: the parallel write fast path bypasses `ToolSafetyGate`, so it
/// may run only when the gate would unconditionally EXECUTE every call. Before
/// the fix it ignored the permission checker and skill restrictions entirely,
/// letting denied / ask / skill-restricted writes land ungated.
#[test]
fn parallel_write_batch_only_fast_paths_when_gate_would_execute() {
use crate::llm::ToolCall;
use crate::permissions::{PermissionChecker, PermissionDecision};
use crate::skills::{Skill, SkillKind, SkillRegistry};

struct Static(PermissionDecision);
impl PermissionChecker for Static {
fn check(&self, _tool: &str, _args: &serde_json::Value) -> PermissionDecision {
self.0
}
}

fn write_call(id: &str, path: &str) -> ToolCall {
ToolCall {
id: id.to_string(),
name: "write_file".to_string(),
args: serde_json::json!({ "path": path, "content": "x" }),
}
}

fn loop_with(
checker: Option<Arc<dyn PermissionChecker>>,
skills: Option<Arc<SkillRegistry>>,
) -> AgentLoop {
// `skill_registry` overrides the default builtins where needed.
let config = AgentConfig {
permission_checker: checker,
skill_registry: skills,
..Default::default()
};
AgentLoop::new(
Arc::new(MockLlmClient::new(vec![])),
Arc::new(ToolExecutor::new("/tmp".to_string())),
test_tool_context(),
config,
)
}

let calls = vec![write_call("a", "a.txt"), write_call("b", "b.txt")];
let allow = || Some(Arc::new(Static(PermissionDecision::Allow)) as Arc<dyn PermissionChecker>);

// Explicit Allow + no restricting skills → fast path is taken.
assert!(
loop_with(allow(), None).can_run_parallel_write_batch(&calls),
"explicit Allow with no restrictions → parallel write batch is allowed"
);

// No permission checker → gate resolves to Ask (a Deny without a confirmation
// manager), so the fast path must be refused.
assert!(
!loop_with(None, None).can_run_parallel_write_batch(&calls),
"missing checker resolves to Ask/Deny → fast path refused"
);

// Explicit Deny → refused.
assert!(
!loop_with(Some(Arc::new(Static(PermissionDecision::Deny))), None)
.can_run_parallel_write_batch(&calls),
"permission Deny → fast path refused"
);

// Ask → refused (sequential path would need a human round-trip).
assert!(
!loop_with(Some(Arc::new(Static(PermissionDecision::Ask))), None)
.can_run_parallel_write_batch(&calls),
"permission Ask → fast path refused"
);

// Allow, but an active skill restriction forbids write_file → refused.
let restricted = SkillRegistry::new();
restricted.register_unchecked(Arc::new(Skill {
name: "read-only".to_string(),
description: String::new(),
allowed_tools: Some("read(*)".to_string()),
disable_model_invocation: false,
kind: SkillKind::Instruction,
content: String::new(),
tags: Vec::new(),
version: None,
}));
assert!(
!loop_with(allow(), Some(Arc::new(restricted))).can_run_parallel_write_batch(&calls),
"active skill restriction disallowing write_file → fast path refused"
);

// Default builtins do not restrict → still fast-paths with Allow.
assert!(
loop_with(allow(), Some(Arc::new(SkillRegistry::with_builtins())))
.can_run_parallel_write_batch(&calls),
"non-restricting builtins → fast path still allowed"
);
}
92 changes: 89 additions & 3 deletions core/src/agent_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,15 +932,101 @@ impl AgentSession {
/// run against; a host can instead supply its own executor to place steps
/// across a cluster.
pub fn agent_executor(&self) -> Arc<dyn crate::orchestration::AgentExecutor> {
let executor = crate::tools::TaskExecutor::with_mcp(
Arc::new(self.build_task_executor(self.parent_run_context()))
}

/// Build the in-box [`TaskExecutor`](crate::tools::TaskExecutor) for this
/// session, applying `parent` as the child-run capability context. Shared by
/// [`agent_executor`](Self::agent_executor) and [`workflow`](Self::workflow)
/// so both wire children identically.
fn build_task_executor(
&self,
parent: crate::child_run::ChildRunContext,
) -> crate::tools::TaskExecutor {
crate::tools::TaskExecutor::with_mcp(
Arc::clone(&self.agent_registry),
Arc::clone(&self.llm_client),
self.workspace.display().to_string(),
Arc::clone(&self.mcp_manager),
)
.with_parent_context(parent)
.with_subagent_tracker(Arc::clone(&self.subagent_tasks))
.with_max_parallel_tasks(self.config.max_parallel_tasks);
Arc::new(executor)
.with_max_parallel_tasks(self.config.max_parallel_tasks)
}

/// A programmable [`Workflow`](crate::orchestration::Workflow) bound to this
/// session.
///
/// Pre-wired with this session's executor (inheriting the same governance as
/// model-driven delegation), persistence store (so each
/// [`phase`](crate::orchestration::Workflow::phase) is a resume boundary),
/// per-step event stream, and a session-derived stable root id. Control flow
/// is ordinary Rust: `await` a verb, inspect the outcomes, decide what runs
/// next.
pub fn workflow(&self) -> crate::orchestration::Workflow {
self.workflow_with_token_budget(None)
}

/// Like [`workflow`](Self::workflow) but with a hard token ceiling shared
/// across every step. The cap is a best-effort *soft* cost ceiling — under a
/// wide fan-out a few in-flight turns can race past it before the shared
/// ledger catches up (see [`WorkflowBudget`](crate::orchestration::WorkflowBudget)).
pub fn workflow_with_token_budget(
&self,
limit_tokens: Option<u64>,
) -> crate::orchestration::Workflow {
use crate::budget::BudgetGuard;

// One shared ledger for the whole workflow, wrapping the session's own
// budget guard (if any) so a host's per-tenant accounting keeps working.
let mut budget = crate::orchestration::WorkflowBudget::new(limit_tokens);
if let Some(inner) = self.config.budget_guard.clone() {
budget = budget.with_inner(inner);
}
let budget = Arc::new(budget);

// Install the shared ledger as the child runs' budget guard so every
// step's per-turn LLM accounting feeds it.
let mut parent = self.parent_run_context();
parent.budget_guard = Some(Arc::clone(&budget) as Arc<dyn BudgetGuard>);
let executor: Arc<dyn crate::orchestration::AgentExecutor> =
Arc::new(self.build_task_executor(parent));

let mut builder = crate::orchestration::Workflow::builder(executor)
.with_root_id(format!("wf-{}", self.session_id))
.with_budget(Arc::clone(&budget));
if let Some(store) = self.session_store.clone() {
builder = builder.with_store(store);
}
if let Some(step_events) = self.tool_context.agent_event_tx.clone() {
builder = builder.with_step_events(step_events);
}
builder.build()
}

/// Build the [`ChildRunContext`](crate::child_run::ChildRunContext) that
/// orchestrated / delegated child runs inherit from this session.
///
/// Mirrors the context the model-driven `task` / `parallel_task` path
/// installs (see `register_task_capability` in `agent_api/capabilities.rs`)
/// so a step run through [`agent_executor`](Self::agent_executor) carries the
/// SAME governance — security provider, skill restrictions, confirmation,
/// the shared workspace, and the safety limits — instead of weaker, ambient
/// authority. Sourced from the session's resolved config; `hook_engine`
/// stays `None` to match the model-driven path.
pub(crate) fn parent_run_context(&self) -> crate::child_run::ChildRunContext {
crate::child_run::ChildRunContext {
security_provider: self.config.security_provider.clone(),
hook_engine: None,
skill_registry: self.config.skill_registry.clone(),
tool_timeout_ms: self.config.tool_timeout_ms,
max_parallel_tasks: Some(self.config.max_parallel_tasks),
max_execution_time_ms: self.config.max_execution_time_ms,
circuit_breaker_threshold: Some(self.config.circuit_breaker_threshold),
confirmation_manager: self.config.confirmation_manager.clone(),
workspace_services: Some(Arc::clone(&self.tool_context.workspace_services)),
budget_guard: self.config.budget_guard.clone(),
}
}

/// The session's persistence store, if one is configured — needed by the
Expand Down
1 change: 1 addition & 0 deletions core/src/agent_api/capabilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ fn register_task_capability(
circuit_breaker_threshold: opts.circuit_breaker_threshold,
confirmation_manager: opts.confirmation_manager.clone(),
workspace_services: opts.workspace_services.clone(),
budget_guard: opts.budget_guard.clone(),
};

let registry = Arc::new(registry);
Expand Down
Loading
Loading