feat(§29): add parallel execution primitives — DTO, domain, validation, turn log, materialize#151
Merged
AlexChesser merged 1 commit intomainfrom Apr 14, 2026
Merged
Conversation
…n, turn log, materialize Implement the foundational layers for SPEC §29 Parallel Step Execution: - DTO: add `async` (serde-renamed), `depends_on` to StepDto; `max_concurrency` to DefaultsDto - Domain: add `async_step`, `depends_on` to Step; `max_concurrency` to Pipeline; `ActionKind::Join` with `JoinErrorMode` (FailFast/WaitForAll) - Validation: parse `action: join` with specialized `on_error` semantics; add `validate_parallel_constraints()` enforcing orphan detection, forward-ref checks, cycle detection, concurrent session constraint, structured join compat - Turn Log: add `concurrent_group`, `launched_at`, `completed_at` fields to TurnEntry; add `StepCancelledEvent` and `record_step_cancelled()` method - Materialize: round-trip `async`, `depends_on`, `max_concurrency`, `action: join` - Test fixtures: add parallel_basic, parallel_structured, and invalid YAML fixtures All 470 existing tests pass. Executor parallel dispatch (Phase 4) is next. https://claude.ai/code/session_01RYo2Rp8t2RkV5R8rfJrd3W
12 tasks
AlexChesser
pushed a commit
that referenced
this pull request
Apr 15, 2026
Completes the §29 implementation on top of the Phase 1-3+6 foundations already on main (PR #151). This adds the actual parallel dispatch so async steps run concurrently instead of being treated as sequential. New capabilities: - `async: true` steps launch on scoped threads and run concurrently with subsequent sequential steps. Forked Session with cloned turn log and isolated http_session_store (when resume: false). - `action: join` synchronizes branches, merges results. Two modes: - String join (default): labelled `[step_id]:\n<response>` concatenation - Structured join: JSON merge when join declares output_schema. Dep responses parsed as JSON, namespaced by step id; output validated against the join's schema if declared. - `on_error: fail_fast` (default) / `wait_for_all` on the join step. fail_fast surfaces the first branch error; wait_for_all collects error envelopes into the merged JSON. - `defaults.max_concurrency` enforced via Mutex+Condvar semaphore — no external dependency, no async runtime. - Turn log entries tagged with `concurrent_group`, `launched_at`, `completed_at` ISO 8601 timestamps (§29.8). - Dotted-path template resolution: `{{ step.<join>.<dep>.<field> }}` walks JSON paths through structured join responses (§29.6). Implementation details: - New module `executor/parallel.rs` with ConcurrencySemaphore, BranchResult, merge_join_results, and timestamp helpers (civil-from-days, no chrono). - `execute_core` now branches to `execute_core_with_parallel` when any step declares `async`. The scoped-thread variant wraps the main step loop in std::thread::scope so async launches coexist with sequential step execution on the main thread. - Runner signatures updated to `&(dyn Runner + Sync)` on the parallel path. All concrete runners are already Sync; RunnerFactory now returns `Box<dyn Runner + Send + Sync>`. - TurnEntry gains `Clone` derive for session forking. - Session gains `fork_for_branch(isolated_http: bool)` — clones entries into a NullProvider-backed TurnLog; mints a fresh http_session_store when resume:false (SPEC §29.9 opt-out of context inheritance). - `execute_single_step` no longer mishandles `action: join` — joins are now fully coordinated in `execute_core_with_parallel`. Tests: - `ail-core/tests/spec/s29_parallel.rs` — 23 tests covering parse-time validation (orphan detection, forward refs, cycles, concurrent resume conflict, structured-join compatibility, join without depends_on, max_concurrency), runtime execution (two-async-plus-join end-to-end, branch invocation count, string join ordering, sequential step after async sees join result, condition:never on async unblocks join, on_result on join step fires abort/continue, regression check for non-async pipelines, max_concurrency serialization, shared concurrent_group across branches), and fixture round-trips. - Full test suite: 504 passed (previous 481 + 23 new), 0 failed. Docs: - `spec/core/s29-parallel-execution.md` status → implemented. - `spec/README.md` entry updated. - `CLAUDE.md` and `ail-core/CLAUDE.md` updated with new template vars, module responsibilities, and Known Constraints entry. - `CHANGELOG.md` v0.3 in-progress entry. Deferred (spec-authorized): - Mid-flight runner-level cancellation for fail_fast — branches complete on their own; first error still propagates (SPEC §29.7 "best effort"). - Controlled-mode executor events for async launches. https://claude.ai/code/session_01RYo2Rp8t2RkV5R8rfJrd3W
6 tasks
AlexChesser
added a commit
that referenced
this pull request
Apr 15, 2026
…154) Completes the §29 implementation on top of the Phase 1-3+6 foundations already on main (PR #151). This adds the actual parallel dispatch so async steps run concurrently instead of being treated as sequential. New capabilities: - `async: true` steps launch on scoped threads and run concurrently with subsequent sequential steps. Forked Session with cloned turn log and isolated http_session_store (when resume: false). - `action: join` synchronizes branches, merges results. Two modes: - String join (default): labelled `[step_id]:\n<response>` concatenation - Structured join: JSON merge when join declares output_schema. Dep responses parsed as JSON, namespaced by step id; output validated against the join's schema if declared. - `on_error: fail_fast` (default) / `wait_for_all` on the join step. fail_fast surfaces the first branch error; wait_for_all collects error envelopes into the merged JSON. - `defaults.max_concurrency` enforced via Mutex+Condvar semaphore — no external dependency, no async runtime. - Turn log entries tagged with `concurrent_group`, `launched_at`, `completed_at` ISO 8601 timestamps (§29.8). - Dotted-path template resolution: `{{ step.<join>.<dep>.<field> }}` walks JSON paths through structured join responses (§29.6). Implementation details: - New module `executor/parallel.rs` with ConcurrencySemaphore, BranchResult, merge_join_results, and timestamp helpers (civil-from-days, no chrono). - `execute_core` now branches to `execute_core_with_parallel` when any step declares `async`. The scoped-thread variant wraps the main step loop in std::thread::scope so async launches coexist with sequential step execution on the main thread. - Runner signatures updated to `&(dyn Runner + Sync)` on the parallel path. All concrete runners are already Sync; RunnerFactory now returns `Box<dyn Runner + Send + Sync>`. - TurnEntry gains `Clone` derive for session forking. - Session gains `fork_for_branch(isolated_http: bool)` — clones entries into a NullProvider-backed TurnLog; mints a fresh http_session_store when resume:false (SPEC §29.9 opt-out of context inheritance). - `execute_single_step` no longer mishandles `action: join` — joins are now fully coordinated in `execute_core_with_parallel`. Tests: - `ail-core/tests/spec/s29_parallel.rs` — 23 tests covering parse-time validation (orphan detection, forward refs, cycles, concurrent resume conflict, structured-join compatibility, join without depends_on, max_concurrency), runtime execution (two-async-plus-join end-to-end, branch invocation count, string join ordering, sequential step after async sees join result, condition:never on async unblocks join, on_result on join step fires abort/continue, regression check for non-async pipelines, max_concurrency serialization, shared concurrent_group across branches), and fixture round-trips. - Full test suite: 504 passed (previous 481 + 23 new), 0 failed. Docs: - `spec/core/s29-parallel-execution.md` status → implemented. - `spec/README.md` entry updated. - `CLAUDE.md` and `ail-core/CLAUDE.md` updated with new template vars, module responsibilities, and Known Constraints entry. - `CHANGELOG.md` v0.3 in-progress entry. Deferred (spec-authorized): - Mid-flight runner-level cancellation for fail_fast — branches complete on their own; first error still propagates (SPEC §29.7 "best effort"). - Controlled-mode executor events for async launches. https://claude.ai/code/session_01RYo2Rp8t2RkV5R8rfJrd3W Co-authored-by: Claude <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Implement the foundational layers for SPEC §29 Parallel Step Execution:
async(serde-renamed),depends_onto StepDto;max_concurrencyto DefaultsDtoasync_step,depends_onto Step;max_concurrencyto Pipeline;ActionKind::JoinwithJoinErrorMode(FailFast/WaitForAll)action: joinwith specializedon_errorsemantics;add
validate_parallel_constraints()enforcing orphan detection, forward-refchecks, cycle detection, concurrent session constraint, structured join compat
concurrent_group,launched_at,completed_atfields toTurnEntry; add
StepCancelledEventandrecord_step_cancelled()methodasync,depends_on,max_concurrency,action: joinAll 470 existing tests pass. Executor parallel dispatch (Phase 4) is next.
https://claude.ai/code/session_01RYo2Rp8t2RkV5R8rfJrd3W