From 8a2aa3e620063ca6d8f631854e00aeefa720d89c Mon Sep 17 00:00:00 2001 From: ndr_brt Date: Mon, 20 Nov 2023 11:18:25 +0100 Subject: [PATCH] refactor: make DataPlanePublicApiController use PipelineService --- .../manager/DataPlaneManagerImpl.java | 12 - .../manager/DataPlaneManagerImplTest.java | 26 -- .../dataplane/api/DataPlaneApiExtension.java | 14 +- .../DataPlanePublicApiController.java | 12 +- .../DataPlaneApiIntegrationTest.java | 311 ------------------ .../DataPlaneControlApiControllerTest.java | 59 ++++ ...anePublicApiControllerIntegrationTest.java | 181 ++++++++++ .../client/EmbeddedDataPlaneClient.java | 2 +- .../spi/manager/DataPlaneManager.java | 37 +-- 9 files changed, 253 insertions(+), 401 deletions(-) delete mode 100644 extensions/data-plane/data-plane-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneApiIntegrationTest.java create mode 100644 extensions/data-plane/data-plane-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerIntegrationTest.java diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java index f6ccce60f93..c1d99f31810 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java @@ -19,10 +19,8 @@ import org.eclipse.edc.connector.dataplane.spi.DataFlow; import org.eclipse.edc.connector.dataplane.spi.DataFlowStates; import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; -import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry; import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore; -import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.entity.StatefulEntity; import org.eclipse.edc.spi.query.Criterion; import org.eclipse.edc.spi.response.StatusResult; @@ -34,7 +32,6 @@ import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.function.Function; import static java.lang.String.format; @@ -90,15 +87,6 @@ public void initiate(DataFlowRequest dataRequest) { update(dataFlow); } - @Override - public CompletableFuture> transfer(DataFlowRequest request) { - var transferService = transferServiceRegistry.resolveTransferService(request); - if (transferService == null) { - return CompletableFuture.failedFuture(new EdcException("No TransferService available for request " + request.getProcessId())); - } - return transferService.transfer(request); - } - @Override public DataFlowStates transferState(String processId) { return Optional.ofNullable(store.findById(processId)).map(StatefulEntity::getState) diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java index 54e23efa8db..ccfca10c407 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java @@ -39,7 +39,6 @@ import static java.util.Collections.emptyList; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.COMPLETED; @@ -113,31 +112,6 @@ void initiateDataFlow() { assertThat(dataFlow.getState()).isEqualTo(RECEIVED.code()); } - @Test - void transfer_shouldCallTransferOnResolvedService() { - var request = dataFlowBuilder().build().toRequest(); - when(registry.resolveTransferService(any())).thenReturn(transferService); - when(transferService.transfer(any())).thenReturn(CompletableFuture.completedFuture(StreamResult.success())); - - var future = manager.transfer(request); - - assertThat(future).succeedsWithin(5, SECONDS); - verify(registry).resolveTransferService(request); - verify(transferService).transfer(request); - } - - @Test - void transfer_shouldFail_whenNoTransferServiceAvailable() { - var request = dataFlowBuilder().build().toRequest(); - when(registry.resolveTransferService(any())).thenReturn(null); - - var future = manager.transfer(request); - - assertThat(future).failsWithin(5, SECONDS); - verify(registry).resolveTransferService(request); - verifyNoInteractions(transferService); - } - @Test void terminate_shouldTerminateDataFlow() { var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); diff --git a/extensions/data-plane/data-plane-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlaneApiExtension.java b/extensions/data-plane/data-plane-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlaneApiExtension.java index 8265dd97152..4027c145719 100644 --- a/extensions/data-plane/data-plane-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlaneApiExtension.java +++ b/extensions/data-plane/data-plane-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlaneApiExtension.java @@ -19,11 +19,11 @@ import org.eclipse.edc.connector.dataplane.api.controller.DataPlanePublicApiController; import org.eclipse.edc.connector.dataplane.api.validation.ConsumerPullTransferDataAddressResolver; import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; +import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.runtime.metamodel.annotation.Setting; import org.eclipse.edc.spi.http.EdcHttpClient; -import org.eclipse.edc.spi.system.ExecutorInstrumentation; import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; import org.eclipse.edc.spi.types.TypeManager; @@ -32,8 +32,6 @@ import org.eclipse.edc.web.spi.configuration.WebServiceConfigurer; import org.eclipse.edc.web.spi.configuration.WebServiceSettings; -import java.util.concurrent.Executors; - /** * This extension provides the Data Plane API: * - Control API: set of endpoints to trigger/monitor/cancel data transfers that should be accessible only from the Control Plane. @@ -67,6 +65,9 @@ public class DataPlaneApiExtension implements ServiceExtension { @Inject private DataPlaneManager dataPlaneManager; + @Inject + private PipelineService pipelineService; + @Inject private WebService webService; @@ -86,19 +87,14 @@ public String name() { @Override public void initialize(ServiceExtensionContext context) { - var monitor = context.getMonitor(); - var validationEndpoint = context.getConfig().getString(CONTROL_PLANE_VALIDATION_ENDPOINT); var dataAddressResolver = new ConsumerPullTransferDataAddressResolver(httpClient, validationEndpoint, typeManager.getMapper()); - var executorService = context.getService(ExecutorInstrumentation.class) - .instrument(Executors.newSingleThreadExecutor(), DataPlanePublicApiController.class.getSimpleName()); - webService.registerResource(controlApiConfiguration.getContextAlias(), new DataPlaneControlApiController(dataPlaneManager)); var configuration = webServiceConfigurer.configure(context, webServer, PUBLIC_SETTINGS); - var publicApiController = new DataPlanePublicApiController(dataPlaneManager, dataAddressResolver); + var publicApiController = new DataPlanePublicApiController(pipelineService, dataAddressResolver); webService.registerResource(configuration.getContextAlias(), publicApiController); } } diff --git a/extensions/data-plane/data-plane-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java b/extensions/data-plane/data-plane-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java index 81fe81b5855..5fe2ccd308a 100644 --- a/extensions/data-plane/data-plane-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java +++ b/extensions/data-plane/data-plane-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java @@ -29,7 +29,7 @@ import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; -import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; +import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; import org.eclipse.edc.connector.dataplane.spi.resolver.DataAddressResolver; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.web.spi.exception.NotAuthorizedException; @@ -45,13 +45,13 @@ @Produces(MediaType.APPLICATION_JSON) public class DataPlanePublicApiController implements DataPlanePublicApi { - private final DataPlaneManager dataPlaneManager; + private final PipelineService pipelineService; private final DataAddressResolver dataAddressResolver; private final DataFlowRequestSupplier requestSupplier; - public DataPlanePublicApiController(DataPlaneManager dataPlaneManager, + public DataPlanePublicApiController(PipelineService pipelineService, DataAddressResolver dataAddressResolver) { - this.dataPlaneManager = dataPlaneManager; + this.pipelineService = pipelineService; this.dataAddressResolver = dataAddressResolver; this.requestSupplier = new DataFlowRequestSupplier(); } @@ -121,7 +121,7 @@ private void handle(ContainerRequestContext context, AsyncResponse response) { var dataAddress = extractSourceDataAddress(token); var dataFlowRequest = requestSupplier.apply(contextApi, dataAddress); - var validationResult = dataPlaneManager.validate(dataFlowRequest); + var validationResult = pipelineService.validate(dataFlowRequest); if (validationResult.failed()) { var errorMsg = validationResult.getFailureMessages().isEmpty() ? format("Failed to validate request with id: %s", dataFlowRequest.getId()) : @@ -130,7 +130,7 @@ private void handle(ContainerRequestContext context, AsyncResponse response) { return; } - dataPlaneManager.transfer(dataFlowRequest) + pipelineService.transfer(dataFlowRequest) .whenComplete((result, throwable) -> { if (throwable == null) { if (result.succeeded()) { diff --git a/extensions/data-plane/data-plane-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneApiIntegrationTest.java b/extensions/data-plane/data-plane-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneApiIntegrationTest.java deleted file mode 100644 index 5f9a7ebc61b..00000000000 --- a/extensions/data-plane/data-plane-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneApiIntegrationTest.java +++ /dev/null @@ -1,311 +0,0 @@ -/* - * Copyright (c) 2022 Amadeus - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Amadeus - initial API and implementation - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - improvements - * - */ - -package org.eclipse.edc.connector.dataplane.api.controller; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.restassured.http.ContentType; -import jakarta.ws.rs.core.Response; -import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; -import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; -import org.eclipse.edc.connector.dataplane.util.sink.OutputStreamDataSinkFactory; -import org.eclipse.edc.junit.annotations.ApiTest; -import org.eclipse.edc.junit.extensions.EdcExtension; -import org.eclipse.edc.runtime.metamodel.annotation.Provides; -import org.eclipse.edc.spi.result.Result; -import org.eclipse.edc.spi.system.ServiceExtension; -import org.eclipse.edc.spi.system.ServiceExtensionContext; -import org.eclipse.edc.spi.types.TypeManager; -import org.eclipse.edc.spi.types.domain.DataAddress; -import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; -import org.hamcrest.CoreMatchers; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; -import org.mockserver.integration.ClientAndServer; -import org.mockserver.model.HttpRequest; -import org.mockserver.model.HttpResponse; -import org.mockserver.model.MediaType; -import org.mockserver.verify.VerificationTimes; - -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import static io.restassured.RestAssured.given; -import static jakarta.ws.rs.core.HttpHeaders.AUTHORIZATION; -import static java.util.concurrent.CompletableFuture.completedFuture; -import static java.util.concurrent.CompletableFuture.failedFuture; -import static org.assertj.core.api.Assertions.assertThat; -import static org.eclipse.edc.junit.testfixtures.TestUtils.getFreePort; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.isA; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockserver.matchers.Times.once; -import static org.mockserver.stop.Stop.stopQuietly; - -@ApiTest -@ExtendWith(EdcExtension.class) -class DataPlaneApiIntegrationTest { - - private static final ObjectMapper MAPPER = new TypeManager().getMapper(); - - private static final int PUBLIC_API_PORT = getFreePort(); - private static final int CONTROL_API_PORT = getFreePort(); - private static final int VALIDATION_API_PORT = getFreePort(); - private static final String VALIDATION_SEVER_URL = "http://localhost:" + VALIDATION_API_PORT; - - private static ClientAndServer tokenValidationServer; - private DataPlaneManager dataPlaneManager; - - @BeforeAll - public static void startServer() { - tokenValidationServer = ClientAndServer.startClientAndServer(VALIDATION_API_PORT); - } - - @AfterAll - public static void stopServer() { - stopQuietly(tokenValidationServer); - } - - @BeforeEach - void setUp(EdcExtension extension) { - dataPlaneManager = mock(DataPlaneManager.class); - extension.registerSystemExtension(ServiceExtension.class, new TestServiceExtension()); - extension.setConfiguration(Map.of( - "web.http.public.port", String.valueOf(PUBLIC_API_PORT), - "web.http.public.path", "/public", - "web.http.control.port", String.valueOf(CONTROL_API_PORT), - "web.http.control.path", "/control", - "edc.dataplane.token.validation.endpoint", VALIDATION_SEVER_URL - )); - } - - @AfterEach - public void tearDown() { - tokenValidationServer.reset(); - } - - @Test - void controlApi_should_callDataPlaneManager_if_requestIsValid() { - var flowRequest = DataFlowRequest.Builder.newInstance() - .id(UUID.randomUUID().toString()) - .processId(UUID.randomUUID().toString()) - .sourceDataAddress(testDestAddress()) - .destinationDataAddress(testDestAddress()) - .build(); - - when(dataPlaneManager.validate(isA(DataFlowRequest.class))).thenReturn(Result.success(Boolean.TRUE)); - - given().port(CONTROL_API_PORT) - .when() - .contentType(ContentType.JSON) - .body(flowRequest) - .post("/control/transfer") - .then() - .statusCode(Response.Status.OK.getStatusCode()); - - verify(dataPlaneManager).initiate(isA(DataFlowRequest.class)); - } - - @Test - void controlApi_should_returnBadRequest_if_requestIsInValid() { - var errorMsg = "test error message"; - var flowRequest = DataFlowRequest.Builder.newInstance() - .id(UUID.randomUUID().toString()) - .processId(UUID.randomUUID().toString()) - .sourceDataAddress(testDestAddress()) - .destinationDataAddress(testDestAddress()) - .build(); - - when(dataPlaneManager.validate(isA(DataFlowRequest.class))).thenReturn(Result.failure(errorMsg)); - - given().port(CONTROL_API_PORT) - .when() - .contentType(ContentType.JSON) - .body(flowRequest) - .post("/control/transfer") - .then() - .statusCode(Response.Status.BAD_REQUEST.getStatusCode()) - .body("errors", CoreMatchers.equalTo(List.of(errorMsg))); - - verify(dataPlaneManager, never()).initiate(any()); - } - - @Test - void publicApi_should_returnBadRequest_if_missingAuthorizationHeader() { - given().port(PUBLIC_API_PORT) - .when() - .post("/public/any") - .then() - .statusCode(Response.Status.BAD_REQUEST.getStatusCode()) - .body("errors[0]", is("Missing bearer token")); - } - - @Test - void publicApi_shouldNotReturn302_whenUrlWithoutTrailingSlash() { - given().port(PUBLIC_API_PORT) - .when() - .post("/public") - .then() - .statusCode(not(302)); - } - - @Test - void publicApi_should_returnForbidden_if_tokenValidationFails() { - var token = UUID.randomUUID().toString(); - - var validationServerRequest = new HttpRequest().withHeader(AUTHORIZATION, token); - tokenValidationServer.when(validationServerRequest, once()).respond(new HttpResponse().withStatusCode(400)); - - given().port(PUBLIC_API_PORT) - .header(AUTHORIZATION, token) - .when() - .post("/public/any") - .then() - .statusCode(Response.Status.FORBIDDEN.getStatusCode()) - .body("errors.size()", is(1)); - - tokenValidationServer.verify(validationServerRequest, VerificationTimes.once()); - } - - @Test - void publicApi_should_returnBadRequest_if_requestValidationFails() throws JsonProcessingException { - var token = UUID.randomUUID().toString(); - var errorMsg = UUID.randomUUID().toString(); - tokenValidationServer.when(new HttpRequest().withHeader(AUTHORIZATION, token), once()) - .respond(new HttpResponse() - .withStatusCode(200) - .withContentType(MediaType.APPLICATION_JSON) - .withBody(MAPPER.writeValueAsString(testDestAddress())) - ); - when(dataPlaneManager.validate(any())).thenReturn(Result.failure(errorMsg)); - - given() - .port(PUBLIC_API_PORT) - .header(AUTHORIZATION, token) - .when() - .post("/public/any") - .then() - .statusCode(Response.Status.BAD_REQUEST.getStatusCode()) - .body("errors.size()", is(1)); - } - - @Test - void publicApi_should_returnInternalServerError_if_transferFails() throws JsonProcessingException { - var token = UUID.randomUUID().toString(); - var errorMsg = UUID.randomUUID().toString(); - tokenValidationServer.when(new HttpRequest().withHeader(AUTHORIZATION, token), once()) - .respond(new HttpResponse() - .withStatusCode(200) - .withContentType(MediaType.APPLICATION_JSON) - .withBody(MAPPER.writeValueAsString(testDestAddress())) - ); - when(dataPlaneManager.validate(any())).thenReturn(Result.success(true)); - when(dataPlaneManager.transfer(any())) - .thenReturn(completedFuture(StreamResult.error(errorMsg))); - - given() - .port(PUBLIC_API_PORT) - .header(AUTHORIZATION, token) - .when() - .post("/public/any") - .then() - .statusCode(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) - .body("errors[0]", is(errorMsg)); - } - - @Test - void publicApi_should_returnInternalServerError_if_transferThrows() throws JsonProcessingException { - var token = UUID.randomUUID().toString(); - var errorMsg = UUID.randomUUID().toString(); - tokenValidationServer.when(new HttpRequest().withHeader(AUTHORIZATION, token), once()) - .respond(new HttpResponse() - .withStatusCode(200) - .withContentType(MediaType.APPLICATION_JSON) - .withBody(MAPPER.writeValueAsString(testDestAddress())) - ); - when(dataPlaneManager.validate(any())).thenReturn(Result.success(true)); - when(dataPlaneManager.transfer(any(DataFlowRequest.class))) - .thenReturn(failedFuture(new RuntimeException(errorMsg))); - - given() - .port(PUBLIC_API_PORT) - .header(AUTHORIZATION, token) - .when() - .post("/public/any") - .then() - .statusCode(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) - .body("errors[0]", is("Unhandled exception occurred during data transfer: " + errorMsg)); - } - - @Test - void publicApi_should_returnDataFromSource_if_transferSuccessful() throws JsonProcessingException { - var token = UUID.randomUUID().toString(); - var address = testDestAddress(); - var requestCaptor = ArgumentCaptor.forClass(DataFlowRequest.class); - - tokenValidationServer.when(new HttpRequest().withHeader(AUTHORIZATION, token), once()) - .respond(new HttpResponse() - .withStatusCode(200) - .withContentType(MediaType.APPLICATION_JSON) - .withBody(MAPPER.writeValueAsString(address)) - ); - when(dataPlaneManager.validate(any())).thenReturn(Result.success(true)); - when(dataPlaneManager.transfer(any())) - .thenReturn(completedFuture(StreamResult.success())); - - given() - .port(PUBLIC_API_PORT) - .header(AUTHORIZATION, token) - .when() - .post("/public/any?foo=bar") - .then() - .statusCode(Response.Status.OK.getStatusCode()); - - verify(dataPlaneManager).validate(requestCaptor.capture()); - verify(dataPlaneManager).transfer(requestCaptor.capture()); - var capturedRequests = requestCaptor.getAllValues(); - assertThat(capturedRequests) - .hasSize(2) - .allSatisfy(request -> { - assertThat(request.getDestinationDataAddress().getType()).isEqualTo(OutputStreamDataSinkFactory.TYPE); - assertThat(request.getSourceDataAddress().getType()).isEqualTo(address.getType()); - assertThat(request.getProperties()).containsEntry("method", "POST").containsEntry("pathSegments", "any").containsEntry("queryParams", "foo=bar"); - }); - } - - private DataAddress testDestAddress() { - return DataAddress.Builder.newInstance().type("test").build(); - } - - @Provides(DataPlaneManager.class) - private class TestServiceExtension implements ServiceExtension { - @Override - public void initialize(ServiceExtensionContext context) { - context.registerService(DataPlaneManager.class, dataPlaneManager); - } - } -} diff --git a/extensions/data-plane/data-plane-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiControllerTest.java b/extensions/data-plane/data-plane-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiControllerTest.java index 66a0634b5f2..0195fe7fbb4 100644 --- a/extensions/data-plane/data-plane-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiControllerTest.java +++ b/extensions/data-plane/data-plane-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiControllerTest.java @@ -14,18 +14,29 @@ package org.eclipse.edc.connector.dataplane.api.controller; +import io.restassured.http.ContentType; import io.restassured.specification.RequestSpecification; import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.Response; import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; import org.eclipse.edc.junit.annotations.ApiTest; import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; import org.eclipse.edc.web.jersey.testfixtures.RestControllerTestBase; +import org.hamcrest.CoreMatchers; import org.junit.jupiter.api.Test; +import java.util.List; +import java.util.UUID; + import static io.restassured.RestAssured.given; import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -34,6 +45,50 @@ class DataPlaneControlApiControllerTest extends RestControllerTestBase { private final DataPlaneManager manager = mock(); + @Test + void should_callDataPlaneManager_if_requestIsValid() { + var flowRequest = DataFlowRequest.Builder.newInstance() + .id(UUID.randomUUID().toString()) + .processId(UUID.randomUUID().toString()) + .sourceDataAddress(testDestAddress()) + .destinationDataAddress(testDestAddress()) + .build(); + + when(manager.validate(isA(DataFlowRequest.class))).thenReturn(Result.success(Boolean.TRUE)); + + baseRequest() + .contentType(ContentType.JSON) + .body(flowRequest) + .post("/transfer") + .then() + .statusCode(Response.Status.OK.getStatusCode()); + + verify(manager).initiate(isA(DataFlowRequest.class)); + } + + @Test + void should_returnBadRequest_if_requestIsInValid() { + var errorMsg = "test error message"; + var flowRequest = DataFlowRequest.Builder.newInstance() + .id(UUID.randomUUID().toString()) + .processId(UUID.randomUUID().toString()) + .sourceDataAddress(testDestAddress()) + .destinationDataAddress(testDestAddress()) + .build(); + + when(manager.validate(isA(DataFlowRequest.class))).thenReturn(Result.failure(errorMsg)); + + baseRequest() + .contentType(ContentType.JSON) + .body(flowRequest) + .post("/transfer") + .then() + .statusCode(Response.Status.BAD_REQUEST.getStatusCode()) + .body("errors", CoreMatchers.equalTo(List.of(errorMsg))); + + verify(manager, never()).initiate(any()); + } + @Test void delete_shouldReturnOk_whenTerminationSucceeds() { when(manager.terminate(any())).thenReturn(StatusResult.success()); @@ -69,4 +124,8 @@ private RequestSpecification baseRequest() { .header(HttpHeaders.AUTHORIZATION, "auth") .when(); } + + private DataAddress testDestAddress() { + return DataAddress.Builder.newInstance().type("test").build(); + } } diff --git a/extensions/data-plane/data-plane-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerIntegrationTest.java b/extensions/data-plane/data-plane-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerIntegrationTest.java new file mode 100644 index 00000000000..bc91c65d4e4 --- /dev/null +++ b/extensions/data-plane/data-plane-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerIntegrationTest.java @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2022 Amadeus + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Amadeus - initial API and implementation + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - improvements + * + */ + +package org.eclipse.edc.connector.dataplane.api.controller; + +import io.restassured.specification.RequestSpecification; +import jakarta.ws.rs.core.Response; +import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; +import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; +import org.eclipse.edc.connector.dataplane.spi.resolver.DataAddressResolver; +import org.eclipse.edc.connector.dataplane.util.sink.OutputStreamDataSinkFactory; +import org.eclipse.edc.junit.annotations.ApiTest; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; +import org.eclipse.edc.web.jersey.testfixtures.RestControllerTestBase; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.util.UUID; + +import static io.restassured.RestAssured.given; +import static jakarta.ws.rs.core.HttpHeaders.AUTHORIZATION; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ApiTest +class DataPlanePublicApiControllerIntegrationTest extends RestControllerTestBase { + + private final PipelineService pipelineService = mock(); + private final DataAddressResolver dataAddressResolver = mock(); + + @Test + void should_returnBadRequest_if_missingAuthorizationHeader() { + baseRequest() + .post("/any") + .then() + .statusCode(Response.Status.BAD_REQUEST.getStatusCode()) + .body("errors[0]", is("Missing bearer token")); + } + + @Test + void shouldNotReturn302_whenUrlWithoutTrailingSlash() { + baseRequest() + .post("") + .then() + .statusCode(not(302)); + } + + @Test + void should_returnForbidden_if_tokenValidationFails() { + var token = UUID.randomUUID().toString(); + when(dataAddressResolver.resolve(any())).thenReturn(Result.failure("token is not value")); + + baseRequest() + .header(AUTHORIZATION, token) + .post("/any") + .then() + .statusCode(Response.Status.FORBIDDEN.getStatusCode()) + .body("errors.size()", is(1)); + + verify(dataAddressResolver).resolve(token); + } + + @Test + void should_returnBadRequest_if_requestValidationFails() { + var token = UUID.randomUUID().toString(); + var errorMsg = UUID.randomUUID().toString(); + when(dataAddressResolver.resolve(any())).thenReturn(Result.success(testDestAddress())); + when(pipelineService.validate(any())).thenReturn(Result.failure(errorMsg)); + + baseRequest() + .header(AUTHORIZATION, token) + .when() + .post("/any") + .then() + .statusCode(Response.Status.BAD_REQUEST.getStatusCode()) + .body("errors.size()", is(1)); + } + + @Test + void should_returnInternalServerError_if_transferFails() { + var token = UUID.randomUUID().toString(); + var errorMsg = UUID.randomUUID().toString(); + when(dataAddressResolver.resolve(any())).thenReturn(Result.success(testDestAddress())); + when(pipelineService.validate(any())).thenReturn(Result.success(true)); + when(pipelineService.transfer(any())) + .thenReturn(completedFuture(StreamResult.error(errorMsg))); + + baseRequest() + .header(AUTHORIZATION, token) + .when() + .post("/any") + .then() + .statusCode(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) + .body("errors[0]", is(errorMsg)); + } + + @Test + void should_returnInternalServerError_if_transferThrows() { + var token = UUID.randomUUID().toString(); + var errorMsg = UUID.randomUUID().toString(); + when(dataAddressResolver.resolve(any())).thenReturn(Result.success(testDestAddress())); + when(pipelineService.validate(any())).thenReturn(Result.success(true)); + when(pipelineService.transfer(any(DataFlowRequest.class))) + .thenReturn(failedFuture(new RuntimeException(errorMsg))); + + baseRequest() + .header(AUTHORIZATION, token) + .when() + .post("/any") + .then() + .statusCode(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) + .body("errors[0]", is("Unhandled exception occurred during data transfer: " + errorMsg)); + } + + @Test + void should_returnDataFromSource_if_transferSuccessful() { + var token = UUID.randomUUID().toString(); + var address = testDestAddress(); + var requestCaptor = ArgumentCaptor.forClass(DataFlowRequest.class); + + when(dataAddressResolver.resolve(any())).thenReturn(Result.success(testDestAddress())); + when(pipelineService.validate(any())).thenReturn(Result.success(true)); + when(pipelineService.transfer(any())) + .thenReturn(completedFuture(StreamResult.success())); + + baseRequest() + .header(AUTHORIZATION, token) + .when() + .post("/any?foo=bar") + .then() + .statusCode(Response.Status.OK.getStatusCode()); + + verify(pipelineService).validate(requestCaptor.capture()); + verify(pipelineService).transfer(requestCaptor.capture()); + var capturedRequests = requestCaptor.getAllValues(); + assertThat(capturedRequests) + .hasSize(2) + .allSatisfy(request -> { + assertThat(request.getDestinationDataAddress().getType()).isEqualTo(OutputStreamDataSinkFactory.TYPE); + assertThat(request.getSourceDataAddress().getType()).isEqualTo(address.getType()); + assertThat(request.getProperties()).containsEntry("method", "POST").containsEntry("pathSegments", "any").containsEntry("queryParams", "foo=bar"); + }); + } + + private RequestSpecification baseRequest() { + return given() + .baseUri("http://localhost:" + port) + .when(); + } + + private DataAddress testDestAddress() { + return DataAddress.Builder.newInstance().type("test").build(); + } + + @Override + protected Object controller() { + return new DataPlanePublicApiController(pipelineService, dataAddressResolver); + } + +} diff --git a/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java b/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java index d27e06934f4..57cdace0f8d 100644 --- a/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java +++ b/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java @@ -40,7 +40,7 @@ public EmbeddedDataPlaneClient(DataPlaneManager dataPlaneManager) { public StatusResult transfer(DataFlowRequest request) { var result = dataPlaneManager.validate(request); if (result.failed()) { - return StatusResult.failure(ResponseStatus.FATAL_ERROR, String.join(", ", result.getFailureMessages())); + return StatusResult.failure(ResponseStatus.FATAL_ERROR, result.getFailureDetail()); } dataPlaneManager.initiate(request); return StatusResult.success(); diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java index dedad893b5e..8fe885871b5 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java @@ -15,39 +15,14 @@ package org.eclipse.edc.connector.dataplane.spi.manager; import org.eclipse.edc.connector.dataplane.spi.DataFlowStates; -import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSink; -import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; import org.eclipse.edc.runtime.metamodel.annotation.ExtensionPoint; import org.eclipse.edc.spi.entity.StateEntityManager; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; -import java.util.concurrent.CompletableFuture; - /** - * Manages the execution of data plane requests. Methods that return {@link StreamResult} from their futures can use that value to respond to different failure conditions. - * For example: - *

- *

- * dataPlaneManager.transfer(request).whenComplete((result, throwable) -> {
- *      if (result != null && result.failed()) {
- *          switch (result.reason()) {
- *              case NOT_FOUND:
- *                  // process
- *                  break;
- *              case NOT_AUTHORIZED:
- *                  // process
- *                  break;
- *              case GENERAL_ERROR:
- *                  // process
- *                  break;
- *              }
- *      } else if (throwable != null) {
- *          reportError(response, throwable);
- *      }
- * });
- * 
+ * Manages the execution of data plane requests. */ @ExtensionPoint public interface DataPlaneManager extends StateEntityManager { @@ -62,16 +37,6 @@ public interface DataPlaneManager extends StateEntityManager { */ void initiate(DataFlowRequest dataRequest); - /** - * Transfers data from a source to a destination asynchronously using the provided DataFlowRequest. The {@link org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource} - * and {@link DataSink} will be resolved from the supplied DataFlowRequest, more specifically the source and destination address - * within. - * - * @param request The DataFlowRequest that specifies the source, destination, and other necessary details. - * @return A CompletableFuture that completes with a StreamResult indicating the success or failure of the transfer. - */ - CompletableFuture> transfer(DataFlowRequest request); - /** * Returns the transfer state for the process. */