diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index 1e8e1af94..b59a9aedb 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -306,6 +306,14 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte // We do this HERE (not in ResultAggregator) to avoid blocking Vert.x worker threads // during the consumption loop itself. kind = etai.eventType(); + + // Store push notification config for newly created tasks (mirrors streaming logic) + // Only for NEW tasks - existing tasks are handled by initMessageSend() + if (mss.task() == null && kind instanceof Task createdTask && shouldAddPushInfo(params)) { + LOGGER.debug("Storing push notification config for new task {}", createdTask.getId()); + pushConfigStore.setInfo(createdTask.getId(), params.configuration().pushNotificationConfig()); + } + if (blocking && interruptedOrNonBlocking) { // For blocking calls: ensure all events are processed before returning // Order of operations is critical to avoid circular dependency: diff --git a/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java b/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java index 4fa3a2a15..acaa531ad 100644 --- a/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java +++ b/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java @@ -4,9 +4,11 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -18,12 +20,14 @@ import io.a2a.server.auth.UnauthenticatedUser; import io.a2a.server.events.EventQueue; import io.a2a.server.events.InMemoryQueueManager; +import io.a2a.server.tasks.InMemoryPushNotificationConfigStore; import io.a2a.server.tasks.InMemoryTaskStore; import io.a2a.server.tasks.TaskUpdater; import io.a2a.spec.JSONRPCError; import io.a2a.spec.Message; import io.a2a.spec.MessageSendConfiguration; import io.a2a.spec.MessageSendParams; +import io.a2a.spec.PushNotificationConfig; import io.a2a.spec.Task; import io.a2a.spec.TaskState; import io.a2a.spec.TaskStatus; @@ -794,6 +798,160 @@ void testBlockingCallReturnsCompleteTaskWithArtifacts() throws Exception { returnedTask.getArtifacts().size()); } + /** + * Test that pushNotificationConfig from SendMessageConfiguration is stored for NEW tasks + * in non-streaming (blocking) mode. This reproduces the bug from issue #84. + * + * Expected behavior: + * 1. Client sends message with pushNotificationConfig in SendMessageConfiguration + * 2. Agent creates a new task + * 3. pushNotificationConfig should be stored in PushNotificationConfigStore + * 4. Config should be retrievable via getInfo() + */ + @Test + @Timeout(10) + void testBlockingMessageStoresPushNotificationConfigForNewTask() throws Exception { + String taskId = "push-config-blocking-new-task"; + String contextId = "push-config-ctx"; + + // Create test config store + InMemoryPushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore(); + + // Re-create request handler with pushConfigStore + requestHandler = DefaultRequestHandler.create( + agentExecutor, + taskStore, + queueManager, + pushConfigStore, // Add push config store + null, // pushSender + Executors.newCachedThreadPool() + ); + + // Create push notification config + PushNotificationConfig pushConfig = new PushNotificationConfig.Builder() + .id("config-1") + .url("https://example.com/webhook") + .token("test-token-123") + .build(); + + // Create message with pushNotificationConfig + Message message = new Message.Builder() + .messageId("msg-push-config") + .role(Message.Role.USER) + .parts(new TextPart("test message")) + .taskId(taskId) + .contextId(contextId) + .build(); + + MessageSendConfiguration config = new MessageSendConfiguration.Builder() + .blocking(true) + .pushNotificationConfig(pushConfig) + .build(); + + MessageSendParams params = new MessageSendParams(message, config, null); + + // Agent creates a new task + agentExecutor.setExecuteCallback((context, queue) -> { + TaskUpdater updater = new TaskUpdater(context, queue); + updater.submit(); // Creates new task in SUBMITTED state + updater.complete(); + }); + + // Call blocking onMessageSend + Object result = requestHandler.onMessageSend(params, serverCallContext); + + // Verify result is a task + assertTrue(result instanceof Task, "Result should be a Task"); + Task returnedTask = (Task) result; + assertEquals(taskId, returnedTask.getId()); + + // THE KEY ASSERTION: Verify pushNotificationConfig was stored + List storedConfigs = pushConfigStore.getInfo(taskId); + assertNotNull(storedConfigs, "Push notification config should be stored for new task"); + assertEquals(1, storedConfigs.size(), + "Should have exactly 1 push config stored"); + assertEquals("config-1", storedConfigs.get(0).id()); + assertEquals("https://example.com/webhook", storedConfigs.get(0).url()); + } + + /** + * Test that pushNotificationConfig is stored for EXISTING tasks. + * This verifies the initMessageSend logic works correctly. + */ + @Test + @Timeout(10) + void testMessageStoresPushNotificationConfigForExistingTask() throws Exception { + String taskId = "push-config-existing-task"; + String contextId = "push-config-existing-ctx"; + + // Create test config store + InMemoryPushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore(); + + // Re-create request handler with pushConfigStore + requestHandler = DefaultRequestHandler.create( + agentExecutor, + taskStore, + queueManager, + pushConfigStore, // Add push config store + null, // pushSender + Executors.newCachedThreadPool() + ); + + // Create EXISTING task in store + Task existingTask = new Task.Builder() + .id(taskId) + .contextId(contextId) + .status(new TaskStatus(TaskState.WORKING)) + .build(); + taskStore.save(existingTask); + + // Create push notification config + PushNotificationConfig pushConfig = new PushNotificationConfig.Builder() + .id("config-existing-1") + .url("https://example.com/existing-webhook") + .token("existing-token-789") + .build(); + + Message message = new Message.Builder() + .messageId("msg-push-existing") + .role(Message.Role.USER) + .parts(new TextPart("update existing task")) + .taskId(taskId) + .contextId(contextId) + .build(); + + MessageSendConfiguration config = new MessageSendConfiguration.Builder() + .blocking(true) + .pushNotificationConfig(pushConfig) + .build(); + + MessageSendParams params = new MessageSendParams(message, config, null); + + // Agent updates the existing task + agentExecutor.setExecuteCallback((context, queue) -> { + TaskUpdater updater = new TaskUpdater(context, queue); + updater.addArtifact( + List.of(new TextPart("update artifact", null)), + "artifact-1", "Update", null); + updater.complete(); + }); + + // Call blocking onMessageSend + Object result = requestHandler.onMessageSend(params, serverCallContext); + + // Verify result + assertTrue(result instanceof Task, "Result should be a Task"); + + // Verify pushNotificationConfig was stored (initMessageSend path) + List storedConfigs = pushConfigStore.getInfo(taskId); + assertNotNull(storedConfigs, + "Push notification config should be stored for existing task"); + assertEquals(1, storedConfigs.size(), + "Should have exactly 1 push config stored"); + assertEquals("config-existing-1", storedConfigs.get(0).id()); + assertEquals("https://example.com/existing-webhook", storedConfigs.get(0).url()); + } + /** * Simple test agent executor that allows controlling execution timing */