diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index bb85eea6e..de62f1a0f 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -3,6 +3,7 @@ import static io.a2a.server.util.async.AsyncUtils.convertingProcessor; import static io.a2a.server.util.async.AsyncUtils.createTubeConfig; import static io.a2a.server.util.async.AsyncUtils.processor; +import static java.util.concurrent.TimeUnit.*; import java.util.ArrayList; import java.util.List; @@ -51,11 +52,12 @@ import io.a2a.spec.Task; import io.a2a.spec.TaskIdParams; import io.a2a.spec.TaskNotCancelableError; -import io.a2a.spec.TaskState; import io.a2a.spec.TaskNotFoundError; import io.a2a.spec.TaskPushNotificationConfig; import io.a2a.spec.TaskQueryParams; +import io.a2a.spec.TaskState; import io.a2a.spec.UnsupportedOperationError; +import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +66,26 @@ public class DefaultRequestHandler implements RequestHandler { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRequestHandler.class); + /** + * Timeout in seconds to wait for agent execution to complete in blocking calls. + * This allows slow agents (LLM-based, data processing, external APIs) sufficient time. + * Configurable via: a2a.blocking.agent.timeout.seconds + * Default: 30 seconds + */ + @Inject + @ConfigProperty(name = "a2a.blocking.agent.timeout.seconds", defaultValue = "30") + int agentCompletionTimeoutSeconds; + + /** + * Timeout in seconds to wait for event consumption to complete in blocking calls. + * This ensures all events are processed and persisted before returning to client. + * Configurable via: a2a.blocking.consumption.timeout.seconds + * Default: 5 seconds + */ + @Inject + @ConfigProperty(name = "a2a.blocking.consumption.timeout.seconds", defaultValue = "5") + int consumptionCompletionTimeoutSeconds; + private final AgentExecutor agentExecutor; private final TaskStore taskStore; private final QueueManager queueManager; @@ -93,6 +115,19 @@ public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore, this.requestContextBuilder = () -> new SimpleRequestContextBuilder(taskStore, false); } + /** + * For testing + */ + public static DefaultRequestHandler create(AgentExecutor agentExecutor, TaskStore taskStore, + QueueManager queueManager, PushNotificationConfigStore pushConfigStore, + PushNotificationSender pushSender, Executor executor) { + DefaultRequestHandler handler = + new DefaultRequestHandler(agentExecutor, taskStore, queueManager, pushConfigStore, pushSender, executor); + handler.agentCompletionTimeoutSeconds = 5; + handler.consumptionCompletionTimeoutSeconds = 2; + return handler; + } + @Override public Task onGetTask(TaskQueryParams params, ServerCallContext context) throws JSONRPCError { LOGGER.debug("onGetTask {}", params.id()); @@ -192,6 +227,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId, mss.requestContext, queue); ResultAggregator.EventTypeAndInterrupt etai = null; + EventKind kind = null; // Declare outside try block so it's in scope for return try { // Create callback for push notifications during background event processing Runnable pushNotificationCallback = () -> sendPushNotification(taskId, resultAggregator); @@ -201,7 +237,10 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte // This callback must be added before we start consuming. Otherwise, // any errors thrown by the producerRunnable are not picked up by the consumer producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback()); - etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking, pushNotificationCallback); + + // Get agent future before consuming (for blocking calls to wait for agent completion) + CompletableFuture agentFuture = runningAgents.get(taskId); + etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking); if (etai == null) { LOGGER.debug("No result, throwing InternalError"); @@ -210,7 +249,69 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte interruptedOrNonBlocking = etai.interrupted(); LOGGER.debug("Was interrupted or non-blocking: {}", interruptedOrNonBlocking); - EventKind kind = etai.eventType(); + // For blocking calls that were interrupted (returned on first event), + // wait for agent execution and event processing BEFORE returning to client. + // This ensures the returned Task has all artifacts and current state. + // We do this HERE (not in ResultAggregator) to avoid blocking Vert.x worker threads + // during the consumption loop itself. + kind = etai.eventType(); + if (blocking && interruptedOrNonBlocking) { + // For blocking calls: ensure all events are processed before returning + // Order of operations is critical to avoid circular dependency: + // 1. Wait for agent to finish enqueueing events + // 2. Close the queue to signal consumption can complete + // 3. Wait for consumption to finish processing events + // 4. Fetch final task state from TaskStore + + try { + // Step 1: Wait for agent to finish (with configurable timeout) + if (agentFuture != null) { + try { + agentFuture.get(agentCompletionTimeoutSeconds, SECONDS); + LOGGER.debug("Agent completed for task {}", taskId); + } catch (java.util.concurrent.TimeoutException e) { + // Agent still running after timeout - that's fine, events already being processed + LOGGER.debug("Agent still running for task {} after {}s", taskId, agentCompletionTimeoutSeconds); + } + } + + // Step 2: Close the queue to signal consumption can complete + // For fire-and-forget tasks, there's no final event, so we need to close the queue + // This allows EventConsumer.consumeAll() to exit + queue.close(false, false); // graceful close, don't notify parent yet + LOGGER.debug("Closed queue for task {} to allow consumption completion", taskId); + + // Step 3: Wait for consumption to complete (now that queue is closed) + if (etai.consumptionFuture() != null) { + etai.consumptionFuture().get(consumptionCompletionTimeoutSeconds, SECONDS); + LOGGER.debug("Consumption completed for task {}", taskId); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + String msg = String.format("Error waiting for task %s completion", taskId); + LOGGER.warn(msg, e); + throw new InternalError(msg); + } catch (java.util.concurrent.ExecutionException e) { + String msg = String.format("Error during task %s execution", taskId); + LOGGER.warn(msg, e.getCause()); + throw new InternalError(msg); + } catch (java.util.concurrent.TimeoutException e) { + String msg = String.format("Timeout waiting for consumption to complete for task %s", taskId); + LOGGER.warn(msg, taskId); + throw new InternalError(msg); + } + + // Step 4: Fetch the final task state from TaskStore (all events have been processed) + Task updatedTask = taskStore.get(taskId); + if (updatedTask != null) { + kind = updatedTask; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Fetched final task for {} with state {} and {} artifacts", + taskId, updatedTask.getStatus().state(), + updatedTask.getArtifacts().size()); + } + } + } if (kind instanceof Task taskResult && !taskId.equals(taskResult.getId())) { throw new InternalError("Task ID mismatch in agent response"); } @@ -227,8 +328,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte trackBackgroundTask(cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, taskId, queue, false)); } - LOGGER.debug("Returning: {}", etai.eventType()); - return etai.eventType(); + LOGGER.debug("Returning: {}", kind); + return kind; } @Override diff --git a/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java b/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java index f73242491..26767a90b 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java +++ b/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java @@ -11,20 +11,20 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.a2a.server.events.EventConsumer; import io.a2a.server.events.EventQueueItem; import io.a2a.spec.A2AServerException; import io.a2a.spec.Event; import io.a2a.spec.EventKind; +import io.a2a.spec.InternalError; import io.a2a.spec.JSONRPCError; import io.a2a.spec.Message; import io.a2a.spec.Task; import io.a2a.spec.TaskState; import io.a2a.spec.TaskStatusUpdateEvent; import io.a2a.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ResultAggregator { private static final Logger LOGGER = LoggerFactory.getLogger(ResultAggregator.class); @@ -106,10 +106,6 @@ public EventKind consumeAll(EventConsumer consumer) throws JSONRPCError { } public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking) throws JSONRPCError { - return consumeAndBreakOnInterrupt(consumer, blocking, null); - } - - public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking, Runnable eventCallback) throws JSONRPCError { Flow.Publisher allItems = consumer.consumeAll(); AtomicReference message = new AtomicReference<>(); AtomicBoolean interrupted = new AtomicBoolean(false); @@ -180,11 +176,11 @@ else if (!blocking) { shouldInterrupt = true; continueInBackground = true; } - else { - // For ALL blocking calls (both final and non-final events), use background consumption - // This ensures all events are processed and persisted to TaskStore in background - // Queue lifecycle is now managed by DefaultRequestHandler.cleanupProducer() - // which waits for BOTH agent and consumption futures before closing queues + else if (blocking) { + // For blocking calls: Interrupt to free Vert.x thread, but continue in background + // Python's async consumption doesn't block threads, but Java's does + // So we interrupt to return quickly, then rely on background consumption + // DefaultRequestHandler will fetch the final state from TaskStore shouldInterrupt = true; continueInBackground = true; if (LOGGER.isDebugEnabled()) { @@ -198,10 +194,17 @@ else if (!blocking) { interrupted.set(true); completionFuture.complete(null); - // Signal that cleanup can proceed while consumption continues in background. - // This prevents infinite hangs for fire-and-forget agents that never emit final events. - // Processing continues (return true below) and all events are still persisted to TaskStore. - consumptionCompletionFuture.complete(null); + // For blocking calls, DON'T complete consumptionCompletionFuture here. + // Let it complete naturally when subscription finishes (onComplete callback below). + // This ensures all events are processed and persisted to TaskStore before + // DefaultRequestHandler.cleanupProducer() proceeds with cleanup. + // + // For non-blocking and auth-required calls, complete immediately to allow + // cleanup to proceed while consumption continues in background. + if (!blocking) { + consumptionCompletionFuture.complete(null); + } + // else: blocking calls wait for actual consumption completion in onComplete // Continue consuming in background - keep requesting events // Note: continueInBackground is always true when shouldInterrupt is true @@ -244,8 +247,8 @@ else if (!blocking) { } } - // Background consumption continues automatically via the subscription - // returning true in the consumer function keeps the subscription alive + // Note: For blocking calls that were interrupted, the wait logic has been moved + // to DefaultRequestHandler.onMessageSend() to avoid blocking Vert.x worker threads. // Queue lifecycle is managed by DefaultRequestHandler.cleanupProducer() Throwable error = errorRef.get(); diff --git a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java index 9447203e1..d654a83a6 100644 --- a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java +++ b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java @@ -98,7 +98,8 @@ public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPC PushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore(); PushNotificationSender pushSender = new BasePushNotificationSender(pushConfigStore, httpClient); - requestHandler = new DefaultRequestHandler(executor, taskStore, queueManager, pushConfigStore, pushSender, internalExecutor); + requestHandler = DefaultRequestHandler.create( + executor, taskStore, queueManager, pushConfigStore, pushSender, internalExecutor); } @AfterEach diff --git a/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java b/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java index 968c7812d..4fa3a2a15 100644 --- a/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java +++ b/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java @@ -1,7 +1,10 @@ package io.a2a.server.requesthandlers; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -16,6 +19,7 @@ import io.a2a.server.events.EventQueue; import io.a2a.server.events.InMemoryQueueManager; import io.a2a.server.tasks.InMemoryTaskStore; +import io.a2a.server.tasks.TaskUpdater; import io.a2a.spec.JSONRPCError; import io.a2a.spec.Message; import io.a2a.spec.MessageSendConfiguration; @@ -50,7 +54,7 @@ void setUp() { queueManager = new InMemoryQueueManager(taskStore); agentExecutor = new TestAgentExecutor(); - requestHandler = new DefaultRequestHandler( + requestHandler = DefaultRequestHandler.create( agentExecutor, taskStore, queueManager, @@ -418,35 +422,99 @@ void testDisconnectPersistsFinalTaskToStore() throws Exception { } /** - * Test that blocking message calls persist all events in background. - * This test proves the bug where blocking calls stop consuming events after - * the first event is returned, causing subsequent events to be lost. + * Test that blocking message call waits for agent to finish and returns complete Task + * even when agent does fire-and-forget (emits non-final state and returns). * * Expected behavior: - * 1. Blocking call returns immediately with first event (WORKING state) + * 1. Agent emits WORKING state with artifacts + * 2. Agent's execute() method returns WITHOUT emitting final state + * 3. Blocking onMessageSend() should wait for agent execution to complete + * 4. Blocking onMessageSend() should wait for all queued events to be processed + * 5. Returned Task should have WORKING state with all artifacts included + * + * This tests fire-and-forget pattern with blocking calls. + */ + @Test + @Timeout(15) + void testBlockingFireAndForgetReturnsNonFinalTask() throws Exception { + String taskId = "blocking-fire-forget-task"; + String contextId = "blocking-fire-forget-ctx"; + + Message message = new Message.Builder() + .messageId("msg-blocking-fire-forget") + .role(Message.Role.USER) + .parts(new TextPart("test message")) + .taskId(taskId) + .contextId(contextId) + .build(); + + MessageSendConfiguration config = new MessageSendConfiguration.Builder() + .blocking(true) + .build(); + + MessageSendParams params = new MessageSendParams(message, config, null); + + // Agent that does fire-and-forget: emits WORKING with artifact but never completes + agentExecutor.setExecuteCallback((context, queue) -> { + TaskUpdater updater = new TaskUpdater(context, queue); + + // Start work (WORKING state) + updater.startWork(); + + // Add artifact + updater.addArtifact( + List.of(new TextPart("Fire and forget artifact", null)), + "artifact-1", "FireForget", null); + + // Agent returns WITHOUT calling updater.complete() + // Task stays in WORKING state (non-final) + }); + + // Call blocking onMessageSend - should wait for agent to finish + Object result = requestHandler.onMessageSend(params, serverCallContext); + + // The returned result should be a Task in WORKING state with artifact + assertTrue(result instanceof Task, "Result should be a Task"); + Task returnedTask = (Task) result; + + // Verify task is in WORKING state (non-final, fire-and-forget) + assertEquals(TaskState.WORKING, returnedTask.getStatus().state(), + "Returned task should be WORKING (fire-and-forget), got: " + returnedTask.getStatus().state()); + + // Verify artifacts are included in the returned task + assertNotNull(returnedTask.getArtifacts(), + "Returned task should have artifacts"); + assertTrue(returnedTask.getArtifacts().size() >= 1, + "Returned task should have at least 1 artifact, got: " + + returnedTask.getArtifacts().size()); + } + + /** + * Test that non-blocking message call returns immediately and persists all events in background. + * + * Expected behavior: + * 1. Non-blocking call returns immediately with first event (WORKING state) * 2. Agent continues running in background and produces more events * 3. Background consumption continues and persists all events to TaskStore - * 4. Final task state (COMPLETED) is persisted despite blocking call having returned - * - * This test will FAIL before the fix is applied, demonstrating the bug. + * 4. Final task state (COMPLETED) is persisted in background */ @Test @Timeout(15) - void testBlockingMessagePersistsAllEventsInBackground() throws Exception { + void testNonBlockingMessagePersistsAllEventsInBackground() throws Exception { String taskId = "blocking-persist-task"; String contextId = "blocking-persist-ctx"; Message message = new Message.Builder() - .messageId("msg-blocking-persist") + .messageId("msg-nonblocking-persist") .role(Message.Role.USER) .parts(new TextPart("test message")) .taskId(taskId) .contextId(contextId) .build(); - // Explicitly set blocking=true (though it's the default) + // Set blocking=false for non-blocking behavior MessageSendConfiguration config = new MessageSendConfiguration.Builder() - .blocking(true) + .blocking(false) .build(); MessageSendParams params = new MessageSendParams(message, config, null); @@ -468,7 +536,7 @@ void testBlockingMessagePersistsAllEventsInBackground() throws Exception { queue.enqueueEvent(workingTask); firstEventEmitted.countDown(); - // Sleep to ensure the blocking call has returned before we emit more events + // Sleep to ensure the non-blocking call has returned before we emit more events try { Thread.sleep(1000); } catch (InterruptedException e) { @@ -485,8 +553,7 @@ void testBlockingMessagePersistsAllEventsInBackground() throws Exception { } // Emit final event (COMPLETED state) - // This event will be LOST with the current bug, because the consumer - // has already stopped after returning the first event + // This event should be persisted to TaskStore in background Task completedTask = new Task.Builder() .id(taskId) .contextId(contextId) @@ -495,21 +562,21 @@ void testBlockingMessagePersistsAllEventsInBackground() throws Exception { queue.enqueueEvent(completedTask); }); - // Call blocking onMessageSend + // Call non-blocking onMessageSend Object result = requestHandler.onMessageSend(params, serverCallContext); // Assertion 1: The immediate result should be the first event (WORKING) assertTrue(result instanceof Task, "Result should be a Task"); Task immediateTask = (Task) result; - assertTrue(immediateTask.getStatus().state() == TaskState.WORKING, - "Immediate return should show WORKING state, got: " + immediateTask.getStatus().state()); + assertEquals(TaskState.WORKING, immediateTask.getStatus().state(), + "Non-blocking should return immediately with WORKING state, got: " + immediateTask.getStatus().state()); - // At this point, the blocking call has returned, but the agent is still running + // At this point, the non-blocking call has returned, but the agent is still running // Allow the agent to emit the final COMPLETED event allowCompletion.countDown(); - // Assertion 2: Poll for the final task state to be persisted + // Assertion 2: Poll for the final task state to be persisted in background // Use polling loop instead of fixed sleep for faster and more reliable test long timeoutMs = 5000; long startTime = System.currentTimeMillis(); @@ -530,8 +597,7 @@ void testBlockingMessagePersistsAllEventsInBackground() throws Exception { completedStateFound, "Final task state should be COMPLETED (background consumption should have processed it), got: " + (persistedTask != null ? persistedTask.getStatus().state() : "null") + - " after " + (System.currentTimeMillis() - startTime) + "ms. " + - "This failure proves the bug - events after the first are lost." + " after " + (System.currentTimeMillis() - startTime) + "ms" ); } @@ -654,6 +720,80 @@ void testMainQueueClosesForFinalizedTasks() throws Exception { "Queue for finalized task should be null or closed"); } + /** + * Test that blocking message call returns a Task with ALL artifacts included. + * This reproduces the reported bug: blocking call returns before artifacts are processed. + * + * Expected behavior: + * 1. Agent emits multiple artifacts via TaskUpdater + * 2. Blocking onMessageSend() should wait for ALL events to be processed + * 3. Returned Task should have all artifacts included in COMPLETED state + * + * Bug manifestation: + * - onMessageSend() returns after first event + * - Artifacts are still being processed in background + * - Returned Task is incomplete + */ + @Test + @Timeout(15) + void testBlockingCallReturnsCompleteTaskWithArtifacts() throws Exception { + String taskId = "blocking-artifacts-task"; + String contextId = "blocking-artifacts-ctx"; + + Message message = new Message.Builder() + .messageId("msg-blocking-artifacts") + .role(Message.Role.USER) + .parts(new TextPart("test message")) + .taskId(taskId) + .contextId(contextId) + .build(); + + MessageSendConfiguration config = new MessageSendConfiguration.Builder() + .blocking(true) + .build(); + + MessageSendParams params = new MessageSendParams(message, config, null); + + // Agent that uses TaskUpdater to emit multiple artifacts (like real agents do) + agentExecutor.setExecuteCallback((context, queue) -> { + TaskUpdater updater = new TaskUpdater(context, queue); + + // Start work (WORKING state) + updater.startWork(); + + // Add first artifact + updater.addArtifact( + List.of(new TextPart("First artifact", null)), + "artifact-1", "First", null); + + // Add second artifact + updater.addArtifact( + List.of(new TextPart("Second artifact", null)), + "artifact-2", "Second", null); + + // Complete the task + updater.complete(); + }); + + // Call blocking onMessageSend - should wait for ALL events + Object result = requestHandler.onMessageSend(params, serverCallContext); + + // The returned result should be a Task with ALL artifacts + assertTrue(result instanceof Task, "Result should be a Task"); + Task returnedTask = (Task) result; + + // Verify task is completed + assertEquals(TaskState.COMPLETED, returnedTask.getStatus().state(), + "Returned task should be COMPLETED"); + + // Verify artifacts are included in the returned task + assertNotNull(returnedTask.getArtifacts(), + "Returned task should have artifacts"); + assertTrue(returnedTask.getArtifacts().size() >= 2, + "Returned task should have at least 2 artifacts, got: " + + returnedTask.getArtifacts().size()); + } + /** * Simple test agent executor that allows controlling execution timing */ diff --git a/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java b/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java index 50fa8979c..d7a2193bd 100644 --- a/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java +++ b/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java @@ -280,8 +280,8 @@ public void testOnGetPushNotificationNoPushNotifierConfig() throws Exception { @Test public void testOnSetPushNotificationNoPushNotifierConfig() throws Exception { // Create request handler without a push notifier - DefaultRequestHandler requestHandler = - new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); + DefaultRequestHandler requestHandler = DefaultRequestHandler.create( + executor, taskStore, queueManager, null, null, internalExecutor); AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(false, true, false); GrpcHandler handler = new TestGrpcHandler(card, requestHandler, internalExecutor); String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId(); @@ -655,8 +655,8 @@ public void testListPushNotificationConfigNotSupported() throws Exception { @Test public void testListPushNotificationConfigNoPushConfigStore() { - DefaultRequestHandler requestHandler = - new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); + DefaultRequestHandler requestHandler = DefaultRequestHandler.create( + executor, taskStore, queueManager, null, null, internalExecutor); GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK); agentExecutorExecute = (context, eventQueue) -> { @@ -728,8 +728,8 @@ public void testDeletePushNotificationConfigNotSupported() throws Exception { @Test public void testDeletePushNotificationConfigNoPushConfigStore() { - DefaultRequestHandler requestHandler = - new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); + DefaultRequestHandler requestHandler = DefaultRequestHandler.create( + executor, taskStore, queueManager, null, null, internalExecutor); GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId(); DeleteTaskPushNotificationConfigRequest request = DeleteTaskPushNotificationConfigRequest.newBuilder() diff --git a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java index 45109cced..c2cf1f751 100644 --- a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java +++ b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java @@ -1112,8 +1112,8 @@ public void testPushNotificationsNotSupportedError() { @Test public void testOnGetPushNotificationNoPushNotifierConfig() { // Create request handler without a push notifier - DefaultRequestHandler requestHandler = - new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); + DefaultRequestHandler requestHandler = DefaultRequestHandler.create( + executor, taskStore, queueManager, null, null, internalExecutor); AgentCard card = createAgentCard(false, true, false); JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor); @@ -1131,8 +1131,8 @@ public void testOnGetPushNotificationNoPushNotifierConfig() { @Test public void testOnSetPushNotificationNoPushNotifierConfig() { // Create request handler without a push notifier - DefaultRequestHandler requestHandler = - new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); + DefaultRequestHandler requestHandler = DefaultRequestHandler.create( + executor, taskStore, queueManager, null, null, internalExecutor); AgentCard card = createAgentCard(false, true, false); JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor); @@ -1222,8 +1222,8 @@ public void testDefaultRequestHandlerWithCustomComponents() { @Test public void testOnMessageSendErrorHandling() { - DefaultRequestHandler requestHandler = - new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); + DefaultRequestHandler requestHandler = DefaultRequestHandler.create( + executor, taskStore, queueManager, null, null, internalExecutor); AgentCard card = createAgentCard(false, true, false); JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor); @@ -1244,8 +1244,7 @@ public void testOnMessageSendErrorHandling() { new UnsupportedOperationError()) .when(mock).consumeAndBreakOnInterrupt( Mockito.any(EventConsumer.class), - Mockito.anyBoolean(), - Mockito.any()); + Mockito.anyBoolean()); })){ response = handler.onMessageSend(request, callContext); } @@ -1376,8 +1375,8 @@ public void testListPushNotificationConfigNotSupported() { @Test public void testListPushNotificationConfigNoPushConfigStore() { - DefaultRequestHandler requestHandler = - new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); + DefaultRequestHandler requestHandler = DefaultRequestHandler.create( + executor, taskStore, queueManager, null, null, internalExecutor); JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK); agentExecutorExecute = (context, eventQueue) -> { @@ -1468,8 +1467,8 @@ public void testDeletePushNotificationConfigNotSupported() { @Test public void testDeletePushNotificationConfigNoPushConfigStore() { - DefaultRequestHandler requestHandler = - new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); + DefaultRequestHandler requestHandler = DefaultRequestHandler.create( + executor, taskStore, queueManager, null, null, internalExecutor); JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK); agentExecutorExecute = (context, eventQueue) -> {