Skip to content

Commit

Permalink
refactor: cleanup DataPlaneSignalingFlowController (#4209)
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed May 24, 2024
1 parent 4f36218 commit 824c862
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,17 @@
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.response.ResponseStatus;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.jetbrains.annotations.NotNull;

import java.util.Collection;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Predicate;

import static java.util.Collections.emptySet;
import static java.util.stream.Collectors.toSet;
import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR;

/**
* Implementation of {@link DataFlowController} that is compliant with the data plane signaling.
Expand Down Expand Up @@ -78,12 +76,17 @@ public boolean canHandle(TransferProcess transferProcess) {
public @NotNull StatusResult<DataFlowResponse> start(TransferProcess transferProcess, Policy policy) {
var flowType = flowTypeExtractor.extract(transferProcess.getTransferType());
if (flowType.failed()) {
return StatusResult.failure(ResponseStatus.FATAL_ERROR, flowType.getFailureDetail());
return StatusResult.failure(FATAL_ERROR, flowType.getFailureDetail());
}

var propertiesResult = propertiesProvider.propertiesFor(transferProcess, policy);
if (propertiesResult.failed()) {
return StatusResult.failure(ResponseStatus.FATAL_ERROR, propertiesResult.getFailureDetail());
return StatusResult.failure(FATAL_ERROR, propertiesResult.getFailureDetail());
}

var selection = selectorClient.select(transferProcess.getContentDataAddress(), transferProcess.getTransferType(), selectionStrategy);
if (!selection.succeeded()) {
return StatusResult.failure(FATAL_ERROR, selection.getFailureDetail());
}

var dataFlowRequest = DataFlowStartMessage.Builder.newInstance()
Expand All @@ -99,36 +102,37 @@ public boolean canHandle(TransferProcess transferProcess) {
.properties(propertiesResult.getContent())
.build();

var selection = selectorClient.select(transferProcess.getContentDataAddress(), transferProcess.getTransferType(), selectionStrategy);
if (selection.succeeded()) {
var dataPlaneInstance = selection.getContent();
return clientFactory.createClient(dataPlaneInstance)
.start(dataFlowRequest)
.map(it -> DataFlowResponse.Builder.newInstance()
.dataAddress(it.getDataAddress())
.dataPlaneId(dataPlaneInstance.getId())
.build()
);
} else {
// TODO: this branch works for embedded data plane but it is a potential false positive when the dataplane is not found, needs to be refactored
return clientFactory.createClient(null)
.start(dataFlowRequest)
.map(it -> DataFlowResponse.Builder.newInstance()
.dataAddress(it.getDataAddress())
.dataPlaneId(null)
.build()
);
}
var dataPlaneInstance = selection.getContent();
return clientFactory.createClient(dataPlaneInstance)
.start(dataFlowRequest)
.map(it -> DataFlowResponse.Builder.newInstance()
.dataAddress(it.getDataAddress())
.dataPlaneId(dataPlaneInstance.getId())
.build()
);

}

@Override
public StatusResult<Void> suspend(TransferProcess transferProcess) {
return onDataplaneInstancesDo("suspending", transferProcess, DataPlaneClient::suspend);
return getClientForDataplane(transferProcess.getDataPlaneId())
.map(client -> client.suspend(transferProcess.getId()))
.orElse(f -> {
var message = "Failed to select the data plane for suspending the transfer process %s. %s"
.formatted(transferProcess.getId(), f.getFailureDetail());
return StatusResult.failure(FATAL_ERROR, message);
});
}

@Override
public StatusResult<Void> terminate(TransferProcess transferProcess) {
return onDataplaneInstancesDo("terminating", transferProcess, DataPlaneClient::terminate);
return getClientForDataplane(transferProcess.getDataPlaneId())
.map(client -> client.terminate(transferProcess.getId()))
.orElse(f -> {
var message = "Failed to select the data plane for terminating the transfer process %s. %s"
.formatted(transferProcess.getId(), f.getFailureDetail());
return StatusResult.failure(FATAL_ERROR, message);
});
}

@Override
Expand All @@ -145,25 +149,18 @@ public Set<String> transferTypesFor(Asset asset) {
.collect(toSet());
}

private StatusResult<Void> onDataplaneInstancesDo(String action, TransferProcess transferProcess, BiFunction<DataPlaneClient, String, StatusResult<Void>> clientAction) {
private StatusResult<DataPlaneClient> getClientForDataplane(String id) {
var result = selectorClient.getAll();
if (result.failed()) {
return StatusResult.failure(ResponseStatus.FATAL_ERROR, result.getFailureDetail());
return StatusResult.failure(FATAL_ERROR, result.getFailureDetail());
}

return result.getContent().stream()
.filter(dataPlaneInstanceFilter(transferProcess))
.filter(instance -> instance.getId().equals(id))
.findFirst()
.map(clientFactory::createClient)
.map(client -> clientAction.apply(client, transferProcess.getId()))
.reduce(StatusResult::merge)
.orElse(StatusResult.failure(ResponseStatus.FATAL_ERROR, "Failed to select the data plane for %s the transfer process %s".formatted(action, transferProcess.getId())));
.map(StatusResult::success)
.orElseGet(() -> StatusResult.failure(FATAL_ERROR, "No data-plane found with id %s".formatted(id)));
}

private Predicate<DataPlaneInstance> dataPlaneInstanceFilter(TransferProcess transferProcess) {
if (transferProcess.getDataPlaneId() != null) {
return dataPlaneInstance -> dataPlaneInstance.getId().equals(transferProcess.getDataPlaneId());
} else {
return d -> true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,30 +149,19 @@ void transferSuccess_withReturnedDataAddress() {
}

@Test
void transferSuccess_withoutDataPlane() {
void shouldFail_whenNoDataplaneSelected() {
when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PULL));
var source = testDataAddress();
var transferProcess = transferProcessBuilder()
.contentDataAddress(testDataAddress())
.transferType(HTTP_DATA_PULL)
.build();

when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.success(Map.of()));
when(dataPlaneClient.start(any(DataFlowStartMessage.class))).thenReturn(StatusResult.success(mock(DataFlowResponseMessage.class)));
when(selectorService.select(any(), anyString(), any())).thenReturn(ServiceResult.notFound("no dataplane found"));
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);

var result = flowController.start(transferProcess, Policy.Builder.newInstance().build());

assertThat(result).isSucceeded().extracting(DataFlowResponse::getDataPlaneId).isNull();
var captor = ArgumentCaptor.forClass(DataFlowStartMessage.class);
verify(dataPlaneClient).start(captor.capture());
var captured = captor.getValue();
assertThat(captured.getProcessId()).isEqualTo(transferProcess.getId());
assertThat(captured.getSourceDataAddress()).usingRecursiveComparison().isEqualTo(source);
assertThat(captured.getDestinationDataAddress()).usingRecursiveComparison().isEqualTo(transferProcess.getDataDestination());
assertThat(captured.getProperties()).isEmpty();
assertThat(captured.getCallbackAddress()).isNotNull();
assertThat(result).isFailed();
}

@Test
Expand Down Expand Up @@ -230,42 +219,25 @@ void returnFailedResultIfTransferFails() {

@Nested
class Terminate {
@Test
void shouldCallTerminate() {
var transferProcess = transferProcessBuilder()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.build();
when(dataPlaneClient.terminate(any())).thenReturn(StatusResult.success());
var dataPlaneInstance = createDataPlaneInstance();
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of(dataPlaneInstance)));

var result = flowController.terminate(transferProcess);

assertThat(result).isSucceeded();
verify(dataPlaneClient).terminate("transferProcessId");
}

@Test
void shouldCallTerminateOnTheRightDataPlane() {
var dataPlaneInstance = createDataPlaneInstance();
var mockedDataPlane = mock(DataPlaneInstance.class);
var dataPlaneInstance = dataPlaneInstanceBuilder().id("dataPlaneId").build();
var anotherDataPlane = dataPlaneInstanceBuilder().id("anotherId").build();
var transferProcess = transferProcessBuilder()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.dataPlaneId(dataPlaneInstance.getId())
.dataPlaneId("dataPlaneId")
.build();
when(mockedDataPlane.getId()).thenReturn("notValidId");
when(dataPlaneClient.terminate(any())).thenReturn(StatusResult.success());
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of(dataPlaneInstance, mockedDataPlane)));
when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of(dataPlaneInstance, anotherDataPlane)));

var result = flowController.terminate(transferProcess);

assertThat(result).isSucceeded();
verify(dataPlaneClient).terminate("transferProcessId");
verify(mockedDataPlane).getId();
verify(dataPlaneClientFactory).createClient(dataPlaneInstance);
}

@Test
Expand Down Expand Up @@ -308,9 +280,10 @@ void shouldCallTerminate() {
var transferProcess = TransferProcess.Builder.newInstance()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.dataPlaneId("dataPlaneId")
.build();
when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success());
var dataPlaneInstance = createDataPlaneInstance();
var dataPlaneInstance = dataPlaneInstanceBuilder().id("dataPlaneId").build();
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of(dataPlaneInstance)));

Expand All @@ -321,24 +294,23 @@ void shouldCallTerminate() {
}

@Test
void shouldCallTerminateOnTheRightDataPlane() {
var dataPlaneInstance = createDataPlaneInstance();
var mockedDataPlane = mock(DataPlaneInstance.class);
void shouldCallSuspendOnTheRightDataPlane() {
var dataPlaneInstance = dataPlaneInstanceBuilder().id("dataPlaneId").build();
var anotherDataPlane = dataPlaneInstanceBuilder().id("anotherDataPlaneId").build();
var transferProcess = TransferProcess.Builder.newInstance()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.dataPlaneId(dataPlaneInstance.getId())
.build();
when(mockedDataPlane.getId()).thenReturn("notValidId");
when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success());
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of(dataPlaneInstance, mockedDataPlane)));
when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of(dataPlaneInstance, anotherDataPlane)));

var result = flowController.suspend(transferProcess);

assertThat(result).isSucceeded();
verify(dataPlaneClient).suspend("transferProcessId");
verify(mockedDataPlane).getId();
verify(dataPlaneClientFactory).createClient(dataPlaneInstance);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ class EmbeddedDataPlane extends Tests {

private static final EdcRuntimeExtension CONSUMER_RUNTIME = Runtimes.InMemory.controlPlane("consumer-control-plane", CONSUMER.controlPlaneConfiguration());
private static final EdcRuntimeExtension PROVIDER_RUNTIME = Runtimes.InMemory.controlPlaneEmbeddedDataPlane("provider-control-plane", PROVIDER.controlPlaneEmbeddedDataPlaneConfiguration());

@RegisterExtension
static final EdcClassRuntimesExtension RUNTIMES = new EdcClassRuntimesExtension(
CONSUMER_RUNTIME,
Expand Down

0 comments on commit 824c862

Please sign in to comment.