Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: make DataPlanePublicApiController use PipelineService #3630

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
import org.eclipse.edc.connector.dataplane.spi.DataFlow;
import org.eclipse.edc.connector.dataplane.spi.DataFlowStates;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.entity.StatefulEntity;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.response.StatusResult;
Expand All @@ -34,7 +32,6 @@

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import static java.lang.String.format;
Expand Down Expand Up @@ -90,15 +87,6 @@ public void initiate(DataFlowRequest dataRequest) {
update(dataFlow);
}

@Override
public CompletableFuture<StreamResult<Object>> transfer(DataFlowRequest request) {
var transferService = transferServiceRegistry.resolveTransferService(request);
if (transferService == null) {
return CompletableFuture.failedFuture(new EdcException("No TransferService available for request " + request.getProcessId()));
}
return transferService.transfer(request);
}

@Override
public DataFlowStates transferState(String processId) {
return Optional.ofNullable(store.findById(processId)).map(StatefulEntity::getState)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import static java.util.Collections.emptyList;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.COMPLETED;
Expand Down Expand Up @@ -113,31 +112,6 @@ void initiateDataFlow() {
assertThat(dataFlow.getState()).isEqualTo(RECEIVED.code());
}

@Test
void transfer_shouldCallTransferOnResolvedService() {
var request = dataFlowBuilder().build().toRequest();
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.transfer(any())).thenReturn(CompletableFuture.completedFuture(StreamResult.success()));

var future = manager.transfer(request);

assertThat(future).succeedsWithin(5, SECONDS);
verify(registry).resolveTransferService(request);
verify(transferService).transfer(request);
}

@Test
void transfer_shouldFail_whenNoTransferServiceAvailable() {
var request = dataFlowBuilder().build().toRequest();
when(registry.resolveTransferService(any())).thenReturn(null);

var future = manager.transfer(request);

assertThat(future).failsWithin(5, SECONDS);
verify(registry).resolveTransferService(request);
verifyNoInteractions(transferService);
}

@Test
void terminate_shouldTerminateDataFlow() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import org.eclipse.edc.connector.dataplane.api.controller.DataPlanePublicApiController;
import org.eclipse.edc.connector.dataplane.api.validation.ConsumerPullTransferDataAddressResolver;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.http.EdcHttpClient;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;
Expand All @@ -32,8 +32,6 @@
import org.eclipse.edc.web.spi.configuration.WebServiceConfigurer;
import org.eclipse.edc.web.spi.configuration.WebServiceSettings;

import java.util.concurrent.Executors;

/**
* This extension provides the Data Plane API:
* - Control API: set of endpoints to trigger/monitor/cancel data transfers that should be accessible only from the Control Plane.
Expand Down Expand Up @@ -67,6 +65,9 @@ public class DataPlaneApiExtension implements ServiceExtension {
@Inject
private DataPlaneManager dataPlaneManager;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, because it is used by DataPlaneControlApiController


@Inject
private PipelineService pipelineService;

@Inject
private WebService webService;

Expand All @@ -86,19 +87,14 @@ public String name() {

@Override
public void initialize(ServiceExtensionContext context) {
var monitor = context.getMonitor();

var validationEndpoint = context.getConfig().getString(CONTROL_PLANE_VALIDATION_ENDPOINT);

var dataAddressResolver = new ConsumerPullTransferDataAddressResolver(httpClient, validationEndpoint, typeManager.getMapper());

var executorService = context.getService(ExecutorInstrumentation.class)
.instrument(Executors.newSingleThreadExecutor(), DataPlanePublicApiController.class.getSimpleName());

webService.registerResource(controlApiConfiguration.getContextAlias(), new DataPlaneControlApiController(dataPlaneManager));

var configuration = webServiceConfigurer.configure(context, webServer, PUBLIC_SETTINGS);
var publicApiController = new DataPlanePublicApiController(dataPlaneManager, dataAddressResolver);
var publicApiController = new DataPlanePublicApiController(pipelineService, dataAddressResolver);
webService.registerResource(configuration.getContextAlias(), publicApiController);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.connector.dataplane.spi.resolver.DataAddressResolver;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.web.spi.exception.NotAuthorizedException;
Expand All @@ -45,13 +45,13 @@
@Produces(MediaType.APPLICATION_JSON)
public class DataPlanePublicApiController implements DataPlanePublicApi {

private final DataPlaneManager dataPlaneManager;
private final PipelineService pipelineService;
private final DataAddressResolver dataAddressResolver;
private final DataFlowRequestSupplier requestSupplier;

public DataPlanePublicApiController(DataPlaneManager dataPlaneManager,
public DataPlanePublicApiController(PipelineService pipelineService,
DataAddressResolver dataAddressResolver) {
this.dataPlaneManager = dataPlaneManager;
this.pipelineService = pipelineService;
this.dataAddressResolver = dataAddressResolver;
this.requestSupplier = new DataFlowRequestSupplier();
}
Expand Down Expand Up @@ -121,7 +121,7 @@ private void handle(ContainerRequestContext context, AsyncResponse response) {
var dataAddress = extractSourceDataAddress(token);
var dataFlowRequest = requestSupplier.apply(contextApi, dataAddress);

var validationResult = dataPlaneManager.validate(dataFlowRequest);
var validationResult = pipelineService.validate(dataFlowRequest);
if (validationResult.failed()) {
var errorMsg = validationResult.getFailureMessages().isEmpty() ?
format("Failed to validate request with id: %s", dataFlowRequest.getId()) :
Expand All @@ -130,7 +130,7 @@ private void handle(ContainerRequestContext context, AsyncResponse response) {
return;
}

dataPlaneManager.transfer(dataFlowRequest)
pipelineService.transfer(dataFlowRequest)
.whenComplete((result, throwable) -> {
if (throwable == null) {
if (result.succeeded()) {
Expand Down
Loading
Loading