Skip to content

Commit

Permalink
refactor: make DataPlanePublicApiController use PipelineService (#3630)
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Nov 20, 2023
1 parent a22278d commit 8fb17e3
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 401 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
import org.eclipse.edc.connector.dataplane.spi.DataFlow;
import org.eclipse.edc.connector.dataplane.spi.DataFlowStates;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.entity.StatefulEntity;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.response.StatusResult;
Expand All @@ -34,7 +32,6 @@

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import static java.lang.String.format;
Expand Down Expand Up @@ -90,15 +87,6 @@ public void initiate(DataFlowRequest dataRequest) {
update(dataFlow);
}

@Override
public CompletableFuture<StreamResult<Object>> transfer(DataFlowRequest request) {
var transferService = transferServiceRegistry.resolveTransferService(request);
if (transferService == null) {
return CompletableFuture.failedFuture(new EdcException("No TransferService available for request " + request.getProcessId()));
}
return transferService.transfer(request);
}

@Override
public DataFlowStates transferState(String processId) {
return Optional.ofNullable(store.findById(processId)).map(StatefulEntity::getState)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import static java.util.Collections.emptyList;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.COMPLETED;
Expand Down Expand Up @@ -113,31 +112,6 @@ void initiateDataFlow() {
assertThat(dataFlow.getState()).isEqualTo(RECEIVED.code());
}

@Test
void transfer_shouldCallTransferOnResolvedService() {
var request = dataFlowBuilder().build().toRequest();
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.transfer(any())).thenReturn(CompletableFuture.completedFuture(StreamResult.success()));

var future = manager.transfer(request);

assertThat(future).succeedsWithin(5, SECONDS);
verify(registry).resolveTransferService(request);
verify(transferService).transfer(request);
}

@Test
void transfer_shouldFail_whenNoTransferServiceAvailable() {
var request = dataFlowBuilder().build().toRequest();
when(registry.resolveTransferService(any())).thenReturn(null);

var future = manager.transfer(request);

assertThat(future).failsWithin(5, SECONDS);
verify(registry).resolveTransferService(request);
verifyNoInteractions(transferService);
}

@Test
void terminate_shouldTerminateDataFlow() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import org.eclipse.edc.connector.dataplane.api.controller.DataPlanePublicApiController;
import org.eclipse.edc.connector.dataplane.api.validation.ConsumerPullTransferDataAddressResolver;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.http.EdcHttpClient;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;
Expand All @@ -32,8 +32,6 @@
import org.eclipse.edc.web.spi.configuration.WebServiceConfigurer;
import org.eclipse.edc.web.spi.configuration.WebServiceSettings;

import java.util.concurrent.Executors;

/**
* This extension provides the Data Plane API:
* - Control API: set of endpoints to trigger/monitor/cancel data transfers that should be accessible only from the Control Plane.
Expand Down Expand Up @@ -67,6 +65,9 @@ public class DataPlaneApiExtension implements ServiceExtension {
@Inject
private DataPlaneManager dataPlaneManager;

@Inject
private PipelineService pipelineService;

@Inject
private WebService webService;

Expand All @@ -86,19 +87,14 @@ public String name() {

@Override
public void initialize(ServiceExtensionContext context) {
var monitor = context.getMonitor();

var validationEndpoint = context.getConfig().getString(CONTROL_PLANE_VALIDATION_ENDPOINT);

var dataAddressResolver = new ConsumerPullTransferDataAddressResolver(httpClient, validationEndpoint, typeManager.getMapper());

var executorService = context.getService(ExecutorInstrumentation.class)
.instrument(Executors.newSingleThreadExecutor(), DataPlanePublicApiController.class.getSimpleName());

webService.registerResource(controlApiConfiguration.getContextAlias(), new DataPlaneControlApiController(dataPlaneManager));

var configuration = webServiceConfigurer.configure(context, webServer, PUBLIC_SETTINGS);
var publicApiController = new DataPlanePublicApiController(dataPlaneManager, dataAddressResolver);
var publicApiController = new DataPlanePublicApiController(pipelineService, dataAddressResolver);
webService.registerResource(configuration.getContextAlias(), publicApiController);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.connector.dataplane.spi.resolver.DataAddressResolver;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.web.spi.exception.NotAuthorizedException;
Expand All @@ -45,13 +45,13 @@
@Produces(MediaType.APPLICATION_JSON)
public class DataPlanePublicApiController implements DataPlanePublicApi {

private final DataPlaneManager dataPlaneManager;
private final PipelineService pipelineService;
private final DataAddressResolver dataAddressResolver;
private final DataFlowRequestSupplier requestSupplier;

public DataPlanePublicApiController(DataPlaneManager dataPlaneManager,
public DataPlanePublicApiController(PipelineService pipelineService,
DataAddressResolver dataAddressResolver) {
this.dataPlaneManager = dataPlaneManager;
this.pipelineService = pipelineService;
this.dataAddressResolver = dataAddressResolver;
this.requestSupplier = new DataFlowRequestSupplier();
}
Expand Down Expand Up @@ -121,7 +121,7 @@ private void handle(ContainerRequestContext context, AsyncResponse response) {
var dataAddress = extractSourceDataAddress(token);
var dataFlowRequest = requestSupplier.apply(contextApi, dataAddress);

var validationResult = dataPlaneManager.validate(dataFlowRequest);
var validationResult = pipelineService.validate(dataFlowRequest);
if (validationResult.failed()) {
var errorMsg = validationResult.getFailureMessages().isEmpty() ?
format("Failed to validate request with id: %s", dataFlowRequest.getId()) :
Expand All @@ -130,7 +130,7 @@ private void handle(ContainerRequestContext context, AsyncResponse response) {
return;
}

dataPlaneManager.transfer(dataFlowRequest)
pipelineService.transfer(dataFlowRequest)
.whenComplete((result, throwable) -> {
if (throwable == null) {
if (result.succeeded()) {
Expand Down
Loading

0 comments on commit 8fb17e3

Please sign in to comment.