Skip to content

ACP agent_message_chunk frames land after end_turn RPC reply due to event-subscription / prompt-RPC race #25421

@truenorth-lj

Description

@truenorth-lj

Summary

Agent.prompt() in packages/opencode/src/acp/agent.ts:1471 returns stopReason: "end_turn" immediately after await this.sdk.session.prompt(...) resolves, but trailing message.part.delta events for the assistant's final text are still queued in the SDK event stream at that moment. They get processed by runEventSubscription and forwarded to the ACP connection as agent_message_chunk frames AFTER the RPC reply has already been sent.

This violates the ACP protocol expectation that session/update frames belong to a turn that ends with the prompt RPC reply. Clients consume stopReason: end_turn as the signal "the agent is done speaking" — disabling streaming indicators, re-enabling the input, etc. — but text continues to land on the wire after that signal, racing against the client's UI state.

Reproducer

Send any streaming session/prompt to the ACP server. Inspect WebSocket frames (Chrome DevTools → Network → WS → Messages). Sort by time.

Expected order:

... earlier session/update frames (chunks, tool_call_update) ...
{ "method": "session/update", ... agent_message_chunk: "...final delta..." }
{ "id": 2, "result": { "stopReason": "end_turn", "usage": {...} } }    ← turn ends here

Actual order (current behavior):

... earlier session/update frames ...
{ "id": 2, "result": { "stopReason": "end_turn", "usage": {...} } }    ← reply sent first
{ "method": "session/update", ... agent_message_chunk: "...final delta..." }    ← chunk lands AFTER

The post-reply chunk is small but real: it's the agent's last text delta (often the closing sentence of the response). Repro is reliable on tasks where the assistant emits a brief final summary, e.g. after a tool call that creates a file:

{ "type": "text", "text": "\n\nDone. Created `/path/to/file`" }

I have a wire trace from a production ACP client showing the chunk arriving 5–50ms after id:2 result:end_turn.

Cause

Two independent async paths in Agent write to the same ACP connection without synchronization:

Path A — event subscription (agent.ts:173-188)

  • runEventSubscription() infinite-loops over sdk.global.event()
  • For each message.part.delta event for the response message, handleEvent calls connection.sessionUpdate({ sessionUpdate: "agent_message_chunk", ... }) (line 482-525)

Path B — prompt RPC (agent.ts:1471-1490)

  • await sdk.session.prompt({...}) resolves when the LLM finishes
  • Returns { stopReason: "end_turn", ... } — becomes the JSON-RPC reply

Path B can resolve and return before Path A has consumed and forwarded the trailing deltas. There is no await between them.

Importantly, runEventSubscription itself is sequential (for await (const event of events.stream) { await this.handleEvent(...) }), so events ARE processed in order — but the prompt RPC return path doesn't wait for the event queue to drain.

Proposed fix

Block prompt() until the event subscription has observed message.updated for the response messageID with info.time.completed set. Because:

  1. runEventSubscription processes events sequentially, AND
  2. message.updated (with time.completed set) is the SDK's "this assistant message is fully written" signal,

awaiting that event guarantees every prior message.part.delta chunk for the same message has already been forwarded to the ACP connection.

Sketch:

// New private state on Agent:
private messageCompletionResolvers = new Map<string, () => void>()
private completedAssistantMessageIds = new Set<string>()

// New case in handleEvent:
case "message.updated": {
  const info = event.properties.info
  if (info.role === "assistant" && info.time.completed !== undefined) {
    this.completedAssistantMessageIds.add(info.id)
    const resolver = this.messageCompletionResolvers.get(info.id)
    if (resolver) {
      this.messageCompletionResolvers.delete(info.id)
      resolver()
    }
  }
  return
}

// Helper + 5s timeout fallback for safety.
private waitForMessageCompletion(messageId: string, timeoutMs: number): Promise<void> { ... }

// In prompt(), after `await sdk.session.prompt(...)`:
const msg = response.data?.info
if (msg?.id) {
  await this.waitForMessageCompletion(msg.id, 5000)
}

Both non-compact branches in prompt() need it (agent.ts:1471 and :1497). The compact path doesn't carry an assistant message id and is left as-is.

Environment

  • Repo: anomalyco/opencode
  • Branch: dev
  • File: packages/opencode/src/acp/agent.ts (line 1471 at HEAD)

Note

Will follow up with a PR. Opening this issue first per the issue-first policy that auto-closed my prior unlinked PR (#24772 → re-done as #24815/#24816).

Also happy to add a regression test using the existing createFakeAgent harness in test/acp/event-subscription.test.ts if reviewers want — I left it out of the initial PR to keep the diff focused, but the harness already supports event injection so it's straightforward.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions