diff --git a/client/base/src/main/java/io/a2a/client/AbstractClient.java b/client/base/src/main/java/io/a2a/client/AbstractClient.java index 37749533e..414c25f73 100644 --- a/client/base/src/main/java/io/a2a/client/AbstractClient.java +++ b/client/base/src/main/java/io/a2a/client/AbstractClient.java @@ -13,6 +13,7 @@ import io.a2a.spec.DeleteTaskPushNotificationConfigParams; import io.a2a.spec.GetTaskPushNotificationConfigParams; import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.ListTasksParams; import io.a2a.spec.ListTasksResult; import io.a2a.spec.Message; @@ -271,26 +272,26 @@ public abstract TaskPushNotificationConfig getTaskPushNotificationConfiguration( @Nullable ClientCallContext context) throws A2AClientException; /** - * Retrieve the list of push notification configurations for a specific task. + * Retrieve the list of push notification configurations for a specific task with pagination support. * * @param request the parameters specifying which task's notification configs to retrieve - * @return the list of task push notification configs + * @return the result containing the list of task push notification configs and pagination information * @throws A2AClientException if getting the task push notification configs fails for any reason */ - public List listTaskPushNotificationConfigurations( + public ListTaskPushNotificationConfigResult listTaskPushNotificationConfigurations( ListTaskPushNotificationConfigParams request) throws A2AClientException { return listTaskPushNotificationConfigurations(request, null); } /** - * Retrieve the list of push notification configurations for a specific task. + * Retrieve the list of push notification configurations for a specific task with pagination support. * * @param request the parameters specifying which task's notification configs to retrieve * @param context optional client call context for the request (may be {@code null}) - * @return the list of task push notification configs + * @return the result containing the list of task push notification configs and pagination information * @throws A2AClientException if getting the task push notification configs fails for any reason */ - public abstract List listTaskPushNotificationConfigurations( + public abstract ListTaskPushNotificationConfigResult listTaskPushNotificationConfigurations( ListTaskPushNotificationConfigParams request, @Nullable ClientCallContext context) throws A2AClientException; diff --git a/client/base/src/main/java/io/a2a/client/Client.java b/client/base/src/main/java/io/a2a/client/Client.java index 0ca73446d..cb8ef954b 100644 --- a/client/base/src/main/java/io/a2a/client/Client.java +++ b/client/base/src/main/java/io/a2a/client/Client.java @@ -16,6 +16,7 @@ import io.a2a.spec.EventKind; import io.a2a.spec.GetTaskPushNotificationConfigParams; import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.ListTasksParams; import io.a2a.spec.ListTasksResult; import io.a2a.spec.Message; @@ -108,7 +109,7 @@ public TaskPushNotificationConfig getTaskPushNotificationConfiguration( } @Override - public List listTaskPushNotificationConfigurations( + public ListTaskPushNotificationConfigResult listTaskPushNotificationConfigurations( ListTaskPushNotificationConfigParams request, @Nullable ClientCallContext context) throws A2AClientException { return clientTransport.listTaskPushNotificationConfigurations(request, context); } diff --git a/client/transport/grpc/src/main/java/io/a2a/client/transport/grpc/GrpcTransport.java b/client/transport/grpc/src/main/java/io/a2a/client/transport/grpc/GrpcTransport.java index 101c599d1..a846210d9 100644 --- a/client/transport/grpc/src/main/java/io/a2a/client/transport/grpc/GrpcTransport.java +++ b/client/transport/grpc/src/main/java/io/a2a/client/transport/grpc/GrpcTransport.java @@ -29,6 +29,7 @@ import io.a2a.spec.GetTaskRequest; import io.a2a.spec.ListTaskPushNotificationConfigParams; import io.a2a.spec.ListTaskPushNotificationConfigRequest; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.ListTasksParams; import io.a2a.spec.ListTasksRequest; import io.a2a.spec.ListTasksResult; @@ -265,7 +266,7 @@ public TaskPushNotificationConfig getTaskPushNotificationConfiguration( } @Override - public List listTaskPushNotificationConfigurations( + public ListTaskPushNotificationConfigResult listTaskPushNotificationConfigurations( ListTaskPushNotificationConfigParams request, @Nullable ClientCallContext context) throws A2AClientException { checkNotNullParam("request", request); @@ -273,15 +274,16 @@ public List listTaskPushNotificationConfigurations( io.a2a.grpc.ListTaskPushNotificationConfigRequest grpcRequest = io.a2a.grpc.ListTaskPushNotificationConfigRequest.newBuilder() .setParent("tasks/" + request.id()) .setTenant(resolveTenant(request.tenant())) + .setPageSize(request.pageSize()) + .setPageToken(request.pageToken()) .build(); PayloadAndHeaders payloadAndHeaders = applyInterceptors(ListTaskPushNotificationConfigRequest.METHOD, grpcRequest, agentCard, context); try { A2AServiceBlockingV2Stub stubWithMetadata = createBlockingStubWithMetadata(context, payloadAndHeaders); - return stubWithMetadata.listTaskPushNotificationConfig(grpcRequest).getConfigsList().stream() - .map(FromProto::taskPushNotificationConfig) - .collect(Collectors.toList()); + io.a2a.grpc.ListTaskPushNotificationConfigResponse grpcResponse = stubWithMetadata.listTaskPushNotificationConfig(grpcRequest); + return FromProto.listTaskPushNotificationConfigResult(grpcResponse); } catch (StatusRuntimeException | StatusException e) { throw GrpcErrorMapper.mapGrpcError(e, "Failed to list task push notification config: "); } diff --git a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java index 0a453bff4..250503f96 100644 --- a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java +++ b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java @@ -45,6 +45,7 @@ import io.a2a.spec.ListTaskPushNotificationConfigParams; import io.a2a.spec.ListTaskPushNotificationConfigRequest; import io.a2a.spec.ListTaskPushNotificationConfigResponse; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.ListTasksParams; import io.a2a.spec.ListTasksRequest; import io.a2a.spec.ListTasksResponse; @@ -222,7 +223,7 @@ public TaskPushNotificationConfig getTaskPushNotificationConfiguration(GetTaskPu } @Override - public List listTaskPushNotificationConfigurations( + public ListTaskPushNotificationConfigResult listTaskPushNotificationConfigurations( ListTaskPushNotificationConfigParams request, @Nullable ClientCallContext context) throws A2AClientException { checkNotNullParam("request", request); diff --git a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransport.java b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransport.java index 41614744c..500daf6f8 100644 --- a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransport.java +++ b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransport.java @@ -41,6 +41,7 @@ import io.a2a.spec.GetTaskRequest; import io.a2a.spec.ListTaskPushNotificationConfigParams; import io.a2a.spec.ListTaskPushNotificationConfigRequest; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.ListTasksParams; import io.a2a.spec.ListTasksRequest; import io.a2a.spec.ListTasksResult; @@ -334,7 +335,7 @@ public TaskPushNotificationConfig getTaskPushNotificationConfiguration(GetTaskPu } @Override - public List listTaskPushNotificationConfigurations(ListTaskPushNotificationConfigParams request, @Nullable ClientCallContext context) throws A2AClientException { + public ListTaskPushNotificationConfigResult listTaskPushNotificationConfigurations(ListTaskPushNotificationConfigParams request, @Nullable ClientCallContext context) throws A2AClientException { checkNotNullParam("request", request); io.a2a.grpc.ListTaskPushNotificationConfigRequest.Builder builder = io.a2a.grpc.ListTaskPushNotificationConfigRequest.newBuilder(); @@ -356,7 +357,7 @@ public List listTaskPushNotificationConfigurations(L String httpResponseBody = response.body(); io.a2a.grpc.ListTaskPushNotificationConfigResponse.Builder responseBuilder = io.a2a.grpc.ListTaskPushNotificationConfigResponse.newBuilder(); JsonFormat.parser().merge(httpResponseBody, responseBuilder); - return ProtoUtils.FromProto.listTaskPushNotificationConfigParams(responseBuilder); + return ProtoUtils.FromProto.listTaskPushNotificationConfigResult(responseBuilder); } catch (A2AClientException e) { throw e; } catch (IOException | InterruptedException e) { diff --git a/client/transport/rest/src/test/java/io/a2a/client/transport/rest/RestTransportTest.java b/client/transport/rest/src/test/java/io/a2a/client/transport/rest/RestTransportTest.java index 784ddb7e2..954fafcae 100644 --- a/client/transport/rest/src/test/java/io/a2a/client/transport/rest/RestTransportTest.java +++ b/client/transport/rest/src/test/java/io/a2a/client/transport/rest/RestTransportTest.java @@ -44,6 +44,7 @@ import io.a2a.spec.FileWithUri; import io.a2a.spec.GetTaskPushNotificationConfigParams; import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.Message; import io.a2a.spec.MessageSendConfiguration; import io.a2a.spec.MessageSendParams; @@ -361,10 +362,10 @@ public void testListTaskPushNotificationConfigurations() throws Exception { ); RestTransport client = new RestTransport(CARD); - List taskPushNotificationConfigs = client.listTaskPushNotificationConfigurations( + ListTaskPushNotificationConfigResult result = client.listTaskPushNotificationConfigurations( new ListTaskPushNotificationConfigParams("de38c76d-d54c-436c-8b9f-4c2703648d64"), null); - assertEquals(2, taskPushNotificationConfigs.size()); - PushNotificationConfig pushNotificationConfig = taskPushNotificationConfigs.get(0).pushNotificationConfig(); + assertEquals(2, result.configs().size()); + PushNotificationConfig pushNotificationConfig = result.configs().get(0).pushNotificationConfig(); assertNotNull(pushNotificationConfig); assertEquals("https://example.com/callback", pushNotificationConfig.url()); assertEquals("10", pushNotificationConfig.id()); @@ -372,7 +373,7 @@ public void testListTaskPushNotificationConfigurations() throws Exception { assertTrue(authenticationInfo.schemes().size() == 1); assertEquals("jwt", authenticationInfo.schemes().get(0)); assertEquals("", authenticationInfo.credentials()); - pushNotificationConfig = taskPushNotificationConfigs.get(1).pushNotificationConfig(); + pushNotificationConfig = result.configs().get(1).pushNotificationConfig(); assertNotNull(pushNotificationConfig); assertEquals("https://test.com/callback", pushNotificationConfig.url()); assertEquals("5", pushNotificationConfig.id()); diff --git a/client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransport.java b/client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransport.java index 202e80492..4b978c717 100644 --- a/client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransport.java +++ b/client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransport.java @@ -10,6 +10,7 @@ import io.a2a.spec.EventKind; import io.a2a.spec.GetTaskPushNotificationConfigParams; import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.ListTasksParams; import io.a2a.spec.ListTasksResult; import io.a2a.spec.MessageSendParams; @@ -102,14 +103,14 @@ TaskPushNotificationConfig getTaskPushNotificationConfiguration( @Nullable ClientCallContext context) throws A2AClientException; /** - * Retrieve the list of push notification configurations for a specific task. + * Retrieve the list of push notification configurations for a specific task with pagination support. * * @param request the parameters specifying which task's notification configs to retrieve * @param context optional client call context for the request (may be {@code null}) - * @return the list of task push notification configs + * @return the result containing the list of task push notification configs and pagination information * @throws A2AClientException if getting the task push notification configs fails for any reason */ - List listTaskPushNotificationConfigurations( + ListTaskPushNotificationConfigResult listTaskPushNotificationConfigurations( ListTaskPushNotificationConfigParams request, @Nullable ClientCallContext context) throws A2AClientException; diff --git a/extras/push-notification-config-store-database-jpa/src/main/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java b/extras/push-notification-config-store-database-jpa/src/main/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java index b2eb7ba18..357cf42e1 100644 --- a/extras/push-notification-config-store-database-jpa/src/main/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java +++ b/extras/push-notification-config-store-database-jpa/src/main/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java @@ -1,5 +1,7 @@ package io.a2a.extras.pushnotificationconfigstore.database.jpa; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import jakarta.annotation.Priority; @@ -11,7 +13,10 @@ import io.a2a.json.JsonProcessingException; import io.a2a.server.tasks.PushNotificationConfigStore; +import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.PushNotificationConfig; +import io.a2a.spec.TaskPushNotificationConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +70,8 @@ public PushNotificationConfig setInfo(String taskId, PushNotificationConfig noti @Transactional @Override - public List getInfo(String taskId) { + public ListTaskPushNotificationConfigResult getInfo(ListTaskPushNotificationConfigParams params) { + String taskId = params.id(); LOGGER.debug("Retrieving PushNotificationConfigs for Task '{}'", taskId); try { List jpaConfigs = em.createQuery( @@ -88,13 +94,58 @@ public List getInfo(String taskId) { .toList(); LOGGER.debug("Successfully retrieved {} PushNotificationConfigs for Task '{}'", configs.size(), taskId); - return configs; + + // Handle pagination + if (configs.isEmpty()) { + return new ListTaskPushNotificationConfigResult(Collections.emptyList()); + } + + if (params.pageSize() <= 0) { + return new ListTaskPushNotificationConfigResult(convertPushNotificationConfig(configs, params), null); + } + + // Apply pageToken filtering if provided + List paginatedConfigs = configs; + if (params.pageToken() != null && !params.pageToken().isBlank()) { + int index = findFirstIndex(configs, params.pageToken()); + if (index < configs.size()) { + paginatedConfigs = configs.subList(index, configs.size()); + } + } + + // Apply page size limit + if (paginatedConfigs.size() <= params.pageSize()) { + return new ListTaskPushNotificationConfigResult(convertPushNotificationConfig(paginatedConfigs, params), null); + } + + String nextToken = paginatedConfigs.get(params.pageSize()).token(); + return new ListTaskPushNotificationConfigResult( + convertPushNotificationConfig(paginatedConfigs.subList(0, params.pageSize()), params), + nextToken); } catch (Exception e) { LOGGER.error("Failed to retrieve PushNotificationConfigs for Task '{}'", taskId, e); throw e; } } + private int findFirstIndex(List configs, String token) { + for (int i = 0; i < configs.size(); i++) { + if (token.equals(configs.get(i).token())) { + return i; + } + } + return configs.size(); + } + + private List convertPushNotificationConfig(List pushNotificationConfigList, ListTaskPushNotificationConfigParams params) { + List taskPushNotificationConfigList = new ArrayList<>(pushNotificationConfigList.size()); + for (PushNotificationConfig pushNotificationConfig : pushNotificationConfigList) { + TaskPushNotificationConfig taskPushNotificationConfig = new TaskPushNotificationConfig(params.id(), pushNotificationConfig, params.tenant()); + taskPushNotificationConfigList.add(taskPushNotificationConfig); + } + return taskPushNotificationConfigList; + } + @Transactional @Override public void deleteInfo(String taskId, String configId) { diff --git a/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java b/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java index e92a19b06..6871a0f09 100644 --- a/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java +++ b/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java @@ -3,10 +3,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import jakarta.inject.Inject; +import jakarta.transaction.Transactional; import java.util.List; import java.util.Queue; @@ -23,6 +25,8 @@ import io.a2a.spec.AgentCard; import io.a2a.spec.DeleteTaskPushNotificationConfigParams; import io.a2a.spec.GetTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.Message; import io.a2a.spec.PushNotificationConfig; import io.a2a.spec.Task; @@ -182,4 +186,235 @@ public void testJpaDatabasePushNotificationConfigStoreIntegration() throws Excep client.getTaskPushNotificationConfiguration(new GetTaskPushNotificationConfigParams(taskId)); }, "Getting a deleted config should throw an A2AClientException"); } + + private PushNotificationConfig createSamplePushConfig(String url, String configId, String token) { + return PushNotificationConfig.builder() + .url(url) + .id(configId) + .token(token) + .build(); + } + + @Test + @Transactional + public void testPaginationWithPageSize() { + String taskId = "task_pagination_" + System.currentTimeMillis(); + // Create 5 configs + for (int i = 0; i < 5; i++) { + PushNotificationConfig config = createSamplePushConfig( + "http://url" + i + ".com/callback", "cfg" + i, "token" + i); + pushNotificationConfigStore.setInfo(taskId, config); + } + + // Request first page with pageSize=2 + ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(taskId, 2, "", ""); + ListTaskPushNotificationConfigResult result = pushNotificationConfigStore.getInfo(params); + + assertNotNull(result); + assertEquals(2, result.configs().size(), "Should return 2 configs"); + assertNotNull(result.nextPageToken(), "Should have nextPageToken when more items exist"); + } + + @Test + @Transactional + public void testPaginationWithPageToken() { + String taskId = "task_pagination_token_" + System.currentTimeMillis(); + // Create 5 configs + createSamples(taskId, 5); + + // Get first page + ListTaskPushNotificationConfigParams firstPageParams = new ListTaskPushNotificationConfigParams(taskId, 2, "", ""); + ListTaskPushNotificationConfigResult firstPage = pushNotificationConfigStore.getInfo(firstPageParams); + assertNotNull(firstPage.nextPageToken()); + + // Get second page using nextPageToken + ListTaskPushNotificationConfigParams secondPageParams = new ListTaskPushNotificationConfigParams( + taskId, 2, firstPage.nextPageToken(), ""); + ListTaskPushNotificationConfigResult secondPage = pushNotificationConfigStore.getInfo(secondPageParams); + + assertNotNull(secondPage); + assertEquals(2, secondPage.configs().size(), "Should return 2 configs for second page"); + assertNotNull(secondPage.nextPageToken(), "Should have nextPageToken when more items exist"); + + // Verify NO overlap between pages - collect all IDs from both pages + List firstPageIds = firstPage.configs().stream() + .map(c -> c.pushNotificationConfig().id()) + .toList(); + List secondPageIds = secondPage.configs().stream() + .map(c -> c.pushNotificationConfig().id()) + .toList(); + + // Check that no ID from first page appears in second page + for (String id : firstPageIds) { + assertTrue(!secondPageIds.contains(id), + "Config " + id + " appears in both pages - overlap detected!"); + } + + // Also verify the pages are sequential (first page ends before second page starts) + // Since configs are created in order, we can verify the IDs + assertEquals("cfg0", firstPageIds.get(0)); + assertEquals("cfg1", firstPageIds.get(1)); + assertEquals("cfg2", secondPageIds.get(0)); + assertEquals("cfg3", secondPageIds.get(1)); + } + + @Test + @Transactional + public void testPaginationLastPage() { + String taskId = "task_pagination_last_" + System.currentTimeMillis(); + // Create 5 configs + createSamples(taskId, 5); + + // Get first page (2 items) + ListTaskPushNotificationConfigParams firstPageParams = new ListTaskPushNotificationConfigParams(taskId, 2, "", ""); + ListTaskPushNotificationConfigResult firstPage = pushNotificationConfigStore.getInfo(firstPageParams); + + // Get second page (2 items) + ListTaskPushNotificationConfigParams secondPageParams = new ListTaskPushNotificationConfigParams( + taskId, 2, firstPage.nextPageToken(), ""); + ListTaskPushNotificationConfigResult secondPage = pushNotificationConfigStore.getInfo(secondPageParams); + + // Get last page (1 item remaining) + ListTaskPushNotificationConfigParams lastPageParams = new ListTaskPushNotificationConfigParams( + taskId, 2, secondPage.nextPageToken(), ""); + ListTaskPushNotificationConfigResult lastPage = pushNotificationConfigStore.getInfo(lastPageParams); + + assertNotNull(lastPage); + assertEquals(1, lastPage.configs().size(), "Last page should have 1 remaining config"); + assertNull(lastPage.nextPageToken(), "Last page should not have nextPageToken"); + } + + @Test + @Transactional + public void testPaginationWithZeroPageSize() { + String taskId = "task_pagination_zero_" + System.currentTimeMillis(); + // Create 5 configs + createSamples(taskId, 5); + + // Request with pageSize=0 should return all configs + ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(taskId, 0, "", ""); + ListTaskPushNotificationConfigResult result = pushNotificationConfigStore.getInfo(params); + + assertNotNull(result); + assertEquals(5, result.configs().size(), "Should return all 5 configs when pageSize=0"); + assertNull(result.nextPageToken(), "Should not have nextPageToken when returning all"); + } + + @Test + @Transactional + public void testPaginationWithNegativePageSize() { + String taskId = "task_pagination_negative_" + System.currentTimeMillis(); + // Create 3 configs + createSamples(taskId, 3); + + // Request with negative pageSize should return all configs + ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(taskId, -1, "", ""); + ListTaskPushNotificationConfigResult result = pushNotificationConfigStore.getInfo(params); + + assertNotNull(result); + assertEquals(3, result.configs().size(), "Should return all configs when pageSize is negative"); + assertNull(result.nextPageToken(), "Should not have nextPageToken when returning all"); + } + + @Test + @Transactional + public void testPaginationPageSizeLargerThanConfigs() { + String taskId = "task_pagination_large_" + System.currentTimeMillis(); + // Create 3 configs + createSamples(taskId, 3); + + // Request with pageSize larger than available configs + ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(taskId, 10, "", ""); + ListTaskPushNotificationConfigResult result = pushNotificationConfigStore.getInfo(params); + + assertNotNull(result); + assertEquals(3, result.configs().size(), "Should return all 3 configs"); + assertNull(result.nextPageToken(), "Should not have nextPageToken when all configs fit in one page"); + } + + @Test + @Transactional + public void testPaginationExactlyPageSize() { + String taskId = "task_pagination_exact_" + System.currentTimeMillis(); + // Create exactly 3 configs + createSamples(taskId, 3); + + // Request with pageSize equal to number of configs + ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(taskId, 3, "", ""); + ListTaskPushNotificationConfigResult result = pushNotificationConfigStore.getInfo(params); + + assertNotNull(result); + assertEquals(3, result.configs().size(), "Should return all 3 configs"); + assertNull(result.nextPageToken(), "Should not have nextPageToken when configs exactly match pageSize"); + } + + @Test + @Transactional + public void testPaginationWithInvalidToken() { + String taskId = "task_pagination_invalid_token_" + System.currentTimeMillis(); + // Create 5 configs + createSamples(taskId, 5); + + // Request with invalid pageToken - JPA implementation behavior is to start from beginning + ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams( + taskId, 2, "invalid_token_that_does_not_exist", ""); + ListTaskPushNotificationConfigResult result = pushNotificationConfigStore.getInfo(params); + + assertNotNull(result); + // When token is not found, implementation starts from beginning + assertEquals(2, result.configs().size(), "Should return first page when token is not found"); + assertNotNull(result.nextPageToken(), "Should have nextPageToken since more items exist"); + } + + @Test + @Transactional + public void testPaginationEmptyTaskWithPageSize() { + String taskId = "task_pagination_empty_" + System.currentTimeMillis(); + // No configs created + + ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(taskId, 2, "", ""); + ListTaskPushNotificationConfigResult result = pushNotificationConfigStore.getInfo(params); + + assertNotNull(result); + assertTrue(result.configs().isEmpty(), "Should return empty list for non-existent task"); + assertNull(result.nextPageToken(), "Should not have nextPageToken for empty result"); + } + + @Test + @Transactional + public void testPaginationFullIteration() { + String taskId = "task_pagination_full_" + System.currentTimeMillis(); + // Create 7 configs + createSamples(taskId, 7); + + // Iterate through all pages with pageSize=3 + int totalCollected = 0; + String pageToken = ""; + int pageCount = 0; + + do { + ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(taskId, 3, pageToken, ""); + ListTaskPushNotificationConfigResult result = pushNotificationConfigStore.getInfo(params); + + totalCollected += result.configs().size(); + pageToken = result.nextPageToken(); + pageCount++; + + // Safety check to prevent infinite loop + assertTrue(pageCount <= 7, "Should not have more than 7 pages for 7 configs"); + + } while (pageToken != null); + + assertEquals(7, totalCollected, "Should collect all 7 configs across all pages"); + assertEquals(3, pageCount, "Should have exactly 3 pages (3+3+1)"); + } + + private void createSamples(String taskId, int size) { + // Create 7 configs + for (int i = 0; i < size; i++) { + PushNotificationConfig config = createSamplePushConfig( + "http://url" + i + ".com/callback", "cfg" + i, "token" + i); + pushNotificationConfigStore.setInfo(taskId, config); + } + } } diff --git a/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaPushNotificationConfigStoreTest.java b/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaPushNotificationConfigStoreTest.java index 2648cba87..1074b7848 100644 --- a/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaPushNotificationConfigStoreTest.java +++ b/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaPushNotificationConfigStoreTest.java @@ -28,8 +28,11 @@ import io.a2a.client.http.A2AHttpResponse; import io.a2a.server.tasks.BasePushNotificationSender; import io.a2a.server.tasks.PushNotificationConfigStore; +import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.PushNotificationConfig; import io.a2a.spec.Task; +import io.a2a.spec.TaskPushNotificationConfig; import io.a2a.spec.TaskState; import io.a2a.spec.TaskStatus; import io.quarkus.test.junit.QuarkusTest; @@ -90,11 +93,11 @@ public void testSetInfoAddsNewConfig() { assertEquals(config.url(), result.url()); assertEquals(config.id(), result.id()); - List configs = configStore.getInfo(taskId); - assertNotNull(configs); - assertEquals(1, configs.size()); - assertEquals(config.url(), configs.get(0).url()); - assertEquals(config.id(), configs.get(0).id()); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configResult); + assertEquals(1, configResult.configs().size()); + assertEquals(config.url(), configResult.configs().get(0).pushNotificationConfig().url()); + assertEquals(config.id(), configResult.configs().get(0).pushNotificationConfig().id()); } @Test @@ -109,11 +112,14 @@ public void testSetInfoAppendsToExistingConfig() { "http://updated.url/callback", "cfg_updated", null); configStore.setInfo(taskId, updatedConfig); - List configs = configStore.getInfo(taskId); - assertNotNull(configs); - assertEquals(2, configs.size()); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configResult); + assertEquals(2, configResult.configs().size()); // Find the configs by ID since order might vary + List configs = configResult.configs().stream() + .map(TaskPushNotificationConfig::pushNotificationConfig) + .toList(); PushNotificationConfig foundInitial = configs.stream() .filter(c -> "cfg_initial".equals(c.id())) .findFirst() @@ -140,9 +146,9 @@ public void testSetInfoWithoutConfigId() { PushNotificationConfig result = configStore.setInfo(taskId, initialConfig); assertEquals(taskId, result.id(), "Config ID should default to taskId when not provided"); - List configs = configStore.getInfo(taskId); - assertEquals(1, configs.size()); - assertEquals(taskId, configs.get(0).id()); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertEquals(1, configResult.configs().size()); + assertEquals(taskId, configResult.configs().get(0).pushNotificationConfig().id()); PushNotificationConfig updatedConfig = PushNotificationConfig.builder() .url("http://initial.url/callback_new") @@ -151,9 +157,9 @@ public void testSetInfoWithoutConfigId() { PushNotificationConfig updatedResult = configStore.setInfo(taskId, updatedConfig); assertEquals(taskId, updatedResult.id()); - configs = configStore.getInfo(taskId); - assertEquals(1, configs.size(), "Should replace existing config with same ID rather than adding new one"); - assertEquals(updatedConfig.url(), configs.get(0).url()); + configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertEquals(1, configResult.configs().size(), "Should replace existing config with same ID rather than adding new one"); + assertEquals(updatedConfig.url(), configResult.configs().get(0).pushNotificationConfig().url()); } @Test @@ -163,20 +169,20 @@ public void testGetInfoExistingConfig() { PushNotificationConfig config = createSamplePushConfig("http://get.this/callback", "cfg1", null); configStore.setInfo(taskId, config); - List retrievedConfigs = configStore.getInfo(taskId); - assertNotNull(retrievedConfigs); - assertEquals(1, retrievedConfigs.size()); - assertEquals(config.url(), retrievedConfigs.get(0).url()); - assertEquals(config.id(), retrievedConfigs.get(0).id()); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configResult); + assertEquals(1, configResult.configs().size()); + assertEquals(config.url(), configResult.configs().get(0).pushNotificationConfig().url()); + assertEquals(config.id(), configResult.configs().get(0).pushNotificationConfig().id()); } @Test @Transactional public void testGetInfoNonExistentConfig() { String taskId = "task_get_non_exist"; - List retrievedConfigs = configStore.getInfo(taskId); - assertNotNull(retrievedConfigs); - assertTrue(retrievedConfigs.isEmpty(), "Should return empty list for non-existent task ID"); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configResult); + assertTrue(configResult.configs().isEmpty(), "Should return empty list for non-existent task ID"); } @Test @@ -186,15 +192,15 @@ public void testDeleteInfoExistingConfig() { PushNotificationConfig config = createSamplePushConfig("http://delete.this/callback", "cfg1", null); configStore.setInfo(taskId, config); - List configs = configStore.getInfo(taskId); - assertNotNull(configs); - assertEquals(1, configs.size()); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configResult); + assertEquals(1, configResult.configs().size()); configStore.deleteInfo(taskId, config.id()); - List configsAfterDelete = configStore.getInfo(taskId); + ListTaskPushNotificationConfigResult configsAfterDelete = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); assertNotNull(configsAfterDelete); - assertTrue(configsAfterDelete.isEmpty(), "Should return empty list when no configs remain after deletion"); + assertTrue(configsAfterDelete.configs().isEmpty(), "Should return empty list when no configs remain after deletion"); } @Test @@ -204,9 +210,9 @@ public void testDeleteInfoNonExistentConfig() { // Should not throw an error configStore.deleteInfo(taskId, "non_existent_id"); - List configs = configStore.getInfo(taskId); - assertNotNull(configs); - assertTrue(configs.isEmpty(), "Should return empty list for non-existent task ID"); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configResult); + assertTrue(configResult.configs().isEmpty(), "Should return empty list for non-existent task ID"); } @Test @@ -221,9 +227,9 @@ public void testDeleteInfoWithNullConfigId() { // Delete with null configId should use taskId configStore.deleteInfo(taskId, null); - List configs = configStore.getInfo(taskId); - assertNotNull(configs); - assertTrue(configs.isEmpty(), "Should return empty list after deletion when using taskId as configId"); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configResult); + assertTrue(configResult.configs().isEmpty(), "Should return empty list after deletion when using taskId as configId"); } @Test @@ -316,11 +322,14 @@ public void testMultipleConfigsForSameTask() { configStore.setInfo(taskId, config1); configStore.setInfo(taskId, config2); - List configs = configStore.getInfo(taskId); - assertNotNull(configs); - assertEquals(2, configs.size()); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configResult); + assertEquals(2, configResult.configs().size()); // Verify both configs are present + List configs = configResult.configs().stream() + .map(TaskPushNotificationConfig::pushNotificationConfig) + .toList(); assertTrue(configs.stream().anyMatch(c -> "cfg1".equals(c.id()))); assertTrue(configs.stream().anyMatch(c -> "cfg2".equals(c.id()))); } @@ -338,10 +347,10 @@ public void testDeleteSpecificConfigFromMultiple() { // Delete only config1 configStore.deleteInfo(taskId, "cfg1"); - List configs = configStore.getInfo(taskId); - assertNotNull(configs); - assertEquals(1, configs.size()); - assertEquals("cfg2", configs.get(0).id()); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configResult); + assertEquals(1, configResult.configs().size()); + assertEquals("cfg2", configResult.configs().get(0).pushNotificationConfig().id()); } @Test @@ -355,13 +364,13 @@ public void testConfigStoreIntegration() { assertEquals(config.url(), storedConfig.url()); assertEquals(config.token(), storedConfig.token()); - List retrievedConfigs = configStore.getInfo(taskId); - assertEquals(1, retrievedConfigs.size()); - assertEquals(config.url(), retrievedConfigs.get(0).url()); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertEquals(1, configResult.configs().size()); + assertEquals(config.url(), configResult.configs().get(0).pushNotificationConfig().url()); // Test deletion configStore.deleteInfo(taskId, storedConfig.id()); - List afterDeletion = configStore.getInfo(taskId); - assertTrue(afterDeletion.isEmpty()); + ListTaskPushNotificationConfigResult afterDeletion = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertTrue(afterDeletion.configs().isEmpty()); } } diff --git a/reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2AServerRoutesTest.java b/reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2AServerRoutesTest.java index 8de9dd893..5c7582da5 100644 --- a/reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2AServerRoutesTest.java +++ b/reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2AServerRoutesTest.java @@ -2,6 +2,7 @@ import static io.a2a.spec.AgentCard.CURRENT_PROTOCOL_VERSION; import static io.a2a.transport.jsonrpc.context.JSONRPCContextKeys.METHOD_NAME_KEY; +import static java.util.Collections.singletonList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; @@ -33,6 +34,7 @@ import io.a2a.spec.GetTaskResponse; import io.a2a.spec.ListTaskPushNotificationConfigRequest; import io.a2a.spec.ListTaskPushNotificationConfigResponse; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.PushNotificationConfig; import io.a2a.spec.SendMessageRequest; import io.a2a.spec.SendMessageResponse; @@ -421,7 +423,7 @@ public void testListTaskPushNotificationConfig_MethodNameSetInContext() { .build(), null ); - ListTaskPushNotificationConfigResponse realResponse = new ListTaskPushNotificationConfigResponse("1", Collections.singletonList(config)); + ListTaskPushNotificationConfigResponse realResponse = new ListTaskPushNotificationConfigResponse("1", new ListTaskPushNotificationConfigResult(singletonList(config))); when(mockJsonRpcHandler.listPushNotificationConfig(any(ListTaskPushNotificationConfigRequest.class), any(ServerCallContext.class))).thenReturn(realResponse); diff --git a/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java b/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java index 29f150943..f3b5f076c 100644 --- a/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java +++ b/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java @@ -55,12 +55,17 @@ import io.a2a.spec.SendStreamingMessageRequest; import io.a2a.spec.SetTaskPushNotificationConfigRequest; import io.a2a.spec.SubscribeToTaskRequest; +import io.a2a.util.Utils; import org.jspecify.annotations.Nullable; @Singleton @Authenticated public class A2AServerRoutes { + private static final String HISTORY_LENGTH_PARAM = "historyLength"; + private static final String PAGE_SIZE_PARAM = "pageSize"; + private static final String PAGE_TOKEN_PARAM = "pageToken"; + @Inject RestHandler jsonRestHandler; @@ -125,9 +130,9 @@ public void listTasks(RoutingContext rc) { if (statusStr != null && !statusStr.isEmpty()) { statusStr = statusStr.toUpperCase(); } - String pageSizeStr = rc.request().params().get("pageSize"); - String pageToken = rc.request().params().get("pageToken"); - String historyLengthStr = rc.request().params().get("historyLength"); + String pageSizeStr = rc.request().params().get(PAGE_SIZE_PARAM); + String pageToken = rc.request().params().get(PAGE_TOKEN_PARAM); + String historyLengthStr = rc.request().params().get(HISTORY_LENGTH_PARAM); String lastUpdatedAfter = rc.request().params().get("lastUpdatedAfter"); String includeArtifactsStr = rc.request().params().get("includeArtifacts"); @@ -170,13 +175,13 @@ public void getTask(RoutingContext rc) { response = jsonRestHandler.createErrorResponse(new InvalidParamsError("bad task id")); } else { Integer historyLength = null; - if (rc.request().params().contains("history_length")) { - historyLength = Integer.valueOf(rc.request().params().get("history_length")); + if (rc.request().params().contains(HISTORY_LENGTH_PARAM)) { + historyLength = Integer.valueOf(rc.request().params().get(HISTORY_LENGTH_PARAM)); } response = jsonRestHandler.getTask(taskId, historyLength, extractTenant(rc), context); } } catch (NumberFormatException e) { - response = jsonRestHandler.createErrorResponse(new InvalidParamsError("bad history_length")); + response = jsonRestHandler.createErrorResponse(new InvalidParamsError("bad historyLength")); } catch (Throwable t) { response = jsonRestHandler.createErrorResponse(new InternalError(t.getMessage())); } finally { @@ -312,8 +317,18 @@ public void listTaskPushNotificationConfigurations(RoutingContext rc) { if (taskId == null || taskId.isEmpty()) { response = jsonRestHandler.createErrorResponse(new InvalidParamsError("bad task id")); } else { - response = jsonRestHandler.listTaskPushNotificationConfigurations(taskId, extractTenant(rc), context); + int pageSize = 0; + if (rc.request().params().contains(PAGE_SIZE_PARAM)) { + pageSize = Integer.parseInt(rc.request().params().get(PAGE_SIZE_PARAM)); + } + String pageToken = ""; + if (rc.request().params().contains(PAGE_TOKEN_PARAM)) { + pageToken = Utils.defaultIfNull(rc.request().params().get(PAGE_TOKEN_PARAM), ""); + } + response = jsonRestHandler.listTaskPushNotificationConfigurations(taskId, pageSize, pageToken, extractTenant(rc), context); } + } catch (NumberFormatException e) { + response = jsonRestHandler.createErrorResponse(new InvalidParamsError("bad " + PAGE_SIZE_PARAM)); } catch (Throwable t) { response = jsonRestHandler.createErrorResponse(new InternalError(t.getMessage())); } finally { @@ -355,6 +370,7 @@ private String extractTenant(RoutingContext rc) { } return tenantPath; } + /** * /** * Handles incoming GET requests to the agent card endpoint. diff --git a/reference/rest/src/test/java/io/a2a/server/rest/quarkus/A2AServerRoutesTest.java b/reference/rest/src/test/java/io/a2a/server/rest/quarkus/A2AServerRoutesTest.java index 84c7401e4..6ff3792c2 100644 --- a/reference/rest/src/test/java/io/a2a/server/rest/quarkus/A2AServerRoutesTest.java +++ b/reference/rest/src/test/java/io/a2a/server/rest/quarkus/A2AServerRoutesTest.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -252,7 +253,7 @@ public void testListTaskPushNotificationConfigurations_MethodNameSetInContext() when(mockHttpResponse.getStatusCode()).thenReturn(200); when(mockHttpResponse.getContentType()).thenReturn("application/json"); when(mockHttpResponse.getBody()).thenReturn("{}"); - when(mockRestHandler.listTaskPushNotificationConfigurations(anyString(), anyString(), any(ServerCallContext.class))) + when(mockRestHandler.listTaskPushNotificationConfigurations(anyString(), anyInt(), anyString(), anyString(), any(ServerCallContext.class))) .thenReturn(mockHttpResponse); ArgumentCaptor contextCaptor = ArgumentCaptor.forClass(ServerCallContext.class); @@ -261,7 +262,7 @@ public void testListTaskPushNotificationConfigurations_MethodNameSetInContext() routes.listTaskPushNotificationConfigurations(mockRoutingContext); // Assert - verify(mockRestHandler).listTaskPushNotificationConfigurations(eq("task123"), anyString(), contextCaptor.capture()); + verify(mockRestHandler).listTaskPushNotificationConfigurations(eq("task123"), anyInt(), anyString(), anyString(), contextCaptor.capture()); ServerCallContext capturedContext = contextCaptor.getValue(); assertNotNull(capturedContext); assertEquals(ListTaskPushNotificationConfigRequest.METHOD, capturedContext.getState().get(METHOD_NAME_KEY)); diff --git a/server-common/src/main/java/io/a2a/server/JSONRPCException.java b/server-common/src/main/java/io/a2a/server/JSONRPCException.java index d7d7d2c8a..cf7416b91 100644 --- a/server-common/src/main/java/io/a2a/server/JSONRPCException.java +++ b/server-common/src/main/java/io/a2a/server/JSONRPCException.java @@ -2,13 +2,30 @@ import io.a2a.spec.JSONRPCError; +/** + * Exception wrapper for JSON-RPC protocol errors. + *

+ * This exception encapsulates a {@link JSONRPCError} for handling + * protocol-level errors during JSON-RPC request processing. + *

+ */ public class JSONRPCException extends Exception{ private final JSONRPCError error; + /** + * Creates a JSONRPCException wrapping the specified error. + * + * @param error the JSON-RPC error + */ public JSONRPCException(JSONRPCError error) { this.error = error; } + /** + * Returns the wrapped JSON-RPC error. + * + * @return the JSON-RPC error + */ public JSONRPCError getError() { return error; } diff --git a/server-common/src/main/java/io/a2a/server/events/EventEnqueueHook.java b/server-common/src/main/java/io/a2a/server/events/EventEnqueueHook.java index 7bc19bf71..e61284917 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventEnqueueHook.java +++ b/server-common/src/main/java/io/a2a/server/events/EventEnqueueHook.java @@ -1,5 +1,15 @@ package io.a2a.server.events; +/** + * Hook interface for event queue enqueue operations. + * Implementations can be notified when items are enqueued to the event queue, + * allowing for custom behavior such as event replication or logging. + */ public interface EventEnqueueHook { + /** + * Called when an item is enqueued to the event queue. + * + * @param item the event queue item being enqueued + */ void onEnqueue(EventQueueItem item); } \ No newline at end of file diff --git a/server-common/src/main/java/io/a2a/server/events/EventQueue.java b/server-common/src/main/java/io/a2a/server/events/EventQueue.java index 92cc015d9..7ce8ad3aa 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventQueue.java +++ b/server-common/src/main/java/io/a2a/server/events/EventQueue.java @@ -16,21 +16,50 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Abstract base class for event queues that manage task event streaming. + *

+ * An EventQueue provides a thread-safe mechanism for enqueueing and dequeueing events + * related to task execution. It supports backpressure through semaphore-based throttling + * and hierarchical queue structures via MainQueue and ChildQueue implementations. + *

+ *

+ * Use {@link #builder()} to create configured instances or extend MainQueue/ChildQueue directly. + *

+ */ public abstract class EventQueue implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(EventQueue.class); + /** + * Default maximum queue size for event queues. + */ public static final int DEFAULT_QUEUE_SIZE = 1000; private final int queueSize; + /** + * Internal blocking queue for storing event queue items. + */ protected final BlockingQueue queue = new LinkedBlockingDeque<>(); + /** + * Semaphore for backpressure control, limiting the number of pending events. + */ protected final Semaphore semaphore; private volatile boolean closed = false; + /** + * Creates an EventQueue with the default queue size. + */ protected EventQueue() { this(DEFAULT_QUEUE_SIZE); } + /** + * Creates an EventQueue with the specified queue size. + * + * @param queueSize the maximum number of events that can be queued + * @throws IllegalArgumentException if queueSize is less than or equal to 0 + */ protected EventQueue(int queueSize) { if (queueSize <= 0) { throw new IllegalArgumentException("Queue size must be greater than 0"); @@ -40,6 +69,11 @@ protected EventQueue(int queueSize) { LOGGER.trace("Creating {} with queue size: {}", this, queueSize); } + /** + * Creates an EventQueue as a child of the specified parent queue. + * + * @param parent the parent event queue + */ protected EventQueue(EventQueue parent) { this(DEFAULT_QUEUE_SIZE); LOGGER.trace("Creating {}, parent: {}", this, parent); @@ -49,6 +83,13 @@ static EventQueueBuilder builder() { return new EventQueueBuilder(); } + /** + * Builder for creating configured EventQueue instances. + *

+ * Supports configuration of queue size, enqueue hooks, task association, + * close callbacks, and task state providers. + *

+ */ public static class EventQueueBuilder { private int queueSize = DEFAULT_QUEUE_SIZE; private @Nullable EventEnqueueHook hook; @@ -56,21 +97,45 @@ public static class EventQueueBuilder { private List onCloseCallbacks = new java.util.ArrayList<>(); private @Nullable TaskStateProvider taskStateProvider; + /** + * Sets the maximum queue size. + * + * @param queueSize the maximum number of events that can be queued + * @return this builder + */ public EventQueueBuilder queueSize(int queueSize) { this.queueSize = queueSize; return this; } + /** + * Sets the enqueue hook for event replication or logging. + * + * @param hook the hook to be invoked when items are enqueued + * @return this builder + */ public EventQueueBuilder hook(EventEnqueueHook hook) { this.hook = hook; return this; } + /** + * Associates this queue with a specific task ID. + * + * @param taskId the task identifier + * @return this builder + */ public EventQueueBuilder taskId(String taskId) { this.taskId = taskId; return this; } + /** + * Adds a callback to be executed when the queue is closed. + * + * @param onCloseCallback the callback to execute on close + * @return this builder + */ public EventQueueBuilder addOnCloseCallback(Runnable onCloseCallback) { if (onCloseCallback != null) { this.onCloseCallbacks.add(onCloseCallback); @@ -78,11 +143,22 @@ public EventQueueBuilder addOnCloseCallback(Runnable onCloseCallback) { return this; } + /** + * Sets the task state provider for tracking task finalization. + * + * @param taskStateProvider the task state provider + * @return this builder + */ public EventQueueBuilder taskStateProvider(TaskStateProvider taskStateProvider) { this.taskStateProvider = taskStateProvider; return this; } + /** + * Builds and returns the configured EventQueue. + * + * @return a new MainQueue instance + */ public EventQueue build() { if (hook != null || !onCloseCallbacks.isEmpty() || taskStateProvider != null) { return new MainQueue(queueSize, hook, taskId, onCloseCallbacks, taskStateProvider); @@ -92,18 +168,48 @@ public EventQueue build() { } } + /** + * Returns the configured queue size. + * + * @return the maximum number of events that can be queued + */ public int getQueueSize() { return queueSize; } + /** + * Waits for the queue poller to start consuming events. + * This method blocks until signaled by {@link #signalQueuePollerStarted()}. + * + * @throws InterruptedException if the thread is interrupted while waiting + */ public abstract void awaitQueuePollerStart() throws InterruptedException ; + /** + * Signals that the queue poller has started consuming events. + * This unblocks any threads waiting in {@link #awaitQueuePollerStart()}. + */ public abstract void signalQueuePollerStarted(); + /** + * Enqueues an event for processing. + * + * @param event the event to enqueue + */ public void enqueueEvent(Event event) { enqueueItem(new LocalEventQueueItem(event)); } + /** + * Enqueues an event queue item for processing. + *

+ * This method will block if the queue is full, waiting to acquire a semaphore permit. + * If the queue is closed, the event will not be enqueued and a warning will be logged. + *

+ * + * @param item the event queue item to enqueue + * @throws RuntimeException if interrupted while waiting to acquire the semaphore + */ public void enqueueItem(EventQueueItem item) { Event event = item.getEvent(); if (closed) { @@ -121,6 +227,16 @@ public void enqueueItem(EventQueueItem item) { LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this); } + /** + * Creates a child queue that shares events with this queue. + *

+ * For MainQueue: creates a ChildQueue that receives all events enqueued to the parent. + * For ChildQueue: throws IllegalStateException (only MainQueue can be tapped). + *

+ * + * @return a new ChildQueue instance + * @throws IllegalStateException if called on a ChildQueue + */ public abstract EventQueue tap(); /** @@ -172,12 +288,24 @@ public void enqueueItem(EventQueueItem item) { } } + /** + * Placeholder method for task completion notification. + * Currently not used as BlockingQueue.poll()/take() automatically remove events. + */ public void taskDone() { // TODO Not sure if needed yet. BlockingQueue.poll()/.take() remove the events. } + /** + * Closes this event queue gracefully, allowing pending events to be consumed. + */ public abstract void close(); + /** + * Closes this event queue with control over immediate shutdown. + * + * @param immediate if true, clears all pending events immediately; if false, allows graceful drain + */ public abstract void close(boolean immediate); /** @@ -191,14 +319,28 @@ public void taskDone() { */ public abstract void close(boolean immediate, boolean notifyParent); + /** + * Checks if this queue has been closed. + * + * @return true if the queue is closed, false otherwise + */ public boolean isClosed() { return closed; } + /** + * Internal method to close the queue gracefully. + * Delegates to {@link #doClose(boolean)} with immediate=false. + */ protected void doClose() { doClose(false); } + /** + * Internal method to close the queue with control over immediate shutdown. + * + * @param immediate if true, clears all pending events immediately; if false, allows graceful drain + */ protected void doClose(boolean immediate) { synchronized (this) { if (closed) { diff --git a/server-common/src/main/java/io/a2a/server/events/EventQueueClosedException.java b/server-common/src/main/java/io/a2a/server/events/EventQueueClosedException.java index f8feaa824..5fe80637d 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventQueueClosedException.java +++ b/server-common/src/main/java/io/a2a/server/events/EventQueueClosedException.java @@ -1,4 +1,8 @@ package io.a2a.server.events; +/** + * Exception thrown when attempting to dequeue from a closed and empty event queue. + * This signals to consumers that no more events will be available from the queue. + */ public class EventQueueClosedException extends Exception { } diff --git a/server-common/src/main/java/io/a2a/server/events/NoTaskQueueException.java b/server-common/src/main/java/io/a2a/server/events/NoTaskQueueException.java index d97c3a261..5783dd214 100644 --- a/server-common/src/main/java/io/a2a/server/events/NoTaskQueueException.java +++ b/server-common/src/main/java/io/a2a/server/events/NoTaskQueueException.java @@ -1,21 +1,56 @@ package io.a2a.server.events; +/** + * Exception thrown when attempting to access a task queue that does not exist. + *

+ * This exception is typically thrown when trying to retrieve or operate on + * an event queue for a task that has not been created or has been removed. + *

+ */ public class NoTaskQueueException extends RuntimeException { + /** + * Creates a NoTaskQueueException with no message. + */ public NoTaskQueueException() { } + /** + * Creates a NoTaskQueueException with the specified detail message. + * + * @param message the detail message + */ public NoTaskQueueException(String message) { super(message); } + /** + * Creates a NoTaskQueueException with the specified detail message and cause. + * + * @param message the detail message + * @param cause the cause + */ public NoTaskQueueException(String message, Throwable cause) { super(message, cause); } + /** + * Creates a NoTaskQueueException with the specified cause. + * + * @param cause the cause + */ public NoTaskQueueException(Throwable cause) { super(cause); } + /** + * Creates a NoTaskQueueException with the specified message, cause, + * suppression enabled or disabled, and writable stack trace enabled or disabled. + * + * @param message the detail message + * @param cause the cause + * @param enableSuppression whether suppression is enabled or disabled + * @param writableStackTrace whether the stack trace should be writable + */ public NoTaskQueueException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index 00f4c3ad9..39fb01104 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -6,7 +6,6 @@ import static java.util.concurrent.TimeUnit.*; import java.time.Instant; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -52,6 +51,7 @@ import io.a2a.spec.InvalidParamsError; import io.a2a.spec.JSONRPCError; import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.ListTasksParams; import io.a2a.spec.ListTasksResult; import io.a2a.spec.Message; @@ -584,25 +584,25 @@ public TaskPushNotificationConfig onGetTaskPushNotificationConfig( throw new TaskNotFoundError(); } - List pushNotificationConfigList = pushConfigStore.getInfo(params.id()); - if (pushNotificationConfigList == null || pushNotificationConfigList.isEmpty()) { + ListTaskPushNotificationConfigResult listTaskPushNotificationConfigResult = pushConfigStore.getInfo(new ListTaskPushNotificationConfigParams(params.id())); + if (listTaskPushNotificationConfigResult == null || listTaskPushNotificationConfigResult.isEmpty()) { throw new InternalError("No push notification config found"); } @Nullable String configId = params.pushNotificationConfigId(); - return new TaskPushNotificationConfig(params.id(), getPushNotificationConfig(pushNotificationConfigList, configId), params.tenant()); + return new TaskPushNotificationConfig(params.id(), getPushNotificationConfig(listTaskPushNotificationConfigResult, configId), params.tenant()); } - private PushNotificationConfig getPushNotificationConfig(List notificationConfigList, + private PushNotificationConfig getPushNotificationConfig(ListTaskPushNotificationConfigResult notificationConfigList, @Nullable String configId) { if (configId != null) { - for (PushNotificationConfig notificationConfig : notificationConfigList) { - if (configId.equals(notificationConfig.id())) { - return notificationConfig; + for (TaskPushNotificationConfig notificationConfig : notificationConfigList.configs()) { + if (configId.equals(notificationConfig.pushNotificationConfig().id())) { + return notificationConfig.pushNotificationConfig(); } } } - return notificationConfigList.get(0); + return notificationConfigList.configs().get(0).pushNotificationConfig(); } @Override @@ -637,26 +637,16 @@ public Flow.Publisher onResubscribeToTask( } @Override - public List onListTaskPushNotificationConfig( + public ListTaskPushNotificationConfigResult onListTaskPushNotificationConfig( ListTaskPushNotificationConfigParams params, ServerCallContext context) throws JSONRPCError { if (pushConfigStore == null) { throw new UnsupportedOperationError(); } - Task task = taskStore.get(params.id()); if (task == null) { throw new TaskNotFoundError(); } - - List pushNotificationConfigList = pushConfigStore.getInfo(params.id()); - List taskPushNotificationConfigList = new ArrayList<>(); - if (pushNotificationConfigList != null) { - for (PushNotificationConfig pushNotificationConfig : pushNotificationConfigList) { - TaskPushNotificationConfig taskPushNotificationConfig = new TaskPushNotificationConfig(params.id(), pushNotificationConfig, params.tenant()); - taskPushNotificationConfigList.add(taskPushNotificationConfig); - } - } - return taskPushNotificationConfigList; + return pushConfigStore.getInfo(params); } @Override diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/RequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/RequestHandler.java index 64ba8eea1..ff7384e75 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/RequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/RequestHandler.java @@ -9,6 +9,7 @@ import io.a2a.spec.GetTaskPushNotificationConfigParams; import io.a2a.spec.JSONRPCError; import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.ListTasksParams; import io.a2a.spec.ListTasksResult; import io.a2a.spec.MessageSendParams; @@ -51,7 +52,7 @@ Flow.Publisher onResubscribeToTask( TaskIdParams params, ServerCallContext context) throws JSONRPCError; - List onListTaskPushNotificationConfig( + ListTaskPushNotificationConfigResult onListTaskPushNotificationConfig( ListTaskPushNotificationConfigParams params, ServerCallContext context) throws JSONRPCError; diff --git a/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java b/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java index 8c07d904c..c6c887577 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java +++ b/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java @@ -12,11 +12,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import io.a2a.json.JsonProcessingException; import io.a2a.client.http.A2AHttpClient; import io.a2a.client.http.JdkA2AHttpClient; import io.a2a.json.JsonUtil; +import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.PushNotificationConfig; import io.a2a.spec.Task; @@ -44,14 +45,14 @@ public BasePushNotificationSender(PushNotificationConfigStore configStore, A2AHt @Override public void sendNotification(Task task) { - List pushConfigs = configStore.getInfo(task.id()); + ListTaskPushNotificationConfigResult pushConfigs = configStore.getInfo(new ListTaskPushNotificationConfigParams(task.id())); if (pushConfigs == null || pushConfigs.isEmpty()) { return; } - List> dispatchResults = pushConfigs + List> dispatchResults = pushConfigs.configs() .stream() - .map(pushConfig -> dispatch(task, pushConfig)) + .map(pushConfig -> dispatch(task, pushConfig.pushNotificationConfig())) .toList(); CompletableFuture allFutures = CompletableFuture.allOf(dispatchResults.toArray(new CompletableFuture[0])); CompletableFuture dispatchResult = allFutures.thenApply(v -> dispatchResults.stream() diff --git a/server-common/src/main/java/io/a2a/server/tasks/InMemoryPushNotificationConfigStore.java b/server-common/src/main/java/io/a2a/server/tasks/InMemoryPushNotificationConfigStore.java index 69ec5e15d..b278d32d6 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/InMemoryPushNotificationConfigStore.java +++ b/server-common/src/main/java/io/a2a/server/tasks/InMemoryPushNotificationConfigStore.java @@ -1,5 +1,7 @@ package io.a2a.server.tasks; +import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -10,14 +12,13 @@ import java.util.List; import java.util.Map; -import org.jspecify.annotations.Nullable; - import io.a2a.spec.PushNotificationConfig; +import io.a2a.spec.TaskPushNotificationConfig; /** * In-memory implementation of the PushNotificationConfigStore interface. * - * Stores push notification configurations in memory + * Stores push notification configurations in memory */ @ApplicationScoped public class InMemoryPushNotificationConfigStore implements PushNotificationConfigStore { @@ -51,8 +52,48 @@ public PushNotificationConfig setInfo(String taskId, PushNotificationConfig noti } @Override - public @Nullable List getInfo(String taskId) { - return pushNotificationInfos.get(taskId); + public ListTaskPushNotificationConfigResult getInfo(ListTaskPushNotificationConfigParams params) { + List configs = pushNotificationInfos.get(params.id()); + if (configs == null) { + return new ListTaskPushNotificationConfigResult(Collections.emptyList()); + } + if (params.pageSize() <= 0) { + return new ListTaskPushNotificationConfigResult(convertPushNotificationConfig(configs, params), null); + } + if (params.pageToken() != null && !params.pageToken().isBlank()) { + //find first index + int index = findFirstIndex(configs, params.pageToken()); + if (index < configs.size()) { + configs = configs.subList(index, configs.size()); + } + } + if (configs.size() <= params.pageSize()) { + return new ListTaskPushNotificationConfigResult(convertPushNotificationConfig(configs, params), null); + } + String newToken = configs.get(params.pageSize()).token(); + return new ListTaskPushNotificationConfigResult(convertPushNotificationConfig(configs.subList(0, params.pageSize()), params), newToken); + } + + private int findFirstIndex(List configs, String token) { + //find first index + Iterator iter = configs.iterator(); + int index = 0; + while (iter.hasNext()) { + if (token.equals(iter.next().token())) { + return index; + } + index++; + } + return index; + } + + private List convertPushNotificationConfig(List pushNotificationConfigList, ListTaskPushNotificationConfigParams params) { + List taskPushNotificationConfigList = new ArrayList<>(pushNotificationConfigList.size()); + for (PushNotificationConfig pushNotificationConfig : pushNotificationConfigList) { + TaskPushNotificationConfig taskPushNotificationConfig = new TaskPushNotificationConfig(params.id(), pushNotificationConfig, params.tenant()); + taskPushNotificationConfigList.add(taskPushNotificationConfig); + } + return taskPushNotificationConfigList; } @Override @@ -77,4 +118,4 @@ public void deleteInfo(String taskId, String configId) { pushNotificationInfos.remove(taskId); } } -} \ No newline at end of file +} diff --git a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java index bb853e297..669fee0a8 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java +++ b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java @@ -14,6 +14,17 @@ import io.a2a.spec.Task; import org.jspecify.annotations.Nullable; +/** + * In-memory implementation of {@link TaskStore} and {@link TaskStateProvider}. + *

+ * This implementation uses a {@link ConcurrentHashMap} to store tasks in memory. + * Tasks are lost on application restart. For persistent storage, use a database-backed + * implementation such as the JPA TaskStore in the extras module. + *

+ *

+ * This is the default TaskStore used when no other implementation is provided. + *

+ */ @ApplicationScoped public class InMemoryTaskStore implements TaskStore, TaskStateProvider { diff --git a/server-common/src/main/java/io/a2a/server/tasks/PushNotificationConfigStore.java b/server-common/src/main/java/io/a2a/server/tasks/PushNotificationConfigStore.java index 6e721a6ec..16a658f0e 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/PushNotificationConfigStore.java +++ b/server-common/src/main/java/io/a2a/server/tasks/PushNotificationConfigStore.java @@ -1,8 +1,8 @@ package io.a2a.server.tasks; -import java.util.List; +import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; -import org.jspecify.annotations.Nullable; import io.a2a.spec.PushNotificationConfig; @@ -21,10 +21,10 @@ public interface PushNotificationConfigStore { /** * Retrieves the push notification configuration for a task. - * @param taskId the task ID - * @return the push notification configurations for a task, or null if not found + * @param params the parameters for listing push notification configurations + * @return the push notification configurations for a task, or with empty list if not found */ - @Nullable List getInfo(String taskId); + ListTaskPushNotificationConfigResult getInfo(ListTaskPushNotificationConfigParams params); /** * Deletes the push notification configuration for a task. diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java b/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java index 6d4df5dbd..7d15e032a 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java +++ b/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java @@ -6,11 +6,34 @@ import io.a2a.spec.ListTasksResult; import io.a2a.spec.Task; +/** + * Storage interface for managing task persistence. + *

+ * Implementations can use in-memory storage, databases, or other persistence mechanisms. + * Default implementation is {@code InMemoryTaskStore}. + *

+ */ public interface TaskStore { + /** + * Saves or updates a task. + * + * @param task the task to save + */ void save(Task task); + /** + * Retrieves a task by its ID. + * + * @param taskId the task identifier + * @return the task if found, null otherwise + */ @Nullable Task get(String taskId); + /** + * Deletes a task by its ID. + * + * @param taskId the task identifier + */ void delete(String taskId); /** diff --git a/server-common/src/main/java/io/a2a/server/util/async/Internal.java b/server-common/src/main/java/io/a2a/server/util/async/Internal.java index 2b8cd100e..7774d4838 100644 --- a/server-common/src/main/java/io/a2a/server/util/async/Internal.java +++ b/server-common/src/main/java/io/a2a/server/util/async/Internal.java @@ -5,6 +5,13 @@ import jakarta.inject.Qualifier; +/** + * CDI qualifier for internal async executor beans. + *

+ * This qualifier is used to distinguish internal async executors from + * application-level executors when multiple executor beans exist. + *

+ */ @Qualifier @Retention(RetentionPolicy.RUNTIME) public @interface Internal { diff --git a/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java b/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java index abf881752..31a57f137 100644 --- a/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java +++ b/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java @@ -22,6 +22,8 @@ import io.a2a.server.tasks.InMemoryTaskStore; import io.a2a.server.tasks.TaskUpdater; import io.a2a.spec.JSONRPCError; +import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.Message; import io.a2a.spec.MessageSendConfiguration; import io.a2a.spec.MessageSendParams; @@ -863,12 +865,13 @@ void testBlockingMessageStoresPushNotificationConfigForNewTask() throws Exceptio assertEquals(taskId, returnedTask.id()); // THE KEY ASSERTION: Verify pushNotificationConfig was stored - List storedConfigs = pushConfigStore.getInfo(taskId); + ListTaskPushNotificationConfigResult storedConfigs = pushConfigStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); assertNotNull(storedConfigs, "Push notification config should be stored for new task"); assertEquals(1, storedConfigs.size(), "Should have exactly 1 push config stored"); - assertEquals("config-1", storedConfigs.get(0).id()); - assertEquals("https://example.com/webhook", storedConfigs.get(0).url()); + PushNotificationConfig storedConfig = storedConfigs.configs().get(0).pushNotificationConfig(); + assertEquals("config-1", storedConfig.id()); + assertEquals("https://example.com/webhook", storedConfig.url()); } /** @@ -940,13 +943,12 @@ void testBlockingMessageStoresPushNotificationConfigForExistingTask() throws Exc assertTrue(result instanceof Task, "Result should be a Task"); // Verify pushNotificationConfig was stored (initMessageSend path) - List storedConfigs = pushConfigStore.getInfo(taskId); - assertNotNull(storedConfigs, - "Push notification config should be stored for existing task"); - assertEquals(1, storedConfigs.size(), - "Should have exactly 1 push config stored"); - assertEquals("config-existing-1", storedConfigs.get(0).id()); - assertEquals("https://example.com/existing-webhook", storedConfigs.get(0).url()); + ListTaskPushNotificationConfigResult storedConfigs = pushConfigStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(storedConfigs,"Push notification config should be stored for existing task"); + assertEquals(1, storedConfigs.size(),"Should have exactly 1 push config stored"); + PushNotificationConfig storedConfig = storedConfigs.configs().get(0).pushNotificationConfig(); + assertEquals("config-existing-1", storedConfig.id()); + assertEquals("https://example.com/existing-webhook", storedConfig.url()); } /** diff --git a/server-common/src/test/java/io/a2a/server/tasks/InMemoryPushNotificationConfigStoreTest.java b/server-common/src/test/java/io/a2a/server/tasks/InMemoryPushNotificationConfigStoreTest.java index d90629b09..c8ccd669f 100644 --- a/server-common/src/test/java/io/a2a/server/tasks/InMemoryPushNotificationConfigStoreTest.java +++ b/server-common/src/test/java/io/a2a/server/tasks/InMemoryPushNotificationConfigStoreTest.java @@ -16,8 +16,11 @@ import io.a2a.client.http.A2AHttpClient; import io.a2a.client.http.A2AHttpResponse; import io.a2a.common.A2AHeaders; +import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.PushNotificationConfig; import io.a2a.spec.Task; +import io.a2a.spec.TaskPushNotificationConfig; import io.a2a.spec.TaskState; import io.a2a.spec.TaskStatus; import org.junit.jupiter.api.BeforeEach; @@ -98,11 +101,11 @@ public void testSetInfoAddsNewConfig() { assertEquals(config.url(), result.url()); assertEquals(config.id(), result.id()); - List configs = configStore.getInfo(taskId); - assertNotNull(configs); - assertEquals(1, configs.size()); - assertEquals(config.url(), configs.get(0).url()); - assertEquals(config.id(), configs.get(0).id()); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configResult); + assertEquals(1, configResult.configs().size()); + assertEquals(config.url(), configResult.configs().get(0).pushNotificationConfig().url()); + assertEquals(config.id(), configResult.configs().get(0).pushNotificationConfig().id()); } @Test @@ -116,11 +119,14 @@ public void testSetInfoAppendsToExistingConfig() { "http://updated.url/callback", "cfg_updated", null); configStore.setInfo(taskId, updatedConfig); - List configs = configStore.getInfo(taskId); - assertNotNull(configs); - assertEquals(2, configs.size()); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configResult); + assertEquals(2, configResult.configs().size()); // Find the configs by ID since order might vary + List configs = configResult.configs().stream() + .map(TaskPushNotificationConfig::pushNotificationConfig) + .toList(); PushNotificationConfig foundInitial = configs.stream() .filter(c -> "cfg_initial".equals(c.id())) .findFirst() @@ -146,9 +152,9 @@ public void testSetInfoWithoutConfigId() { PushNotificationConfig result = configStore.setInfo(taskId, initialConfig); assertEquals(taskId, result.id(), "Config ID should default to taskId when not provided"); - List configs = configStore.getInfo(taskId); - assertEquals(1, configs.size()); - assertEquals(taskId, configs.get(0).id()); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertEquals(1, configResult.configs().size()); + assertEquals(taskId, configResult.configs().get(0).pushNotificationConfig().id()); PushNotificationConfig updatedConfig = PushNotificationConfig.builder() .url("http://initial.url/callback_new") @@ -157,9 +163,9 @@ public void testSetInfoWithoutConfigId() { PushNotificationConfig updatedResult = configStore.setInfo(taskId, updatedConfig); assertEquals(taskId, updatedResult.id()); - configs = configStore.getInfo(taskId); - assertEquals(1, configs.size(), "Should replace existing config with same ID rather than adding new one"); - assertEquals(updatedConfig.url(), configs.get(0).url()); + configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertEquals(1, configResult.configs().size(), "Should replace existing config with same ID rather than adding new one"); + assertEquals(updatedConfig.url(), configResult.configs().get(0).pushNotificationConfig().url()); } @Test @@ -168,18 +174,19 @@ public void testGetInfoExistingConfig() { PushNotificationConfig config = createSamplePushConfig("http://get.this/callback", "cfg1", null); configStore.setInfo(taskId, config); - List retrievedConfigs = configStore.getInfo(taskId); - assertNotNull(retrievedConfigs); - assertEquals(1, retrievedConfigs.size()); - assertEquals(config.url(), retrievedConfigs.get(0).url()); - assertEquals(config.id(), retrievedConfigs.get(0).id()); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configResult); + assertEquals(1, configResult.configs().size()); + assertEquals(config.url(), configResult.configs().get(0).pushNotificationConfig().url()); + assertEquals(config.id(), configResult.configs().get(0).pushNotificationConfig().id()); } @Test public void testGetInfoNonExistentConfig() { String taskId = "task_get_non_exist"; - List retrievedConfigs = configStore.getInfo(taskId); - assertNull(retrievedConfigs, "Should return null for non-existent task ID"); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configResult); + assertTrue(configResult.configs().isEmpty(), "Should return empty list for non-existent task ID"); } @Test @@ -188,14 +195,15 @@ public void testDeleteInfoExistingConfig() { PushNotificationConfig config = createSamplePushConfig("http://delete.this/callback", "cfg1", null); configStore.setInfo(taskId, config); - List configs = configStore.getInfo(taskId); - assertNotNull(configs); - assertEquals(1, configs.size()); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configResult); + assertEquals(1, configResult.configs().size()); configStore.deleteInfo(taskId, config.id()); - List configsAfterDelete = configStore.getInfo(taskId); - assertNull(configsAfterDelete, "Should return null when no configs remain after deletion"); + ListTaskPushNotificationConfigResult configsAfterDelete = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configsAfterDelete); + assertTrue(configsAfterDelete.configs().isEmpty(), "Should return empty list when no configs remain after deletion"); } @Test @@ -204,8 +212,9 @@ public void testDeleteInfoNonExistentConfig() { // Should not throw an error configStore.deleteInfo(taskId, "non_existent_id"); - List configs = configStore.getInfo(taskId); - assertNull(configs, "Should still return null for non-existent task ID"); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configResult); + assertTrue(configResult.configs().isEmpty(), "Should return empty list for non-existent task ID"); } @Test @@ -219,8 +228,9 @@ public void testDeleteInfoWithNullConfigId() { // Delete with null configId should use taskId configStore.deleteInfo(taskId, null); - List configs = configStore.getInfo(taskId); - assertNull(configs, "Should return null after deletion when using taskId as configId"); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configResult); + assertTrue(configResult.configs().isEmpty(), "Should return empty list after deletion when using taskId as configId"); } @Test @@ -324,11 +334,14 @@ public void testMultipleConfigsForSameTask() { configStore.setInfo(taskId, config1); configStore.setInfo(taskId, config2); - List configs = configStore.getInfo(taskId); - assertNotNull(configs); - assertEquals(2, configs.size()); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configResult); + assertEquals(2, configResult.configs().size()); // Verify both configs are present + List configs = configResult.configs().stream() + .map(TaskPushNotificationConfig::pushNotificationConfig) + .toList(); assertTrue(configs.stream().anyMatch(c -> "cfg1".equals(c.id()))); assertTrue(configs.stream().anyMatch(c -> "cfg2".equals(c.id()))); } @@ -345,10 +358,10 @@ public void testDeleteSpecificConfigFromMultiple() { // Delete only config1 configStore.deleteInfo(taskId, "cfg1"); - List configs = configStore.getInfo(taskId); - assertNotNull(configs); - assertEquals(1, configs.size()); - assertEquals("cfg2", configs.get(0).id()); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(configResult); + assertEquals(1, configResult.configs().size()); + assertEquals("cfg2", configResult.configs().get(0).pushNotificationConfig().id()); } @Test @@ -361,14 +374,251 @@ public void testConfigStoreIntegration() { assertEquals(config.url(), storedConfig.url()); assertEquals(config.token(), storedConfig.token()); - List retrievedConfigs = configStore.getInfo(taskId); - assertEquals(1, retrievedConfigs.size()); - assertEquals(config.url(), retrievedConfigs.get(0).url()); + ListTaskPushNotificationConfigResult configResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertEquals(1, configResult.configs().size()); + assertEquals(config.url(), configResult.configs().get(0).pushNotificationConfig().url()); // Test deletion configStore.deleteInfo(taskId, storedConfig.id()); - List afterDeletion = configStore.getInfo(taskId); - assertNull(afterDeletion); + ListTaskPushNotificationConfigResult afterDeletion = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId)); + assertNotNull(afterDeletion); + assertTrue(afterDeletion.configs().isEmpty()); + } + + @Test + public void testPaginationWithPageSize() { + String taskId = "task_pagination"; + // Create 5 configs + for (int i = 0; i < 5; i++) { + PushNotificationConfig config = createSamplePushConfig( + "http://url" + i + ".com/callback", "cfg" + i, "token" + i); + configStore.setInfo(taskId, config); + } + + // Request first page with pageSize=2 + ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(taskId, 2, "", ""); + ListTaskPushNotificationConfigResult result = configStore.getInfo(params); + + assertNotNull(result); + assertEquals(2, result.configs().size(), "Should return 2 configs"); + assertNotNull(result.nextPageToken(), "Should have nextPageToken when more items exist"); + } + + @Test + public void testPaginationWithPageToken() { + String taskId = "task_pagination_token"; + // Create 5 configs + for (int i = 0; i < 5; i++) { + PushNotificationConfig config = createSamplePushConfig( + "http://url" + i + ".com/callback", "cfg" + i, "token" + i); + configStore.setInfo(taskId, config); + } + + // Get first page + ListTaskPushNotificationConfigParams firstPageParams = new ListTaskPushNotificationConfigParams(taskId, 2, "", ""); + ListTaskPushNotificationConfigResult firstPage = configStore.getInfo(firstPageParams); + assertNotNull(firstPage.nextPageToken()); + + // Get second page using nextPageToken + ListTaskPushNotificationConfigParams secondPageParams = new ListTaskPushNotificationConfigParams( + taskId, 2, firstPage.nextPageToken(), ""); + ListTaskPushNotificationConfigResult secondPage = configStore.getInfo(secondPageParams); + + assertNotNull(secondPage); + assertEquals(2, secondPage.configs().size(), "Should return 2 configs for second page"); + assertNotNull(secondPage.nextPageToken(), "Should have nextPageToken when more items exist"); + + // Verify NO overlap between pages - collect all IDs from both pages + List firstPageIds = firstPage.configs().stream() + .map(c -> c.pushNotificationConfig().id()) + .toList(); + List secondPageIds = secondPage.configs().stream() + .map(c -> c.pushNotificationConfig().id()) + .toList(); + + // Check that no ID from first page appears in second page + for (String id : firstPageIds) { + assertTrue(!secondPageIds.contains(id), + "Config " + id + " appears in both pages - overlap detected!"); + } + + // Also verify the pages are sequential (first page ends before second page starts) + // Since configs are created in order, we can verify the IDs + assertEquals("cfg0", firstPageIds.get(0)); + assertEquals("cfg1", firstPageIds.get(1)); + assertEquals("cfg2", secondPageIds.get(0)); + assertEquals("cfg3", secondPageIds.get(1)); + } + + @Test + public void testPaginationLastPage() { + String taskId = "task_pagination_last"; + // Create 5 configs + for (int i = 0; i < 5; i++) { + PushNotificationConfig config = createSamplePushConfig( + "http://url" + i + ".com/callback", "cfg" + i, "token" + i); + configStore.setInfo(taskId, config); + } + + // Get first page (2 items) + ListTaskPushNotificationConfigParams firstPageParams = new ListTaskPushNotificationConfigParams(taskId, 2, "", ""); + ListTaskPushNotificationConfigResult firstPage = configStore.getInfo(firstPageParams); + + // Get second page (2 items) + ListTaskPushNotificationConfigParams secondPageParams = new ListTaskPushNotificationConfigParams( + taskId, 2, firstPage.nextPageToken(), ""); + ListTaskPushNotificationConfigResult secondPage = configStore.getInfo(secondPageParams); + + // Get last page (1 item remaining) + ListTaskPushNotificationConfigParams lastPageParams = new ListTaskPushNotificationConfigParams( + taskId, 2, secondPage.nextPageToken(), ""); + ListTaskPushNotificationConfigResult lastPage = configStore.getInfo(lastPageParams); + + assertNotNull(lastPage); + assertEquals(1, lastPage.configs().size(), "Last page should have 1 remaining config"); + assertNull(lastPage.nextPageToken(), "Last page should not have nextPageToken"); + } + + @Test + public void testPaginationWithZeroPageSize() { + String taskId = "task_pagination_zero"; + // Create 5 configs + for (int i = 0; i < 5; i++) { + PushNotificationConfig config = createSamplePushConfig( + "http://url" + i + ".com/callback", "cfg" + i, "token" + i); + configStore.setInfo(taskId, config); + } + + // Request with pageSize=0 should return all configs + ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(taskId, 0, "", ""); + ListTaskPushNotificationConfigResult result = configStore.getInfo(params); + + assertNotNull(result); + assertEquals(5, result.configs().size(), "Should return all 5 configs when pageSize=0"); + assertNull(result.nextPageToken(), "Should not have nextPageToken when returning all"); + } + + @Test + public void testPaginationWithNegativePageSize() { + String taskId = "task_pagination_negative"; + // Create 3 configs + for (int i = 0; i < 3; i++) { + PushNotificationConfig config = createSamplePushConfig( + "http://url" + i + ".com/callback", "cfg" + i, "token" + i); + configStore.setInfo(taskId, config); + } + + // Request with negative pageSize should return all configs + ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(taskId, -1, "", ""); + ListTaskPushNotificationConfigResult result = configStore.getInfo(params); + + assertNotNull(result); + assertEquals(3, result.configs().size(), "Should return all configs when pageSize is negative"); + assertNull(result.nextPageToken(), "Should not have nextPageToken when returning all"); + } + + @Test + public void testPaginationPageSizeLargerThanConfigs() { + String taskId = "task_pagination_large"; + // Create 3 configs + for (int i = 0; i < 3; i++) { + PushNotificationConfig config = createSamplePushConfig( + "http://url" + i + ".com/callback", "cfg" + i, "token" + i); + configStore.setInfo(taskId, config); + } + + // Request with pageSize larger than available configs + ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(taskId, 10, "", ""); + ListTaskPushNotificationConfigResult result = configStore.getInfo(params); + + assertNotNull(result); + assertEquals(3, result.configs().size(), "Should return all 3 configs"); + assertNull(result.nextPageToken(), "Should not have nextPageToken when all configs fit in one page"); + } + + @Test + public void testPaginationExactlyPageSize() { + String taskId = "task_pagination_exact"; + // Create exactly 3 configs + for (int i = 0; i < 3; i++) { + PushNotificationConfig config = createSamplePushConfig( + "http://url" + i + ".com/callback", "cfg" + i, "token" + i); + configStore.setInfo(taskId, config); + } + + // Request with pageSize equal to number of configs + ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(taskId, 3, "", ""); + ListTaskPushNotificationConfigResult result = configStore.getInfo(params); + + assertNotNull(result); + assertEquals(3, result.configs().size(), "Should return all 3 configs"); + assertNull(result.nextPageToken(), "Should not have nextPageToken when configs exactly match pageSize"); + } + + @Test + public void testPaginationWithInvalidToken() { + String taskId = "task_pagination_invalid_token"; + // Create 5 configs + for (int i = 0; i < 5; i++) { + PushNotificationConfig config = createSamplePushConfig( + "http://url" + i + ".com/callback", "cfg" + i, "token" + i); + configStore.setInfo(taskId, config); + } + + // Request with invalid pageToken - implementation behavior is to start from beginning + ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams( + taskId, 2, "invalid_token_that_does_not_exist", ""); + ListTaskPushNotificationConfigResult result = configStore.getInfo(params); + + assertNotNull(result); + // When token is not found, implementation starts from beginning + assertEquals(2, result.configs().size(), "Should return first page when token is not found"); + assertNotNull(result.nextPageToken(), "Should have nextPageToken since more items exist"); + } + + @Test + public void testPaginationEmptyTaskWithPageSize() { + String taskId = "task_pagination_empty"; + // No configs created + + ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(taskId, 2, "", ""); + ListTaskPushNotificationConfigResult result = configStore.getInfo(params); + + assertNotNull(result); + assertTrue(result.configs().isEmpty(), "Should return empty list for non-existent task"); + assertNull(result.nextPageToken(), "Should not have nextPageToken for empty result"); + } + + @Test + public void testPaginationFullIteration() { + String taskId = "task_pagination_full"; + // Create 7 configs + for (int i = 0; i < 7; i++) { + PushNotificationConfig config = createSamplePushConfig( + "http://url" + i + ".com/callback", "cfg" + i, "token" + i); + configStore.setInfo(taskId, config); + } + + // Iterate through all pages with pageSize=3 + int totalCollected = 0; + String pageToken = ""; + int pageCount = 0; + + do { + ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(taskId, 3, pageToken, ""); + ListTaskPushNotificationConfigResult result = configStore.getInfo(params); + + totalCollected += result.configs().size(); + pageToken = result.nextPageToken(); + pageCount++; + + // Safety check to prevent infinite loop + assertTrue(pageCount <= 10, "Should not have more than 10 pages for 7 configs"); + + } while (pageToken != null); + + assertEquals(7, totalCollected, "Should collect all 7 configs across all pages"); + assertEquals(3, pageCount, "Should have exactly 3 pages (3+3+1)"); } } diff --git a/spec-grpc/src/main/java/io/a2a/grpc/mapper/ListTaskPushNotificationConfigParamsMapper.java b/spec-grpc/src/main/java/io/a2a/grpc/mapper/ListTaskPushNotificationConfigParamsMapper.java index f0cf356ab..1e653eea8 100644 --- a/spec-grpc/src/main/java/io/a2a/grpc/mapper/ListTaskPushNotificationConfigParamsMapper.java +++ b/spec-grpc/src/main/java/io/a2a/grpc/mapper/ListTaskPushNotificationConfigParamsMapper.java @@ -22,7 +22,6 @@ public interface ListTaskPushNotificationConfigParamsMapper { */ @BeanMapping(builder = @Builder(buildMethod = "build")) @Mapping(target = "id", expression = "java(ResourceNameParser.extractParentId(proto.getParent()))") - @Mapping(target = "tenant", source = "tenant") ListTaskPushNotificationConfigParams fromProto(io.a2a.grpc.ListTaskPushNotificationConfigRequest proto); /** @@ -30,7 +29,5 @@ public interface ListTaskPushNotificationConfigParamsMapper { * Constructs the parent field from task ID. */ @Mapping(target = "parent", expression = "java(ResourceNameParser.defineTaskName(domain.id()))") - @Mapping(target = "pageSize", ignore = true) - @Mapping(target = "pageToken", ignore = true) io.a2a.grpc.ListTaskPushNotificationConfigRequest toProto(ListTaskPushNotificationConfigParams domain); } diff --git a/spec-grpc/src/main/java/io/a2a/grpc/utils/JSONRPCUtils.java b/spec-grpc/src/main/java/io/a2a/grpc/utils/JSONRPCUtils.java index 6075c5316..85d5b9f2e 100644 --- a/spec-grpc/src/main/java/io/a2a/grpc/utils/JSONRPCUtils.java +++ b/spec-grpc/src/main/java/io/a2a/grpc/utils/JSONRPCUtils.java @@ -304,7 +304,7 @@ public static JSONRPCResponse parseResponseBody(String body, String method) t case ListTaskPushNotificationConfigRequest.METHOD -> { io.a2a.grpc.ListTaskPushNotificationConfigResponse.Builder builder = io.a2a.grpc.ListTaskPushNotificationConfigResponse.newBuilder(); parseRequestBody(paramsNode, builder, id); - return new ListTaskPushNotificationConfigResponse(id, ProtoUtils.FromProto.listTaskPushNotificationConfigParams(builder)); + return new ListTaskPushNotificationConfigResponse(id, ProtoUtils.FromProto.listTaskPushNotificationConfigResult(builder)); } case DeleteTaskPushNotificationConfigRequest.METHOD -> { return new DeleteTaskPushNotificationConfigResponse(id); diff --git a/spec-grpc/src/main/java/io/a2a/grpc/utils/ProtoUtils.java b/spec-grpc/src/main/java/io/a2a/grpc/utils/ProtoUtils.java index 55be58228..6a0ac5409 100644 --- a/spec-grpc/src/main/java/io/a2a/grpc/utils/ProtoUtils.java +++ b/spec-grpc/src/main/java/io/a2a/grpc/utils/ProtoUtils.java @@ -29,6 +29,7 @@ import io.a2a.spec.GetTaskPushNotificationConfigParams; import io.a2a.spec.InvalidParamsError; import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.ListTasksParams; import io.a2a.spec.ListTasksResult; import io.a2a.spec.Message; @@ -126,12 +127,16 @@ public static io.a2a.grpc.SendMessageRequest sendMessageRequest(MessageSendParam return MessageSendParamsMapper.INSTANCE.toProto(request); } - public static io.a2a.grpc.ListTaskPushNotificationConfigResponse listTaskPushNotificationConfigResponse(List configs) { - List confs = new ArrayList<>(configs.size()); - for (TaskPushNotificationConfig config : configs) { + public static io.a2a.grpc.ListTaskPushNotificationConfigResponse listTaskPushNotificationConfigResponse(ListTaskPushNotificationConfigResult result) { + List confs = new ArrayList<>(result.configs().size()); + for (TaskPushNotificationConfig config : result.configs()) { confs.add(taskPushNotificationConfig(config)); } - return io.a2a.grpc.ListTaskPushNotificationConfigResponse.newBuilder().addAllConfigs(confs).build(); + io.a2a.grpc.ListTaskPushNotificationConfigResponse.Builder builder = io.a2a.grpc.ListTaskPushNotificationConfigResponse.newBuilder().addAllConfigs(confs); + if (result.nextPageToken() != null) { + builder.setNextPageToken(result.nextPageToken()); + } + return builder.build(); } public static StreamResponse streamResponse(StreamingEventKind streamingEventKind) { @@ -254,13 +259,17 @@ public static TaskIdParams taskIdParams(io.a2a.grpc.SubscribeToTaskRequestOrBuil return convert(() -> TaskIdParamsMapper.INSTANCE.fromProtoSubscribeToTaskRequest(reqProto)); } - public static List listTaskPushNotificationConfigParams(io.a2a.grpc.ListTaskPushNotificationConfigResponseOrBuilder response) { + public static ListTaskPushNotificationConfigResult listTaskPushNotificationConfigResult(io.a2a.grpc.ListTaskPushNotificationConfigResponseOrBuilder response) { List configs = response.getConfigsList(); List result = new ArrayList<>(configs.size()); for (io.a2a.grpc.TaskPushNotificationConfig config : configs) { result.add(taskPushNotificationConfig(config)); } - return result; + String nextPageToken = response.getNextPageToken(); + if (nextPageToken != null && nextPageToken.isEmpty()) { + nextPageToken = null; + } + return new ListTaskPushNotificationConfigResult(result, nextPageToken); } public static ListTaskPushNotificationConfigParams listTaskPushNotificationConfigParams(io.a2a.grpc.ListTaskPushNotificationConfigRequestOrBuilder request) { diff --git a/spec/src/main/java/io/a2a/spec/ListTaskPushNotificationConfigParams.java b/spec/src/main/java/io/a2a/spec/ListTaskPushNotificationConfigParams.java index 2f71bcceb..09967e378 100644 --- a/spec/src/main/java/io/a2a/spec/ListTaskPushNotificationConfigParams.java +++ b/spec/src/main/java/io/a2a/spec/ListTaskPushNotificationConfigParams.java @@ -1,6 +1,5 @@ package io.a2a.spec; - import io.a2a.util.Assert; /** @@ -15,7 +14,7 @@ * @see TaskPushNotificationConfig for the configuration structure * @see A2A Protocol Specification */ -public record ListTaskPushNotificationConfigParams(String id, String tenant) { +public record ListTaskPushNotificationConfigParams(String id, int pageSize, String pageToken, String tenant) { /** * Compact constructor for validation. @@ -35,6 +34,6 @@ public record ListTaskPushNotificationConfigParams(String id, String tenant) { * @param id the task identifier (required) */ public ListTaskPushNotificationConfigParams(String id) { - this(id, ""); + this(id, 0, "", ""); } } diff --git a/spec/src/main/java/io/a2a/spec/ListTaskPushNotificationConfigResponse.java b/spec/src/main/java/io/a2a/spec/ListTaskPushNotificationConfigResponse.java index 2e66b03c9..098fabe1c 100644 --- a/spec/src/main/java/io/a2a/spec/ListTaskPushNotificationConfigResponse.java +++ b/spec/src/main/java/io/a2a/spec/ListTaskPushNotificationConfigResponse.java @@ -1,31 +1,31 @@ package io.a2a.spec; -import java.util.List; - /** - * JSON-RPC response containing all push notification configurations for a task. + * JSON-RPC response containing all push notification configurations for a task with pagination support. *

- * This response returns a list of all {@link TaskPushNotificationConfig} entries - * configured for the requested task, showing all active notification endpoints. + * This response returns a {@link ListTaskPushNotificationConfigResult} containing + * {@link TaskPushNotificationConfig} entries configured for the requested task, + * showing all active notification endpoints with optional pagination information. *

* If an error occurs, the error field will contain a {@link JSONRPCError}. * * @see ListTaskPushNotificationConfigRequest for the corresponding request + * @see ListTaskPushNotificationConfigResult for the result structure * @see TaskPushNotificationConfig for the configuration structure * @see A2A Protocol Specification */ -public final class ListTaskPushNotificationConfigResponse extends JSONRPCResponse> { +public final class ListTaskPushNotificationConfigResponse extends JSONRPCResponse { /** * Constructs response with all parameters. * * @param jsonrpc the JSON-RPC version * @param id the request ID - * @param result the list of push notification configurations + * @param result the result containing list of push notification configurations and pagination info * @param error the error (if any) */ - public ListTaskPushNotificationConfigResponse(String jsonrpc, Object id, List result, JSONRPCError error) { - super(jsonrpc, id, result, error, (Class>) (Class) List.class); + public ListTaskPushNotificationConfigResponse(String jsonrpc, Object id, ListTaskPushNotificationConfigResult result, JSONRPCError error) { + super(jsonrpc, id, result, error, ListTaskPushNotificationConfigResult.class); } /** @@ -42,10 +42,10 @@ public ListTaskPushNotificationConfigResponse(Object id, JSONRPCError error) { * Constructs success response. * * @param id the request ID - * @param result the list of push notification configurations + * @param result the result containing list of push notification configurations and pagination info */ - public ListTaskPushNotificationConfigResponse(Object id, List result) { - this(null, id, result, null); + public ListTaskPushNotificationConfigResponse(Object id, ListTaskPushNotificationConfigResult result) { + this(null, id, result, null); } } diff --git a/spec/src/main/java/io/a2a/spec/ListTaskPushNotificationConfigResult.java b/spec/src/main/java/io/a2a/spec/ListTaskPushNotificationConfigResult.java new file mode 100644 index 000000000..193b3ea9b --- /dev/null +++ b/spec/src/main/java/io/a2a/spec/ListTaskPushNotificationConfigResult.java @@ -0,0 +1,63 @@ +package io.a2a.spec; + +import io.a2a.util.Assert; +import org.jspecify.annotations.Nullable; + +import java.util.List; + +/** + * Result of listing push notification configurations for a task with pagination support. + * + * @param configs List of push notification configurations for the task + * @param nextPageToken Token for retrieving the next page of results (null if no more results) + */ +public record ListTaskPushNotificationConfigResult(List configs, + @Nullable String nextPageToken) { + /** + * Compact constructor for validation. + * Validates parameters and creates a defensive copy of the configs list. + * + * @param configs the list of push notification configurations + * @param nextPageToken token for next page + * @throws IllegalArgumentException if validation fails + */ + public ListTaskPushNotificationConfigResult { + Assert.checkNotNullParam("configs", configs); + // Make defensive copy + configs = List.copyOf(configs); + } + + /** + * Constructor for results without pagination. + * + * @param configs the list of push notification configurations + */ + public ListTaskPushNotificationConfigResult(List configs) { + this(configs, null); + } + + /** + * Returns whether there are more results available. + * + * @return true if there are more pages of results + */ + public boolean hasMoreResults() { + return nextPageToken != null && !nextPageToken.isEmpty(); + } + + /** + * Return the size of the configs. + * @return the size of the configs. + */ + public int size() { + return configs.size(); + } + + /** + * Return if the configs is empty or not. + * @return true if the configs is empty - false otherwise. + */ + public boolean isEmpty() { + return configs.isEmpty(); + } +} diff --git a/spec/src/main/java/io/a2a/util/Utils.java b/spec/src/main/java/io/a2a/util/Utils.java index 038629397..858737375 100644 --- a/spec/src/main/java/io/a2a/util/Utils.java +++ b/spec/src/main/java/io/a2a/util/Utils.java @@ -72,7 +72,7 @@ public static T unmarshalFrom(String data, Class typeRef) throws JsonProc * @param defaultValue the default value to return if value is null * @return value if non-null, otherwise defaultValue */ - public static T defaultIfNull(T value, T defaultValue) { + public static T defaultIfNull(@Nullable T value, T defaultValue) { if (value == null) { return defaultValue; } diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java index 938f2d3ad..8d52b61b1 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java @@ -55,6 +55,7 @@ import io.a2a.spec.JSONParseError; import io.a2a.spec.JSONRPCErrorResponse; import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.ListTasksParams; import io.a2a.spec.Message; import io.a2a.spec.MessageSendParams; @@ -1070,11 +1071,11 @@ public void testListPushNotificationConfigWithConfigId() throws Exception { savePushNotificationConfigInStore(MINIMAL_TASK.id(), notificationConfig2); try { - List result = getClient().listTaskPushNotificationConfigurations( + ListTaskPushNotificationConfigResult result = getClient().listTaskPushNotificationConfigurations( new ListTaskPushNotificationConfigParams(MINIMAL_TASK.id())); assertEquals(2, result.size()); - assertEquals(new TaskPushNotificationConfig(MINIMAL_TASK.id(), notificationConfig1, null), result.get(0)); - assertEquals(new TaskPushNotificationConfig(MINIMAL_TASK.id(), notificationConfig2, null), result.get(1)); + assertEquals(new TaskPushNotificationConfig(MINIMAL_TASK.id(), notificationConfig1, null), result.configs().get(0)); + assertEquals(new TaskPushNotificationConfig(MINIMAL_TASK.id(), notificationConfig2, null), result.configs().get(1)); } catch (Exception e) { fail(); } finally { @@ -1100,7 +1101,7 @@ public void testListPushNotificationConfigWithoutConfigId() throws Exception { // will overwrite the previous one savePushNotificationConfigInStore(MINIMAL_TASK.id(), notificationConfig2); try { - List result = getClient().listTaskPushNotificationConfigurations( + ListTaskPushNotificationConfigResult result = getClient().listTaskPushNotificationConfigurations( new ListTaskPushNotificationConfigParams(MINIMAL_TASK.id())); assertEquals(1, result.size()); @@ -1109,7 +1110,7 @@ public void testListPushNotificationConfigWithoutConfigId() throws Exception { .id(MINIMAL_TASK.id()) .build(); assertEquals(new TaskPushNotificationConfig(MINIMAL_TASK.id(), expectedNotificationConfig, null), - result.get(0)); + result.configs().get(0)); } catch (Exception e) { fail(); } finally { @@ -1121,7 +1122,7 @@ public void testListPushNotificationConfigWithoutConfigId() throws Exception { @Test public void testListPushNotificationConfigTaskNotFound() { try { - List result = getClient().listTaskPushNotificationConfigurations( + ListTaskPushNotificationConfigResult result = getClient().listTaskPushNotificationConfigurations( new ListTaskPushNotificationConfigParams("non-existent-task")); fail(); } catch (A2AClientException e) { @@ -1133,7 +1134,7 @@ public void testListPushNotificationConfigTaskNotFound() { public void testListPushNotificationConfigEmptyList() throws Exception { saveTaskInTaskStore(MINIMAL_TASK); try { - List result = getClient().listTaskPushNotificationConfigurations( + ListTaskPushNotificationConfigResult result = getClient().listTaskPushNotificationConfigurations( new ListTaskPushNotificationConfigParams(MINIMAL_TASK.id())); assertEquals(0, result.size()); } catch (Exception e) { @@ -1172,7 +1173,7 @@ public void testDeletePushNotificationConfigWithValidConfigId() throws Exception new DeleteTaskPushNotificationConfigParams(MINIMAL_TASK.id(), "config1")); // should now be 1 left - List result = getClient().listTaskPushNotificationConfigurations( + ListTaskPushNotificationConfigResult result = getClient().listTaskPushNotificationConfigurations( new ListTaskPushNotificationConfigParams(MINIMAL_TASK.id())); assertEquals(1, result.size()); @@ -1212,7 +1213,7 @@ public void testDeletePushNotificationConfigWithNonExistingConfigId() throws Exc new DeleteTaskPushNotificationConfigParams(MINIMAL_TASK.id(), "non-existent-config-id")); // should remain unchanged - List result = getClient().listTaskPushNotificationConfigurations( + ListTaskPushNotificationConfigResult result = getClient().listTaskPushNotificationConfigurations( new ListTaskPushNotificationConfigParams(MINIMAL_TASK.id())); assertEquals(2, result.size()); } catch (Exception e) { @@ -1257,7 +1258,7 @@ public void testDeletePushNotificationConfigSetWithoutConfigId() throws Exceptio new DeleteTaskPushNotificationConfigParams(MINIMAL_TASK.id(), MINIMAL_TASK.id())); // should now be 0 - List result = getClient().listTaskPushNotificationConfigurations( + ListTaskPushNotificationConfigResult result = getClient().listTaskPushNotificationConfigurations( new ListTaskPushNotificationConfigParams(MINIMAL_TASK.id()), null); assertEquals(0, result.size()); } catch (Exception e) { diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java index ea14783bd..1df1409f3 100644 --- a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java +++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java @@ -41,6 +41,7 @@ import io.a2a.spec.JSONParseError; import io.a2a.spec.JSONRPCError; import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.MessageSendParams; import io.a2a.spec.MethodNotFoundError; import io.a2a.spec.PushNotificationNotSupportedError; @@ -208,13 +209,9 @@ public void listTaskPushNotificationConfig(io.a2a.grpc.ListTaskPushNotificationC try { ServerCallContext context = createCallContext(responseObserver); ListTaskPushNotificationConfigParams params = FromProto.listTaskPushNotificationConfigParams(request); - List configList = getRequestHandler().onListTaskPushNotificationConfig(params, context); - io.a2a.grpc.ListTaskPushNotificationConfigResponse.Builder responseBuilder = - io.a2a.grpc.ListTaskPushNotificationConfigResponse.newBuilder(); - for (TaskPushNotificationConfig config : configList) { - responseBuilder.addConfigs(ToProto.taskPushNotificationConfig(config)); - } - responseObserver.onNext(responseBuilder.build()); + ListTaskPushNotificationConfigResult result = getRequestHandler().onListTaskPushNotificationConfig(params, context); + io.a2a.grpc.ListTaskPushNotificationConfigResponse response = ToProto.listTaskPushNotificationConfigResponse(result); + responseObserver.onNext(response); responseObserver.onCompleted(); } catch (JSONRPCError e) { handleError(responseObserver, e); diff --git a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java index 8c62a81d6..5c097c04c 100644 --- a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java +++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java @@ -34,6 +34,7 @@ import io.a2a.spec.JSONRPCError; import io.a2a.spec.ListTaskPushNotificationConfigRequest; import io.a2a.spec.ListTaskPushNotificationConfigResponse; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.ListTasksRequest; import io.a2a.spec.ListTasksResponse; import io.a2a.spec.ListTasksResult; @@ -210,9 +211,9 @@ public ListTaskPushNotificationConfigResponse listPushNotificationConfig( new PushNotificationNotSupportedError()); } try { - List pushNotificationConfigList = + ListTaskPushNotificationConfigResult result = requestHandler.onListTaskPushNotificationConfig(request.getParams(), context); - return new ListTaskPushNotificationConfigResponse(request.getId(), pushNotificationConfigList); + return new ListTaskPushNotificationConfigResponse(request.getId(), result); } catch (JSONRPCError e) { return new ListTaskPushNotificationConfigResponse(request.getId(), e); } catch (Throwable t) { diff --git a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java index b2f447d4b..acf62dcb7 100644 --- a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java +++ b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java @@ -1361,7 +1361,7 @@ public void testListPushNotificationConfig() { assertEquals("111", listResponse.getId()); assertEquals(1, listResponse.getResult().size()); - assertEquals(result, listResponse.getResult().get(0)); + assertEquals(result, listResponse.getResult().configs().get(0)); } @Test diff --git a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java index bd07b27f5..ea4d62ec7 100644 --- a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java +++ b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java @@ -36,6 +36,7 @@ import io.a2a.spec.JSONParseError; import io.a2a.spec.JSONRPCError; import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; import io.a2a.spec.ListTasksParams; import io.a2a.spec.ListTasksResult; import io.a2a.spec.MethodNotFoundError; @@ -251,14 +252,14 @@ public HTTPRestResponse getTaskPushNotificationConfiguration(String taskId, @Nul } } - public HTTPRestResponse listTaskPushNotificationConfigurations(String taskId, String tenant, ServerCallContext context) { + public HTTPRestResponse listTaskPushNotificationConfigurations(String taskId, int pageSize, String pageToken, String tenant, ServerCallContext context) { try { if (!agentCard.capabilities().pushNotifications()) { throw new PushNotificationNotSupportedError(); } - ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(taskId,tenant); - List configs = requestHandler.onListTaskPushNotificationConfig(params, context); - return createSuccessResponse(200, io.a2a.grpc.ListTaskPushNotificationConfigResponse.newBuilder(ProtoUtils.ToProto.listTaskPushNotificationConfigResponse(configs))); + ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(taskId, pageSize, pageToken, tenant); + ListTaskPushNotificationConfigResult result = requestHandler.onListTaskPushNotificationConfig(params, context); + return createSuccessResponse(200, io.a2a.grpc.ListTaskPushNotificationConfigResponse.newBuilder(ProtoUtils.ToProto.listTaskPushNotificationConfigResponse(result))); } catch (JSONRPCError e) { return createErrorResponse(e); } catch (Throwable throwable) { diff --git a/transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java b/transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java index d4b326f17..5b309116d 100644 --- a/transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java +++ b/transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java @@ -301,7 +301,7 @@ public void testListPushNotificationConfigs() { RestHandler handler = new RestHandler(CARD, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK); - RestHandler.HTTPRestResponse response = handler.listTaskPushNotificationConfigurations(MINIMAL_TASK.id(), "", callContext); + RestHandler.HTTPRestResponse response = handler.listTaskPushNotificationConfigurations(MINIMAL_TASK.id(), 0, "", "", callContext); Assertions.assertEquals(200, response.getStatusCode()); Assertions.assertEquals("application/json", response.getContentType());