refactor(decopilot): JetStream source of truth + subscribe model#3353
Merged
Conversation
Decouples stream production from the HTTP response. The producer (`streamCore`) pumps `uiStream` chunks into JetStream via `streamBuffer.pump()`; every HTTP response — initial `/stream` and any `/attach` — is a JetStream live-tail subscriber. The producer's lifetime is bound to `registrySignal`, never to any consumer. The previous `relay()` was a `pipeThrough` TransformStream gated on the HTTP response staying connected: when a proxy or tab-close cancelled the response mid-stream, backpressure stopped pulling the relay and subsequent `writer.write(...)` calls from tools (e.g. the `data-web-search` progress chunks from `web_search`) never reached JetStream. The deferred-FINISH fix kept the registry alive but the stream output still vanished, so `/attach` replayed a prefix and hung. The pump is a detached async reader, so chunks are persisted regardless of consumer state. With UI-stream `onFinish` now always firing (the pump drains it to completion), the deferred-FINISH machinery is no longer needed: - drop `httpSignal` from `StreamCoreInput` and all call sites - drop `deferRegistryFinish` flag and its branches in `streamText` and UI-stream `onFinish` - revert `resolveThreadStatus` to UI parts shape only; drop the five AI-SDK-content tests added for the recovery path Test/dev mode with the no-op `streamBuffer` stub continues to work: when `createTailStream` returns null, `streamCore` falls back to serving `uiStream` directly — same degraded behavior as before the refactor when NATS was unavailable. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Contributor
Release OptionsSuggested: Patch ( React with an emoji to override the release type:
Current version:
|
Contributor
🧪 BenchmarkShould we run the Virtual MCP strategy benchmark for this PR? React with 👍 to run the benchmark.
Benchmark will run on the next push after you react. |
Adds the subscribe-model surface on top of the JetStream-source-of-truth
refactor: posting a message becomes a fire-and-forget command, and a
single long-lived /attach connection covers every run in the thread.
- POST /:org/decopilot/threads/:threadId/messages: claims the run,
starts the JetStream pump, returns `202 { taskId }` in milliseconds.
No SSE body. Mirrors the validation of POST /stream but takes the
threadId from the URL (the addressable resource in this model) and
rejects a body thread_id that disagrees.
- GET /:org/decopilot/attach/:threadId now accepts `?persistent=true`.
When set, the subscription stays open across multiple runs in the
thread instead of closing on the `{done}` sentinel — clients detect
run boundaries from the AI-SDK "finish" parts already in the chunk
stream. The default (no query param) preserves legacy reconnect
semantics for clients that haven't migrated yet.
- Persistent mode also tolerates idle threads: instead of returning
204 when no run is in progress, it subscribes from "now"
(DeliverPolicy.New) and waits for a future POST /messages to start
publishing. This is the "subscribe on thread open, then send
messages" pattern.
Stream-core changes:
- `fireAndForget?: boolean` on StreamCoreInput. When true, streamCore
starts the pump and returns `{ taskId }` (no stream). Used by both
POST /messages and /attach's orphan-resume branch (which now creates
the tail itself after kicking off the resume).
- The pump is started unconditionally when a streamBuffer is present,
before createTailStream is called. With JetStream's
`ordered: true` + DeliverPolicy.All, the subscription replays any
chunks published before it subscribed.
Other:
- Test-mode StreamBuffer stub's `pump` now drains uiStream so
`createUIMessageStream.execute` runs to completion in tests that
exercise streamCore without NATS. createTailStream still returns
null, so the legacy /stream route in test mode returns 503.
- `StreamCoreFn` type and `consumeStreamCore` updated for the
now-optional `stream` field.
- Unit test for persistent mode skipping the `{done}` sentinel.
Frontend migration is out of scope. The existing chat hook continues
using POST /stream + reconnect /attach. A follow-up will switch to
POST /messages + /attach?persistent=true with a custom ChatTransport.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…tream
Two pieces of cleanup that fall out of the subscribe-model refactor:
1. Replace the `fireAndForget` flag on streamCore with two explicit
public functions:
- `streamCore(...)` — always fire-and-forget. Requires
`deps.streamBuffer`. Starts the JetStream pump and returns
`{ taskId }`. HTTP routes that need SSE call
`streamBuffer.createTailStream(taskId)` themselves afterwards.
- `executeRun(...)` — drains `uiStream` internally to completion
and resolves `{ taskId }` once the run terminates. Used by
automations and the pod-death recovery flow in app.ts that need
to await the workflow step finishing.
Both share a single internal `streamCoreInner` with a `mode`
parameter. The dual-shape result type and `consumeStreamCore`
helper are gone.
2. Delete `POST /:org/decopilot/runtime/stream`. Confirmed zero
callers via repo search — defined in routes.ts and referenced as
a URL string in two packages/runtime files that don't appear to
actually invoke it. The endpoint is dead code.
The remaining `POST /:org/decopilot/stream` is marked DEPRECATED with
a comment pointing at the subscribe-model endpoints. It now uses the
same path under the hood — streamCore starts the pump, then the route
creates a one-shot tail subscription to serve as SSE — so legacy
clients survive HTTP cuts the same way new clients do.
`POST /:org/decopilot/threads/:threadId/messages` and `/attach`'s
orphan-resume drop their `fireAndForget: true` line since it's
implicit now.
`app.ts` pod-death recovery switches from
`streamCore + consumeStreamCore` to `executeRun`. The DBOS workflow's
`streamCoreFn` wires to `executeRun` instead of `streamCore`.
Frontend is still on `POST /stream` + reconnect `/attach`; migration
to the new endpoints is the focused follow-up PR.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
… model Frontend switches from the AI SDK's useChat (POST + response-bound stream) to a hand-rolled useThreadChat that fits the subscribe-model backend: sendMessage POSTs to /messages and a persistent /attach delivers assistant chunks to every observer of the thread. useStreamManager loses its resume gymnastics — the persistent connection lives inside useThreadChat — and keeps only the SSE-driven cache invalidations. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…r_ask responses
Four issues found while testing the subscribe model in the UI:
- Dev server hangs after a few mounts. The persistent /attach was kept
alive via a bare useRef and never aborted, so React's StrictMode
double-mount and every HMR cycle leaked one SSE connection. The dev
server's HTTP pool fills up and new requests (including hard refresh)
block waiting for a free slot. Move the lifecycle into
useSyncExternalStore so React aborts the fetch on unmount.
- user_ask / approval-required runs hang at end of stream. The backend
pump's chunk publishes are fire-and-forget; the reactor's purge on
`requires_action` can race ahead of the trailing AI-SDK
`{type: "finish"}` chunk, which then never reaches the client and
leaves my per-finish demux waiting forever. Subscribe to the
`decopilot.finish` SSE event inside the hook and force-close the
current sub-stream as a backstop.
- Approval / tool-output responses had no visible effect. The patched
assistant lived in localMessages but mergeWithServer dropped it the
moment the server refetch produced an unpatched copy with the same
id, so the continuation POST body carried the stale assistant. Make
mergeWithServer prefer local over server for overlapping ids — local
is always the more-recent version (eager-patched), server is just
what the DB had a moment ago.
- Submit button on the user_ask popup no-op'd after a refresh. The
assistant message was loaded from the server snapshot only;
localMessages was empty, so patchLastAssistant updated nothing. Fall
back to initialMessages when local has no assistant tail and promote
a patched copy into localMessages.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…tchRunAndWait The pair `streamCore` (fire-and-forget) + `executeRun` (drain to completion) named the same operation differently: "core" said nothing about behavior and "stream" stopped matching once the fire-and-forget half stopped returning a stream. Both functions claim a run, start the agent loop, and publish chunks to JetStream — the only difference is whether the caller awaits completion. streamCore → dispatchRun executeRun → dispatchRunAndWait StreamCoreInput → DispatchRunInput StreamCoreDeps → DispatchRunDeps StreamCoreResult → DispatchRunResult StreamCoreFn → DispatchRunFn StreamCoreMode → DispatchRunMode stream-core.ts → dispatch-run.ts The shared `dispatch` prefix signals the symmetry; the `AndWait` suffix says exactly what the second variant adds. Aligns with the surrounding "run" vocabulary already used by `runRegistry`, `run-reactor`, and the `RUN_*` event taxonomy. No behavior change. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…iant tail
dispatchRunInner mixed setup (claim run, load conversation, build tools,
construct uiStream) with a `mode`-dependent tail that either handed
uiStream to the JetStream pump or drained it inline. Now:
- `prepareRun` (private): does only the setup, returns
`{ taskId, uiStream, registrySignal }`. Setup-phase errors still trip
the catch block that force-FINISHes the run to "failed".
- `dispatchRun` (public, fire-and-forget): awaits `prepareRun`, then
`buffer.pump(uiStream, taskId, registrySignal)`. Returns immediately.
- `dispatchRunAndWait` (public, drain-to-completion): awaits
`prepareRun`, drains `uiStream` with a reader loop, returns when done.
Drops the `mode: "fire-and-forget" | "drain"` parameter and the runtime
`if (mode === ...)` branch at the bottom of the 1.4k-line function. Each
public function's body is now ~5 lines and tells you exactly what it
does without having to grep for the mode flag.
Also hoists the OTel span-attribute object into `dispatchRunSpanAttrs`
to dedupe the identical six-line blob both call sites had.
No behavior change.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…am-source-of-truth
…ttach flag
Now that the frontend is fully on POST /messages + GET /attach, the
legacy surfaces are dead:
- `POST /:org/decopilot/stream` had no callers (route handler + one
stale doc comment were the only references). Internally it was just
`dispatchRun` + a one-shot tail on the same JetStream subject — i.e.
exactly what the /messages + /attach pair does in two requests
instead of one. Removed entirely (~140 lines).
- `GET /:org/decopilot/attach/:threadId?persistent=true` is the only
shape we ever hit. Drop the `persistent` query parameter and the
`closeOnDone` option on `streamBuffer.createTailStream` — the JetStream
`{done}` sentinel is now always swallowed server-side, run boundaries
are detected client-side from the AI-SDK `{type: "finish"}` chunk.
One open connection per (tab, thread) covers every run.
- Update the corresponding tests in nats-stream-buffer.test.ts to drop
the now-irrelevant `closeOnDone: false` option and the stale
`push({done: true})` expectations that relied on the legacy
close-on-done behavior.
- Frontend's /attach URL drops `?persistent=true`.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…te + dispatchAndTrack
Two functions instead of one wrapper:
validate(c, threadIdParam?) → DispatchRunInput
Parse body, resolve models, check permissions. No side effects
beyond reading the request. Throws HTTPException for caller-visible
failures.
dispatchAndTrack(input, ctx, deps) → { taskId }
Calls dispatchRun and emits the chat_message_started posthog event.
Kept separate from dispatchRun itself so orphan-resume / automation
paths (which call dispatchRun without a fresh user message) don't
double-count the event.
The /messages route handler now reads top-to-bottom as three lines of
business logic:
const input = await validate(c, c.req.param("threadId"));
const { taskId } = await dispatchAndTrack(input, ctx, deps);
return c.json({ taskId }, 202);
If we ever add a /stream compat shim, it'd swap line 3 for a tail
subscription with the same first two lines unchanged.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
pedrofrxncx
approved these changes
May 14, 2026
…eadChat onToolCall
`useInvalidateCollectionsOnToolCall` was written for useChat's
`ChatOnToolCallCallback` signature (`event.toolCall.toolName`). When I
swapped in useThreadChat I passed a flat `{ toolCallId, toolName, input }`
payload instead, so `event.toolCall` was undefined and every tool call
threw "Cannot read properties of undefined (reading 'toolName')" mid-stream.
Wrap the payload as `{ toolCall: {...} }` and align the option type so the
existing handler keeps working unchanged.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
useThreadChat's onFinish payload was missing finishReason, so consumers
landing on `setFinishReason(payload.finishReason ?? null)` always got
null. The warning banner in chat/highlight/index.tsx gates on
`!!finishReason && finishReason !== "stop"`, so non-stop reasons
(length, content-filter, error, tool-calls-without-client-tools) never
showed the StatusHighlight warning card.
`readUIMessageStream` folds chunks into a UIMessage snapshot but drops
the top-level `finishReason` field from `{type: "finish"}` chunks. We
already see every raw chunk in `handleChunkFanOut`, so capture it there
into `demuxRef.pendingFinishReason`, and read it back in the
sub-stream's async drain loop right before invoking `onFinish`.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…unce SSE finish backstop
Three review fixes for the subscribe-model frontend.
1. Reconnect on transient failures (the important one)
The persistent /attach is the only delivery path for assistant
chunks now. Pre-refactor `useStreamManager` handled mid-stream
drops by calling `chat.resumeStream()`; that logic was dropped
when the resume code was removed but never replaced.
Some self-hosted proxies hard-cut TCP after a fixed duration
regardless of SSE keepalive (~2 min seen in one customer setup),
so a single drop today silently kills updates until reload.
Wrap `runPersistentLoop` in a reconnect loop with exponential
backoff (1s → 2s → 4s → … capped at 30s). Distinguish transient
(TypeError, server-side `done` mid-run, network unreachable)
from terminal (4xx/5xx HTTP response, schema parse error,
`signal.aborted`). Reconnect on transient.
The new /attach uses `DeliverPolicy.All` and replays every chunk
for the in-flight run from JetStream's start. Caller's
`onReconnect` hook discards the in-flight sub-stream's partial
fold (`forceCloseCurrentSubStream(true)`) and resets
`streamingStore` so the replay re-folds cleanly without
duplicating deltas into the previous state. `discardOnClose`
flag on `demuxRef` carries the intent into the async drain so
the partial doesn't get half-committed to `localMessages`.
2. SSE `decopilot.finish` backstop debounced ~1.5s
The watch SSE and the JetStream tail are independent transports.
Under load the SSE event commonly arrives ahead of the buffered
`{type:"finish"}` chunk on /attach; closing the sub-stream then
would orphan the in-flight chunks (`ensureSubStream` would open
a fresh sub for them, surfacing as a duplicate partial assistant
message). Wait ~1.5s so the actual finish chunk has time to
land — if it does, the sub is already closed and the backstop
is a no-op. If it doesn't (the original race we added the
backstop for: chunk lost to the JetStream purge), the timer
forces close as before.
3. Stale comment removed from chat-context.tsx — `useStreamManager`
no longer "owns every resume decision" and no longer takes an
`onResumeSuccess` callback.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…close the next one
The 1.5s SSE-finish backstop timer was scheduled blind — when run N's
chunk-finish arrived normally inside the window and the user (or
`maybeAutoSend` after addToolOutput / addToolApprovalResponse) kicked
off run N+1, the timer closed N+1's freshly-opened sub-stream instead.
The partial got promoted to localMessages and the remaining chunks
opened a second sub via `ensureSubStream`, fragmenting a single run
into two assistant bubbles.
Track `pendingSseBackstops` on `demuxRef`:
- SSE `decopilot.finish` → ++ ; only schedule a timer when positive
- AI-SDK `{type:"finish"}` chunk → -- (and let it go negative; a
later SSE finish for the same run will balance back to 0)
- timer fires → only act when still positive; consume one slot
Trace:
- normal flow: chunk--, then SSE++ → 0 → no schedule
- lost-chunk (purge race): SSE++ → 1, no chunk, timer fires, --, close
- reviewer's race: chunk--, SSE++ → 0, no schedule; the user
starts N+1 with no pending timer
- SSE delayed past N+1: chunk for N--, chunk for N+1--, SSE for N++,
SSE for N+1++ → counter stays in lockstep, no
spurious fires
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
pedrofrxncx
pushed a commit
that referenced
this pull request
May 15, 2026
* refactor(decopilot): jetstream as source of truth for UI stream
Decouples stream production from the HTTP response. The producer
(`streamCore`) pumps `uiStream` chunks into JetStream via
`streamBuffer.pump()`; every HTTP response — initial `/stream` and
any `/attach` — is a JetStream live-tail subscriber. The producer's
lifetime is bound to `registrySignal`, never to any consumer.
The previous `relay()` was a `pipeThrough` TransformStream gated on
the HTTP response staying connected: when a proxy or tab-close
cancelled the response mid-stream, backpressure stopped pulling the
relay and subsequent `writer.write(...)` calls from tools (e.g. the
`data-web-search` progress chunks from `web_search`) never reached
JetStream. The deferred-FINISH fix kept the registry alive but the
stream output still vanished, so `/attach` replayed a prefix and
hung. The pump is a detached async reader, so chunks are persisted
regardless of consumer state.
With UI-stream `onFinish` now always firing (the pump drains it to
completion), the deferred-FINISH machinery is no longer needed:
- drop `httpSignal` from `StreamCoreInput` and all call sites
- drop `deferRegistryFinish` flag and its branches in `streamText`
and UI-stream `onFinish`
- revert `resolveThreadStatus` to UI parts shape only; drop the five
AI-SDK-content tests added for the recovery path
Test/dev mode with the no-op `streamBuffer` stub continues to work:
when `createTailStream` returns null, `streamCore` falls back to
serving `uiStream` directly — same degraded behavior as before the
refactor when NATS was unavailable.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* feat(decopilot): subscribe model — POST /messages + persistent /attach
Adds the subscribe-model surface on top of the JetStream-source-of-truth
refactor: posting a message becomes a fire-and-forget command, and a
single long-lived /attach connection covers every run in the thread.
- POST /:org/decopilot/threads/:threadId/messages: claims the run,
starts the JetStream pump, returns `202 { taskId }` in milliseconds.
No SSE body. Mirrors the validation of POST /stream but takes the
threadId from the URL (the addressable resource in this model) and
rejects a body thread_id that disagrees.
- GET /:org/decopilot/attach/:threadId now accepts `?persistent=true`.
When set, the subscription stays open across multiple runs in the
thread instead of closing on the `{done}` sentinel — clients detect
run boundaries from the AI-SDK "finish" parts already in the chunk
stream. The default (no query param) preserves legacy reconnect
semantics for clients that haven't migrated yet.
- Persistent mode also tolerates idle threads: instead of returning
204 when no run is in progress, it subscribes from "now"
(DeliverPolicy.New) and waits for a future POST /messages to start
publishing. This is the "subscribe on thread open, then send
messages" pattern.
Stream-core changes:
- `fireAndForget?: boolean` on StreamCoreInput. When true, streamCore
starts the pump and returns `{ taskId }` (no stream). Used by both
POST /messages and /attach's orphan-resume branch (which now creates
the tail itself after kicking off the resume).
- The pump is started unconditionally when a streamBuffer is present,
before createTailStream is called. With JetStream's
`ordered: true` + DeliverPolicy.All, the subscription replays any
chunks published before it subscribed.
Other:
- Test-mode StreamBuffer stub's `pump` now drains uiStream so
`createUIMessageStream.execute` runs to completion in tests that
exercise streamCore without NATS. createTailStream still returns
null, so the legacy /stream route in test mode returns 503.
- `StreamCoreFn` type and `consumeStreamCore` updated for the
now-optional `stream` field.
- Unit test for persistent mode skipping the `{done}` sentinel.
Frontend migration is out of scope. The existing chat hook continues
using POST /stream + reconnect /attach. A follow-up will switch to
POST /messages + /attach?persistent=true with a custom ChatTransport.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* refactor(decopilot): split streamCore + executeRun; delete /runtime/stream
Two pieces of cleanup that fall out of the subscribe-model refactor:
1. Replace the `fireAndForget` flag on streamCore with two explicit
public functions:
- `streamCore(...)` — always fire-and-forget. Requires
`deps.streamBuffer`. Starts the JetStream pump and returns
`{ taskId }`. HTTP routes that need SSE call
`streamBuffer.createTailStream(taskId)` themselves afterwards.
- `executeRun(...)` — drains `uiStream` internally to completion
and resolves `{ taskId }` once the run terminates. Used by
automations and the pod-death recovery flow in app.ts that need
to await the workflow step finishing.
Both share a single internal `streamCoreInner` with a `mode`
parameter. The dual-shape result type and `consumeStreamCore`
helper are gone.
2. Delete `POST /:org/decopilot/runtime/stream`. Confirmed zero
callers via repo search — defined in routes.ts and referenced as
a URL string in two packages/runtime files that don't appear to
actually invoke it. The endpoint is dead code.
The remaining `POST /:org/decopilot/stream` is marked DEPRECATED with
a comment pointing at the subscribe-model endpoints. It now uses the
same path under the hood — streamCore starts the pump, then the route
creates a one-shot tail subscription to serve as SSE — so legacy
clients survive HTTP cuts the same way new clients do.
`POST /:org/decopilot/threads/:threadId/messages` and `/attach`'s
orphan-resume drop their `fireAndForget: true` line since it's
implicit now.
`app.ts` pod-death recovery switches from
`streamCore + consumeStreamCore` to `executeRun`. The DBOS workflow's
`streamCoreFn` wires to `executeRun` instead of `streamCore`.
Frontend is still on `POST /stream` + reconnect `/attach`; migration
to the new endpoints is the focused follow-up PR.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* refactor(decopilot): replace useChat with useThreadChat for subscribe model
Frontend switches from the AI SDK's useChat (POST + response-bound stream)
to a hand-rolled useThreadChat that fits the subscribe-model backend:
sendMessage POSTs to /messages and a persistent /attach delivers assistant
chunks to every observer of the thread. useStreamManager loses its resume
gymnastics — the persistent connection lives inside useThreadChat — and
keeps only the SSE-driven cache invalidations.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* fix(decopilot): make useThreadChat survive StrictMode and pick up user_ask responses
Four issues found while testing the subscribe model in the UI:
- Dev server hangs after a few mounts. The persistent /attach was kept
alive via a bare useRef and never aborted, so React's StrictMode
double-mount and every HMR cycle leaked one SSE connection. The dev
server's HTTP pool fills up and new requests (including hard refresh)
block waiting for a free slot. Move the lifecycle into
useSyncExternalStore so React aborts the fetch on unmount.
- user_ask / approval-required runs hang at end of stream. The backend
pump's chunk publishes are fire-and-forget; the reactor's purge on
`requires_action` can race ahead of the trailing AI-SDK
`{type: "finish"}` chunk, which then never reaches the client and
leaves my per-finish demux waiting forever. Subscribe to the
`decopilot.finish` SSE event inside the hook and force-close the
current sub-stream as a backstop.
- Approval / tool-output responses had no visible effect. The patched
assistant lived in localMessages but mergeWithServer dropped it the
moment the server refetch produced an unpatched copy with the same
id, so the continuation POST body carried the stale assistant. Make
mergeWithServer prefer local over server for overlapping ids — local
is always the more-recent version (eager-patched), server is just
what the DB had a moment ago.
- Submit button on the user_ask popup no-op'd after a refresh. The
assistant message was loaded from the server snapshot only;
localMessages was empty, so patchLastAssistant updated nothing. Fall
back to initialMessages when local has no assistant tail and promote
a patched copy into localMessages.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* refactor(decopilot): rename streamCore/executeRun → dispatchRun/dispatchRunAndWait
The pair `streamCore` (fire-and-forget) + `executeRun` (drain to completion)
named the same operation differently: "core" said nothing about behavior
and "stream" stopped matching once the fire-and-forget half stopped
returning a stream. Both functions claim a run, start the agent loop, and
publish chunks to JetStream — the only difference is whether the caller
awaits completion.
streamCore → dispatchRun
executeRun → dispatchRunAndWait
StreamCoreInput → DispatchRunInput
StreamCoreDeps → DispatchRunDeps
StreamCoreResult → DispatchRunResult
StreamCoreFn → DispatchRunFn
StreamCoreMode → DispatchRunMode
stream-core.ts → dispatch-run.ts
The shared `dispatch` prefix signals the symmetry; the `AndWait` suffix
says exactly what the second variant adds. Aligns with the surrounding
"run" vocabulary already used by `runRegistry`, `run-reactor`, and the
`RUN_*` event taxonomy.
No behavior change.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* refactor(decopilot): split dispatchRunInner into prepareRun + per-variant tail
dispatchRunInner mixed setup (claim run, load conversation, build tools,
construct uiStream) with a `mode`-dependent tail that either handed
uiStream to the JetStream pump or drained it inline. Now:
- `prepareRun` (private): does only the setup, returns
`{ taskId, uiStream, registrySignal }`. Setup-phase errors still trip
the catch block that force-FINISHes the run to "failed".
- `dispatchRun` (public, fire-and-forget): awaits `prepareRun`, then
`buffer.pump(uiStream, taskId, registrySignal)`. Returns immediately.
- `dispatchRunAndWait` (public, drain-to-completion): awaits
`prepareRun`, drains `uiStream` with a reader loop, returns when done.
Drops the `mode: "fire-and-forget" | "drain"` parameter and the runtime
`if (mode === ...)` branch at the bottom of the 1.4k-line function. Each
public function's body is now ~5 lines and tells you exactly what it
does without having to grep for the mode flag.
Also hoists the OTel span-attribute object into `dispatchRunSpanAttrs`
to dedupe the identical six-line blob both call sites had.
No behavior change.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* refactor(decopilot): remove legacy POST /stream and the persistent /attach flag
Now that the frontend is fully on POST /messages + GET /attach, the
legacy surfaces are dead:
- `POST /:org/decopilot/stream` had no callers (route handler + one
stale doc comment were the only references). Internally it was just
`dispatchRun` + a one-shot tail on the same JetStream subject — i.e.
exactly what the /messages + /attach pair does in two requests
instead of one. Removed entirely (~140 lines).
- `GET /:org/decopilot/attach/:threadId?persistent=true` is the only
shape we ever hit. Drop the `persistent` query parameter and the
`closeOnDone` option on `streamBuffer.createTailStream` — the JetStream
`{done}` sentinel is now always swallowed server-side, run boundaries
are detected client-side from the AI-SDK `{type: "finish"}` chunk.
One open connection per (tab, thread) covers every run.
- Update the corresponding tests in nats-stream-buffer.test.ts to drop
the now-irrelevant `closeOnDone: false` option and the stale
`push({done: true})` expectations that relied on the legacy
close-on-done behavior.
- Frontend's /attach URL drops `?persistent=true`.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* refactor(decopilot): split validateAndDispatch into composable validate + dispatchAndTrack
Two functions instead of one wrapper:
validate(c, threadIdParam?) → DispatchRunInput
Parse body, resolve models, check permissions. No side effects
beyond reading the request. Throws HTTPException for caller-visible
failures.
dispatchAndTrack(input, ctx, deps) → { taskId }
Calls dispatchRun and emits the chat_message_started posthog event.
Kept separate from dispatchRun itself so orphan-resume / automation
paths (which call dispatchRun without a fresh user message) don't
double-count the event.
The /messages route handler now reads top-to-bottom as three lines of
business logic:
const input = await validate(c, c.req.param("threadId"));
const { taskId } = await dispatchAndTrack(input, ctx, deps);
return c.json({ taskId }, 202);
If we ever add a /stream compat shim, it'd swap line 3 for a tail
subscription with the same first two lines unchanged.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* fix(decopilot): match useChat's `{ toolCall: {...} }` shape in useThreadChat onToolCall
`useInvalidateCollectionsOnToolCall` was written for useChat's
`ChatOnToolCallCallback` signature (`event.toolCall.toolName`). When I
swapped in useThreadChat I passed a flat `{ toolCallId, toolName, input }`
payload instead, so `event.toolCall` was undefined and every tool call
threw "Cannot read properties of undefined (reading 'toolName')" mid-stream.
Wrap the payload as `{ toolCall: {...} }` and align the option type so the
existing handler keeps working unchanged.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* fix(decopilot): forward finishReason from finish chunk through onFinish
useThreadChat's onFinish payload was missing finishReason, so consumers
landing on `setFinishReason(payload.finishReason ?? null)` always got
null. The warning banner in chat/highlight/index.tsx gates on
`!!finishReason && finishReason !== "stop"`, so non-stop reasons
(length, content-filter, error, tool-calls-without-client-tools) never
showed the StatusHighlight warning card.
`readUIMessageStream` folds chunks into a UIMessage snapshot but drops
the top-level `finishReason` field from `{type: "finish"}` chunks. We
already see every raw chunk in `handleChunkFanOut`, so capture it there
into `demuxRef.pendingFinishReason`, and read it back in the
sub-stream's async drain loop right before invoking `onFinish`.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* fix(decopilot): reconnect persistent /attach on transient drops; debounce SSE finish backstop
Three review fixes for the subscribe-model frontend.
1. Reconnect on transient failures (the important one)
The persistent /attach is the only delivery path for assistant
chunks now. Pre-refactor `useStreamManager` handled mid-stream
drops by calling `chat.resumeStream()`; that logic was dropped
when the resume code was removed but never replaced.
Some self-hosted proxies hard-cut TCP after a fixed duration
regardless of SSE keepalive (~2 min seen in one customer setup),
so a single drop today silently kills updates until reload.
Wrap `runPersistentLoop` in a reconnect loop with exponential
backoff (1s → 2s → 4s → … capped at 30s). Distinguish transient
(TypeError, server-side `done` mid-run, network unreachable)
from terminal (4xx/5xx HTTP response, schema parse error,
`signal.aborted`). Reconnect on transient.
The new /attach uses `DeliverPolicy.All` and replays every chunk
for the in-flight run from JetStream's start. Caller's
`onReconnect` hook discards the in-flight sub-stream's partial
fold (`forceCloseCurrentSubStream(true)`) and resets
`streamingStore` so the replay re-folds cleanly without
duplicating deltas into the previous state. `discardOnClose`
flag on `demuxRef` carries the intent into the async drain so
the partial doesn't get half-committed to `localMessages`.
2. SSE `decopilot.finish` backstop debounced ~1.5s
The watch SSE and the JetStream tail are independent transports.
Under load the SSE event commonly arrives ahead of the buffered
`{type:"finish"}` chunk on /attach; closing the sub-stream then
would orphan the in-flight chunks (`ensureSubStream` would open
a fresh sub for them, surfacing as a duplicate partial assistant
message). Wait ~1.5s so the actual finish chunk has time to
land — if it does, the sub is already closed and the backstop
is a no-op. If it doesn't (the original race we added the
backstop for: chunk lost to the JetStream purge), the timer
forces close as before.
3. Stale comment removed from chat-context.tsx — `useStreamManager`
no longer "owns every resume decision" and no longer takes an
`onResumeSuccess` callback.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* fix(decopilot): match SSE finish backstop to its own run so it can't close the next one
The 1.5s SSE-finish backstop timer was scheduled blind — when run N's
chunk-finish arrived normally inside the window and the user (or
`maybeAutoSend` after addToolOutput / addToolApprovalResponse) kicked
off run N+1, the timer closed N+1's freshly-opened sub-stream instead.
The partial got promoted to localMessages and the remaining chunks
opened a second sub via `ensureSubStream`, fragmenting a single run
into two assistant bubbles.
Track `pendingSseBackstops` on `demuxRef`:
- SSE `decopilot.finish` → ++ ; only schedule a timer when positive
- AI-SDK `{type:"finish"}` chunk → -- (and let it go negative; a
later SSE finish for the same run will balance back to 0)
- timer fires → only act when still positive; consume one slot
Trace:
- normal flow: chunk--, then SSE++ → 0 → no schedule
- lost-chunk (purge race): SSE++ → 1, no chunk, timer fires, --, close
- reviewer's race: chunk--, SSE++ → 0, no schedule; the user
starts N+1 with no pending timer
- SSE delayed past N+1: chunk for N--, chunk for N+1--, SSE for N++,
SSE for N+1++ → counter stays in lockstep, no
spurious fires
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
---------
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Replaces the request-response decopilot streaming model with a subscribe model backed by NATS JetStream as the source of truth. Both backend and frontend are migrated.
JetStream as source of truth
Decouples chunk production from the HTTP response.
dispatchRunpumpsuiStreamchunks into JetStream viastreamBuffer.pump(); every HTTP response is a JetStream live-tail subscriber. The producer's lifetime is bound toregistrySignal, never to any consumer.The bug this fixed: after #3334 / #3341 the registry survived an HTTP cut but tool output (web_search etc.) still hung. The previous
streamBuffer.relay()was apipeThroughTransformStream gated on the HTTP response staying connected; when a proxy or tab-close cancelled the response mid-stream, the relay stopped being pulled and subsequentwriter.write(...)calls from tools never reached JetStream. The newpump()is a detached async reader, so JetStream is decoupled from any consumer's lifecycle.Subscribe model
POST /:org/decopilot/threads/:threadId/messages— claims the run, starts the pump, returns202 { taskId }. No SSE body.GET /:org/decopilot/attach/:threadId— long-lived SSE. Stays open across runs in the thread; clients detect run boundaries from the AI-SDK{type: "finish"}chunk in the stream. When the thread is idle (no run in progress) the subscription waits withDeliverPolicy.Newfor a futurePOST /messagesrather than 204-ing.dispatchRun / dispatchRunAndWait split
The agent loop is exposed as two public functions sharing a private
prepareRunhelper that does all the setup (claim run, build prompt, constructuiStream):dispatchRun(input, ctx, deps)— fire-and-forget. HandsuiStreamtostreamBuffer.pump()and returns{ taskId }. Used by HTTP routes.dispatchRunAndWait(input, ctx, deps)— drainsuiStreaminternally and resolves once the run finishes. Used by automations (DBOS workflow steps) and pod-death orphan recovery that need to await completion.The previous
fireAndForgetflag,consumeStreamCorehelper, and dual-shape result are gone.POST /:org/decopilot/runtime/stream(0 callers) is deleted.Route composition
/messagesis wired from two composable helpers inroutes.ts:validate— pure-ish: parse body, resolve models, check permissions, returnDispatchRunInput. ThrowsHTTPExceptionfor caller-visible failures.dispatchAndTrack— callsdispatchRunand emits thechat_message_startedposthog event. Kept separate fromdispatchRunitself so orphan-resume and automation paths don't double-count the event.Frontend migration
The chat hook is rewritten to fit the subscribe model.
useChatfrom@ai-sdk/reactis gone for the decopilot panel:useThreadChat(new) — holds one persistentGET /attach/:threadIdSSE per(orgSlug, threadId)pair. Pipes the body throughparseJsonEventStreamand folds chunks into UIMessage state viareadUIMessageStream, splitting on{type: "finish"}so each fold yields exactly one assistant message. Lifecycle is driven byuseSyncExternalStoreso React aborts the fetch cleanly on unmount (StrictMode-safe).sendMessageis a fire-and-forgetPOST /messageswith optimistic user-message append.addToolOutput/addToolApprovalResponsepatch local state and auto-fire a continuation POST whensendAutomaticallyWhenmatches.useStreamManageris stripped down to SSE-driven cache invalidations — no more resume gymnastics since the subscribe stream is always open.SubscribeChatTransport(the intermediate fix that keptuseChat) is deleted.Subscribe model also fixes cross-tab streaming: open the same thread in two tabs, send from one, the other streams live without refresh.
Several rounds of fix commits on top of the initial frontend migration cover edge cases discovered during testing: StrictMode SSE leaks,
THREAD_STATUS=in_progress-triggered cache refetch, server-vs-local message merge precedence (local wins so eager tool-output patches survive a server refetch), and a JetStream-purge-vs-finish-chunk race backstop using the SSEdecopilot.finishevent.Endpoint matrix (final)
POST /:org/decopilot/threads/:threadId/messages202 { taskId }.GET /:org/decopilot/attach/:threadIdPOST /:org/decopilot/streamGET /:org/decopilot/attach/:threadId?persistent=truePOST /:org/decopilot/runtime/streamTest plan
bun run check/lint/fmtcleanbun test apps/mesh/src/api/routes/decopilot— 349 pass, 0 failuser_ask/ approval-required tool round-trip🤖 Generated with Claude Code