Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DataFlowManager and DataFlowController refactoring #3917

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public void register(int priority, DataFlowController controller) {

@WithSpan
@Override
public @NotNull StatusResult<DataFlowResponse> initiate(TransferProcess transferProcess, Policy policy) {
public @NotNull StatusResult<DataFlowResponse> start(TransferProcess transferProcess, Policy policy) {
try {
return chooseControllerAndApply(transferProcess, controller -> controller.initiateFlow(transferProcess, policy));
return chooseControllerAndApply(transferProcess, controller -> controller.start(transferProcess, policy));
} catch (Exception e) {
return StatusResult.failure(FATAL_ERROR, runtimeException(transferProcess.getId(), e.getLocalizedMessage()));
}
Expand Down Expand Up @@ -95,6 +95,7 @@ private String controllerNotFound(String id) {
return format("Unable to process transfer %s. No data flow controller found", id);
}

record PrioritizedDataFlowController(int priority, DataFlowController controller) { }
record PrioritizedDataFlowController(int priority, DataFlowController controller) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ private boolean processRequesting(TransferProcess process) {
private boolean processStarting(TransferProcess process) {
var policy = policyArchive.findPolicyForContract(process.getContractId());

return entityRetryProcessFactory.doSyncProcess(process, () -> dataFlowManager.initiate(process, policy))
return entityRetryProcessFactory.doSyncProcess(process, () -> dataFlowManager.start(process, policy))
.onSuccess((p, dataFlowResponse) -> sendTransferStartMessage(p, dataFlowResponse, policy))
.onFatalError((p, failure) -> transitionToTerminating(p, failure.getFailureDetail()))
.onFailure((t, failure) -> transitionToStarting(t))
Expand Down Expand Up @@ -388,7 +388,7 @@ private void sendTransferStartMessage(TransferProcess process, DataFlowResponse
.dataAddress(dataFlowResponse.getDataAddress());

dispatch(messageBuilder, process, policy, Object.class)
.onSuccess((t, content) -> transitionToStarted(t))
.onSuccess((t, content) -> transitionToStarted(t, dataFlowResponse.getDataPlaneId()))
.onFailure((t, throwable) -> transitionToStarting(t))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable))
Expand Down Expand Up @@ -513,8 +513,8 @@ private void transitionToStarting(TransferProcess transferProcess) {
update(transferProcess);
}

private void transitionToStarted(TransferProcess process) {
process.transitionStarted();
private void transitionToStarted(TransferProcess process, String dataPlaneId) {
process.transitionStarted(dataPlaneId);
observable.invokeForEach(l -> l.preStarted(process));
update(process);
observable.invokeForEach(l -> l.started(process, TransferProcessStartedData.Builder.newInstance().build()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ void shouldInitiateFlowOnCorrectController() {
var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build();

when(controller.canHandle(any())).thenReturn(true);
when(controller.initiateFlow(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build()));
when(controller.start(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build()));
manager.register(controller);

var response = manager.initiate(transferProcess, policy);
var response = manager.start(transferProcess, policy);

assertThat(response.succeeded()).isTrue();
}
Expand All @@ -71,7 +71,7 @@ void shouldReturnFatalError_whenNoControllerCanHandleTheRequest() {
when(controller.canHandle(any())).thenReturn(false);
manager.register(controller);

var response = manager.initiate(transferProcess, policy);
var response = manager.start(transferProcess, policy);

assertThat(response.succeeded()).isFalse();
assertThat(response.getFailure().status()).isEqualTo(FATAL_ERROR);
Expand All @@ -87,10 +87,10 @@ void shouldCatchExceptionsAndReturnFatalError() {

var errorMsg = "Test Error Message";
when(controller.canHandle(any())).thenReturn(true);
when(controller.initiateFlow(any(), any())).thenThrow(new EdcException(errorMsg));
when(controller.start(any(), any())).thenThrow(new EdcException(errorMsg));
manager.register(controller);

var response = manager.initiate(transferProcess, policy);
var response = manager.start(transferProcess, policy);

assertThat(response.succeeded()).isFalse();
assertThat(response.getFailure().status()).isEqualTo(FATAL_ERROR);
Expand All @@ -104,16 +104,16 @@ void shouldChooseHighestPriorityController() {
manager.register(1, lowPriority);
manager.register(2, highPriority);

manager.initiate(TransferProcess.Builder.newInstance().build(), Policy.Builder.newInstance().build());
manager.start(TransferProcess.Builder.newInstance().build(), Policy.Builder.newInstance().build());

verify(highPriority).initiateFlow(any(), any());
verify(highPriority).start(any(), any());
verifyNoInteractions(lowPriority);
}

private DataFlowController createDataFlowController() {
var dataFlowController = mock(DataFlowController.class);
when(dataFlowController.canHandle(any())).thenReturn(true);
when(dataFlowController.initiateFlow(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build()));
when(dataFlowController.start(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build()));
return dataFlowController;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,27 @@ void verifyProvision_shouldNotStarve() {

}

private ProvisionedResourceSet provisionedResourceSet() {
return ProvisionedResourceSet.Builder.newInstance()
.resources(List.of(new TestProvisionedDataDestinationResource("test-resource", "1")))
.build();
}

private TransferProcess.Builder transferProcessBuilder() {
var processId = UUID.randomUUID().toString();
var dataRequest = DataRequest.Builder.newInstance()
.id(processId)
.destinationType("test-type")
.contractId(UUID.randomUUID().toString())
.build();

return TransferProcess.Builder.newInstance()
.provisionedResourceSet(ProvisionedResourceSet.Builder.newInstance().build())
.type(CONSUMER)
.id("test-process-" + processId)
.dataRequest(dataRequest);
}

@Nested
class IdempotencyProcessStateReplication {

Expand All @@ -176,7 +197,7 @@ void shouldSentMessageWithTheSameId_whenFirstDispatchFailed(TransferProcess.Type
when(dispatcherRegistry.dispatch(any(), isA(messageType)))
.thenReturn(completedFuture(StatusResult.failure(ERROR_RETRY)))
.thenReturn(completedFuture(StatusResult.success(TransferProcessAck.Builder.newInstance().build())));
when(dataFlowManager.initiate(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build()));
when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build()));
when(dataFlowManager.terminate(any())).thenReturn(StatusResult.success());

var transfer = transferProcessBuilder().type(type).state(state.code()).build();
Expand Down Expand Up @@ -205,36 +226,15 @@ private static class EgressMessages implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext extensionContext) {
return Stream.of(
arguments(CONSUMER, REQUESTING, TransferRequestMessage.class),
arguments(CONSUMER, COMPLETING, TransferCompletionMessage.class),
arguments(CONSUMER, TERMINATING, TransferTerminationMessage.class),
arguments(PROVIDER, STARTING, TransferStartMessage.class),
arguments(PROVIDER, COMPLETING, TransferCompletionMessage.class),
arguments(PROVIDER, TERMINATING, TransferTerminationMessage.class)
arguments(CONSUMER, REQUESTING, TransferRequestMessage.class),
arguments(CONSUMER, COMPLETING, TransferCompletionMessage.class),
arguments(CONSUMER, TERMINATING, TransferTerminationMessage.class),
arguments(PROVIDER, STARTING, TransferStartMessage.class),
arguments(PROVIDER, COMPLETING, TransferCompletionMessage.class),
arguments(PROVIDER, TERMINATING, TransferTerminationMessage.class)
);
}
}
}

private ProvisionedResourceSet provisionedResourceSet() {
return ProvisionedResourceSet.Builder.newInstance()
.resources(List.of(new TestProvisionedDataDestinationResource("test-resource", "1")))
.build();
}

private TransferProcess.Builder transferProcessBuilder() {
var processId = UUID.randomUUID().toString();
var dataRequest = DataRequest.Builder.newInstance()
.id(processId)
.destinationType("test-type")
.contractId(UUID.randomUUID().toString())
.build();

return TransferProcess.Builder.newInstance()
.provisionedResourceSet(ProvisionedResourceSet.Builder.newInstance().build())
.type(CONSUMER)
.id("test-process-" + processId)
.dataRequest(dataRequest);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class TransferProcessManagerImplTest {
@BeforeEach
void setup() {
when(protocolWebhook.url()).thenReturn(protocolWebhookUrl);
when(dataFlowManager.initiate(any(), any())).thenReturn(StatusResult.success(createDataFlowResponse()));
when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.success(createDataFlowResponse()));
when(policyArchive.findPolicyForContract(any())).thenReturn(Policy.Builder.newInstance().build());
var observable = new TransferProcessObservableImpl();
observable.registerListener(listener);
Expand Down Expand Up @@ -497,7 +497,7 @@ void starting_shouldStartDataTransferAndSendMessageToConsumer() {
when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build());
when(transferProcessStore.nextNotLeased(anyInt(), providerStateIs(STARTING.code()))).thenReturn(List.of(process)).thenReturn(emptyList());
when(transferProcessStore.findById(process.getId())).thenReturn(process);
when(dataFlowManager.initiate(any(), any())).thenReturn(StatusResult.success(dataFlowResponse));
when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.success(dataFlowResponse));
when(dispatcherRegistry.dispatch(any(), isA(TransferStartMessage.class))).thenReturn(completedFuture(StatusResult.success("any")));

manager.start();
Expand All @@ -519,7 +519,7 @@ void starting_shouldStartDataTransferAndSendMessageToConsumer() {
@Test
void starting_onFailureAndRetriesNotExhausted_updatesStateCountForRetry() {
var process = createTransferProcess(STARTING).toBuilder().type(PROVIDER).build();
when(dataFlowManager.initiate(any(), any())).thenReturn(StatusResult.failure(ERROR_RETRY));
when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.failure(ERROR_RETRY));
when(transferProcessStore.nextNotLeased(anyInt(), providerStateIs(STARTING.code()))).thenReturn(List.of(process)).thenReturn(emptyList());
when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(STARTING.code()).build());

Expand All @@ -535,7 +535,7 @@ void starting_shouldTransitionToTerminatingIfFatalFailure() {
var process = createTransferProcess(STARTING).toBuilder().type(PROVIDER).build();
when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build());
when(transferProcessStore.nextNotLeased(anyInt(), providerStateIs(STARTING.code()))).thenReturn(List.of(process)).thenReturn(emptyList());
when(dataFlowManager.initiate(any(), any())).thenReturn(StatusResult.failure(FATAL_ERROR));
when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.failure(FATAL_ERROR));

manager.start();

Expand All @@ -547,7 +547,7 @@ void starting_shouldTransitionToTerminatingIfFatalFailure() {
@Test
void starting_onFailureAndRetriesExhausted_transitToTerminating() {
var process = createTransferProcessBuilder(STARTING).type(PROVIDER).stateCount(RETRY_EXHAUSTED).build();
when(dataFlowManager.initiate(any(), any())).thenReturn(StatusResult.failure(ERROR_RETRY));
when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.failure(ERROR_RETRY));
when(transferProcessStore.nextNotLeased(anyInt(), providerStateIs(STARTING.code()))).thenReturn(List.of(process)).thenReturn(emptyList());
when(transferProcessStore.findById(process.getId())).thenReturn(process);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,6 @@

}

@Override
protected StateMachineManager.Builder configureStateMachineManager(StateMachineManager.Builder builder) {
return builder
.processor(processDataFlowInState(RECEIVED, this::processReceived))
.processor(processDataFlowInState(COMPLETED, this::processCompleted))
.processor(processDataFlowInState(FAILED, this::processFailed));
}

@Override
public Result<Boolean> validate(DataFlowStartMessage dataRequest) {
var transferService = transferServiceRegistry.resolveTransferService(dataRequest);
Expand All @@ -79,7 +71,6 @@
.destination(dataRequest.getDestinationDataAddress())
.callbackAddress(dataRequest.getCallbackAddress())
.traceContext(telemetry.getCurrentTraceContext())
.trackable(dataRequest.isTrackable())
.properties(dataRequest.getProperties())
.state(RECEIVED.code())
.build();
Expand Down Expand Up @@ -116,6 +107,14 @@
}
}

@Override
protected StateMachineManager.Builder configureStateMachineManager(StateMachineManager.Builder builder) {
return builder
.processor(processDataFlowInState(RECEIVED, this::processReceived))
.processor(processDataFlowInState(COMPLETED, this::processCompleted))
.processor(processDataFlowInState(FAILED, this::processFailed));
}

private boolean processReceived(DataFlow dataFlow) {
var request = dataFlow.toRequest();
var transferService = transferServiceRegistry.resolveTransferService(request);
Expand Down Expand Up @@ -188,19 +187,24 @@

public static class Builder extends AbstractStateEntityManager.Builder<DataFlow, DataPlaneStore, DataPlaneManagerImpl, Builder> {

public static Builder newInstance() {
return new Builder();
}

private Builder() {
super(new DataPlaneManagerImpl());
}

public static Builder newInstance() {
return new Builder();
}

@Override
public Builder self() {
return this;
}

public DataPlaneManagerImpl build() {

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
Builder<DataFlow,DataPlaneStore,DataPlaneManagerImpl,Builder>.build
; it is advisable to add an Override annotation.
Objects.requireNonNull(manager.transferProcessClient);
return manager;
}

public Builder transferServiceRegistry(TransferServiceRegistry transferServiceRegistry) {
manager.transferServiceRegistry = transferServiceRegistry;
return this;
Expand All @@ -210,11 +214,6 @@
manager.transferProcessClient = transferProcessClient;
return this;
}

public DataPlaneManagerImpl build() {
Objects.requireNonNull(manager.transferProcessClient);
return manager;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ void initiateDataFlow() {
.destinationDataAddress(DataAddress.Builder.newInstance().type("type").build())
.callbackAddress(URI.create("http://any"))
.properties(Map.of("key", "value"))
.trackable(true)
.build();

manager.initiate(request);
Expand All @@ -108,7 +107,6 @@ void initiateDataFlow() {
assertThat(dataFlow.getDestination()).isSameAs(request.getDestinationDataAddress());
assertThat(dataFlow.getCallbackAddress()).isEqualTo(URI.create("http://any"));
assertThat(dataFlow.getProperties()).isEqualTo(request.getProperties());
assertThat(dataFlow.isTrackable()).isEqualTo(request.isTrackable());
assertThat(dataFlow.getState()).isEqualTo(RECEIVED.code());
}

Expand Down Expand Up @@ -340,7 +338,6 @@ private DataFlow.Builder dataFlowBuilder() {
.source(DataAddress.Builder.newInstance().type("source").build())
.destination(DataAddress.Builder.newInstance().type("destination").build())
.callbackAddress(URI.create("http://any"))
.trackable(true)
.properties(Map.of("key", "value"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ CREATE TABLE IF NOT EXISTS edc_transfer_process
pending BOOLEAN DEFAULT FALSE,
transfer_type VARCHAR,
protocol_messages JSON,
data_plane_id VARCHAR,
lease_id VARCHAR
CONSTRAINT transfer_process_lease_lease_id_fk
REFERENCES edc_lease
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ private void update(Connection conn, TransferProcess process, String existingDat
process.isPending(),
process.getTransferType(),
toJson(process.getProtocolMessages()),
process.getDataPlaneId(),
process.getId());

var newDr = process.getDataRequest();
Expand Down Expand Up @@ -313,7 +314,8 @@ private void insert(Connection conn, TransferProcess process) {
toJson(process.getCallbackAddresses()),
process.isPending(),
process.getTransferType(),
toJson(process.getProtocolMessages()));
toJson(process.getProtocolMessages()),
process.getDataPlaneId());

//insert DataRequest
var dr = process.getDataRequest();
Expand Down Expand Up @@ -358,6 +360,7 @@ private TransferProcess mapTransferProcess(ResultSet resultSet) throws SQLExcept
.pending(resultSet.getBoolean(statements.getPendingColumn()))
.transferType(resultSet.getString(statements.getTransferTypeColumn()))
.protocolMessages(fromJson(resultSet.getString(statements.getProtocolMessagesColumn()), ProtocolMessages.class))
.dataPlaneId(resultSet.getString(statements.getDataPlaneIdColumn()))
.build();
}

Expand Down