feat: support AbortSignal in step.run for cooperative cancellation#37
Conversation
Pass an AbortSignal to step.run callbacks so long-running steps can break out early when a run is cancelled. The signal is aborted both at step boundaries (before CancelledError is thrown) and mid-step via the run:cancel event listener. Fixes #26 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughThis PR makes step execution cancellation cooperative by passing an AbortSignal into Changes
Sequence Diagram(s)sequenceDiagram
participant User as User/API
participant Worker as Worker
participant Context as StepContext
participant Controller as AbortController
participant StepFn as Step Function
User->>Worker: request cancellation (cancel run)
Worker->>Worker: emit "run:cancel" event
Worker->>Context: run:cancel event delivered
Context->>Controller: controller.abort() (signal.aborted -> true)
StepFn->>Controller: observes signal (checks signal.aborted / receives AbortError)
alt Signal aborted
StepFn->>StepFn: stop/throw early
StepFn-->>Context: returns/throws cancelled result
else Signal not aborted
StepFn->>StepFn: continue normal work
StepFn-->>Context: return result
end
Context->>Worker: step result or cancellation propagated
Worker->>Worker: call dispose() to unsubscribe
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (3)
packages/durably/tests/shared/step.shared.ts (1)
459-513: Strengthen the boundary-cancellation test by assertingsignal.aborteddirectly.The test currently verifies cancellation behavior, but not the signal state it names. Adding a direct assertion will tighten intent-to-assertion alignment.
✅ Suggested test tweak
it('signal is aborted when cancellation is detected at step boundary', async () => { let step2Called = false + let step1SignalAborted = false let step1StartedResolve: () => void const step1StartedPromise = new Promise<void>((resolve) => { step1StartedResolve = resolve }) @@ const signalBoundaryTestDef = defineJob({ name: 'signal-boundary-test', input: z.object({}), run: async (step) => { - await step.run('step1', async () => { + await step.run('step1', async (signal) => { step1StartedResolve() // Wait until we are told to proceed (after cancel is issued) await proceedPromise + step1SignalAborted = signal.aborted return 'done' }) @@ // step2 callback should never have been called expect(step2Called).toBe(false) + expect(step1SignalAborted).toBe(true) })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/durably/tests/shared/step.shared.ts` around lines 459 - 513, The test should also assert the AbortSignal state directly: inside signalBoundaryTestDef's run, update the step.run('step1', ...) callback so that after awaiting proceedPromise it checks the passed-in signal.aborted is true (use the signal parameter provided to the step.run callback), then return; keep the existing cancellation flow and final run.status assertion unchanged so the test still verifies cancellation and that step2 was not executed.packages/durably/src/context.ts (1)
66-123: Avoid classifying cooperative cancellation as step failure.When the callback reacts to an aborted signal by throwing, Line [97] currently persists a failed step and emits
step:fail. Consider short-circuiting aborted executions so cancellation remains cancellation in step telemetry too.💡 Suggested adjustment
} catch (error) { + // Preserve cancellation semantics for aborted in-flight steps + if (controller.signal.aborted) { + throw error instanceof CancelledError + ? error + : new CancelledError(run.id) + } + // Save failed step const errorMessage = error instanceof Error ? error.message : String(error)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/durably/src/context.ts` around lines 66 - 123, The catch block that currently treats all thrown errors as failures should detect cooperative cancellation and record it as a cancellation instead: inside the catch (where storage.createStep and eventEmitter.emit are used), check controller.signal.aborted or error.name === 'AbortError' (or similar sentinel) and in that branch call storage.createStep with status: 'cancelled' (using run.id, name, index: stepIndex, startedAt) and emit a 'step:cancel' event (use runId, jobName, stepName: name, stepIndex, labels, duration if available), update the run.currentStepIndex as appropriate (same increment + storage.updateRun(run.id, { currentStepIndex: stepIndex })) and then rethrow the error; otherwise keep the existing failed-step logic for real errors.website/api/step.md (1)
42-58: Consider constructing the URL dynamically with the page number.The cooperative cancellation example is clear and demonstrates the key concepts well. However, the
urlvariable is static while iterating through pages. For improved clarity, consider showing dynamic URL construction:📝 Optional enhancement for the example
await step.run('fetch-all-pages', async (signal) => { const results = [] + const baseUrl = 'https://api.example.com/items' for (let page = 1; page <= totalPages; page++) { if (signal.aborted) break // Stop early on cancellation - const data = await fetch(url, { signal }) // Also aborts fetch + const data = await fetch(`${baseUrl}?page=${page}`, { signal }) // Also aborts fetch results.push(data) } return results })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@website/api/step.md` around lines 42 - 58, The example loop inside step.run uses a static url while iterating pages; update the example to construct the request URL per iteration (e.g., build url using page or page query param) so fetch receives the correct page-specific URL; modify the code inside the step.run callback (the for loop that checks signal.aborted and calls fetch) to build a dynamic URL per page before calling fetch.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@packages/durably/src/context.ts`:
- Around line 66-123: The catch block that currently treats all thrown errors as
failures should detect cooperative cancellation and record it as a cancellation
instead: inside the catch (where storage.createStep and eventEmitter.emit are
used), check controller.signal.aborted or error.name === 'AbortError' (or
similar sentinel) and in that branch call storage.createStep with status:
'cancelled' (using run.id, name, index: stepIndex, startedAt) and emit a
'step:cancel' event (use runId, jobName, stepName: name, stepIndex, labels,
duration if available), update the run.currentStepIndex as appropriate (same
increment + storage.updateRun(run.id, { currentStepIndex: stepIndex })) and then
rethrow the error; otherwise keep the existing failed-step logic for real
errors.
In `@packages/durably/tests/shared/step.shared.ts`:
- Around line 459-513: The test should also assert the AbortSignal state
directly: inside signalBoundaryTestDef's run, update the step.run('step1', ...)
callback so that after awaiting proceedPromise it checks the passed-in
signal.aborted is true (use the signal parameter provided to the step.run
callback), then return; keep the existing cancellation flow and final run.status
assertion unchanged so the test still verifies cancellation and that step2 was
not executed.
In `@website/api/step.md`:
- Around line 42-58: The example loop inside step.run uses a static url while
iterating pages; update the example to construct the request URL per iteration
(e.g., build url using page or page query param) so fetch receives the correct
page-specific URL; modify the code inside the step.run callback (the for loop
that checks signal.aborted and calls fetch) to build a dynamic URL per page
before calling fetch.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9abceb81-9ec0-487c-a992-4ab572d17125
📒 Files selected for processing (9)
packages/durably/docs/llms.mdpackages/durably/src/context.tspackages/durably/src/index.tspackages/durably/src/job.tspackages/durably/src/worker.tspackages/durably/tests/shared/step.shared.tswebsite/api/index.mdwebsite/api/step.mdwebsite/public/llms.txt
- Assert signal.aborted directly in boundary cancellation test - Use dynamic URL in docs cooperative cancellation example Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
AbortSignaltostep.run()callbacks, enabling cooperative cancellation of long-running stepsCancelledError) and mid-step viarun:canceleventdispose()in the worker'sfinallyblockRunCancelEventtype from package indexFixes #26
Test plan
cancel()during a long-running stepdurablyanddurably-react🤖 Generated with Claude Code
Summary by CodeRabbit