diff --git a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImpl.java b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImpl.java index 0bf244b12e4..d7426b9fc02 100644 --- a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImpl.java +++ b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImpl.java @@ -54,9 +54,9 @@ public void register(int priority, DataFlowController controller) { @WithSpan @Override - public @NotNull StatusResult initiate(TransferProcess transferProcess, Policy policy) { + public @NotNull StatusResult start(TransferProcess transferProcess, Policy policy) { try { - return chooseControllerAndApply(transferProcess, controller -> controller.initiateFlow(transferProcess, policy)); + return chooseControllerAndApply(transferProcess, controller -> controller.start(transferProcess, policy)); } catch (Exception e) { return StatusResult.failure(FATAL_ERROR, runtimeException(transferProcess.getId(), e.getLocalizedMessage())); } @@ -95,6 +95,7 @@ private String controllerNotFound(String id) { return format("Unable to process transfer %s. No data flow controller found", id); } - record PrioritizedDataFlowController(int priority, DataFlowController controller) { } + record PrioritizedDataFlowController(int priority, DataFlowController controller) { + } } diff --git a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java index d809cb54f25..0c1ef817304 100644 --- a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java +++ b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java @@ -307,7 +307,7 @@ private boolean processRequesting(TransferProcess process) { private boolean processStarting(TransferProcess process) { var policy = policyArchive.findPolicyForContract(process.getContractId()); - return entityRetryProcessFactory.doSyncProcess(process, () -> dataFlowManager.initiate(process, policy)) + return entityRetryProcessFactory.doSyncProcess(process, () -> dataFlowManager.start(process, policy)) .onSuccess((p, dataFlowResponse) -> sendTransferStartMessage(p, dataFlowResponse, policy)) .onFatalError((p, failure) -> transitionToTerminating(p, failure.getFailureDetail())) .onFailure((t, failure) -> transitionToStarting(t)) @@ -388,7 +388,7 @@ private void sendTransferStartMessage(TransferProcess process, DataFlowResponse .dataAddress(dataFlowResponse.getDataAddress()); dispatch(messageBuilder, process, policy, Object.class) - .onSuccess((t, content) -> transitionToStarted(t)) + .onSuccess((t, content) -> transitionToStarted(t, dataFlowResponse.getDataPlaneId())) .onFailure((t, throwable) -> transitionToStarting(t)) .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) .onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) @@ -513,8 +513,8 @@ private void transitionToStarting(TransferProcess transferProcess) { update(transferProcess); } - private void transitionToStarted(TransferProcess process) { - process.transitionStarted(); + private void transitionToStarted(TransferProcess process, String dataPlaneId) { + process.transitionStarted(dataPlaneId); observable.invokeForEach(l -> l.preStarted(process)); update(process); observable.invokeForEach(l -> l.started(process, TransferProcessStartedData.Builder.newInstance().build())); diff --git a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImplTest.java b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImplTest.java index 09ad7c2a01e..b7034180f4d 100644 --- a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImplTest.java +++ b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImplTest.java @@ -52,10 +52,10 @@ void shouldInitiateFlowOnCorrectController() { var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build(); when(controller.canHandle(any())).thenReturn(true); - when(controller.initiateFlow(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build())); + when(controller.start(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build())); manager.register(controller); - var response = manager.initiate(transferProcess, policy); + var response = manager.start(transferProcess, policy); assertThat(response.succeeded()).isTrue(); } @@ -71,7 +71,7 @@ void shouldReturnFatalError_whenNoControllerCanHandleTheRequest() { when(controller.canHandle(any())).thenReturn(false); manager.register(controller); - var response = manager.initiate(transferProcess, policy); + var response = manager.start(transferProcess, policy); assertThat(response.succeeded()).isFalse(); assertThat(response.getFailure().status()).isEqualTo(FATAL_ERROR); @@ -87,10 +87,10 @@ void shouldCatchExceptionsAndReturnFatalError() { var errorMsg = "Test Error Message"; when(controller.canHandle(any())).thenReturn(true); - when(controller.initiateFlow(any(), any())).thenThrow(new EdcException(errorMsg)); + when(controller.start(any(), any())).thenThrow(new EdcException(errorMsg)); manager.register(controller); - var response = manager.initiate(transferProcess, policy); + var response = manager.start(transferProcess, policy); assertThat(response.succeeded()).isFalse(); assertThat(response.getFailure().status()).isEqualTo(FATAL_ERROR); @@ -104,16 +104,16 @@ void shouldChooseHighestPriorityController() { manager.register(1, lowPriority); manager.register(2, highPriority); - manager.initiate(TransferProcess.Builder.newInstance().build(), Policy.Builder.newInstance().build()); + manager.start(TransferProcess.Builder.newInstance().build(), Policy.Builder.newInstance().build()); - verify(highPriority).initiateFlow(any(), any()); + verify(highPriority).start(any(), any()); verifyNoInteractions(lowPriority); } private DataFlowController createDataFlowController() { var dataFlowController = mock(DataFlowController.class); when(dataFlowController.canHandle(any())).thenReturn(true); - when(dataFlowController.initiateFlow(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build())); + when(dataFlowController.start(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build())); return dataFlowController; } } diff --git a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplIntegrationTest.java b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplIntegrationTest.java index e9e6652a7dd..212b2a2f88f 100644 --- a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplIntegrationTest.java +++ b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplIntegrationTest.java @@ -166,6 +166,27 @@ void verifyProvision_shouldNotStarve() { } + private ProvisionedResourceSet provisionedResourceSet() { + return ProvisionedResourceSet.Builder.newInstance() + .resources(List.of(new TestProvisionedDataDestinationResource("test-resource", "1"))) + .build(); + } + + private TransferProcess.Builder transferProcessBuilder() { + var processId = UUID.randomUUID().toString(); + var dataRequest = DataRequest.Builder.newInstance() + .id(processId) + .destinationType("test-type") + .contractId(UUID.randomUUID().toString()) + .build(); + + return TransferProcess.Builder.newInstance() + .provisionedResourceSet(ProvisionedResourceSet.Builder.newInstance().build()) + .type(CONSUMER) + .id("test-process-" + processId) + .dataRequest(dataRequest); + } + @Nested class IdempotencyProcessStateReplication { @@ -176,7 +197,7 @@ void shouldSentMessageWithTheSameId_whenFirstDispatchFailed(TransferProcess.Type when(dispatcherRegistry.dispatch(any(), isA(messageType))) .thenReturn(completedFuture(StatusResult.failure(ERROR_RETRY))) .thenReturn(completedFuture(StatusResult.success(TransferProcessAck.Builder.newInstance().build()))); - when(dataFlowManager.initiate(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build())); + when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build())); when(dataFlowManager.terminate(any())).thenReturn(StatusResult.success()); var transfer = transferProcessBuilder().type(type).state(state.code()).build(); @@ -205,36 +226,15 @@ private static class EgressMessages implements ArgumentsProvider { @Override public Stream provideArguments(ExtensionContext extensionContext) { return Stream.of( - arguments(CONSUMER, REQUESTING, TransferRequestMessage.class), - arguments(CONSUMER, COMPLETING, TransferCompletionMessage.class), - arguments(CONSUMER, TERMINATING, TransferTerminationMessage.class), - arguments(PROVIDER, STARTING, TransferStartMessage.class), - arguments(PROVIDER, COMPLETING, TransferCompletionMessage.class), - arguments(PROVIDER, TERMINATING, TransferTerminationMessage.class) + arguments(CONSUMER, REQUESTING, TransferRequestMessage.class), + arguments(CONSUMER, COMPLETING, TransferCompletionMessage.class), + arguments(CONSUMER, TERMINATING, TransferTerminationMessage.class), + arguments(PROVIDER, STARTING, TransferStartMessage.class), + arguments(PROVIDER, COMPLETING, TransferCompletionMessage.class), + arguments(PROVIDER, TERMINATING, TransferTerminationMessage.class) ); } } } - - private ProvisionedResourceSet provisionedResourceSet() { - return ProvisionedResourceSet.Builder.newInstance() - .resources(List.of(new TestProvisionedDataDestinationResource("test-resource", "1"))) - .build(); - } - - private TransferProcess.Builder transferProcessBuilder() { - var processId = UUID.randomUUID().toString(); - var dataRequest = DataRequest.Builder.newInstance() - .id(processId) - .destinationType("test-type") - .contractId(UUID.randomUUID().toString()) - .build(); - - return TransferProcess.Builder.newInstance() - .provisionedResourceSet(ProvisionedResourceSet.Builder.newInstance().build()) - .type(CONSUMER) - .id("test-process-" + processId) - .dataRequest(dataRequest); - } } diff --git a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplTest.java b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplTest.java index 195cc544fe4..73a77807cc7 100644 --- a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplTest.java +++ b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplTest.java @@ -146,7 +146,7 @@ class TransferProcessManagerImplTest { @BeforeEach void setup() { when(protocolWebhook.url()).thenReturn(protocolWebhookUrl); - when(dataFlowManager.initiate(any(), any())).thenReturn(StatusResult.success(createDataFlowResponse())); + when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.success(createDataFlowResponse())); when(policyArchive.findPolicyForContract(any())).thenReturn(Policy.Builder.newInstance().build()); var observable = new TransferProcessObservableImpl(); observable.registerListener(listener); @@ -497,7 +497,7 @@ void starting_shouldStartDataTransferAndSendMessageToConsumer() { when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); when(transferProcessStore.nextNotLeased(anyInt(), providerStateIs(STARTING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); when(transferProcessStore.findById(process.getId())).thenReturn(process); - when(dataFlowManager.initiate(any(), any())).thenReturn(StatusResult.success(dataFlowResponse)); + when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.success(dataFlowResponse)); when(dispatcherRegistry.dispatch(any(), isA(TransferStartMessage.class))).thenReturn(completedFuture(StatusResult.success("any"))); manager.start(); @@ -519,7 +519,7 @@ void starting_shouldStartDataTransferAndSendMessageToConsumer() { @Test void starting_onFailureAndRetriesNotExhausted_updatesStateCountForRetry() { var process = createTransferProcess(STARTING).toBuilder().type(PROVIDER).build(); - when(dataFlowManager.initiate(any(), any())).thenReturn(StatusResult.failure(ERROR_RETRY)); + when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.failure(ERROR_RETRY)); when(transferProcessStore.nextNotLeased(anyInt(), providerStateIs(STARTING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(STARTING.code()).build()); @@ -535,7 +535,7 @@ void starting_shouldTransitionToTerminatingIfFatalFailure() { var process = createTransferProcess(STARTING).toBuilder().type(PROVIDER).build(); when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); when(transferProcessStore.nextNotLeased(anyInt(), providerStateIs(STARTING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); - when(dataFlowManager.initiate(any(), any())).thenReturn(StatusResult.failure(FATAL_ERROR)); + when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.failure(FATAL_ERROR)); manager.start(); @@ -547,7 +547,7 @@ void starting_shouldTransitionToTerminatingIfFatalFailure() { @Test void starting_onFailureAndRetriesExhausted_transitToTerminating() { var process = createTransferProcessBuilder(STARTING).type(PROVIDER).stateCount(RETRY_EXHAUSTED).build(); - when(dataFlowManager.initiate(any(), any())).thenReturn(StatusResult.failure(ERROR_RETRY)); + when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.failure(ERROR_RETRY)); when(transferProcessStore.nextNotLeased(anyInt(), providerStateIs(STARTING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); when(transferProcessStore.findById(process.getId())).thenReturn(process); diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java index be3f8236837..275d019b186 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java @@ -54,14 +54,6 @@ private DataPlaneManagerImpl() { } - @Override - protected StateMachineManager.Builder configureStateMachineManager(StateMachineManager.Builder builder) { - return builder - .processor(processDataFlowInState(RECEIVED, this::processReceived)) - .processor(processDataFlowInState(COMPLETED, this::processCompleted)) - .processor(processDataFlowInState(FAILED, this::processFailed)); - } - @Override public Result validate(DataFlowStartMessage dataRequest) { var transferService = transferServiceRegistry.resolveTransferService(dataRequest); @@ -79,7 +71,6 @@ public void initiate(DataFlowStartMessage dataRequest) { .destination(dataRequest.getDestinationDataAddress()) .callbackAddress(dataRequest.getCallbackAddress()) .traceContext(telemetry.getCurrentTraceContext()) - .trackable(dataRequest.isTrackable()) .properties(dataRequest.getProperties()) .state(RECEIVED.code()) .build(); @@ -116,6 +107,14 @@ public StatusResult terminate(String dataFlowId) { } } + @Override + protected StateMachineManager.Builder configureStateMachineManager(StateMachineManager.Builder builder) { + return builder + .processor(processDataFlowInState(RECEIVED, this::processReceived)) + .processor(processDataFlowInState(COMPLETED, this::processCompleted)) + .processor(processDataFlowInState(FAILED, this::processFailed)); + } + private boolean processReceived(DataFlow dataFlow) { var request = dataFlow.toRequest(); var transferService = transferServiceRegistry.resolveTransferService(request); @@ -188,19 +187,24 @@ private Processor processDataFlowInState(DataFlowStates state, Function { - public static Builder newInstance() { - return new Builder(); - } - private Builder() { super(new DataPlaneManagerImpl()); } + public static Builder newInstance() { + return new Builder(); + } + @Override public Builder self() { return this; } + public DataPlaneManagerImpl build() { + Objects.requireNonNull(manager.transferProcessClient); + return manager; + } + public Builder transferServiceRegistry(TransferServiceRegistry transferServiceRegistry) { manager.transferServiceRegistry = transferServiceRegistry; return this; @@ -210,11 +214,6 @@ public Builder transferProcessClient(TransferProcessApiClient transferProcessCli manager.transferProcessClient = transferProcessClient; return this; } - - public DataPlaneManagerImpl build() { - Objects.requireNonNull(manager.transferProcessClient); - return manager; - } } } diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java index dc65b1d9561..a9b5f9bb0aa 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java @@ -95,7 +95,6 @@ void initiateDataFlow() { .destinationDataAddress(DataAddress.Builder.newInstance().type("type").build()) .callbackAddress(URI.create("http://any")) .properties(Map.of("key", "value")) - .trackable(true) .build(); manager.initiate(request); @@ -108,7 +107,6 @@ void initiateDataFlow() { assertThat(dataFlow.getDestination()).isSameAs(request.getDestinationDataAddress()); assertThat(dataFlow.getCallbackAddress()).isEqualTo(URI.create("http://any")); assertThat(dataFlow.getProperties()).isEqualTo(request.getProperties()); - assertThat(dataFlow.isTrackable()).isEqualTo(request.isTrackable()); assertThat(dataFlow.getState()).isEqualTo(RECEIVED.code()); } @@ -340,7 +338,6 @@ private DataFlow.Builder dataFlowBuilder() { .source(DataAddress.Builder.newInstance().type("source").build()) .destination(DataAddress.Builder.newInstance().type("destination").build()) .callbackAddress(URI.create("http://any")) - .trackable(true) .properties(Map.of("key", "value")); } diff --git a/extensions/control-plane/store/sql/transfer-process-store-sql/docs/schema.sql b/extensions/control-plane/store/sql/transfer-process-store-sql/docs/schema.sql index ba7d38ee04f..2a91b187d3f 100644 --- a/extensions/control-plane/store/sql/transfer-process-store-sql/docs/schema.sql +++ b/extensions/control-plane/store/sql/transfer-process-store-sql/docs/schema.sql @@ -36,6 +36,7 @@ CREATE TABLE IF NOT EXISTS edc_transfer_process pending BOOLEAN DEFAULT FALSE, transfer_type VARCHAR, protocol_messages JSON, + data_plane_id VARCHAR, lease_id VARCHAR CONSTRAINT transfer_process_lease_lease_id_fk REFERENCES edc_lease diff --git a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/SqlTransferProcessStore.java b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/SqlTransferProcessStore.java index bf62d1a9215..492c5068413 100644 --- a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/SqlTransferProcessStore.java +++ b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/SqlTransferProcessStore.java @@ -257,6 +257,7 @@ private void update(Connection conn, TransferProcess process, String existingDat process.isPending(), process.getTransferType(), toJson(process.getProtocolMessages()), + process.getDataPlaneId(), process.getId()); var newDr = process.getDataRequest(); @@ -313,7 +314,8 @@ private void insert(Connection conn, TransferProcess process) { toJson(process.getCallbackAddresses()), process.isPending(), process.getTransferType(), - toJson(process.getProtocolMessages())); + toJson(process.getProtocolMessages()), + process.getDataPlaneId()); //insert DataRequest var dr = process.getDataRequest(); @@ -358,6 +360,7 @@ private TransferProcess mapTransferProcess(ResultSet resultSet) throws SQLExcept .pending(resultSet.getBoolean(statements.getPendingColumn())) .transferType(resultSet.getString(statements.getTransferTypeColumn())) .protocolMessages(fromJson(resultSet.getString(statements.getProtocolMessagesColumn()), ProtocolMessages.class)) + .dataPlaneId(resultSet.getString(statements.getDataPlaneIdColumn())) .build(); } diff --git a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/BaseSqlDialectStatements.java b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/BaseSqlDialectStatements.java index dcbd0caeec1..5557a68b4fe 100644 --- a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/BaseSqlDialectStatements.java +++ b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/BaseSqlDialectStatements.java @@ -82,6 +82,7 @@ public String getInsertStatement() { .column(getPendingColumn()) .column(getTransferTypeColumn()) .jsonColumn(getProtocolMessagesColumn()) + .column(getDataPlaneIdColumn()) .insertInto(getTransferProcessTableName()); } @@ -107,6 +108,7 @@ public String getUpdateTransferProcessTemplate() { .column(getPendingColumn()) .column(getTransferTypeColumn()) .jsonColumn(getProtocolMessagesColumn()) + .column(getDataPlaneIdColumn()) .update(getTransferProcessTableName(), getIdColumn()); } diff --git a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/TransferProcessStoreStatements.java b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/TransferProcessStoreStatements.java index c53b17f8ee8..1a734eaa969 100644 --- a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/TransferProcessStoreStatements.java +++ b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/TransferProcessStoreStatements.java @@ -79,6 +79,10 @@ default String getTransferTypeColumn() { return "transfer_type"; } + default String getDataPlaneIdColumn() { + return "data_plane_id"; + } + default String getContractIdColumn() { return "contract_id"; } diff --git a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/postgres/TransferProcessMapping.java b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/postgres/TransferProcessMapping.java index 0aefd22a962..1c58071630f 100644 --- a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/postgres/TransferProcessMapping.java +++ b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/postgres/TransferProcessMapping.java @@ -38,6 +38,7 @@ public class TransferProcessMapping extends StatefulEntityMapping { private static final String FIELD_PRIVATE_PROPERTIES = "privateProperties"; private static final String FIELD_PENDING = "pending"; private static final String FIELD_TRANSFER_TYPE = "transferType"; + private static final String FIELD_DATA_PLANE_ID = "dataPlaneId"; public TransferProcessMapping(TransferProcessStoreStatements statements) { @@ -54,5 +55,6 @@ public TransferProcessMapping(TransferProcessStoreStatements statements) { add(FIELD_DEPROVISIONED_RESOURCES, new JsonFieldTranslator(PostgresDialectStatements.DEPROVISIONED_RESOURCES_ALIAS)); add(FIELD_PENDING, statements.getPendingColumn()); add(FIELD_TRANSFER_TYPE, statements.getTransferTypeColumn()); + add(FIELD_DATA_PLANE_ID, statements.getDataPlaneIdColumn()); } } diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java index 1cb9de48652..1b02d666db8 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java @@ -55,7 +55,7 @@ public boolean canHandle(TransferProcess transferProcess) { } @Override - public @NotNull StatusResult initiateFlow(TransferProcess transferProcess, Policy policy) { + public @NotNull StatusResult start(TransferProcess transferProcess, Policy policy) { var contentAddress = transferProcess.getContentDataAddress(); var dataRequest = transferProcess.getDataRequest(); @@ -67,6 +67,16 @@ public boolean canHandle(TransferProcess transferProcess) { .orElse(failure(FATAL_ERROR, format("Failed to find DataPlaneInstance for source/destination: %s/%s", contentAddress.getType(), HTTP_PROXY))); } + @Override + public StatusResult terminate(TransferProcess transferProcess) { + return StatusResult.success(); + } + + @Override + public Set transferTypesFor(Asset asset) { + return transferTypes; + } + // Shim translation from "Http-PULL" to HttpProxy dataAddress private DataAddress destinationAddress(DataRequest dataRequest) { if (transferTypes.contains(dataRequest.getDestinationType())) { @@ -78,16 +88,6 @@ private DataAddress destinationAddress(DataRequest dataRequest) { } } - @Override - public StatusResult terminate(TransferProcess transferProcess) { - return StatusResult.success(); - } - - @Override - public Set transferTypesFor(Asset asset) { - return transferTypes; - } - private DataFlowResponse toResponse(DataAddress address) { return DataFlowResponse.Builder.newInstance().dataAddress(address).build(); } diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java index fd4575ba195..9bff21a3af2 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java @@ -22,6 +22,7 @@ import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.policy.model.Policy; +import org.eclipse.edc.spi.response.ResponseStatus; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.types.domain.asset.Asset; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; @@ -31,6 +32,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.function.Predicate; import static java.util.stream.Collectors.toSet; import static org.eclipse.edc.connector.transfer.dataplane.spi.TransferDataPlaneConstants.HTTP_PROXY; @@ -60,11 +62,10 @@ public boolean canHandle(TransferProcess transferProcess) { } @Override - public @NotNull StatusResult initiateFlow(TransferProcess transferProcess, Policy policy) { + public @NotNull StatusResult start(TransferProcess transferProcess, Policy policy) { var dataFlowRequest = DataFlowStartMessage.Builder.newInstance() .id(UUID.randomUUID().toString()) .processId(transferProcess.getId()) - .trackable(true) .sourceDataAddress(transferProcess.getContentDataAddress()) .destinationDataAddress(transferProcess.getDataDestination()) .transferType(transferProcess.getTransferType()) @@ -73,16 +74,18 @@ public boolean canHandle(TransferProcess transferProcess) { var dataPlaneInstance = selectorClient.select(transferProcess.getContentDataAddress(), transferProcess.getDataDestination()); return clientFactory.createClient(dataPlaneInstance) - .transfer(dataFlowRequest) - .map(it -> DataFlowResponse.Builder.newInstance().build()); + .start(dataFlowRequest) + .map(it -> DataFlowResponse.Builder.newInstance().dataPlaneId(dataPlaneInstance.getId()).build()); } @Override public StatusResult terminate(TransferProcess transferProcess) { - return selectorClient.getAll().stream().map(clientFactory::createClient) + return selectorClient.getAll().stream() + .filter(dataPlaneInstanceFilter(transferProcess)) + .map(clientFactory::createClient) .map(client -> client.terminate(transferProcess.getId())) .reduce(StatusResult::merge) - .orElse(StatusResult.success()); + .orElse(StatusResult.failure(ResponseStatus.FATAL_ERROR, "Failed to select the data plane for terminating the transfer process %s".formatted(transferProcess.getId()))); } @Override @@ -95,4 +98,11 @@ public Set transferTypesFor(Asset asset) { .collect(toSet()); } + private Predicate dataPlaneInstanceFilter(TransferProcess transferProcess) { + if (transferProcess.getDataPlaneId() != null) { + return (dataPlaneInstance -> dataPlaneInstance.getId().equals(transferProcess.getDataPlaneId())); + } else { + return (d) -> true; + } + } } diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowControllerTest.java b/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowControllerTest.java index 794e9466f00..81fba72a3ed 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowControllerTest.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowControllerTest.java @@ -65,7 +65,7 @@ void initiateFlow_success() { when(selectorService.select(any(), argThat(destination -> destination.getType().equals(HTTP_PROXY)))).thenReturn(instance); when(resolver.toDataAddress(any(), any(), any())).thenReturn(Result.success(proxyAddress)); - var result = flowController.initiateFlow(transferProcess, null); + var result = flowController.start(transferProcess, null); assertThat(result).isSucceeded().satisfies(response -> { assertThat(response.getDataAddress()).isEqualTo(proxyAddress); @@ -85,7 +85,7 @@ void initiateFlow_success_withTransferType() { when(selectorService.select(any(), argThat(destination -> destination.getType().equals(HTTP_PROXY)))).thenReturn(instance); when(resolver.toDataAddress(any(), any(), any())).thenReturn(Result.success(proxyAddress)); - var result = flowController.initiateFlow(transferProcess, null); + var result = flowController.start(transferProcess, null); assertThat(result).isSucceeded().satisfies(response -> { assertThat(response.getDataAddress()).isEqualTo(proxyAddress); @@ -99,7 +99,7 @@ void initiateFlow_returnsFailureIfNoDataPlaneInstance() { .contentDataAddress(dataAddress()) .build(); - var result = flowController.initiateFlow(transferProcess, null); + var result = flowController.start(transferProcess, null); assertThat(result).isFailed().extracting(Failure::getFailureDetail).asString() @@ -118,7 +118,7 @@ void initiateFlow_returnsFailureIfAddressResolutionFails() { when(selectorService.select(any(), argThat(destination -> destination.getType().equals(HTTP_PROXY)))).thenReturn(instance); when(resolver.toDataAddress(any(), any(), any())).thenReturn(Result.failure(errorMsg)); - var result = flowController.initiateFlow(transferProcess, Policy.Builder.newInstance().build()); + var result = flowController.start(transferProcess, Policy.Builder.newInstance().build()); assertThat(result).isFailed().extracting(Failure::getFailureDetail).asString().contains(errorMsg); } diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowControllerTest.java b/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowControllerTest.java index 3bfd24ad89f..e0fa710b8f2 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowControllerTest.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowControllerTest.java @@ -25,6 +25,7 @@ import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.asset.Asset; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; @@ -44,15 +45,18 @@ class ProviderPushTransferDataFlowControllerTest { + private static final String HTTP_DATA_PULL = "HttpData-PULL"; private final DataPlaneClient dataPlaneClient = mock(); private final DataPlaneClientFactory dataPlaneClientFactory = mock(); private final DataPlaneSelectorService selectorService = mock(); - - private static final String HTTP_DATA_PULL = "HttpData-PULL"; - private final ProviderPushTransferDataFlowController flowController = new ProviderPushTransferDataFlowController(() -> URI.create("http://localhost"), selectorService, dataPlaneClientFactory); + @NotNull + private static DataPlaneInstance.Builder dataPlaneInstanceBuilder() { + return DataPlaneInstance.Builder.newInstance().url("http://any"); + } + @Test void canHandle() { assertThat(flowController.canHandle(transferProcess(HTTP_PROXY))).isFalse(); @@ -70,18 +74,17 @@ void initiateFlow_transferSuccess() { .contentDataAddress(testDataAddress()) .build(); - when(dataPlaneClient.transfer(any(DataFlowStartMessage.class))).thenReturn(StatusResult.success()); + when(dataPlaneClient.start(any(DataFlowStartMessage.class))).thenReturn(StatusResult.success(mock(DataFlowResponseMessage.class))); var dataPlaneInstance = createDataPlaneInstance(); when(selectorService.select(any(), any())).thenReturn(dataPlaneInstance); when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); - var result = flowController.initiateFlow(transferProcess, Policy.Builder.newInstance().build()); + var result = flowController.start(transferProcess, Policy.Builder.newInstance().build()); assertThat(result.succeeded()).isTrue(); var captor = ArgumentCaptor.forClass(DataFlowStartMessage.class); - verify(dataPlaneClient).transfer(captor.capture()); + verify(dataPlaneClient).start(captor.capture()); var captured = captor.getValue(); - assertThat(captured.isTrackable()).isTrue(); assertThat(captured.getProcessId()).isEqualTo(transferProcess.getId()); assertThat(captured.getSourceDataAddress()).usingRecursiveComparison().isEqualTo(source); assertThat(captured.getDestinationDataAddress()).usingRecursiveComparison().isEqualTo(request.getDataDestination()); @@ -97,14 +100,14 @@ void initiateFlow_returnFailedResultIfTransferFails() { .contentDataAddress(testDataAddress()) .build(); - when(dataPlaneClient.transfer(any())).thenReturn(StatusResult.failure(ResponseStatus.FATAL_ERROR, errorMsg)); + when(dataPlaneClient.start(any())).thenReturn(StatusResult.failure(ResponseStatus.FATAL_ERROR, errorMsg)); var dataPlaneInstance = createDataPlaneInstance(); when(selectorService.select(any(), any())).thenReturn(dataPlaneInstance); when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); - var result = flowController.initiateFlow(transferProcess, Policy.Builder.newInstance().build()); + var result = flowController.start(transferProcess, Policy.Builder.newInstance().build()); - verify(dataPlaneClient).transfer(any()); + verify(dataPlaneClient).start(any()); assertThat(result.failed()).isTrue(); assertThat(result.getFailureMessages()).allSatisfy(s -> assertThat(s).contains(errorMsg)); @@ -128,6 +131,46 @@ void terminate_shouldCallTerminate() { verify(dataPlaneClient).terminate("transferProcessId"); } + @Test + void terminate_shouldCallTerminateOnTheRightDataPlane() { + var dataPlaneInstance = createDataPlaneInstance(); + var mockedDataPlane = mock(DataPlaneInstance.class); + var transferProcess = TransferProcess.Builder.newInstance() + .id("transferProcessId") + .dataRequest(createDataRequest()) + .contentDataAddress(testDataAddress()) + .dataPlaneId(dataPlaneInstance.getId()) + .build(); + when(mockedDataPlane.getId()).thenReturn("notValidId"); + when(dataPlaneClient.terminate(any())).thenReturn(StatusResult.success()); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); + when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance, mockedDataPlane)); + + var result = flowController.terminate(transferProcess); + + assertThat(result).isSucceeded(); + verify(dataPlaneClient).terminate("transferProcessId"); + verify(mockedDataPlane).getId(); + } + + @Test + void terminate_shouldFail_withInvalidDataPlaneId() { + var dataPlaneInstance = createDataPlaneInstance(); + var transferProcess = TransferProcess.Builder.newInstance() + .id("transferProcessId") + .dataRequest(createDataRequest()) + .contentDataAddress(testDataAddress()) + .dataPlaneId("invalid") + .build(); + when(dataPlaneClient.terminate(any())).thenReturn(StatusResult.success()); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); + when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance)); + + var result = flowController.terminate(transferProcess); + + assertThat(result).isFailed().detail().contains("Failed to select the data plane for terminating the transfer process"); + } + @Test void transferTypes_shouldReturnTypesForSpecifiedAsset() { when(selectorService.getAll()).thenReturn(List.of( @@ -146,11 +189,6 @@ private DataPlaneInstance createDataPlaneInstance() { return dataPlaneInstanceBuilder().build(); } - @NotNull - private static DataPlaneInstance.Builder dataPlaneInstanceBuilder() { - return DataPlaneInstance.Builder.newInstance().url("http://any"); - } - private DataAddress testDataAddress() { return DataAddress.Builder.newInstance().type("test-type").build(); } diff --git a/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java b/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java index 146d0bb5098..7064391e26b 100644 --- a/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java +++ b/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java @@ -19,6 +19,7 @@ import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; import org.eclipse.edc.spi.response.ResponseStatus; import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import java.util.Objects; @@ -37,13 +38,13 @@ public EmbeddedDataPlaneClient(DataPlaneManager dataPlaneManager) { @WithSpan @Override - public StatusResult transfer(DataFlowStartMessage request) { + public StatusResult start(DataFlowStartMessage request) { var result = dataPlaneManager.validate(request); if (result.failed()) { return StatusResult.failure(ResponseStatus.FATAL_ERROR, result.getFailureDetail()); } dataPlaneManager.initiate(request); - return StatusResult.success(); + return StatusResult.success(DataFlowResponseMessage.Builder.newInstance().build()); } @Override diff --git a/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/RemoteDataPlaneClient.java b/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/RemoteDataPlaneClient.java index 6487c225b7d..8028e954e25 100644 --- a/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/RemoteDataPlaneClient.java +++ b/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/RemoteDataPlaneClient.java @@ -29,6 +29,7 @@ import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.http.EdcHttpClient; import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import java.io.IOException; @@ -54,7 +55,7 @@ public RemoteDataPlaneClient(EdcHttpClient httpClient, ObjectMapper mapper, Data @WithSpan @Override - public StatusResult transfer(DataFlowStartMessage dataFlowStartMessage) { + public StatusResult start(DataFlowStartMessage dataFlowStartMessage) { RequestBody body; try { body = RequestBody.create(mapper.writeValueAsString(dataFlowStartMessage), TYPE_JSON); @@ -64,7 +65,13 @@ public StatusResult transfer(DataFlowStartMessage dataFlowStartMessage) { var request = new Request.Builder().post(body).url(dataPlane.getUrl()).build(); try (var response = httpClient.execute(request)) { - return handleResponse(response, dataFlowStartMessage.getId()); + var result = handleResponse(response, dataFlowStartMessage.getId()); + + if (result.failed()) { + return StatusResult.failure(result.getFailure().status(), result.getFailureDetail()); + } else { + return StatusResult.success(DataFlowResponseMessage.Builder.newInstance().build()); + } } catch (IOException e) { return StatusResult.failure(FATAL_ERROR, e.getMessage()); } diff --git a/extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClientTest.java b/extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClientTest.java index 10694c8cc2b..c3eb3e096e0 100644 --- a/extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClientTest.java +++ b/extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClientTest.java @@ -36,13 +36,22 @@ class EmbeddedDataPlaneClientTest { private final DataPlaneManager dataPlaneManager = mock(); private final DataPlaneClient client = new EmbeddedDataPlaneClient(dataPlaneManager); + private static DataFlowStartMessage createDataFlowRequest() { + return DataFlowStartMessage.Builder.newInstance() + .id("123") + .processId("456") + .sourceDataAddress(DataAddress.Builder.newInstance().type("test").build()) + .destinationDataAddress(DataAddress.Builder.newInstance().type("test").build()) + .build(); + } + @Test void transfer_shouldSucceed_whenTransferInitiatedCorrectly() { var request = createDataFlowRequest(); when(dataPlaneManager.validate(any())).thenReturn(Result.success(true)); doNothing().when(dataPlaneManager).initiate(any()); - var result = client.transfer(request); + var result = client.start(request); verify(dataPlaneManager).validate(request); verify(dataPlaneManager).initiate(request); @@ -57,7 +66,7 @@ void transfer_shouldReturnFailedResult_whenValidationFailure() { when(dataPlaneManager.validate(any())).thenReturn(Result.failure(errorMsg)); doNothing().when(dataPlaneManager).initiate(any()); - var result = client.transfer(request); + var result = client.start(request); verify(dataPlaneManager).validate(request); verify(dataPlaneManager, never()).initiate(any()); @@ -74,14 +83,4 @@ void terminate_shouldProxyCallToManager() { assertThat(result).isSucceeded(); verify(dataPlaneManager).terminate("dataFlowId"); } - - private static DataFlowStartMessage createDataFlowRequest() { - return DataFlowStartMessage.Builder.newInstance() - .trackable(true) - .id("123") - .processId("456") - .sourceDataAddress(DataAddress.Builder.newInstance().type("test").build()) - .destinationDataAddress(DataAddress.Builder.newInstance().type("test").build()) - .build(); - } } diff --git a/extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/RemoteDataPlaneClientTest.java b/extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/RemoteDataPlaneClientTest.java index ce5a162c61b..2d68f2e7602 100644 --- a/extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/RemoteDataPlaneClientTest.java +++ b/extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/RemoteDataPlaneClientTest.java @@ -70,6 +70,20 @@ public static void tearDown() { stopQuietly(dataPlane); } + private static HttpResponse withResponse(String errorMsg) throws JsonProcessingException { + return response().withStatusCode(HttpStatusCode.BAD_REQUEST_400.code()) + .withBody(MAPPER.writeValueAsString(new TransferErrorResponse(List.of(errorMsg))), MediaType.APPLICATION_JSON); + } + + private static DataFlowStartMessage createDataFlowRequest() { + return DataFlowStartMessage.Builder.newInstance() + .id("123") + .processId("456") + .sourceDataAddress(DataAddress.Builder.newInstance().type("test").build()) + .destinationDataAddress(DataAddress.Builder.newInstance().type("test").build()) + .build(); + } + @AfterEach public void resetMockServer() { dataPlane.reset(); @@ -82,7 +96,7 @@ void transfer_verifyReturnFatalErrorIfReceiveResponseWithNullBody() throws JsonP var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(flowRequest)); dataPlane.when(httpRequest, once()).respond(response().withStatusCode(HttpStatusCode.BAD_REQUEST_400.code())); - var result = dataPlaneClient.transfer(flowRequest); + var result = dataPlaneClient.start(flowRequest); dataPlane.verify(httpRequest, VerificationTimes.once()); @@ -102,7 +116,7 @@ void transfer_verifyReturnFatalErrorIfReceiveErrorInResponse() throws JsonProces var errorMsg = UUID.randomUUID().toString(); dataPlane.when(httpRequest, once()).respond(withResponse(errorMsg)); - var result = dataPlaneClient.transfer(flowRequest); + var result = dataPlaneClient.start(flowRequest); dataPlane.verify(httpRequest, VerificationTimes.once()); @@ -121,7 +135,7 @@ void transfer_verifyTransferSuccess() throws JsonProcessingException { var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(flowRequest)); dataPlane.when(httpRequest, once()).respond(response().withStatusCode(HttpStatusCode.OK_200.code())); - var result = dataPlaneClient.transfer(flowRequest); + var result = dataPlaneClient.start(flowRequest); dataPlane.verify(httpRequest, VerificationTimes.once()); @@ -148,19 +162,4 @@ void terminate_shouldFail_whenConflictResponse() { assertThat(result).isFailed(); } - - private static HttpResponse withResponse(String errorMsg) throws JsonProcessingException { - return response().withStatusCode(HttpStatusCode.BAD_REQUEST_400.code()) - .withBody(MAPPER.writeValueAsString(new TransferErrorResponse(List.of(errorMsg))), MediaType.APPLICATION_JSON); - } - - private static DataFlowStartMessage createDataFlowRequest() { - return DataFlowStartMessage.Builder.newInstance() - .trackable(true) - .id("123") - .processId("456") - .sourceDataAddress(DataAddress.Builder.newInstance().type("test").build()) - .destinationDataAddress(DataAddress.Builder.newInstance().type("test").build()) - .build(); - } } diff --git a/extensions/data-plane/data-plane-http/src/testFixtures/java/org/eclipse/edc/connector/dataplane/http/testfixtures/TestFunctions.java b/extensions/data-plane/data-plane-http/src/testFixtures/java/org/eclipse/edc/connector/dataplane/http/testfixtures/TestFunctions.java index 0c3ec93c164..64d07a7ec1e 100644 --- a/extensions/data-plane/data-plane-http/src/testFixtures/java/org/eclipse/edc/connector/dataplane/http/testfixtures/TestFunctions.java +++ b/extensions/data-plane/data-plane-http/src/testFixtures/java/org/eclipse/edc/connector/dataplane/http/testfixtures/TestFunctions.java @@ -52,8 +52,7 @@ public static DataFlowStartMessage.Builder createRequest(Map pro .processId(UUID.randomUUID().toString()) .properties(properties) .sourceDataAddress(source) - .destinationDataAddress(destination) - .trackable(true); + .destinationDataAddress(destination); } public static Response.Builder createHttpResponse() { diff --git a/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowRequestSupplier.java b/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowRequestSupplier.java index 1c0a9982824..b1af7e8159c 100644 --- a/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowRequestSupplier.java +++ b/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowRequestSupplier.java @@ -32,6 +32,22 @@ public class DataFlowRequestSupplier implements BiFunction { + /** + * Put all properties of the incoming request (method, request body, query params...) into a map. + */ + private static Map createProps(ContainerRequestContextApi contextApi) { + var props = new HashMap(); + props.put(METHOD, contextApi.method()); + props.put(QUERY_PARAMS, contextApi.queryParams()); + props.put(PATH, contextApi.path()); + Optional.ofNullable(contextApi.mediaType()) + .ifPresent(mediaType -> { + props.put(MEDIA_TYPE, mediaType); + props.put(BODY, contextApi.body()); + }); + return props; + } + /** * Create a {@link DataFlowStartMessage} based on incoming request and claims decoded from the access token. * @@ -48,25 +64,8 @@ public DataFlowStartMessage apply(ContainerRequestContextApi contextApi, DataAdd .destinationDataAddress(DataAddress.Builder.newInstance() .type(AsyncStreamingDataSink.TYPE) .build()) - .trackable(false) .id(UUID.randomUUID().toString()) .properties(props) .build(); } - - /** - * Put all properties of the incoming request (method, request body, query params...) into a map. - */ - private static Map createProps(ContainerRequestContextApi contextApi) { - var props = new HashMap(); - props.put(METHOD, contextApi.method()); - props.put(QUERY_PARAMS, contextApi.queryParams()); - props.put(PATH, contextApi.path()); - Optional.ofNullable(contextApi.mediaType()) - .ifPresent(mediaType -> { - props.put(MEDIA_TYPE, mediaType); - props.put(BODY, contextApi.body()); - }); - return props; - } } diff --git a/extensions/data-plane/data-plane-public-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowStartMessageSupplierTest.java b/extensions/data-plane/data-plane-public-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowStartMessageSupplierTest.java index 39698b44a2b..8961e2e42f4 100644 --- a/extensions/data-plane/data-plane-public-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowStartMessageSupplierTest.java +++ b/extensions/data-plane/data-plane-public-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowStartMessageSupplierTest.java @@ -51,7 +51,6 @@ void verifyMapping_noInputBody() { var request = supplier.apply(contextApi, address); - assertThat(request.isTrackable()).isFalse(); assertThat(request.getId()).isNotBlank(); assertThat(request.getDestinationDataAddress().getType()).isEqualTo(AsyncStreamingDataSink.TYPE); assertThat(request.getSourceDataAddress().getType()).isEqualTo(address.getType()); @@ -81,7 +80,6 @@ void verifyMapping_withInputBody() { var request = supplier.apply(contextApi, address); - assertThat(request.isTrackable()).isFalse(); assertThat(request.getId()).isNotBlank(); assertThat(request.getDestinationDataAddress().getType()).isEqualTo(AsyncStreamingDataSink.TYPE); assertThat(request.getSourceDataAddress().getType()).isEqualTo(address.getType()); diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/docs/schema.sql b/extensions/data-plane/store/sql/data-plane-store-sql/docs/schema.sql index 91769de6a3d..0073ae461da 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/docs/schema.sql +++ b/extensions/data-plane/store/sql/data-plane-store-sql/docs/schema.sql @@ -24,7 +24,6 @@ CREATE TABLE IF NOT EXISTS edc_data_plane trace_context JSON, error_detail VARCHAR, callback_address VARCHAR, - trackable BOOLEAN, lease_id VARCHAR CONSTRAINT data_plane_lease_lease_id_fk REFERENCES edc_lease diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java index 65486cff9e8..8283044dda0 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java @@ -145,7 +145,6 @@ private void insert(Connection connection, DataFlow dataFlow) { toJson(dataFlow.getTraceContext()), dataFlow.getErrorDetail(), Optional.ofNullable(dataFlow.getCallbackAddress()).map(URI::toString).orElse(null), - dataFlow.isTrackable(), toJson(dataFlow.getSource()), toJson(dataFlow.getDestination()), toJson(dataFlow.getProperties()) @@ -162,7 +161,6 @@ private void update(Connection connection, DataFlow dataFlow) { toJson(dataFlow.getTraceContext()), dataFlow.getErrorDetail(), Optional.ofNullable(dataFlow.getCallbackAddress()).map(URI::toString).orElse(null), - dataFlow.isTrackable(), toJson(dataFlow.getSource()), toJson(dataFlow.getDestination()), toJson(dataFlow.getProperties()), @@ -180,7 +178,6 @@ private DataFlow mapDataFlow(ResultSet resultSet) throws SQLException { .traceContext(fromJson(resultSet.getString(statements.getTraceContextColumn()), getTypeRef())) .errorDetail(resultSet.getString(statements.getErrorDetailColumn())) .callbackAddress(Optional.ofNullable(resultSet.getString(statements.getCallbackAddressColumn())).map(URI::create).orElse(null)) - .trackable(resultSet.getBoolean(statements.getTrackableColumn())) .source(fromJson(resultSet.getString(statements.getSourceColumn()), DataAddress.class)) .destination(fromJson(resultSet.getString(statements.getDestinationColumn()), DataAddress.class)) .properties(fromJson(resultSet.getString(statements.getPropertiesColumn()), getTypeRef())) diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java index eee6ccd4f39..07f9c15bac5 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java @@ -41,7 +41,6 @@ public String getInsertTemplate() { .jsonColumn(getTraceContextColumn()) .column(getErrorDetailColumn()) .column(getCallbackAddressColumn()) - .column(getTrackableColumn()) .jsonColumn(getSourceColumn()) .jsonColumn(getDestinationColumn()) .jsonColumn(getPropertiesColumn()) @@ -58,7 +57,6 @@ public String getUpdateTemplate() { .jsonColumn(getTraceContextColumn()) .column(getErrorDetailColumn()) .column(getCallbackAddressColumn()) - .column(getTrackableColumn()) .jsonColumn(getSourceColumn()) .jsonColumn(getDestinationColumn()) .jsonColumn(getPropertiesColumn()) diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java index 2d75f3126fd..ba7fb3026cc 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java @@ -36,10 +36,6 @@ default String getCallbackAddressColumn() { return "callback_address"; } - default String getTrackableColumn() { - return "trackable"; - } - default String getSourceColumn() { return "source"; } diff --git a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowResponseMessage.java b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowResponseMessage.java index fe46e6599a3..a4cf678d12e 100644 --- a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowResponseMessage.java +++ b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowResponseMessage.java @@ -21,10 +21,40 @@ /** * A response message from the data plane upon receiving a {@link DataFlowStartMessage} */ -public record DataFlowResponseMessage(DataAddress dataAddress) { +public class DataFlowResponseMessage { public static final String DATA_FLOW_RESPONSE_MESSAGE_SIMPLE_TYPE = "DataFlowResponseMessage"; public static final String DATA_FLOW_RESPONSE_MESSAGE_TYPE = EDC_NAMESPACE + DATA_FLOW_RESPONSE_MESSAGE_SIMPLE_TYPE; public static final String DATA_FLOW_RESPONSE_MESSAGE_DATA_ADDRESS = EDC_NAMESPACE + "dataAddress"; + private DataAddress dataAddress; + + private DataFlowResponseMessage() { + } + + public DataAddress getDataAddress() { + return dataAddress; + } + + public static class Builder { + + DataFlowResponseMessage response; + + private Builder() { + response = new DataFlowResponseMessage(); + } + + public static Builder newInstance() { + return new Builder(); + } + + public Builder dataAddress(DataAddress dataAddress) { + response.dataAddress = dataAddress; + return this; + } + + public DataFlowResponseMessage build() { + return response; + } + } } diff --git a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessage.java b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessage.java index 42d8d1285df..85f91bf7d2f 100644 --- a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessage.java +++ b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessage.java @@ -60,7 +60,6 @@ public class DataFlowStartMessage implements Polymorphic, TraceCarrier { private DataAddress destinationDataAddress; private String transferType; private URI callbackAddress; - private boolean trackable; private Map properties = Map.of(); private Map traceContext = Map.of(); // TODO: should this stay in the DataFlow class? @@ -126,13 +125,6 @@ public String getParticipantId() { return participantId; } - /** - * Returns true if the request must be tracked for delivery guarantees. - */ - public boolean isTrackable() { - return trackable; - } - /** * Custom properties that are passed to the provider connector. */ @@ -228,12 +220,6 @@ public Builder assetId(String assetId) { return this; } - - public Builder trackable(boolean value) { - request.trackable = value; - return this; - } - public Builder properties(Map value) { request.properties = value == null ? null : Map.copyOf(value); return this; diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowController.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowController.java index 1555f67fbbb..254d1c14bb7 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowController.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowController.java @@ -32,7 +32,7 @@ public interface DataFlowController { /** * Returns true if the manager can handle the Transfer Process. * - * @param transferProcess the TransferProcess + * @param transferProcess the TransferProcess * @return true if it can handle the TransferProcess, false otherwise. */ boolean canHandle(TransferProcess transferProcess); @@ -43,11 +43,11 @@ public interface DataFlowController { *

Implementations should not throw exceptions. If an unexpected exception occurs and the flow should be re-attempted, set {@link ResponseStatus#ERROR_RETRY} in the * response. If an exception occurs and re-tries should not be re-attempted, set {@link ResponseStatus#FATAL_ERROR} in the response.

* - * @param transferProcess the transfer process - * @param policy the contract agreement usage policy for the asset being transferred + * @param transferProcess the transfer process + * @param policy the contract agreement usage policy for the asset being transferred */ @NotNull - StatusResult initiateFlow(TransferProcess transferProcess, Policy policy); + StatusResult start(TransferProcess transferProcess, Policy policy); /** * Terminate a data flow. diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowManager.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowManager.java index 6afc019dfde..c4d5a3ee9a9 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowManager.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowManager.java @@ -40,7 +40,7 @@ public interface DataFlowManager { /** * Register the controller with a specific priority. * - * @param priority the priority. + * @param priority the priority. * @param controller the controller. */ void register(int priority, DataFlowController controller); @@ -53,7 +53,7 @@ public interface DataFlowManager { * @return succeeded StatusResult if flow has been initiated correctly, failed one otherwise. */ @NotNull - StatusResult initiate(TransferProcess transferProcess, Policy policy); + StatusResult start(TransferProcess transferProcess, Policy policy); /** * Terminates a data flow. diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/DataFlowResponse.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/DataFlowResponse.java index f41f8eba863..991af219665 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/DataFlowResponse.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/DataFlowResponse.java @@ -18,11 +18,12 @@ import org.eclipse.edc.spi.types.domain.DataAddress; /** - * A Response for {@link DataFlowManager#initiate} operation + * A Response for {@link DataFlowManager#start} operation */ public class DataFlowResponse { private DataAddress dataAddress; + private String dataPlaneId; private DataFlowResponse() { } @@ -31,6 +32,10 @@ public DataAddress getDataAddress() { return dataAddress; } + public String getDataPlaneId() { + return dataPlaneId; + } + public static class Builder { DataFlowResponse response; @@ -48,6 +53,11 @@ public Builder dataAddress(DataAddress dataAddress) { return this; } + public Builder dataPlaneId(String dataPlaneId) { + response.dataPlaneId = dataPlaneId; + return this; + } + public DataFlowResponse build() { return response; } diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/TransferProcess.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/TransferProcess.java index f227859026f..c9386acc0fa 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/TransferProcess.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/TransferProcess.java @@ -132,6 +132,8 @@ public class TransferProcess extends StatefulEntity { private String transferType; + private String dataPlaneId; + private TransferProcess() { } @@ -292,9 +294,14 @@ public boolean canBeStartedConsumer() { } public void transitionStarted() { + transitionStarted(null); + } + + public void transitionStarted(String dataPlaneId) { if (type == CONSUMER) { transition(STARTED, state -> canBeStartedConsumer()); } else { + this.dataPlaneId = dataPlaneId; transition(STARTED, STARTED, STARTING); } } @@ -363,6 +370,16 @@ public String getCorrelationId() { return dataRequest.getId(); } + /** + * Set the correlationId, operation that's needed on the consumer side when it receives the first message with the + * provider process id. + * + * @param correlationId the correlation id. + */ + public void setCorrelationId(String correlationId) { + dataRequest.setId(correlationId); + } + @JsonIgnore public List getProvisionedResources() { return Optional.ofNullable(getProvisionedResourceSet()).map(ProvisionedResourceSet::getResources).orElse(emptyList()); @@ -373,7 +390,6 @@ public String getConnectorAddress() { return dataRequest.getConnectorAddress(); } - /** * The transfer type to use for the requested data */ @@ -412,6 +428,10 @@ public String getDestinationType() { return dataRequest.getDestinationType(); } + public String getDataPlaneId() { + return dataPlaneId; + } + @Override public TransferProcess copy() { var builder = Builder.newInstance() @@ -424,7 +444,8 @@ public TransferProcess copy() { .callbackAddresses(callbackAddresses) .transferType(transferType) .type(type) - .protocolMessages(protocolMessages); + .protocolMessages(protocolMessages) + .dataPlaneId(dataPlaneId); return copy(builder); } @@ -487,16 +508,6 @@ private void transition(TransferProcessStates end, Predicate transfer(DataFlowStartMessage request); + StatusResult start(DataFlowStartMessage request); /** * Terminate the transfer. diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java index d82788d635b..47673c2546e 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java @@ -40,7 +40,6 @@ public class DataFlow extends StatefulEntity { private DataAddress source; private DataAddress destination; private URI callbackAddress; - private boolean trackable; private Map properties = Map.of(); @Override @@ -49,7 +48,6 @@ public DataFlow copy() { .source(source) .destination(destination) .callbackAddress(callbackAddress) - .trackable(trackable) .properties(properties); return copy(builder); @@ -72,10 +70,6 @@ public URI getCallbackAddress() { return callbackAddress; } - public boolean isTrackable() { - return trackable; - } - public Map getProperties() { return properties; } @@ -88,7 +82,6 @@ public DataFlowStartMessage toRequest() { .processId(getId()) .callbackAddress(getCallbackAddress()) .traceContext(traceContext) - .trackable(isTrackable()) .properties(getProperties()) .build(); } @@ -161,11 +154,6 @@ public Builder callbackAddress(URI callbackAddress) { return this; } - public Builder trackable(boolean trackable) { - entity.trackable = trackable; - return this; - } - public Builder properties(Map properties) { entity.properties = properties; return this; diff --git a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java index 415ac0018e9..854b97da345 100644 --- a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java +++ b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java @@ -44,6 +44,26 @@ public abstract class DataPlaneStoreTestBase { protected static final String CONNECTOR_NAME = "test-connector"; + protected abstract DataPlaneStore getStore(); + + protected abstract void leaseEntity(String entityId, String owner, Duration duration); + + protected void leaseEntity(String entityId, String owner) { + leaseEntity(entityId, owner, Duration.ofSeconds(60)); + } + + protected abstract boolean isLeasedBy(String entityId, String owner); + + private DataFlow createDataFlow(String id, DataFlowStates state) { + return DataFlow.Builder.newInstance() + .id(id) + .callbackAddress(URI.create("http://any")) + .source(DataAddress.Builder.newInstance().type("src-type").build()) + .destination(DataAddress.Builder.newInstance().type("dest-type").build()) + .state(state.code()) + .build(); + } + @Nested class Create { @@ -189,25 +209,4 @@ void shouldReturnAlreadyLeased_whenEntityIsAlreadyLeased() { assertThat(result).isFailed().extracting(StoreFailure::getReason).isEqualTo(ALREADY_LEASED); } } - - private DataFlow createDataFlow(String id, DataFlowStates state) { - return DataFlow.Builder.newInstance() - .id(id) - .callbackAddress(URI.create("http://any")) - .source(DataAddress.Builder.newInstance().type("src-type").build()) - .destination(DataAddress.Builder.newInstance().type("dest-type").build()) - .trackable(true) - .state(state.code()) - .build(); - } - - protected abstract DataPlaneStore getStore(); - - protected abstract void leaseEntity(String entityId, String owner, Duration duration); - - protected void leaseEntity(String entityId, String owner) { - leaseEntity(entityId, owner, Duration.ofSeconds(60)); - } - - protected abstract boolean isLeasedBy(String entityId, String owner); }