feat(drawer): parallelism inside for_each#155
Merged
Conversation
Adds Parallelism int to kind=for_each. 0 or 1 keeps the existing
serial fail-fast semantics. N > 1 spawns a worker pool so N
iterations run concurrently:
{
"kind": "for_each",
"over": "${scrape-list.output.urls}",
"as": "url",
"parallelism": 10,
"steps": [ ... ]
}
## Correctness story
- **Ordering:** results[i] maps to items[i] regardless of
completion order. Downstream refs like
${step.output.results.0.item} stay deterministic.
- **Stop-on-failure:** the parallel path allocates a sub-context
(workerCtx). First failure cancels it so in-flight iterations
abort promptly instead of finishing work after the step has
already been declared failed. The recorded failure is the
lowest-indexed one so traces are reproducible.
- **Continue-on-failure:** wg.Wait() still runs every scheduled
iteration to completion; failures are marked per-item; the
overall step completes ok with partial failure markers.
- **Race-free:** each worker owns its slot in results[]/errs[]
(preallocated by index); only first-failure bookkeeping is
mutex-protected. `go test ./... -race` clean.
- **Resource bounds:** buffered semaphore of size Parallelism caps
inflight goroutines + child processes. No implicit 1:1 mapping
between iterations and processes running simultaneously.
## Implementation
- Serial path was inlined for its fast-return case (avoids
channel/goroutine overhead for small N). Parallel path uses
chan+WaitGroup (no x/sync dependency).
- Per-iter body extracted into runForEachIter so serial and
parallel share one implementation of per-iteration semantics.
## set extended
New STEP.parallelism=N path addressable from the CLI.
## Tests
- TestForEach_ParallelismSpeedsUpRuns — wall-clock assertion:
4×400ms iterations with parallelism=4 completes in <1.2s
(serial baseline is ~1.6s). Catches regressions to serial.
- TestForEach_ParallelismStopOnFirstFailure — cancels in-flight
workers on first failure; step reports FOREACH_ITEM_FAILED.
- TestForEach_ParallelismContinueCollectsAll — all 4 iterations
run; 2 odd-indexed failures marked per-item; overall step ok.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Adds `parallelism: N` to `kind=for_each`. 0 or 1 keeps the existing serial fail-fast semantics; N > 1 spawns a worker pool so N iterations run concurrently.
```json
{
"kind": "for_each",
"over": "${scrape-list.output.urls}",
"as": "url",
"parallelism": 10,
"steps": [ ... ]
}
```
Correctness story
`set` extended
New `STEP.parallelism=N` path — `buttons drawer X set each.parallelism=10`.
Test plan
🤖 Generated with Claude Code