From 9441a669e65b8986b306bb10e681fa51b1de57c7 Mon Sep 17 00:00:00 2001 From: Enrico Risa Date: Mon, 25 Mar 2024 14:25:17 +0100 Subject: [PATCH] feat: introduces properties provider for DataFlowStartMessage --- .../TransferDataPlaneSignalingExtension.java | 13 +- .../DataPlaneSignalingFlowController.java | 11 +- .../DataPlaneSignalingFlowControllerTest.java | 153 +++++++++++------- .../spi/flow/DataFlowPropertiesProvider.java | 32 ++++ 4 files changed, 145 insertions(+), 64 deletions(-) create mode 100644 spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowPropertiesProvider.java diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/transfer/dataplane/TransferDataPlaneSignalingExtension.java b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/transfer/dataplane/TransferDataPlaneSignalingExtension.java index 8d2ab138cff..4dd45ded381 100644 --- a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/transfer/dataplane/TransferDataPlaneSignalingExtension.java +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/transfer/dataplane/TransferDataPlaneSignalingExtension.java @@ -19,12 +19,16 @@ import org.eclipse.edc.connector.transfer.dataplane.flow.DataPlaneSignalingFlowController; import org.eclipse.edc.connector.transfer.spi.callback.ControlApiUrl; import org.eclipse.edc.connector.transfer.spi.flow.DataFlowManager; +import org.eclipse.edc.connector.transfer.spi.flow.DataFlowPropertiesProvider; import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.runtime.metamodel.annotation.Setting; +import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; +import java.util.Map; + import static org.eclipse.edc.connector.transfer.dataplane.TransferDataPlaneSignalingExtension.NAME; @Extension(NAME) @@ -47,10 +51,17 @@ public class TransferDataPlaneSignalingExtension implements ServiceExtension { @Inject private DataPlaneClientFactory clientFactory; + @Inject(required = false) + private DataFlowPropertiesProvider propertiesProvider; @Override public void initialize(ServiceExtensionContext context) { var selectionStrategy = context.getSetting(DPF_SELECTOR_STRATEGY, DEFAULT_DATAPLANE_SELECTOR_STRATEGY); - dataFlowManager.register(new DataPlaneSignalingFlowController(callbackUrl, selectorService, clientFactory, selectionStrategy)); + dataFlowManager.register(new DataPlaneSignalingFlowController(callbackUrl, selectorService, getPropertiesProvider(), clientFactory, selectionStrategy)); } + + private DataFlowPropertiesProvider getPropertiesProvider() { + return propertiesProvider == null ? (tp, p) -> StatusResult.success(Map.of()) : propertiesProvider; + } + } diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/DataPlaneSignalingFlowController.java b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/DataPlaneSignalingFlowController.java index 26988af6f24..31d73c21a10 100644 --- a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/DataPlaneSignalingFlowController.java +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/DataPlaneSignalingFlowController.java @@ -19,6 +19,7 @@ import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; import org.eclipse.edc.connector.transfer.spi.callback.ControlApiUrl; import org.eclipse.edc.connector.transfer.spi.flow.DataFlowController; +import org.eclipse.edc.connector.transfer.spi.flow.DataFlowPropertiesProvider; import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.policy.model.Policy; @@ -52,11 +53,13 @@ public class DataPlaneSignalingFlowController implements DataFlowController { private final DataPlaneSelectorService selectorClient; private final DataPlaneClientFactory clientFactory; + private final DataFlowPropertiesProvider propertiesProvider; private final String selectionStrategy; - public DataPlaneSignalingFlowController(ControlApiUrl callbackUrl, DataPlaneSelectorService selectorClient, DataPlaneClientFactory clientFactory, String selectionStrategy) { + public DataPlaneSignalingFlowController(ControlApiUrl callbackUrl, DataPlaneSelectorService selectorClient, DataFlowPropertiesProvider propertiesProvider, DataPlaneClientFactory clientFactory, String selectionStrategy) { this.callbackUrl = callbackUrl; this.selectorClient = selectorClient; + this.propertiesProvider = propertiesProvider; this.clientFactory = clientFactory; this.selectionStrategy = selectionStrategy; } @@ -73,6 +76,11 @@ public boolean canHandle(TransferProcess transferProcess) { return StatusResult.failure(ResponseStatus.FATAL_ERROR, flowType.getFailureDetail()); } + var propertiesResult = propertiesProvider.propertiesFor(transferProcess, policy); + if (propertiesResult.failed()) { + return StatusResult.failure(ResponseStatus.FATAL_ERROR, propertiesResult.getFailureDetail()); + } + var dataPlaneInstance = selectorClient.select(transferProcess.getContentDataAddress(), transferProcess.getDataDestination(), selectionStrategy, transferProcess.getTransferType()); var dataFlowRequest = DataFlowStartMessage.Builder.newInstance() .id(UUID.randomUUID().toString()) @@ -84,6 +92,7 @@ public boolean canHandle(TransferProcess transferProcess) { .assetId(transferProcess.getAssetId()) .flowType(flowType.getContent()) .callbackAddress(callbackUrl != null ? callbackUrl.get() : null) + .properties(propertiesResult.getContent()) .build(); var dataPlaneInstanceId = dataPlaneInstance != null ? dataPlaneInstance.getId() : null; diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/DataPlaneSignalingFlowControllerTest.java b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/DataPlaneSignalingFlowControllerTest.java index 0816efe5ca4..d1e1fc69d57 100644 --- a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/DataPlaneSignalingFlowControllerTest.java +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/DataPlaneSignalingFlowControllerTest.java @@ -18,6 +18,7 @@ import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClient; import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory; import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; +import org.eclipse.edc.connector.transfer.spi.flow.DataFlowPropertiesProvider; import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.policy.model.Policy; @@ -28,6 +29,7 @@ import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -36,6 +38,7 @@ import java.net.URI; import java.util.List; +import java.util.Map; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; @@ -53,8 +56,15 @@ public class DataPlaneSignalingFlowControllerTest { private final DataPlaneClient dataPlaneClient = mock(); private final DataPlaneClientFactory dataPlaneClientFactory = mock(); private final DataPlaneSelectorService selectorService = mock(); + + private final DataFlowPropertiesProvider propertiesProvider = mock(); private final DataPlaneSignalingFlowController flowController = - new DataPlaneSignalingFlowController(() -> URI.create("http://localhost"), selectorService, dataPlaneClientFactory, "random"); + new DataPlaneSignalingFlowController(() -> URI.create("http://localhost"), selectorService, propertiesProvider, dataPlaneClientFactory, "random"); + + + @BeforeEach + void setup() { + } @Test void canHandle() { @@ -80,6 +90,8 @@ void initiateFlow_transferSuccess(String transferType) { .contentDataAddress(testDataAddress()) .build(); + var customProperties = Map.of("foo", "bar"); + when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.success(customProperties)); when(dataPlaneClient.start(any(DataFlowStartMessage.class))).thenReturn(StatusResult.success(mock(DataFlowResponseMessage.class))); var dataPlaneInstance = createDataPlaneInstance(); when(selectorService.select(any(), any(), any(), eq(transferType))).thenReturn(dataPlaneInstance); @@ -98,7 +110,7 @@ void initiateFlow_transferSuccess(String transferType) { assertThat(captured.getAgreementId()).isEqualTo(transferProcess.getContractId()); assertThat(captured.getAssetId()).isEqualTo(transferProcess.getAssetId()); assertThat(transferType).contains(captured.getFlowType().toString()); - assertThat(captured.getProperties()).isEmpty(); + assertThat(captured.getProperties()).containsAllEntriesOf(customProperties); assertThat(captured.getCallbackAddress()).isNotNull(); } @@ -112,6 +124,7 @@ void initiateFlow_transferSuccess_withReturnedDataAddress() { var response = mock(DataFlowResponseMessage.class); when(response.getDataAddress()).thenReturn(DataAddress.Builder.newInstance().type("type").build()); + when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.success(Map.of())); when(dataPlaneClient.start(any(DataFlowStartMessage.class))).thenReturn(StatusResult.success(response)); var dataPlaneInstance = createDataPlaneInstance(); when(selectorService.select(any(), any(), any(), eq(HTTP_DATA_PULL))).thenReturn(dataPlaneInstance); @@ -134,6 +147,7 @@ void initiateFlow_transferSuccess_withoutDataPlane() { .transferType(HTTP_DATA_PULL) .build(); + when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.success(Map.of())); when(dataPlaneClient.start(any(DataFlowStartMessage.class))).thenReturn(StatusResult.success(mock(DataFlowResponseMessage.class))); when(selectorService.select(any(), any())).thenReturn(null); when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); @@ -171,6 +185,21 @@ void initiateFlow_invalidTransferType(String transferType) { assertThat(result.getFailureMessages()).allSatisfy(s -> assertThat(s).contains("Failed to extract flow type from transferType %s".formatted(transferType))); } + @Test + void initiateFlow_returnFailedResult_whenPropertiesResolveFails() { + var errorMsg = "error"; + var transferProcess = transferProcessBuilder() + .contentDataAddress(testDataAddress()) + .transferType(HTTP_DATA_PULL) + .build(); + + when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.failure(ResponseStatus.FATAL_ERROR, errorMsg)); + var result = flowController.start(transferProcess, Policy.Builder.newInstance().build()); + + assertThat(result.failed()).isTrue(); + assertThat(result.getFailureMessages()).allSatisfy(s -> assertThat(s).contains(errorMsg)); + } + @Test void initiateFlow_returnFailedResultIfTransferFails() { var errorMsg = "error"; @@ -179,6 +208,7 @@ void initiateFlow_returnFailedResultIfTransferFails() { .transferType(HTTP_DATA_PULL) .build(); + when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.success(Map.of())); when(dataPlaneClient.start(any())).thenReturn(StatusResult.failure(ResponseStatus.FATAL_ERROR, errorMsg)); var dataPlaneInstance = createDataPlaneInstance(); when(selectorService.select(any(), any())).thenReturn(dataPlaneInstance); @@ -192,65 +222,6 @@ void initiateFlow_returnFailedResultIfTransferFails() { assertThat(result.getFailureMessages()).allSatisfy(s -> assertThat(s).contains(errorMsg)); } - @Nested - class Suspend { - - @Test - void shouldCallTerminate() { - var transferProcess = TransferProcess.Builder.newInstance() - .id("transferProcessId") - .contentDataAddress(testDataAddress()) - .build(); - when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success()); - var dataPlaneInstance = createDataPlaneInstance(); - when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); - when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance)); - - var result = flowController.suspend(transferProcess); - - assertThat(result).isSucceeded(); - verify(dataPlaneClient).suspend("transferProcessId"); - } - - @Test - void shouldCallTerminateOnTheRightDataPlane() { - var dataPlaneInstance = createDataPlaneInstance(); - var mockedDataPlane = mock(DataPlaneInstance.class); - var transferProcess = TransferProcess.Builder.newInstance() - .id("transferProcessId") - .contentDataAddress(testDataAddress()) - .dataPlaneId(dataPlaneInstance.getId()) - .build(); - when(mockedDataPlane.getId()).thenReturn("notValidId"); - when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success()); - when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); - when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance, mockedDataPlane)); - - var result = flowController.suspend(transferProcess); - - assertThat(result).isSucceeded(); - verify(dataPlaneClient).suspend("transferProcessId"); - verify(mockedDataPlane).getId(); - } - - @Test - void shouldFail_withInvalidDataPlaneId() { - var dataPlaneInstance = createDataPlaneInstance(); - var transferProcess = TransferProcess.Builder.newInstance() - .id("transferProcessId") - .contentDataAddress(testDataAddress()) - .dataPlaneId("invalid") - .build(); - when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success()); - when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); - when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance)); - - var result = flowController.suspend(transferProcess); - - assertThat(result).isFailed().detail().contains("Failed to select the data plane for suspending the transfer process"); - } - } - @Test void terminate_shouldCallTerminate() { var transferProcess = transferProcessBuilder() @@ -325,7 +296,6 @@ private DataPlaneInstance.Builder dataPlaneInstanceBuilder() { return DataPlaneInstance.Builder.newInstance().url("http://any"); } - private DataPlaneInstance createDataPlaneInstance() { return dataPlaneInstanceBuilder().build(); } @@ -350,4 +320,63 @@ private TransferProcess.Builder transferProcessBuilder() { .counterPartyAddress("test.connector.address") .dataDestination(DataAddress.Builder.newInstance().type("test").build()); } + + @Nested + class Suspend { + + @Test + void shouldCallTerminate() { + var transferProcess = TransferProcess.Builder.newInstance() + .id("transferProcessId") + .contentDataAddress(testDataAddress()) + .build(); + when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success()); + var dataPlaneInstance = createDataPlaneInstance(); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); + when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance)); + + var result = flowController.suspend(transferProcess); + + assertThat(result).isSucceeded(); + verify(dataPlaneClient).suspend("transferProcessId"); + } + + @Test + void shouldCallTerminateOnTheRightDataPlane() { + var dataPlaneInstance = createDataPlaneInstance(); + var mockedDataPlane = mock(DataPlaneInstance.class); + var transferProcess = TransferProcess.Builder.newInstance() + .id("transferProcessId") + .contentDataAddress(testDataAddress()) + .dataPlaneId(dataPlaneInstance.getId()) + .build(); + when(mockedDataPlane.getId()).thenReturn("notValidId"); + when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success()); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); + when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance, mockedDataPlane)); + + var result = flowController.suspend(transferProcess); + + assertThat(result).isSucceeded(); + verify(dataPlaneClient).suspend("transferProcessId"); + verify(mockedDataPlane).getId(); + } + + @Test + void shouldFail_withInvalidDataPlaneId() { + var dataPlaneInstance = createDataPlaneInstance(); + var transferProcess = TransferProcess.Builder.newInstance() + .id("transferProcessId") + .contentDataAddress(testDataAddress()) + .dataPlaneId("invalid") + .build(); + when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success()); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); + when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance)); + + var result = flowController.suspend(transferProcess); + + assertThat(result).isFailed().detail().contains("Failed to select the data plane for suspending the transfer process"); + } + } } diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowPropertiesProvider.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowPropertiesProvider.java new file mode 100644 index 00000000000..d30cac47780 --- /dev/null +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowPropertiesProvider.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.transfer.spi.flow; + +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.policy.model.Policy; +import org.eclipse.edc.runtime.metamodel.annotation.ExtensionPoint; +import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; + +import java.util.Map; + +/** + * Extension point allows additional properties to be included in a {@link DataFlowStartMessage} + */ +@FunctionalInterface +@ExtensionPoint +public interface DataFlowPropertiesProvider { + StatusResult> propertiesFor(TransferProcess transferProcess, Policy policy); +}