diff --git a/compat-0.3/reference/jsonrpc/src/main/java/org/a2aproject/sdk/compat03/server/apps/quarkus/A2AServerRoutes_v0_3.java b/compat-0.3/reference/jsonrpc/src/main/java/org/a2aproject/sdk/compat03/server/apps/quarkus/A2AServerRoutes_v0_3.java index de83415b8..6264e7091 100644 --- a/compat-0.3/reference/jsonrpc/src/main/java/org/a2aproject/sdk/compat03/server/apps/quarkus/A2AServerRoutes_v0_3.java +++ b/compat-0.3/reference/jsonrpc/src/main/java/org/a2aproject/sdk/compat03/server/apps/quarkus/A2AServerRoutes_v0_3.java @@ -2,7 +2,6 @@ import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; -import static jakarta.ws.rs.core.MediaType.SERVER_SENT_EVENTS; import static org.a2aproject.sdk.compat03.transport.jsonrpc.context.JSONRPCContextKeys_v0_3.HEADERS_KEY; import static org.a2aproject.sdk.compat03.transport.jsonrpc.context.JSONRPCContextKeys_v0_3.METHOD_NAME_KEY; @@ -24,11 +23,6 @@ import io.quarkus.security.ForbiddenException; import io.quarkus.security.UnauthorizedException; import io.smallrye.mutiny.Multi; -import io.vertx.core.AsyncResult; -import io.vertx.core.Handler; -import io.vertx.core.MultiMap; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.HttpServerResponse; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.BodyHandler; @@ -66,12 +60,11 @@ import org.a2aproject.sdk.server.ServerCallContext; import org.a2aproject.sdk.server.auth.UnauthenticatedUser; import org.a2aproject.sdk.server.auth.User; +import org.a2aproject.sdk.server.common.quarkus.SseResponseWriter; import org.a2aproject.sdk.server.common.quarkus.VertxSecurityHelper; import org.a2aproject.sdk.server.extensions.A2AExtensions; import org.a2aproject.sdk.spec.AgentCard; import org.jspecify.annotations.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @Singleton public class A2AServerRoutes_v0_3 { @@ -79,7 +72,7 @@ public class A2AServerRoutes_v0_3 { @Inject JSONRPCHandler_v0_3 jsonRpcHandler; - // Hook so testing can wait until the MultiSseSupport is subscribed. + // Hook so testing can wait until the SSE subscriber is attached. // Without this we get intermittent failures private static volatile @Nullable Runnable streamingMultiSseSupportSubscribedRunnable; @@ -206,7 +199,7 @@ public void invokeJSONRPCHandler(String body, RoutingContext rc) { AtomicLong eventIdCounter = new AtomicLong(0); Multi sseEvents = streamingResponse .map(response -> formatSseEvent(response, eventIdCounter.getAndIncrement())); - MultiSseSupport.writeSseStrings(sseEvents, rc, context); + SseResponseWriter.writeSseStrings(sseEvents, rc, context, streamingMultiSseSupportSubscribedRunnable); } else { rc.response() .setStatusCode(200) @@ -335,81 +328,4 @@ public String getUsername() { } } - private static class MultiSseSupport { - private static final Logger logger = LoggerFactory.getLogger(MultiSseSupport.class); - - private MultiSseSupport() { - // Avoid direct instantiation. - } - - public static void writeSseStrings(Multi sseStrings, RoutingContext rc, ServerCallContext context) { - HttpServerResponse response = rc.response(); - - sseStrings.subscribe().withSubscriber(new Flow.Subscriber() { - Flow.@Nullable Subscription upstream; - - @Override - public void onSubscribe(Flow.Subscription subscription) { - this.upstream = subscription; - this.upstream.request(1); - - response.closeHandler(v -> { - logger.info("SSE connection closed by client, calling EventConsumer.cancel() to stop polling loop"); - context.invokeEventConsumerCancelCallback(); - subscription.cancel(); - }); - - // Notify tests that we are subscribed - Runnable runnable = streamingMultiSseSupportSubscribedRunnable; - if (runnable != null) { - runnable.run(); - } - } - - @Override - public void onNext(String sseEvent) { - if (response.bytesWritten() == 0) { - MultiMap headers = response.headers(); - if (headers.get(CONTENT_TYPE) == null) { - headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); - } - headers.set("Cache-Control", "no-cache"); - headers.set("X-Accel-Buffering", "no"); - response.setChunked(true); - response.setWriteQueueMaxSize(1); - response.write(": SSE stream started\n\n"); - } - - response.write(Buffer.buffer(sseEvent), new Handler>() { - @Override - public void handle(AsyncResult ar) { - if (ar.failed()) { - java.util.Objects.requireNonNull(upstream).cancel(); - rc.fail(ar.cause()); - } else { - java.util.Objects.requireNonNull(upstream).request(1); - } - } - }); - } - - @Override - public void onError(Throwable throwable) { - java.util.Objects.requireNonNull(upstream).cancel(); - rc.fail(throwable); - } - - @Override - public void onComplete() { - if (response.bytesWritten() == 0) { - MultiMap headers = response.headers(); - if (headers.get(CONTENT_TYPE) == null) { - headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); - } - } - response.end(); - } - }); - } - } } diff --git a/compat-0.3/reference/rest/src/main/java/org/a2aproject/sdk/compat03/server/rest/quarkus/A2AServerRoutes_v0_3.java b/compat-0.3/reference/rest/src/main/java/org/a2aproject/sdk/compat03/server/rest/quarkus/A2AServerRoutes_v0_3.java index 9387024a9..f00795328 100644 --- a/compat-0.3/reference/rest/src/main/java/org/a2aproject/sdk/compat03/server/rest/quarkus/A2AServerRoutes_v0_3.java +++ b/compat-0.3/reference/rest/src/main/java/org/a2aproject/sdk/compat03/server/rest/quarkus/A2AServerRoutes_v0_3.java @@ -2,7 +2,6 @@ import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; -import static jakarta.ws.rs.core.MediaType.SERVER_SENT_EVENTS; import static org.a2aproject.sdk.compat03.transport.rest.context.RestContextKeys_v0_3.HEADERS_KEY; import static org.a2aproject.sdk.compat03.transport.rest.context.RestContextKeys_v0_3.METHOD_NAME_KEY; @@ -10,7 +9,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -24,11 +22,7 @@ import io.quarkus.security.ForbiddenException; import io.quarkus.security.UnauthorizedException; import io.smallrye.mutiny.Multi; -import io.vertx.core.AsyncResult; import io.vertx.core.Handler; -import io.vertx.core.MultiMap; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.HttpServerResponse; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.BodyHandler; @@ -53,12 +47,11 @@ import org.a2aproject.sdk.server.ServerCallContext; import org.a2aproject.sdk.server.auth.UnauthenticatedUser; import org.a2aproject.sdk.server.auth.User; +import org.a2aproject.sdk.server.common.quarkus.SseResponseWriter; import org.a2aproject.sdk.server.common.quarkus.VertxSecurityHelper; import org.a2aproject.sdk.server.extensions.A2AExtensions; import org.a2aproject.sdk.spec.AgentCard; import org.jspecify.annotations.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @Singleton public class A2AServerRoutes_v0_3 { @@ -66,7 +59,7 @@ public class A2AServerRoutes_v0_3 { @Inject RestHandler_v0_3 jsonRestHandler; - // Hook so testing can wait until the MultiSseSupport is subscribed. + // Hook so testing can wait until the SSE subscriber is attached. // Without this we get intermittent failures private static volatile @Nullable Runnable streamingMultiSseSupportSubscribedRunnable; @@ -202,7 +195,7 @@ public void sendMessageStreaming(String body, RoutingContext rc) { AtomicLong eventIdCounter = new AtomicLong(0); Multi sseEvents = Multi.createFrom().publisher(streamingResponse.getPublisher()) .map(json -> formatSseEvent(json, eventIdCounter.getAndIncrement())); - MultiSseSupport.writeSseStrings(sseEvents, rc, context); + SseResponseWriter.writeSseStrings(sseEvents, rc, context, streamingMultiSseSupportSubscribedRunnable); } } } @@ -302,7 +295,7 @@ public void resubscribeTask(RoutingContext rc) { AtomicLong eventIdCounter = new AtomicLong(0); Multi sseEvents = Multi.createFrom().publisher(streamingResponse.getPublisher()) .map(json -> formatSseEvent(json, eventIdCounter.getAndIncrement())); - MultiSseSupport.writeSseStrings(sseEvents, rc, context); + SseResponseWriter.writeSseStrings(sseEvents, rc, context, streamingMultiSseSupportSubscribedRunnable); } } } @@ -463,81 +456,4 @@ private static boolean hasNonDefaultV10AgentCard() { return false; } - private static class MultiSseSupport { - private static final Logger logger = LoggerFactory.getLogger(MultiSseSupport.class); - - private MultiSseSupport() { - // Avoid direct instantiation. - } - - public static void writeSseStrings(Multi sseStrings, RoutingContext rc, ServerCallContext context) { - HttpServerResponse response = rc.response(); - - sseStrings.subscribe().withSubscriber(new Flow.Subscriber() { - Flow.@Nullable Subscription upstream; - - @Override - public void onSubscribe(Flow.Subscription subscription) { - this.upstream = subscription; - this.upstream.request(1); - - response.closeHandler(v -> { - logger.debug("REST SSE connection closed by client, calling EventConsumer.cancel() to stop polling loop"); - context.invokeEventConsumerCancelCallback(); - subscription.cancel(); - }); - - // Notify tests that we are subscribed - Runnable runnable = streamingMultiSseSupportSubscribedRunnable; - if (runnable != null) { - runnable.run(); - } - } - - @Override - public void onNext(String sseEvent) { - if (response.bytesWritten() == 0) { - MultiMap headers = response.headers(); - if (headers.get(CONTENT_TYPE) == null) { - headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); - } - headers.set("Cache-Control", "no-cache"); - headers.set("X-Accel-Buffering", "no"); - response.setChunked(true); - response.setWriteQueueMaxSize(1); - response.write(": SSE stream started\n\n"); - } - - response.write(Buffer.buffer(sseEvent), new Handler>() { - @Override - public void handle(AsyncResult ar) { - if (ar.failed()) { - java.util.Objects.requireNonNull(upstream).cancel(); - rc.fail(ar.cause()); - } else { - java.util.Objects.requireNonNull(upstream).request(1); - } - } - }); - } - - @Override - public void onError(Throwable throwable) { - java.util.Objects.requireNonNull(upstream).cancel(); - rc.fail(throwable); - } - - @Override - public void onComplete() { - if (response.bytesWritten() == 0) { - MultiMap headers = response.headers(); - if (headers.get(CONTENT_TYPE) == null) { - headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); - } - } - response.end(); - } - }); - } - } } diff --git a/reference/common/src/main/java/org/a2aproject/sdk/server/common/quarkus/SseResponseWriter.java b/reference/common/src/main/java/org/a2aproject/sdk/server/common/quarkus/SseResponseWriter.java new file mode 100644 index 000000000..f112ab46e --- /dev/null +++ b/reference/common/src/main/java/org/a2aproject/sdk/server/common/quarkus/SseResponseWriter.java @@ -0,0 +1,149 @@ +package org.a2aproject.sdk.server.common.quarkus; + +import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; + +import java.util.Objects; +import java.util.concurrent.Flow; + +import org.jspecify.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.smallrye.mutiny.Multi; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.ext.web.RoutingContext; + +import org.a2aproject.sdk.server.ServerCallContext; +import org.a2aproject.sdk.server.events.EventConsumer; + +/** + * Utility for writing SSE (Server-Sent Events) responses over Vert.x HTTP. + * + *

Events are requested all upfront ({@code request(Long.MAX_VALUE)}) so that + * back-to-back emissions from the EventConsumer are never dropped by a stalled + * single-item demand window. This means the {@link EventConsumer}'s internal buffer + * (256 items) acts as the only bound — write-level backpressure is not applied. + * Ordering between the final {@code response.write()} and {@code response.end()} is + * preserved by {@code EventConsumer.BUFFER_FLUSH_DELAY_MS}: the EventConsumer waits + * briefly after sending the final event before calling {@code tube.complete()}, which + * ensures every write callback has confirmed delivery before {@code onComplete} is + * delivered to this subscriber. + */ +public final class SseResponseWriter { + + private static final Logger logger = LoggerFactory.getLogger(SseResponseWriter.class); + private static final String SERVER_SENT_EVENTS = "text/event-stream"; + + private SseResponseWriter() { + // Utility class — no instances. + } + + /** + * Subscribes to {@code sseStrings} and writes each SSE event to the HTTP response. + * + *

Error handling: + *

    + *
  • Client disconnect → cancels upstream, stops polling
  • + *
  • Write failure → cancels upstream, fails routing context
  • + *
  • Stream error → cancels upstream, fails routing context
  • + *
+ * + * @param sseStrings the SSE-formatted event stream + * @param rc the Vert.x routing context + * @param context the A2A server call context (for EventConsumer cancellation) + * @param onSubscribedHook optional hook invoked once the subscriber is attached; used by tests + */ + public static void writeSseStrings( + Multi sseStrings, + RoutingContext rc, + ServerCallContext context, + @Nullable Runnable onSubscribedHook) { + HttpServerResponse response = rc.response(); + + sseStrings.subscribe().withSubscriber(new Flow.Subscriber() { + // Written in onSubscribe (EventConsumer / subscription thread), read inside + // the write-failure callback (event loop thread) — volatile for visibility. + volatile Flow.@Nullable Subscription upstream; + // onNext and onComplete both run on the same EventConsumer polling thread, + // so no volatile needed for headersSet. + boolean headersSet = false; + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.upstream = subscription; + // Request all events upfront: the EventConsumer's BUFFER_FLUSH_DELAY_MS + // sleep between tube.send(finalEvent) and tube.complete() guarantees that + // every write callback confirms delivery before onComplete fires, so + // response.end() is always called after the data is in flight. + this.upstream.request(Long.MAX_VALUE); + + response.closeHandler(v -> { + logger.info("SSE connection closed by client, calling EventConsumer.cancel() to stop polling loop"); + context.invokeEventConsumerCancelCallback(); + subscription.cancel(); + }); + + if (onSubscribedHook != null) { + onSubscribedHook.run(); + } + } + + @Override + public void onNext(String sseEvent) { + Buffer data; + if (!headersSet) { + headersSet = true; + MultiMap headers = response.headers(); + if (headers.get(CONTENT_TYPE) == null) { + headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); + } + headers.set("Cache-Control", "no-cache"); + headers.set("X-Accel-Buffering", "no"); // disables nginx proxy buffering + response.setChunked(true); + response.setWriteQueueMaxSize(1); // Vert.x default buffering breaks SSE flushing + + // Merge kickstart comment into first event to avoid an orphaned async write + // that could race with the error callback of the data write. + data = Buffer.buffer(": SSE stream started\n\n").appendBuffer(Buffer.buffer(sseEvent)); + } else { + data = Buffer.buffer(sseEvent); + } + + response.write(data, ar -> { + if (ar.failed() && !rc.failed()) { + // NullAway: upstream is guaranteed non-null after onSubscribe + Objects.requireNonNull(upstream).cancel(); + rc.fail(ar.cause()); + } + }); + } + + @Override + public void onError(Throwable throwable) { + // NullAway: upstream is guaranteed non-null after onSubscribe + Objects.requireNonNull(upstream).cancel(); + if (!rc.failed()) { + rc.fail(throwable); + } + } + + @Override + public void onComplete() { + if (!headersSet) { + MultiMap headers = response.headers(); + if (headers.get(CONTENT_TYPE) == null) { + headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); + } + } + // Guard against duplicate end() if the client disconnected concurrently + if (!response.ended()) { + response.end(); + } + } + }); + } +} diff --git a/reference/common/src/test/java/org/a2aproject/sdk/server/common/quarkus/SseResponseWriterTest.java b/reference/common/src/test/java/org/a2aproject/sdk/server/common/quarkus/SseResponseWriterTest.java new file mode 100644 index 000000000..6cd11bbcd --- /dev/null +++ b/reference/common/src/test/java/org/a2aproject/sdk/server/common/quarkus/SseResponseWriterTest.java @@ -0,0 +1,192 @@ +package org.a2aproject.sdk.server.common.quarkus; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import io.smallrye.mutiny.Multi; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.ext.web.RoutingContext; +import org.a2aproject.sdk.server.ServerCallContext; +import org.a2aproject.sdk.server.auth.UnauthenticatedUser; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +// LENIENT: setUp stubs are shared convenience for write tests but not all tests need them all +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class SseResponseWriterTest { + + @Mock + HttpServerResponse response; + @Mock + RoutingContext rc; + @Mock + MultiMap headers; + + private ServerCallContext context; + + @BeforeEach + void setUp() { + when(rc.response()).thenReturn(response); + when(response.headers()).thenReturn(headers); + when(response.setChunked(true)).thenReturn(response); + successfulWrite(); + when(response.ended()).thenReturn(false); + context = new ServerCallContext(UnauthenticatedUser.INSTANCE, Map.of(), Set.of()); + } + + @Test + void emptyStream_callsEndWithoutAnyWrite() { + SseResponseWriter.writeSseStrings(Multi.createFrom().empty(), rc, context, null); + + verify(response, never()).write(any(Buffer.class), any()); + verify(response).end(); + } + + @Test + void singleEvent_setsHeadersAndWritesKickstartPlusData() { + List written = captureWrites(); + + SseResponseWriter.writeSseStrings(Multi.createFrom().item("data: hello\n\n"), rc, context, null); + + verify(response, times(1)).write(any(Buffer.class), any()); + verify(response).end(); + // First (and only) write must contain the SSE kickstart comment + assertTrue(written.get(0).toString().contains(": SSE stream started"), + "First write should include SSE kickstart comment"); + assertTrue(written.get(0).toString().contains("data: hello"), + "First write should include the event data"); + } + + @Test + void multipleEvents_kickstartOnlyOnFirstWrite() { + List written = captureWrites(); + + SseResponseWriter.writeSseStrings( + Multi.createFrom().items("data: first\n\n", "data: second\n\n"), + rc, context, null); + + verify(response, times(2)).write(any(Buffer.class), any()); + assertTrue(written.get(0).toString().contains(": SSE stream started"), + "First write should include SSE kickstart comment"); + assertFalse(written.get(1).toString().contains(": SSE stream started"), + "Subsequent writes must not repeat the kickstart comment"); + } + + @Test + void writeFails_failsRoutingContext() { + failingWrite(new RuntimeException("network error")); + when(rc.failed()).thenReturn(false); + + SseResponseWriter.writeSseStrings(Multi.createFrom().item("data: hello\n\n"), rc, context, null); + + verify(rc).fail(any(Throwable.class)); + } + + @Test + void clientDisconnect_invokesEventConsumerCancelAndCancelsSubscription() { + AtomicReference> capturedCloseHandler = new AtomicReference<>(); + doAnswer(inv -> { + capturedCloseHandler.set(inv.getArgument(0)); + return response; + }).when(response).closeHandler(any()); + + // never() emits nothing and does not complete — subscriber stays attached + SseResponseWriter.writeSseStrings(Multi.createFrom().nothing(), rc, context, null); + + // Simulate client disconnect on the event-loop thread + capturedCloseHandler.get().handle(null); + + // EventConsumer.cancel() must be called so the polling loop stops + // (verified indirectly: ServerCallContext.invokeEventConsumerCancelCallback() is a no-op + // when no callback is registered, so no exception means the path was exercised) + verify(response).closeHandler(any()); + } + + @Test + void onSubscribedHook_isCalledAfterSubscribe() { + Runnable hook = mock(Runnable.class); + + SseResponseWriter.writeSseStrings(Multi.createFrom().empty(), rc, context, hook); + + verify(hook).run(); + } + + @Test + void responseAlreadyEnded_endIsNotCalledAgain() { + when(response.ended()).thenReturn(true); + + SseResponseWriter.writeSseStrings(Multi.createFrom().empty(), rc, context, null); + + verify(response, never()).end(); + } + + // --- helpers --- + + /** Configures the response mock to invoke write callbacks with a successful result. */ + private void successfulWrite() { + doAnswer(inv -> { + AsyncResult ok = successResult(); + inv.>>getArgument(1).handle(ok); + return response; + }).when(response).write(any(Buffer.class), any()); + } + + /** Configures the response mock to invoke write callbacks with a failure. */ + private void failingWrite(Throwable cause) { + doAnswer(inv -> { + AsyncResult fail = failResult(cause); + inv.>>getArgument(1).handle(fail); + return response; + }).when(response).write(any(Buffer.class), any()); + } + + /** Captures every Buffer passed to response.write() and still invokes the success callback. */ + private List captureWrites() { + List written = new ArrayList<>(); + doAnswer(inv -> { + written.add(inv.getArgument(0)); + AsyncResult ok = successResult(); + inv.>>getArgument(1).handle(ok); + return response; + }).when(response).write(any(Buffer.class), any()); + return written; + } + + @SuppressWarnings("unchecked") + private static AsyncResult successResult() { + AsyncResult r = mock(AsyncResult.class); + when(r.failed()).thenReturn(false); + return r; + } + + @SuppressWarnings("unchecked") + private static AsyncResult failResult(Throwable cause) { + AsyncResult r = mock(AsyncResult.class); + when(r.failed()).thenReturn(true); + when(r.cause()).thenReturn(cause); + return r; + } +} diff --git a/reference/jsonrpc/src/main/java/org/a2aproject/sdk/server/apps/quarkus/A2AServerRoutes.java b/reference/jsonrpc/src/main/java/org/a2aproject/sdk/server/apps/quarkus/A2AServerRoutes.java index c313e450c..1f6bc0e23 100644 --- a/reference/jsonrpc/src/main/java/org/a2aproject/sdk/server/apps/quarkus/A2AServerRoutes.java +++ b/reference/jsonrpc/src/main/java/org/a2aproject/sdk/server/apps/quarkus/A2AServerRoutes.java @@ -2,7 +2,6 @@ import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; -import static jakarta.ws.rs.core.MediaType.SERVER_SENT_EVENTS; import static org.a2aproject.sdk.server.ServerCallContext.TRANSPORT_KEY; import static org.a2aproject.sdk.transport.jsonrpc.context.JSONRPCContextKeys.HEADERS_KEY; import static org.a2aproject.sdk.transport.jsonrpc.context.JSONRPCContextKeys.METHOD_NAME_KEY; @@ -26,11 +25,6 @@ import io.quarkus.security.ForbiddenException; import io.quarkus.security.UnauthorizedException; import io.smallrye.mutiny.Multi; -import io.vertx.core.AsyncResult; -import io.vertx.core.Handler; -import io.vertx.core.MultiMap; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.HttpServerResponse; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.BodyHandler; @@ -71,6 +65,7 @@ import org.a2aproject.sdk.server.ServerCallContext; import org.a2aproject.sdk.server.auth.UnauthenticatedUser; import org.a2aproject.sdk.server.auth.User; +import org.a2aproject.sdk.server.common.quarkus.SseResponseWriter; import org.a2aproject.sdk.server.common.quarkus.VertxSecurityHelper; import org.a2aproject.sdk.server.extensions.A2AExtensions; import org.a2aproject.sdk.server.util.async.Internal; @@ -81,8 +76,7 @@ import org.a2aproject.sdk.spec.TransportProtocol; import org.a2aproject.sdk.spec.UnsupportedOperationError; import org.a2aproject.sdk.transport.jsonrpc.handler.JSONRPCHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.jspecify.annotations.Nullable; /** * Quarkus routing configuration for JSON-RPC A2A protocol requests. @@ -169,7 +163,7 @@ * * @see JSONRPCHandler * @see CallContextFactory - * @see MultiSseSupport + * @see SseResponseWriter */ @Singleton public class A2AServerRoutes { @@ -180,9 +174,10 @@ public class A2AServerRoutes { @Inject AgentCardCacheMetadata cacheMetadata; - // Hook so testing can wait until the MultiSseSupport is subscribed. + // Hook so testing can wait until the SSE subscriber is attached. // Without this we get intermittent failures - private static volatile Runnable streamingMultiSseSupportSubscribedRunnable; + private static volatile @Nullable + Runnable streamingMultiSseSupportSubscribedRunnable; @Inject @Internal @@ -350,13 +345,13 @@ public void invokeJSONRPCHandler(String body, RoutingContext rc) { } else if (streaming) { // Convert Multi to Multi with SSE formatting // CRITICAL: Subscribe synchronously to avoid race condition where EventConsumer - // starts emitting events before MultiSseSupport subscribes. The executor.execute() + // starts emitting events before the SSE subscriber is attached. The executor.execute() // wrapper caused 100-600ms delays before subscription, causing events to be lost. AtomicLong eventIdCounter = new AtomicLong(0); Multi sseEvents = streamingResponse .map(response -> SseFormatter.formatResponseAsSSE(response, eventIdCounter.getAndIncrement())); // Write SSE-formatted strings to HTTP response - MultiSseSupport.writeSseStrings(sseEvents, rc, context); + SseResponseWriter.writeSseStrings(sseEvents, rc, context, streamingMultiSseSupportSubscribedRunnable); } else { rc.response() @@ -728,165 +723,4 @@ private static com.google.protobuf.MessageOrBuilder convertToProto(A2AResponseThis class handles the HTTP-specific aspects of SSE streaming, including: - *
    - *
  • Writing SSE-formatted events to the HTTP response
  • - *
  • Managing backpressure through reactive streams
  • - *
  • Detecting client disconnections and canceling upstream
  • - *
  • Setting appropriate HTTP headers for SSE
  • - *
- * - *

SSE Format

- *

Events are written in Server-Sent Events format: - *

-     * id: 0
-     * data: {"jsonrpc":"2.0","id":"req-123","result":{...}}
-     *
-     * id: 1
-     * data: {"jsonrpc":"2.0","id":"req-123","result":{...}}
-     * 
- * - *

Backpressure Handling

- *

Uses reactive streams subscription to handle backpressure: - *

    - *
  1. Request 1 event from upstream
  2. - *
  3. Write event to HTTP response
  4. - *
  5. Wait for write completion
  6. - *
  7. Request next event (backpressure)
  8. - *
- * - *

Disconnect Handling

- *

When the client disconnects: - *

    - *
  1. Vert.x closeHandler fires
  2. - *
  3. Invokes {@link ServerCallContext#invokeEventConsumerCancelCallback()}
  4. - *
  5. Cancels upstream subscription
  6. - *
  7. Stops event polling
  8. - *
- * - * @see SseFormatter - */ - private static class MultiSseSupport { - private static final Logger logger = LoggerFactory.getLogger(MultiSseSupport.class); - - private MultiSseSupport() { - // Avoid direct instantiation. - } - - /** - * Writes SSE-formatted strings to the HTTP response with backpressure handling. - * - *

This method subscribes to the SSE event stream and writes each event to the - * HTTP response, managing backpressure through the reactive streams subscription. - * - *

Subscription Flow: - *

    - *
  1. Subscribe to SSE stream
  2. - *
  3. Register disconnect handler
  4. - *
  5. Request first event
  6. - *
  7. Write event to response
  8. - *
  9. Wait for write completion
  10. - *
  11. Request next event (backpressure)
  12. - *
- * - *

HTTP Headers: - *

Sets {@code Content-Type: text/event-stream} on first write - * - *

Error Handling: - *

    - *
  • Client disconnect → cancel upstream, stop polling
  • - *
  • Write failure → cancel upstream, fail routing context
  • - *
  • Stream error → cancel upstream, fail routing context
  • - *
- * - * @param sseStrings Multi stream of SSE-formatted strings from {@link SseFormatter} - * @param rc the Vert.x routing context - * @param context the A2A server call context (for EventConsumer cancellation) - * @see SseFormatter#formatResponseAsSSE - */ - public static void writeSseStrings(Multi sseStrings, RoutingContext rc, ServerCallContext context) { - HttpServerResponse response = rc.response(); - - sseStrings.subscribe().withSubscriber(new Flow.Subscriber() { - Flow.Subscription upstream; - - @Override - public void onSubscribe(Flow.Subscription subscription) { - this.upstream = subscription; - this.upstream.request(1); - - // Detect client disconnect and call EventConsumer.cancel() directly - response.closeHandler(v -> { - logger.info("SSE connection closed by client, calling EventConsumer.cancel() to stop polling loop"); - context.invokeEventConsumerCancelCallback(); - subscription.cancel(); - }); - - // Notify tests that we are subscribed - Runnable runnable = streamingMultiSseSupportSubscribedRunnable; - if (runnable != null) { - runnable.run(); - } - } - - @Override - public void onNext(String sseEvent) { - // Set SSE headers on first event - if (response.bytesWritten() == 0) { - MultiMap headers = response.headers(); - if (headers.get(CONTENT_TYPE) == null) { - headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); - } - // Additional SSE headers to prevent buffering - headers.set("Cache-Control", "no-cache"); - headers.set("X-Accel-Buffering", "no"); // Disable nginx buffering - response.setChunked(true); - - // CRITICAL: Disable write queue max size to prevent buffering - // Vert.x buffers writes by default - we need immediate flushing for SSE - response.setWriteQueueMaxSize(1); - - // Send initial SSE comment to kickstart the stream - response.write(": SSE stream started\n\n"); - } - - // Write SSE-formatted string to response - response.write(Buffer.buffer(sseEvent), new Handler>() { - @Override - public void handle(AsyncResult ar) { - if (ar.failed()) { - // Client disconnected or write failed - cancel upstream to stop EventConsumer - upstream.cancel(); - rc.fail(ar.cause()); - } else { - upstream.request(1); - } - } - }); - } - - @Override - public void onError(Throwable throwable) { - // Cancel upstream to stop EventConsumer when error occurs - upstream.cancel(); - rc.fail(throwable); - } - - @Override - public void onComplete() { - if (response.bytesWritten() == 0) { - // No events written - still set SSE content type - MultiMap headers = response.headers(); - if (headers.get(CONTENT_TYPE) == null) { - headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); - } - } - response.end(); - } - }); - } - } } diff --git a/reference/rest/src/main/java/org/a2aproject/sdk/server/rest/quarkus/A2AServerRoutes.java b/reference/rest/src/main/java/org/a2aproject/sdk/server/rest/quarkus/A2AServerRoutes.java index f033300fe..1fc25b08d 100644 --- a/reference/rest/src/main/java/org/a2aproject/sdk/server/rest/quarkus/A2AServerRoutes.java +++ b/reference/rest/src/main/java/org/a2aproject/sdk/server/rest/quarkus/A2AServerRoutes.java @@ -7,7 +7,6 @@ import static org.a2aproject.sdk.transport.rest.context.RestContextKeys.METHOD_NAME_KEY; import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; -import static jakarta.ws.rs.core.MediaType.SERVER_SENT_EVENTS; import java.util.HashMap; import java.util.List; @@ -41,6 +40,7 @@ import org.a2aproject.sdk.transport.rest.handler.RestHandler; import org.a2aproject.sdk.transport.rest.handler.RestHandler.HTTPRestResponse; import org.a2aproject.sdk.transport.rest.handler.RestHandler.HTTPRestStreamingResponse; +import org.a2aproject.sdk.server.common.quarkus.SseResponseWriter; import org.a2aproject.sdk.server.common.quarkus.VertxSecurityHelper; import org.a2aproject.sdk.util.Utils; import io.quarkus.security.Authenticated; @@ -48,11 +48,7 @@ import io.quarkus.security.UnauthorizedException; import io.smallrye.mutiny.Multi; import jakarta.annotation.security.PermitAll; -import io.vertx.core.AsyncResult; import io.vertx.core.Handler; -import io.vertx.core.MultiMap; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.HttpServerResponse; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.BodyHandler; @@ -98,7 +94,7 @@ * *

Streaming Support

*

Streaming endpoints ({@code message:stream}, {@code subscribe}) use Server-Sent Events (SSE) - * via the inner {@link MultiSseSupport} class. SSE responses are handled by: + * via {@link SseResponseWriter}. SSE responses are handled by: *

    *
  • Converting {@link Flow.Publisher} to Mutiny {@code Multi}
  • *
  • Formatting events with {@link SseFormatter}
  • @@ -136,7 +132,7 @@ public class A2AServerRoutes { @Inject RestHandler jsonRestHandler; - // Hook so testing can wait until the MultiSseSupport is subscribed. + // Hook so testing can wait until the SSE subscriber is attached. // Without this we get intermittent failures private static volatile @Nullable Runnable streamingMultiSseSupportSubscribedRunnable; @@ -356,13 +352,13 @@ public void sendMessageStreaming(String body, RoutingContext rc) { } else if (streamingResponse != null) { // Convert Flow.Publisher (JSON) to Multi (SSE-formatted) // CRITICAL: Subscribe synchronously to avoid race condition where EventConsumer - // starts emitting events before MultiSseSupport subscribes. The executor.execute() + // starts emitting events before the SSE subscriber is attached. The executor.execute() // wrapper caused 100-600ms delays before subscription, causing events to be lost. AtomicLong eventIdCounter = new AtomicLong(0); Multi sseEvents = Multi.createFrom().publisher(streamingResponse.getPublisher()) .map(json -> SseFormatter.formatJsonAsSSE(json, eventIdCounter.getAndIncrement())); // Write SSE-formatted strings to HTTP response - MultiSseSupport.writeSseStrings(sseEvents, rc, context); + SseResponseWriter.writeSseStrings(sseEvents, rc, context, streamingMultiSseSupportSubscribedRunnable); } } } @@ -566,13 +562,13 @@ public void subscribeToTask(RoutingContext rc) { } else if (streamingResponse != null) { // Convert Flow.Publisher (JSON) to Multi (SSE-formatted) // CRITICAL: Subscribe synchronously to avoid race condition where EventConsumer - // starts emitting events before MultiSseSupport subscribes. The executor.execute() + // starts emitting events before the SSE subscriber is attached. The executor.execute() // wrapper caused 100-600ms delays before subscription, causing events to be lost. AtomicLong eventIdCounter = new AtomicLong(0); Multi sseEvents = Multi.createFrom().publisher(streamingResponse.getPublisher()) .map(json -> SseFormatter.formatJsonAsSSE(json, eventIdCounter.getAndIncrement())); // Write SSE-formatted strings to HTTP response - MultiSseSupport.writeSseStrings(sseEvents, rc, context); + SseResponseWriter.writeSseStrings(sseEvents, rc, context, streamingMultiSseSupportSubscribedRunnable); } } } @@ -934,167 +930,5 @@ public String getUsername() { } } - /** - * Server-Sent Events (SSE) streaming support for Vert.x/Quarkus. - * - *

    This inner class handles the HTTP-specific aspects of SSE streaming: - *

      - *
    • Writing SSE-formatted events to the HTTP response
    • - *
    • Managing backpressure via {@link Flow.Subscription#request(long)}
    • - *
    • Detecting client disconnection and canceling upstream
    • - *
    • Setting appropriate SSE headers and chunked encoding
    • - *
    - * - *

    SSE Format

    - *

    Events are formatted by {@link SseFormatter} before being passed to this class. - * Each event follows the SSE specification: - *

    -     * id: 0
    -     * data: {"taskStatusUpdate":{...}}
    -     *
    -     * id: 1
    -     * data: {"taskArtifactUpdate":{...}}
    -     * 
    - * - *

    Backpressure Handling

    - *

    The subscriber requests one event at a time ({@code request(1)}) and only - * requests the next event after the previous write completes. This ensures the - * HTTP connection doesn't buffer excessive data if the client is slow. - * - *

    Disconnect Detection

    - *

    When the client closes the connection, this class: - *

      - *
    1. Calls {@link ServerCallContext#invokeEventConsumerCancelCallback()} to stop the event producer
    2. - *
    3. Cancels the upstream subscription to stop event generation
    4. - *
    - * - *

    Write Queue Configuration

    - *

    Critical: Sets {@code setWriteQueueMaxSize(1)} to force immediate flushing - * of each event. Without this, Vert.x buffers writes, causing delays in SSE delivery. - * - * @see SseFormatter - * @see ServerCallContext#invokeEventConsumerCancelCallback() - */ - private static class MultiSseSupport { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MultiSseSupport.class); - - private MultiSseSupport() { - // Avoid direct instantiation. - } - - /** - * Writes SSE-formatted event strings to the HTTP response with backpressure control. - * - *

    This method subscribes to the event stream and writes each SSE-formatted string - * to the Vert.x HTTP response. It implements reactive backpressure by requesting - * events one at a time and only requesting the next after the previous write completes. - * - *

    Execution Flow

    - *
      - *
    1. Subscribe to upstream {@code Multi} (SSE-formatted events)
    2. - *
    3. On first event: set SSE headers, disable buffering, write kickstart comment
    4. - *
    5. For each event: write to HTTP response asynchronously
    6. - *
    7. After write completes: request next event (backpressure control)
    8. - *
    9. On client disconnect or error: cancel upstream to stop event production
    10. - *
    - * - *

    Headers Set

    - *
      - *
    • {@code Content-Type: text/event-stream}
    • - *
    • {@code Cache-Control: no-cache}
    • - *
    • {@code X-Accel-Buffering: no} (disable nginx buffering)
    • - *
    • Chunked encoding enabled
    • - *
    - * - * @param sseStrings Multi stream of SSE-formatted strings (from {@link SseFormatter}) - * @param rc Vert.x routing context providing HTTP response - * @param context A2A server call context (for EventConsumer cancellation on disconnect) - */ - public static void writeSseStrings(Multi sseStrings, RoutingContext rc, ServerCallContext context) { - HttpServerResponse response = rc.response(); - - sseStrings.subscribe().withSubscriber(new Flow.Subscriber() { - Flow.@Nullable Subscription upstream; - - @Override - public void onSubscribe(Flow.Subscription subscription) { - this.upstream = subscription; - this.upstream.request(1); - - // Detect client disconnect and call EventConsumer.cancel() directly - response.closeHandler(v -> { - logger.debug("REST SSE connection closed by client, calling EventConsumer.cancel() to stop polling loop"); - context.invokeEventConsumerCancelCallback(); - subscription.cancel(); - }); - - // Notify tests that we are subscribed - Runnable runnable = streamingMultiSseSupportSubscribedRunnable; - if (runnable != null) { - runnable.run(); - } - } - - @Override - public void onNext(String sseEvent) { - // Set SSE headers on first event - if (response.bytesWritten() == 0) { - MultiMap headers = response.headers(); - if (headers.get(CONTENT_TYPE) == null) { - headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); - } - // Additional SSE headers to prevent buffering - headers.set("Cache-Control", "no-cache"); - headers.set("X-Accel-Buffering", "no"); // Disable nginx buffering - response.setChunked(true); - - // CRITICAL: Disable write queue max size to prevent buffering - // Vert.x buffers writes by default - we need immediate flushing for SSE - response.setWriteQueueMaxSize(1); // Force immediate flush - - // Send initial SSE comment to kickstart the stream - // This forces Vert.x to send headers and start the stream immediately - response.write(": SSE stream started\n\n"); - } - - // Write SSE-formatted string to response - response.write(Buffer.buffer(sseEvent), new Handler>() { - @Override - public void handle(AsyncResult ar) { - if (ar.failed()) { - // Client disconnected or write failed - cancel upstream to stop EventConsumer - // NullAway: upstream is guaranteed non-null after onSubscribe - java.util.Objects.requireNonNull(upstream).cancel(); - rc.fail(ar.cause()); - } else { - // NullAway: upstream is guaranteed non-null after onSubscribe - java.util.Objects.requireNonNull(upstream).request(1); - } - } - }); - } - - @Override - public void onError(Throwable throwable) { - // Cancel upstream to stop EventConsumer when error occurs - // NullAway: upstream is guaranteed non-null after onSubscribe - java.util.Objects.requireNonNull(upstream).cancel(); - rc.fail(throwable); - } - - @Override - public void onComplete() { - if (response.bytesWritten() == 0) { - // No events written - still set SSE content type - MultiMap headers = response.headers(); - if (headers.get(CONTENT_TYPE) == null) { - headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); - } - } - response.end(); - } - }); - } - } } diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/events/EventConsumer.java b/server-common/src/main/java/org/a2aproject/sdk/server/events/EventConsumer.java index aa357618f..18da23b22 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/events/EventConsumer.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/events/EventConsumer.java @@ -41,11 +41,12 @@ public class EventConsumer { private static final int MAX_AWAITING_FINAL_TIMEOUT_MS = 3000; private static final int MAX_POLL_TIMEOUTS_AWAITING_FINAL = (MAX_AWAITING_FINAL_TIMEOUT_MS + QUEUE_WAIT_MILLISECONDS - 1) / QUEUE_WAIT_MILLISECONDS; - // WORKAROUND: Sleep delay to allow SSE buffer flush before stream completion - // This is a temporary workaround for a race condition where tube.complete() can arrive - // before the final event is flushed from the SSE buffer. Ideally, this should be handled - // at the transport layer (e.g., MultiSseSupport) with proper write completion callbacks. - // TODO: Move buffer flush handling to transport layer to avoid this latency penalty + // Delay between tube.send(finalEvent) and tube.complete() to allow the SSE transport + // layer to flush the write before the stream-end signal arrives. Mutiny's internal + // demand management can call request(1) on the underlying publisher independently of + // the write callback, causing onComplete to fire before the HTTP response.write() + // callback confirms the data was sent. This sleep ensures the write callback fires + // first, so response.end() is only called after the data is safely in flight. private static final int BUFFER_FLUSH_DELAY_MS = 150; public EventConsumer(EventQueue queue, Executor executor) { @@ -229,12 +230,10 @@ public Flow.Publisher consumeAll() { queue.close(); LOGGER.debug("Queue closed, breaking loop for queue {}", System.identityHashCode(queue)); - // CRITICAL: Allow tube buffer to flush before calling tube.complete() - // tube.send() buffers events asynchronously. If we call tube.complete() immediately, - // the stream-end signal can reach the client BEFORE the buffered final event, - // causing the client to close the connection and never receive the final event. - // This is especially important in replicated scenarios where events arrive via Kafka - // and timing is less deterministic. + // Allow the SSE write callback to fire before calling tube.complete(). + // Mutiny's internal demand management can trigger onComplete independently + // of the write callback, causing response.end() to race with a pending + // response.write(). This delay ensures the write callback runs first. if (isFinalSent) { try { Thread.sleep(BUFFER_FLUSH_DELAY_MS); diff --git a/server-common/src/test/java/org/a2aproject/sdk/server/events/MainEventBusProcessorExceptionTest.java b/server-common/src/test/java/org/a2aproject/sdk/server/events/MainEventBusProcessorExceptionTest.java index 23264977b..d4e97fdc3 100644 --- a/server-common/src/test/java/org/a2aproject/sdk/server/events/MainEventBusProcessorExceptionTest.java +++ b/server-common/src/test/java/org/a2aproject/sdk/server/events/MainEventBusProcessorExceptionTest.java @@ -124,7 +124,7 @@ public void testTaskSerializationException_ConvertsToInternalError() throws Inte "Error message should mention serialization: " + error.getMessage()); // Assert: Verify ERROR level logging - boolean foundErrorLog = logAppender.list.stream() + boolean foundErrorLog = snapshotLogs().stream() .anyMatch(event -> event.getLevel() == Level.ERROR && event.getFormattedMessage().contains(TASK_ID) && event.getFormattedMessage().contains("serialization")); @@ -161,7 +161,7 @@ public void testTaskPersistenceException_ConvertsToInternalError() throws Interr "Error message should contain task ID: " + error.getMessage()); // Assert: Verify ERROR level logging - boolean foundErrorLog = logAppender.list.stream() + boolean foundErrorLog = snapshotLogs().stream() .anyMatch(event -> event.getLevel() == Level.ERROR && event.getFormattedMessage().contains(TASK_ID) && event.getFormattedMessage().contains("persistence failed")); @@ -220,7 +220,7 @@ public void onTaskFinalized(String taskId) { "Error should contain specific task ID: " + error.getMessage()); // Assert: Verify specific taskId appears in logs - boolean foundTaskIdInLog = logAppender.list.stream() + boolean foundTaskIdInLog = snapshotLogs().stream() .anyMatch(event -> event.getFormattedMessage().contains(specificTaskId)); assertTrue(foundTaskIdInLog, "Logs should contain specific task ID"); } @@ -236,6 +236,18 @@ private Task createTestTask() { .build(); } + /** + * Thread-safe snapshot of the log appender's current list. + * + *

    logAppender.doAppend() is synchronized(logAppender), so iterating + * logAppender.list without the same lock races with the processor thread. + */ + private List snapshotLogs() { + synchronized (logAppender) { + return List.copyOf(logAppender.list); + } + } + /** * Helper method to enqueue an event and capture what gets distributed to clients. * Uses MainEventBusProcessorCallback to wait for async processing.