From ece9af820e97a91cbd5c86948c2a93c12a422095 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Mon, 3 Nov 2025 15:13:23 +0000 Subject: [PATCH 1/3] fix: Wait for agent completion and ensure all events processed in blocking calls Fixes race condition where DefaultRequestHandler.onMessageSend() returns before all task events are fully processed and persisted to TaskStore, resulting in incomplete Task objects being returned to clients (missing artifacts, incorrect state). Root Cause: - Blocking calls interrupted immediately after first event and returned to client before background event consumption completed - Agent execution and event processing happened asynchronously in background - No synchronization to ensure all events were consumed and persisted before returning Task to client Solution (4-step process): 1. Wait for agent to finish enqueueing events (5s timeout) 2. Close the queue to signal consumption can complete (breaks dependency) 3. Wait for consumption to finish processing events (2s timeout) 4. Fetch final task state from TaskStore (has all artifacts and correct state) This ensures blocking calls return complete Task objects with all artifacts and correct state, including support for fire-and-forget tasks that never emit final state events. Added unit tests: - testBlockingFireAndForgetReturnsNonFinalTask: Validates fire-and-forget pattern - testBlockingCallReturnsCompleteTaskWithArtifacts: Ensures all artifacts included --- .../DefaultRequestHandler.java | 103 +++++++++- .../io/a2a/server/tasks/ResultAggregator.java | 41 ++-- .../AbstractA2ARequestHandlerTest.java | 3 +- .../DefaultRequestHandlerTest.java | 184 +++++++++++++++--- .../grpc/handler/GrpcHandlerTest.java | 12 +- .../jsonrpc/handler/JSONRPCHandlerTest.java | 21 +- 6 files changed, 306 insertions(+), 58 deletions(-) diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index bb85eea6e..5f0553510 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -21,6 +21,8 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; +import org.eclipse.microprofile.config.inject.ConfigProperty; + import io.a2a.server.ServerCallContext; import io.a2a.server.agentexecution.AgentExecutor; import io.a2a.server.agentexecution.RequestContext; @@ -64,6 +66,26 @@ public class DefaultRequestHandler implements RequestHandler { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRequestHandler.class); + /** + * Timeout in seconds to wait for agent execution to complete in blocking calls. + * This allows slow agents (LLM-based, data processing, external APIs) sufficient time. + * Configurable via: a2a.blocking.agent.timeout.seconds + * Default: 30 seconds + */ + @Inject + @ConfigProperty(name = "a2a.blocking.agent.timeout.seconds", defaultValue = "30") + int agentCompletionTimeoutSeconds; + + /** + * Timeout in seconds to wait for event consumption to complete in blocking calls. + * This ensures all events are processed and persisted before returning to client. + * Configurable via: a2a.blocking.consumption.timeout.seconds + * Default: 5 seconds + */ + @Inject + @ConfigProperty(name = "a2a.blocking.consumption.timeout.seconds", defaultValue = "5") + int consumptionCompletionTimeoutSeconds; + private final AgentExecutor agentExecutor; private final TaskStore taskStore; private final QueueManager queueManager; @@ -93,6 +115,19 @@ public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore, this.requestContextBuilder = () -> new SimpleRequestContextBuilder(taskStore, false); } + /** + * For testing + */ + public static DefaultRequestHandler create(AgentExecutor agentExecutor, TaskStore taskStore, + QueueManager queueManager, PushNotificationConfigStore pushConfigStore, + PushNotificationSender pushSender, Executor executor) { + DefaultRequestHandler handler = + new DefaultRequestHandler(agentExecutor, taskStore, queueManager, pushConfigStore, pushSender, executor); + handler.agentCompletionTimeoutSeconds = 5; + handler.consumptionCompletionTimeoutSeconds = 2; + return handler; + } + @Override public Task onGetTask(TaskQueryParams params, ServerCallContext context) throws JSONRPCError { LOGGER.debug("onGetTask {}", params.id()); @@ -192,6 +227,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId, mss.requestContext, queue); ResultAggregator.EventTypeAndInterrupt etai = null; + EventKind kind = null; // Declare outside try block so it's in scope for return try { // Create callback for push notifications during background event processing Runnable pushNotificationCallback = () -> sendPushNotification(taskId, resultAggregator); @@ -201,7 +237,10 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte // This callback must be added before we start consuming. Otherwise, // any errors thrown by the producerRunnable are not picked up by the consumer producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback()); - etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking, pushNotificationCallback); + + // Get agent future before consuming (for blocking calls to wait for agent completion) + CompletableFuture agentFuture = runningAgents.get(taskId); + etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking, pushNotificationCallback, agentFuture); if (etai == null) { LOGGER.debug("No result, throwing InternalError"); @@ -210,7 +249,63 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte interruptedOrNonBlocking = etai.interrupted(); LOGGER.debug("Was interrupted or non-blocking: {}", interruptedOrNonBlocking); - EventKind kind = etai.eventType(); + // For blocking calls that were interrupted (returned on first event), + // wait for agent execution and event processing BEFORE returning to client. + // This ensures the returned Task has all artifacts and current state. + // We do this HERE (not in ResultAggregator) to avoid blocking Vert.x worker threads + // during the consumption loop itself. + kind = etai.eventType(); + if (blocking && interruptedOrNonBlocking) { + // For blocking calls: ensure all events are processed before returning + // Order of operations is critical to avoid circular dependency: + // 1. Wait for agent to finish enqueueing events + // 2. Close the queue to signal consumption can complete + // 3. Wait for consumption to finish processing events + // 4. Fetch final task state from TaskStore + + try { + // Step 1: Wait for agent to finish (with configurable timeout) + if (agentFuture != null) { + try { + agentFuture.get(agentCompletionTimeoutSeconds, java.util.concurrent.TimeUnit.SECONDS); + LOGGER.debug("Agent completed for task {}", taskId); + } catch (java.util.concurrent.TimeoutException e) { + // Agent still running after timeout - that's fine, events already being processed + LOGGER.debug("Agent still running for task {} after {}s", taskId, agentCompletionTimeoutSeconds); + } + } + + // Step 2: Close the queue to signal consumption can complete + // For fire-and-forget tasks, there's no final event, so we need to close the queue + // This allows EventConsumer.consumeAll() to exit + queue.close(false, false); // graceful close, don't notify parent yet + LOGGER.debug("Closed queue for task {} to allow consumption completion", taskId); + + // Step 3: Wait for consumption to complete (now that queue is closed) + if (etai.consumptionFuture() != null) { + etai.consumptionFuture().get(consumptionCompletionTimeoutSeconds, java.util.concurrent.TimeUnit.SECONDS); + LOGGER.debug("Consumption completed for task {}", taskId); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("Interrupted waiting for task {} completion", taskId, e); + } catch (java.util.concurrent.ExecutionException e) { + LOGGER.warn("Error during task {} execution", taskId, e.getCause()); + } catch (java.util.concurrent.TimeoutException e) { + LOGGER.warn("Timeout waiting for consumption to complete for task {}", taskId); + } + + // Step 4: Fetch the final task state from TaskStore (all events have been processed) + Task updatedTask = taskStore.get(taskId); + if (updatedTask != null) { + kind = updatedTask; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Fetched final task for {} with state {} and {} artifacts", + taskId, updatedTask.getStatus().state(), + updatedTask.getArtifacts().size()); + } + } + } if (kind instanceof Task taskResult && !taskId.equals(taskResult.getId())) { throw new InternalError("Task ID mismatch in agent response"); } @@ -227,8 +322,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte trackBackgroundTask(cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, taskId, queue, false)); } - LOGGER.debug("Returning: {}", etai.eventType()); - return etai.eventType(); + LOGGER.debug("Returning: {}", kind); + return kind; } @Override diff --git a/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java b/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java index f73242491..cdc2b4010 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java +++ b/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java @@ -11,20 +11,20 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.a2a.server.events.EventConsumer; import io.a2a.server.events.EventQueueItem; import io.a2a.spec.A2AServerException; import io.a2a.spec.Event; import io.a2a.spec.EventKind; +import io.a2a.spec.InternalError; import io.a2a.spec.JSONRPCError; import io.a2a.spec.Message; import io.a2a.spec.Task; import io.a2a.spec.TaskState; import io.a2a.spec.TaskStatusUpdateEvent; import io.a2a.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ResultAggregator { private static final Logger LOGGER = LoggerFactory.getLogger(ResultAggregator.class); @@ -106,10 +106,14 @@ public EventKind consumeAll(EventConsumer consumer) throws JSONRPCError { } public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking) throws JSONRPCError { - return consumeAndBreakOnInterrupt(consumer, blocking, null); + return consumeAndBreakOnInterrupt(consumer, blocking, null, null); } public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking, Runnable eventCallback) throws JSONRPCError { + return consumeAndBreakOnInterrupt(consumer, blocking, eventCallback, null); + } + + public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking, Runnable eventCallback, CompletableFuture agentFuture) throws JSONRPCError { Flow.Publisher allItems = consumer.consumeAll(); AtomicReference message = new AtomicReference<>(); AtomicBoolean interrupted = new AtomicBoolean(false); @@ -180,11 +184,11 @@ else if (!blocking) { shouldInterrupt = true; continueInBackground = true; } - else { - // For ALL blocking calls (both final and non-final events), use background consumption - // This ensures all events are processed and persisted to TaskStore in background - // Queue lifecycle is now managed by DefaultRequestHandler.cleanupProducer() - // which waits for BOTH agent and consumption futures before closing queues + else if (blocking) { + // For blocking calls: Interrupt to free Vert.x thread, but continue in background + // Python's async consumption doesn't block threads, but Java's does + // So we interrupt to return quickly, then rely on background consumption + // DefaultRequestHandler will fetch the final state from TaskStore shouldInterrupt = true; continueInBackground = true; if (LOGGER.isDebugEnabled()) { @@ -198,10 +202,17 @@ else if (!blocking) { interrupted.set(true); completionFuture.complete(null); - // Signal that cleanup can proceed while consumption continues in background. - // This prevents infinite hangs for fire-and-forget agents that never emit final events. - // Processing continues (return true below) and all events are still persisted to TaskStore. - consumptionCompletionFuture.complete(null); + // For blocking calls, DON'T complete consumptionCompletionFuture here. + // Let it complete naturally when subscription finishes (onComplete callback below). + // This ensures all events are processed and persisted to TaskStore before + // DefaultRequestHandler.cleanupProducer() proceeds with cleanup. + // + // For non-blocking and auth-required calls, complete immediately to allow + // cleanup to proceed while consumption continues in background. + if (!blocking) { + consumptionCompletionFuture.complete(null); + } + // else: blocking calls wait for actual consumption completion in onComplete // Continue consuming in background - keep requesting events // Note: continueInBackground is always true when shouldInterrupt is true @@ -244,8 +255,8 @@ else if (!blocking) { } } - // Background consumption continues automatically via the subscription - // returning true in the consumer function keeps the subscription alive + // Note: For blocking calls that were interrupted, the wait logic has been moved + // to DefaultRequestHandler.onMessageSend() to avoid blocking Vert.x worker threads. // Queue lifecycle is managed by DefaultRequestHandler.cleanupProducer() Throwable error = errorRef.get(); diff --git a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java index 9447203e1..d654a83a6 100644 --- a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java +++ b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java @@ -98,7 +98,8 @@ public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPC PushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore(); PushNotificationSender pushSender = new BasePushNotificationSender(pushConfigStore, httpClient); - requestHandler = new DefaultRequestHandler(executor, taskStore, queueManager, pushConfigStore, pushSender, internalExecutor); + requestHandler = DefaultRequestHandler.create( + executor, taskStore, queueManager, pushConfigStore, pushSender, internalExecutor); } @AfterEach diff --git a/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java b/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java index 968c7812d..4fa3a2a15 100644 --- a/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java +++ b/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java @@ -1,7 +1,10 @@ package io.a2a.server.requesthandlers; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -16,6 +19,7 @@ import io.a2a.server.events.EventQueue; import io.a2a.server.events.InMemoryQueueManager; import io.a2a.server.tasks.InMemoryTaskStore; +import io.a2a.server.tasks.TaskUpdater; import io.a2a.spec.JSONRPCError; import io.a2a.spec.Message; import io.a2a.spec.MessageSendConfiguration; @@ -50,7 +54,7 @@ void setUp() { queueManager = new InMemoryQueueManager(taskStore); agentExecutor = new TestAgentExecutor(); - requestHandler = new DefaultRequestHandler( + requestHandler = DefaultRequestHandler.create( agentExecutor, taskStore, queueManager, @@ -418,35 +422,99 @@ void testDisconnectPersistsFinalTaskToStore() throws Exception { } /** - * Test that blocking message calls persist all events in background. - * This test proves the bug where blocking calls stop consuming events after - * the first event is returned, causing subsequent events to be lost. + * Test that blocking message call waits for agent to finish and returns complete Task + * even when agent does fire-and-forget (emits non-final state and returns). * * Expected behavior: - * 1. Blocking call returns immediately with first event (WORKING state) + * 1. Agent emits WORKING state with artifacts + * 2. Agent's execute() method returns WITHOUT emitting final state + * 3. Blocking onMessageSend() should wait for agent execution to complete + * 4. Blocking onMessageSend() should wait for all queued events to be processed + * 5. Returned Task should have WORKING state with all artifacts included + * + * This tests fire-and-forget pattern with blocking calls. + */ + @Test + @Timeout(15) + void testBlockingFireAndForgetReturnsNonFinalTask() throws Exception { + String taskId = "blocking-fire-forget-task"; + String contextId = "blocking-fire-forget-ctx"; + + Message message = new Message.Builder() + .messageId("msg-blocking-fire-forget") + .role(Message.Role.USER) + .parts(new TextPart("test message")) + .taskId(taskId) + .contextId(contextId) + .build(); + + MessageSendConfiguration config = new MessageSendConfiguration.Builder() + .blocking(true) + .build(); + + MessageSendParams params = new MessageSendParams(message, config, null); + + // Agent that does fire-and-forget: emits WORKING with artifact but never completes + agentExecutor.setExecuteCallback((context, queue) -> { + TaskUpdater updater = new TaskUpdater(context, queue); + + // Start work (WORKING state) + updater.startWork(); + + // Add artifact + updater.addArtifact( + List.of(new TextPart("Fire and forget artifact", null)), + "artifact-1", "FireForget", null); + + // Agent returns WITHOUT calling updater.complete() + // Task stays in WORKING state (non-final) + }); + + // Call blocking onMessageSend - should wait for agent to finish + Object result = requestHandler.onMessageSend(params, serverCallContext); + + // The returned result should be a Task in WORKING state with artifact + assertTrue(result instanceof Task, "Result should be a Task"); + Task returnedTask = (Task) result; + + // Verify task is in WORKING state (non-final, fire-and-forget) + assertEquals(TaskState.WORKING, returnedTask.getStatus().state(), + "Returned task should be WORKING (fire-and-forget), got: " + returnedTask.getStatus().state()); + + // Verify artifacts are included in the returned task + assertNotNull(returnedTask.getArtifacts(), + "Returned task should have artifacts"); + assertTrue(returnedTask.getArtifacts().size() >= 1, + "Returned task should have at least 1 artifact, got: " + + returnedTask.getArtifacts().size()); + } + + /** + * Test that non-blocking message call returns immediately and persists all events in background. + * + * Expected behavior: + * 1. Non-blocking call returns immediately with first event (WORKING state) * 2. Agent continues running in background and produces more events * 3. Background consumption continues and persists all events to TaskStore - * 4. Final task state (COMPLETED) is persisted despite blocking call having returned - * - * This test will FAIL before the fix is applied, demonstrating the bug. + * 4. Final task state (COMPLETED) is persisted in background */ @Test @Timeout(15) - void testBlockingMessagePersistsAllEventsInBackground() throws Exception { + void testNonBlockingMessagePersistsAllEventsInBackground() throws Exception { String taskId = "blocking-persist-task"; String contextId = "blocking-persist-ctx"; Message message = new Message.Builder() - .messageId("msg-blocking-persist") + .messageId("msg-nonblocking-persist") .role(Message.Role.USER) .parts(new TextPart("test message")) .taskId(taskId) .contextId(contextId) .build(); - // Explicitly set blocking=true (though it's the default) + // Set blocking=false for non-blocking behavior MessageSendConfiguration config = new MessageSendConfiguration.Builder() - .blocking(true) + .blocking(false) .build(); MessageSendParams params = new MessageSendParams(message, config, null); @@ -468,7 +536,7 @@ void testBlockingMessagePersistsAllEventsInBackground() throws Exception { queue.enqueueEvent(workingTask); firstEventEmitted.countDown(); - // Sleep to ensure the blocking call has returned before we emit more events + // Sleep to ensure the non-blocking call has returned before we emit more events try { Thread.sleep(1000); } catch (InterruptedException e) { @@ -485,8 +553,7 @@ void testBlockingMessagePersistsAllEventsInBackground() throws Exception { } // Emit final event (COMPLETED state) - // This event will be LOST with the current bug, because the consumer - // has already stopped after returning the first event + // This event should be persisted to TaskStore in background Task completedTask = new Task.Builder() .id(taskId) .contextId(contextId) @@ -495,21 +562,21 @@ void testBlockingMessagePersistsAllEventsInBackground() throws Exception { queue.enqueueEvent(completedTask); }); - // Call blocking onMessageSend + // Call non-blocking onMessageSend Object result = requestHandler.onMessageSend(params, serverCallContext); // Assertion 1: The immediate result should be the first event (WORKING) assertTrue(result instanceof Task, "Result should be a Task"); Task immediateTask = (Task) result; - assertTrue(immediateTask.getStatus().state() == TaskState.WORKING, - "Immediate return should show WORKING state, got: " + immediateTask.getStatus().state()); + assertEquals(TaskState.WORKING, immediateTask.getStatus().state(), + "Non-blocking should return immediately with WORKING state, got: " + immediateTask.getStatus().state()); - // At this point, the blocking call has returned, but the agent is still running + // At this point, the non-blocking call has returned, but the agent is still running // Allow the agent to emit the final COMPLETED event allowCompletion.countDown(); - // Assertion 2: Poll for the final task state to be persisted + // Assertion 2: Poll for the final task state to be persisted in background // Use polling loop instead of fixed sleep for faster and more reliable test long timeoutMs = 5000; long startTime = System.currentTimeMillis(); @@ -530,8 +597,7 @@ void testBlockingMessagePersistsAllEventsInBackground() throws Exception { completedStateFound, "Final task state should be COMPLETED (background consumption should have processed it), got: " + (persistedTask != null ? persistedTask.getStatus().state() : "null") + - " after " + (System.currentTimeMillis() - startTime) + "ms. " + - "This failure proves the bug - events after the first are lost." + " after " + (System.currentTimeMillis() - startTime) + "ms" ); } @@ -654,6 +720,80 @@ void testMainQueueClosesForFinalizedTasks() throws Exception { "Queue for finalized task should be null or closed"); } + /** + * Test that blocking message call returns a Task with ALL artifacts included. + * This reproduces the reported bug: blocking call returns before artifacts are processed. + * + * Expected behavior: + * 1. Agent emits multiple artifacts via TaskUpdater + * 2. Blocking onMessageSend() should wait for ALL events to be processed + * 3. Returned Task should have all artifacts included in COMPLETED state + * + * Bug manifestation: + * - onMessageSend() returns after first event + * - Artifacts are still being processed in background + * - Returned Task is incomplete + */ + @Test + @Timeout(15) + void testBlockingCallReturnsCompleteTaskWithArtifacts() throws Exception { + String taskId = "blocking-artifacts-task"; + String contextId = "blocking-artifacts-ctx"; + + Message message = new Message.Builder() + .messageId("msg-blocking-artifacts") + .role(Message.Role.USER) + .parts(new TextPart("test message")) + .taskId(taskId) + .contextId(contextId) + .build(); + + MessageSendConfiguration config = new MessageSendConfiguration.Builder() + .blocking(true) + .build(); + + MessageSendParams params = new MessageSendParams(message, config, null); + + // Agent that uses TaskUpdater to emit multiple artifacts (like real agents do) + agentExecutor.setExecuteCallback((context, queue) -> { + TaskUpdater updater = new TaskUpdater(context, queue); + + // Start work (WORKING state) + updater.startWork(); + + // Add first artifact + updater.addArtifact( + List.of(new TextPart("First artifact", null)), + "artifact-1", "First", null); + + // Add second artifact + updater.addArtifact( + List.of(new TextPart("Second artifact", null)), + "artifact-2", "Second", null); + + // Complete the task + updater.complete(); + }); + + // Call blocking onMessageSend - should wait for ALL events + Object result = requestHandler.onMessageSend(params, serverCallContext); + + // The returned result should be a Task with ALL artifacts + assertTrue(result instanceof Task, "Result should be a Task"); + Task returnedTask = (Task) result; + + // Verify task is completed + assertEquals(TaskState.COMPLETED, returnedTask.getStatus().state(), + "Returned task should be COMPLETED"); + + // Verify artifacts are included in the returned task + assertNotNull(returnedTask.getArtifacts(), + "Returned task should have artifacts"); + assertTrue(returnedTask.getArtifacts().size() >= 2, + "Returned task should have at least 2 artifacts, got: " + + returnedTask.getArtifacts().size()); + } + /** * Simple test agent executor that allows controlling execution timing */ diff --git a/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java b/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java index 50fa8979c..d7a2193bd 100644 --- a/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java +++ b/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java @@ -280,8 +280,8 @@ public void testOnGetPushNotificationNoPushNotifierConfig() throws Exception { @Test public void testOnSetPushNotificationNoPushNotifierConfig() throws Exception { // Create request handler without a push notifier - DefaultRequestHandler requestHandler = - new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); + DefaultRequestHandler requestHandler = DefaultRequestHandler.create( + executor, taskStore, queueManager, null, null, internalExecutor); AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(false, true, false); GrpcHandler handler = new TestGrpcHandler(card, requestHandler, internalExecutor); String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId(); @@ -655,8 +655,8 @@ public void testListPushNotificationConfigNotSupported() throws Exception { @Test public void testListPushNotificationConfigNoPushConfigStore() { - DefaultRequestHandler requestHandler = - new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); + DefaultRequestHandler requestHandler = DefaultRequestHandler.create( + executor, taskStore, queueManager, null, null, internalExecutor); GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK); agentExecutorExecute = (context, eventQueue) -> { @@ -728,8 +728,8 @@ public void testDeletePushNotificationConfigNotSupported() throws Exception { @Test public void testDeletePushNotificationConfigNoPushConfigStore() { - DefaultRequestHandler requestHandler = - new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); + DefaultRequestHandler requestHandler = DefaultRequestHandler.create( + executor, taskStore, queueManager, null, null, internalExecutor); GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId(); DeleteTaskPushNotificationConfigRequest request = DeleteTaskPushNotificationConfigRequest.newBuilder() diff --git a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java index 45109cced..1c71e8b76 100644 --- a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java +++ b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java @@ -1112,8 +1112,8 @@ public void testPushNotificationsNotSupportedError() { @Test public void testOnGetPushNotificationNoPushNotifierConfig() { // Create request handler without a push notifier - DefaultRequestHandler requestHandler = - new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); + DefaultRequestHandler requestHandler = DefaultRequestHandler.create( + executor, taskStore, queueManager, null, null, internalExecutor); AgentCard card = createAgentCard(false, true, false); JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor); @@ -1131,8 +1131,8 @@ public void testOnGetPushNotificationNoPushNotifierConfig() { @Test public void testOnSetPushNotificationNoPushNotifierConfig() { // Create request handler without a push notifier - DefaultRequestHandler requestHandler = - new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); + DefaultRequestHandler requestHandler = DefaultRequestHandler.create( + executor, taskStore, queueManager, null, null, internalExecutor); AgentCard card = createAgentCard(false, true, false); JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor); @@ -1222,8 +1222,8 @@ public void testDefaultRequestHandlerWithCustomComponents() { @Test public void testOnMessageSendErrorHandling() { - DefaultRequestHandler requestHandler = - new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); + DefaultRequestHandler requestHandler = DefaultRequestHandler.create( + executor, taskStore, queueManager, null, null, internalExecutor); AgentCard card = createAgentCard(false, true, false); JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor); @@ -1245,6 +1245,7 @@ public void testOnMessageSendErrorHandling() { .when(mock).consumeAndBreakOnInterrupt( Mockito.any(EventConsumer.class), Mockito.anyBoolean(), + Mockito.any(), Mockito.any()); })){ response = handler.onMessageSend(request, callContext); @@ -1376,8 +1377,8 @@ public void testListPushNotificationConfigNotSupported() { @Test public void testListPushNotificationConfigNoPushConfigStore() { - DefaultRequestHandler requestHandler = - new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); + DefaultRequestHandler requestHandler = DefaultRequestHandler.create( + executor, taskStore, queueManager, null, null, internalExecutor); JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK); agentExecutorExecute = (context, eventQueue) -> { @@ -1468,8 +1469,8 @@ public void testDeletePushNotificationConfigNotSupported() { @Test public void testDeletePushNotificationConfigNoPushConfigStore() { - DefaultRequestHandler requestHandler = - new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); + DefaultRequestHandler requestHandler = DefaultRequestHandler.create( + executor, taskStore, queueManager, null, null, internalExecutor); JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK); agentExecutorExecute = (context, eventQueue) -> { From fab40d7318455de494c4442c447dbccc5daab9e7 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Tue, 4 Nov 2025 14:28:31 +0000 Subject: [PATCH 2/3] Remove unused methods/parameters --- .../a2a/server/requesthandlers/DefaultRequestHandler.java | 7 +++---- .../main/java/io/a2a/server/tasks/ResultAggregator.java | 8 -------- .../a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java | 4 +--- 3 files changed, 4 insertions(+), 15 deletions(-) diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index 5f0553510..3a5372fdb 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -21,8 +21,6 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; -import org.eclipse.microprofile.config.inject.ConfigProperty; - import io.a2a.server.ServerCallContext; import io.a2a.server.agentexecution.AgentExecutor; import io.a2a.server.agentexecution.RequestContext; @@ -53,11 +51,12 @@ import io.a2a.spec.Task; import io.a2a.spec.TaskIdParams; import io.a2a.spec.TaskNotCancelableError; -import io.a2a.spec.TaskState; import io.a2a.spec.TaskNotFoundError; import io.a2a.spec.TaskPushNotificationConfig; import io.a2a.spec.TaskQueryParams; +import io.a2a.spec.TaskState; import io.a2a.spec.UnsupportedOperationError; +import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -240,7 +239,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte // Get agent future before consuming (for blocking calls to wait for agent completion) CompletableFuture agentFuture = runningAgents.get(taskId); - etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking, pushNotificationCallback, agentFuture); + etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking); if (etai == null) { LOGGER.debug("No result, throwing InternalError"); diff --git a/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java b/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java index cdc2b4010..26767a90b 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java +++ b/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java @@ -106,14 +106,6 @@ public EventKind consumeAll(EventConsumer consumer) throws JSONRPCError { } public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking) throws JSONRPCError { - return consumeAndBreakOnInterrupt(consumer, blocking, null, null); - } - - public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking, Runnable eventCallback) throws JSONRPCError { - return consumeAndBreakOnInterrupt(consumer, blocking, eventCallback, null); - } - - public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking, Runnable eventCallback, CompletableFuture agentFuture) throws JSONRPCError { Flow.Publisher allItems = consumer.consumeAll(); AtomicReference message = new AtomicReference<>(); AtomicBoolean interrupted = new AtomicBoolean(false); diff --git a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java index 1c71e8b76..c2cf1f751 100644 --- a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java +++ b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java @@ -1244,9 +1244,7 @@ public void testOnMessageSendErrorHandling() { new UnsupportedOperationError()) .when(mock).consumeAndBreakOnInterrupt( Mockito.any(EventConsumer.class), - Mockito.anyBoolean(), - Mockito.any(), - Mockito.any()); + Mockito.anyBoolean()); })){ response = handler.onMessageSend(request, callContext); } From 63a4a41d53a4ce5512137fc91ffacabc898b5ff3 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Tue, 4 Nov 2025 14:57:06 +0000 Subject: [PATCH 3/3] More cleanup --- .../requesthandlers/DefaultRequestHandler.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index 3a5372fdb..de62f1a0f 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -3,6 +3,7 @@ import static io.a2a.server.util.async.AsyncUtils.convertingProcessor; import static io.a2a.server.util.async.AsyncUtils.createTubeConfig; import static io.a2a.server.util.async.AsyncUtils.processor; +import static java.util.concurrent.TimeUnit.*; import java.util.ArrayList; import java.util.List; @@ -266,7 +267,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte // Step 1: Wait for agent to finish (with configurable timeout) if (agentFuture != null) { try { - agentFuture.get(agentCompletionTimeoutSeconds, java.util.concurrent.TimeUnit.SECONDS); + agentFuture.get(agentCompletionTimeoutSeconds, SECONDS); LOGGER.debug("Agent completed for task {}", taskId); } catch (java.util.concurrent.TimeoutException e) { // Agent still running after timeout - that's fine, events already being processed @@ -282,16 +283,22 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte // Step 3: Wait for consumption to complete (now that queue is closed) if (etai.consumptionFuture() != null) { - etai.consumptionFuture().get(consumptionCompletionTimeoutSeconds, java.util.concurrent.TimeUnit.SECONDS); + etai.consumptionFuture().get(consumptionCompletionTimeoutSeconds, SECONDS); LOGGER.debug("Consumption completed for task {}", taskId); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - LOGGER.warn("Interrupted waiting for task {} completion", taskId, e); + String msg = String.format("Error waiting for task %s completion", taskId); + LOGGER.warn(msg, e); + throw new InternalError(msg); } catch (java.util.concurrent.ExecutionException e) { - LOGGER.warn("Error during task {} execution", taskId, e.getCause()); + String msg = String.format("Error during task %s execution", taskId); + LOGGER.warn(msg, e.getCause()); + throw new InternalError(msg); } catch (java.util.concurrent.TimeoutException e) { - LOGGER.warn("Timeout waiting for consumption to complete for task {}", taskId); + String msg = String.format("Timeout waiting for consumption to complete for task %s", taskId); + LOGGER.warn(msg, taskId); + throw new InternalError(msg); } // Step 4: Fetch the final task state from TaskStore (all events have been processed)