Skip to content

fix: prevent dropped SSE events under back-to-back emission#906

Merged
kabir merged 1 commit into
a2aproject:mainfrom
ehsavoie:flaky_multiple_messages_test_fix
Jun 4, 2026
Merged

fix: prevent dropped SSE events under back-to-back emission#906
kabir merged 1 commit into
a2aproject:mainfrom
ehsavoie:flaky_multiple_messages_test_fix

Conversation

@ehsavoie
Copy link
Copy Markdown
Collaborator

@ehsavoie ehsavoie commented Jun 2, 2026

With request(1) backpressure, the write-completion handler was responsible for requesting the next event. When an agent emitted two events back-to-back (e.g. artifact + completion), tube.complete() could fire while the completion event was still buffered with demand=0, causing mutiny-zero to drop it and close the stream with only one event delivered.

Switch to request(Long.MAX_VALUE) so the drain loop delivers all buffered events synchronously before tube.complete() is called. Replace the unreliable bytesWritten() == 0 header guard with a boolean headersSet flag, since multiple onNext() calls now arrive before any async write completes. Reduce BUFFER_FLUSH_DELAY_MS from 150 ms to 50 ms as it is now a secondary safety margin rather than the primary guard.

@ehsavoie ehsavoie requested a review from kabir June 2, 2026 07:37
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request changes the SSE backpressure strategy across multiple A2AServerRoutes files by requesting all events upfront (request(Long.MAX_VALUE)) instead of one by one, and introduces a headersSet flag to track when headers are written. It also reduces the buffer flush delay in EventConsumer. The reviewer identified a critical Vert.x threading model violation across all modified route files: processing reactive stream events on the background executor thread instead of the Vert.x event loop thread is unsafe because HttpServerResponse and RoutingContext are not thread-safe. To resolve this concurrency issue, the reviewer recommends wrapping Vert.x interactions in rc.vertx().runOnContext(...).

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

@ehsavoie ehsavoie force-pushed the flaky_multiple_messages_test_fix branch from c4e804f to daa12e3 Compare June 2, 2026 11:32
@ehsavoie
Copy link
Copy Markdown
Collaborator Author

ehsavoie commented Jun 2, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors Server-Sent Events (SSE) handling across several server route classes to ensure thread safety and prevent dropped events. It schedules response writes, errors, and completion handlers on the Vert.x event loop thread using runOnContext, and switches from single-request backpressure to requesting all events upfront (Long.MAX_VALUE). Additionally, the buffer flush delay in EventConsumer is reduced. The review feedback recommends completely removing the remaining sleep delay (BUFFER_FLUSH_DELAY_MS) in EventConsumer, as blocking threads is an anti-pattern in reactive applications and is redundant given Vert.x's event loop guarantees.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread server-common/src/main/java/org/a2aproject/sdk/server/events/EventConsumer.java Outdated
@ehsavoie ehsavoie force-pushed the flaky_multiple_messages_test_fix branch from daa12e3 to d3d7ed0 Compare June 2, 2026 13:31
@ehsavoie ehsavoie force-pushed the flaky_multiple_messages_test_fix branch 2 times, most recently from 9562f66 to 52bc280 Compare June 3, 2026 06:47
@kabir
Copy link
Copy Markdown
Collaborator

kabir commented Jun 3, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the Server-Sent Events (SSE) streaming implementation by consolidating the duplicate MultiSseSupport inner classes into a single shared utility class, SseResponseWriter. It also removes a temporary thread sleep workaround in EventConsumer and resolves a concurrency race condition in MainEventBusProcessorExceptionTest by synchronizing access to the log appender. The review feedback suggests marking the upstream subscription field in SseResponseWriter as volatile to guarantee cross-thread visibility, and combining the initial stream kickstart write with the first event write to reduce socket overhead and prevent duplicate failure handling.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

@ehsavoie ehsavoie force-pushed the flaky_multiple_messages_test_fix branch 2 times, most recently from b87e718 to 771a6d7 Compare June 3, 2026 19:10
@kabir
Copy link
Copy Markdown
Collaborator

kabir commented Jun 3, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request consolidates Server-Sent Events (SSE) streaming logic by extracting duplicate MultiSseSupport inner classes into a new shared utility, SseResponseWriter. It also updates documentation in EventConsumer and resolves a test race condition in MainEventBusProcessorExceptionTest by synchronizing log access. The review feedback points out that SseResponseWriter still uses the old request(1) backpressure model and bytesWritten check instead of the intended request(Long.MAX_VALUE) and headersSet flag, and that BUFFER_FLUSH_DELAY_MS in EventConsumer should be reduced to 50 ms.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

@ehsavoie ehsavoie force-pushed the flaky_multiple_messages_test_fix branch from 771a6d7 to 542b500 Compare June 4, 2026 07:54
…ng.MAX_VALUE)

- Replace the MultiSseSupport inner class with a shared SseResponseWriter utility.
  Switch from request(1) backpressure to request(Long.MAX_VALUE) with a headersSet
  flag for first-write detection, combining the SSE kickstart comment and the first
  event into a single write to avoid duplicate error callbacks.

 - Restore BUFFER_FLUSH_DELAY_MS in EventConsumer: Mutiny's internal demand management
  can fire request(1) on the underlying publisher independently of the write callback,
  causing onComplete to race with a pending response.write(). The sleep ensures every
  write callback confirms delivery before tube.complete() is called.

Signed-off-by: Emmanuel Hugonnet <ehugonne@redhat.com>
@ehsavoie ehsavoie force-pushed the flaky_multiple_messages_test_fix branch from 542b500 to 14e2863 Compare June 4, 2026 09:31
@kabir kabir merged commit 5f20bf8 into a2aproject:main Jun 4, 2026
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants