feat(decopilot): per-thread DBOS gate for user messages#3376
Merged
viktormarinho merged 5 commits intoMay 15, 2026
Conversation
Contributor
🧪 BenchmarkShould we run the Virtual MCP strategy benchmark for this PR? React with 👍 to run the benchmark.
Benchmark will run on the next push after you react. |
Contributor
Release OptionsSuggested: Minor ( React with an emoji to override the release type:
Current version:
|
3 tasks
Contributor
There was a problem hiding this comment.
2 issues found across 5 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="apps/mesh/src/dispatch-queue/thread-gate-workflow.ts">
<violation number="1" location="apps/mesh/src/dispatch-queue/thread-gate-workflow.ts:127">
P1: Do not swallow dispatch exceptions in the workflow step; rethrow so failed runs are recorded as workflow failures (and can follow normal retry/failure handling) instead of succeeding with an ignored `{ error }` payload.</violation>
</file>
<file name="apps/mesh/src/api/routes/decopilot/routes.ts">
<violation number="1" location="apps/mesh/src/api/routes/decopilot/routes.ts:328">
P2: `chat_message_started` is emitted unconditionally after enqueue, so idempotent POST retries can be double-counted even when they collapse to an existing workflow.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
Re-trigger cubic
f2f6d16 to
96b817a
Compare
17fd03f to
7b0d010
Compare
Adds `threadGateWorkflow` — a DBOS workflow with partitioned concurrency=1 per threadId — and cuts `POST /messages` over to enqueue on it instead of calling `dispatchRun` directly. Holding the partition slot until `dispatchRunAndWait` returns is what serializes messages on the same thread: a second POST while a run is in-flight queues behind it and dispatches once the active run finishes. Idempotency: when the client supplies a request message id, the DBOS workflow ID is derived from `<threadId>:<messageId>`, so a retried POST collapses onto the existing handle. Other paths unchanged: - Orphan-resume in `/attach` still calls `dispatchRun` directly (recovery path; can't go through the queue because the run is already in flight). - Automations still go through `fireAutomationWorkflow` and its existing per-automation / global gates (rerouting them through `threadGateWorkflow` is a follow-up PR). Inbox UI and cancel endpoints land in a follow-up PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
7b0d010 to
32139cb
Compare
Phase 5 of the dispatch-queue unification. Automation fires now hand off
to `threadGateWorkflow` via `awaitThreadRun` instead of calling
`dispatchRunAndWait` directly. Practical effects:
- A user-message run and an automation run on the same thread now
serialize through the same per-thread gate (concurrency=1). Previously
they could race.
- Automation chunks still publish to the per-thread JetStream subject
(already true after Phase 1), so `/attach` keeps surfacing them live.
- The per-automation (concurrency=3) and global (concurrency=5) gates
remain — they layer above the new per-thread gate, not replaced.
API:
- New `awaitThreadRun(ctx, opts)` helper alongside `enqueueThreadRun`.
Returns the workflow result; used by callers that hold an outer queue
slot and need the outcome to advance (the automation fire step).
- `ThreadGateContext.source: "user-message" | "automation"` so the
workflow body suppresses `chat_message_started` for automation fires —
they reuse the gate but don't count as user message sends.
Cleanups:
- `AutomationRuntime.dispatchRunFn`, `.deps`, and the standalone
`DispatchRunFn` type are gone — the thread-gate runtime owns dispatch
now, automations only need `storage` + `meshContextFactory`.
- The automation workflow body still resolves with `{taskId, error}` on
failure (preserves the `FireAutomationOutcome` contract callers rely
on). The inner dispatch step *throws* so DBOS records step-level
failure for observability.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…cept idempotency header Three review findings on the per-thread gate: 1. `chat_message_started` was emitted before `dispatchRunAndWaitStep`, so setup failures from `prepareRun` (model-permission, agent-not-found, thread-ownership, async-research model-slot guard) produced orphan started events — `streamText.onError` only covers in-flight errors, not the pre-stream gap. Workflow now wraps the dispatch step in try/catch and emits `chat_message_failed` (in its own DBOS step, so replay-safe) when the step throws. `error_category: "setup"` keeps it distinguishable from runtime stream errors. 2. The 5-minute hard timeout was lifted from the automations gate and regressed user-chat runs (Claude Code, deep research, multi-step tool loops routinely exceed 5 min; the legacy fire-and-forget HTTP path had no timeout at all). Timeout is now opt-in: automations still pass an explicit cap (so cron runs are bounded), user messages leave it unset and no abort timer is installed. 3. Idempotency only worked when the client happened to send a message id, which is optional in the schema. Route now also accepts an explicit `X-Idempotency-Key` header (preferred over message id); docstring spells out the fallback chain and notes at-least-once semantics when neither is supplied. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Contributor
There was a problem hiding this comment.
1 issue found across 2 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="apps/mesh/src/api/routes/decopilot/routes.ts">
<violation number="1" location="apps/mesh/src/api/routes/decopilot/routes.ts:303">
P2: Empty `X-Idempotency-Key` values block fallback to message id, which can silently disable idempotency on retries.</violation>
</file>
Tip: Review your code locally with the cubic CLI to iterate faster.
Re-trigger cubic
DBOS forbids invoking workflows from inside a step, so the previous dispatchRunAndWaitStep crashed with "Invalid call to a workflow function from within a step or transaction" on every fire. Split into a buildDispatchRequest step (membership pre-check + request assembly, journaled) and call awaitThreadRun directly from fireAutomationWorkflowFn. markRunFailed moves into its own step so the side effect is recorded. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
`??` only falls back on null/undefined, so a client sending the header with an empty or whitespace-only value kept it as "", failed the truthy check below, and silently dropped to at-least-once semantics. Trim the header and treat an empty result as missing so the fallback to the last message's id still applies. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
4 tasks
viktormarinho
added a commit
that referenced
this pull request
May 17, 2026
#3387) * refactor(decopilot): drop /attach orphan-resume; collapse to pure tail Pre-PR-#3376, POST /messages dispatched runs directly, so a pod death mid-run left orphan threads that only the next /attach could resurrect. Now every user message lives inside a thread-gate DBOS workflow step, and the recovery executor replays it on a healthy pod with the streamBuffer wired in — chunks land back on the per-thread JetStream subject and the existing /attach tail picks them up. The heartbeat watcher in app.ts remains as a backstop. Also removes the now-dead fire-and-forget dispatchRun export and threadStorage from DecopilotDeps. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore(decopilot): fix stale dispatchRun reference in prepareRun error Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(decopilot): use DB thread status for /attach deliverPolicy isRunning() is pod-local; a client attached to a non-owner pod (multi-pod deployment, mid-deploy, or post-DBOS-replay rehome) would silently miss chunks the owner had already pumped to the shared JetStream subject. thread.status is set synchronously by run-reactor's claimRunStart, so it's a cluster-wide signal. The buffer purges on terminal events, so "all" only ever replays the current in-flight run. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore(decopilot): fix stale doc references missed by rename Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a per-thread DBOS gate so agent runs on the same thread execute one at a time, while runs on different threads progress in parallel. Both user messages and automation fires now funnel through the same gate.
POST /:org/decopilot/threads/:threadId/messagesenqueues onto a partitioned DBOS workflow (threadGateWorkflow, partition key =threadId, concurrency = 1) and returns202 { taskId }in milliseconds. A second POST while a run is in flight queues behind it and dispatches only after the active run completes. Streaming continues throughGET /:org/decopilot/attach/:threadId.Changes
apps/mesh/src/dispatch-queue/(new)threadGateWorkflow— single DBOS workflow that owns the per-thread serialization. Body is two steps:trackMessageStarted— emitschat_message_startedPostHog event (skipped for automation sources). Wrapped in a step so idempotent retries that collapse onto an existing workflow ID don't double-count.dispatchRunAndWait— invokes the configured dispatch fn; constructs its ownAbortController(sinceAbortSignalisn't serializable across workflow boundaries). If the step throws,trackMessageFailedemits witherror_category: "setup"to balance the started event.enqueueThreadRun(ctx, { workflowID? })— fire-and-forget. Used by the user-message route.awaitThreadRun(ctx, { workflowID? })— enqueue +getResult(). Used by automation fires that need to block on the inner run's outcome.setThreadGateRuntime()registry wiresdispatchRunFn,meshContextFactory, and dispatch deps beforeDBOS.launch().THREAD_GATE_QUEUE+THREAD_GATE_PARTITION_CONCURRENCYconstants; queue registered increateApp.apps/mesh/src/api/routes/decopilot/routes.tsPOST /messagesreplaces the inlinedispatchAndTrackcall withenqueueThreadRun. Returns202 { taskId }.X-Idempotency-Keyheader, falls back to the last message'sid. Combined asworkflowID = thread-run:<threadId>:<key>. Without either, retries get a fresh workflow ID (at-least-once).dispatchAndTrackdeleted; PostHogchat_message_startedis now emitted from the workflow step./attachcontinues to calldispatchRundirectly — going through the gate would deadlock against itself.apps/mesh/src/automations/dbos-workflow.tsfireAutomationWorkflowFnhands off to the thread gate viaawaitThreadRuninstead of callingdispatchRunFndirectly. Existing per-automation (concurrency=3) and global (concurrency=5) gates remain layered above; the per-thread gate adds the third layer.buildDispatchRequeststep — membership pre-check +buildStreamRequest, journaled so replays reuse the samecrypto.randomUUID()message ids.awaitThreadRunfrom the workflow body — DBOS forbids invoking a workflow from inside a step, so this lives in the workflow context. Failures are caught at the workflow level to preserve theFireAutomationOutcome{taskId, error}shape.markRunFailedextracted into its own step so the side effect is recorded on the journal.AutomationRuntimelosesdispatchRunFnanddeps(now owned by the thread-gate runtime).apps/mesh/src/api/app.tssetThreadGateRuntime,THREAD_GATE_QUEUE,THREAD_GATE_PARTITION_CONCURRENCYfrom@/dispatch-queue.setThreadGateRuntime({ dispatchRunFn: dispatchRunAndWait, meshContextFactory, deps })beforeDBOS.launch().thread-gatequeue withpartitionQueue: true, concurrency: 1.apps/mesh/src/automations/fire.tsDispatchRunFntype andDispatchRunInput/DispatchRunDepsimports.Telemetry semantics
chat_message_started— fires from inside the workflow body (post queue-admission), once per workflow. Idempotent retries collapse onto the same workflowID and do not double-count.chat_message_failed— emitted from one of three places, never two:prepareRunbeforestreamTextstarts → workflow'strackMessageFailedstep witherror_category: "setup".createUIMessageStream.onErrorinsidedispatchRunwithclassifyStreamError(error)category.Idempotency contract
X-Idempotency-Key: <stable-key>(recommended for retrying clients).id. Most chat clients already send a UUID per message.thread-run:<threadId>:<key>. A redelivered POST gets202andgetResult()-equivalent semantics inside DBOS.What's NOT in this PR
GET/DELETE /threads/:id/queueendpoints — follow-up.Bug fixes included on the branch
ff8914bb0): the initial automation routing crashed every fire withInvalid call to a 'workflow' function from within a 'step' or 'transaction'becauseawaitThreadRunwas called from insideDBOS.runStep. Fixed by movingawaitThreadRunto the workflow body and splitting the prep work into a journaled step.Test plan
bun run --cwd=apps/mesh checkclean (TypeScript).bun run fmtclean.thread-gate-workflow.test.tscovers queue plumbing (constant exports +setThreadGateRuntimeruntime shape).202 { taskId }and stream attaches via/attach.X-Idempotency-Key; verify no duplicate run / no duplicatechat_message_started.🤖 Generated with Claude Code