Skip to content

Commit

Permalink
feat: introduces properties provider for DataFlowStartMessage (#4044)
Browse files Browse the repository at this point in the history
  • Loading branch information
wolf4ood committed Mar 26, 2024
1 parent 73317e3 commit 8725718
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
import org.eclipse.edc.connector.transfer.dataplane.flow.DataPlaneSignalingFlowController;
import org.eclipse.edc.connector.transfer.spi.callback.ControlApiUrl;
import org.eclipse.edc.connector.transfer.spi.flow.DataFlowManager;
import org.eclipse.edc.connector.transfer.spi.flow.DataFlowPropertiesProvider;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;

import java.util.Map;

import static org.eclipse.edc.connector.transfer.dataplane.TransferDataPlaneSignalingExtension.NAME;

@Extension(NAME)
Expand All @@ -47,10 +51,17 @@ public class TransferDataPlaneSignalingExtension implements ServiceExtension {
@Inject
private DataPlaneClientFactory clientFactory;

@Inject(required = false)
private DataFlowPropertiesProvider propertiesProvider;

@Override
public void initialize(ServiceExtensionContext context) {
var selectionStrategy = context.getSetting(DPF_SELECTOR_STRATEGY, DEFAULT_DATAPLANE_SELECTOR_STRATEGY);
dataFlowManager.register(new DataPlaneSignalingFlowController(callbackUrl, selectorService, clientFactory, selectionStrategy));
dataFlowManager.register(new DataPlaneSignalingFlowController(callbackUrl, selectorService, getPropertiesProvider(), clientFactory, selectionStrategy));
}

private DataFlowPropertiesProvider getPropertiesProvider() {
return propertiesProvider == null ? (tp, p) -> StatusResult.success(Map.of()) : propertiesProvider;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.connector.transfer.spi.callback.ControlApiUrl;
import org.eclipse.edc.connector.transfer.spi.flow.DataFlowController;
import org.eclipse.edc.connector.transfer.spi.flow.DataFlowPropertiesProvider;
import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.policy.model.Policy;
Expand Down Expand Up @@ -52,11 +53,13 @@ public class DataPlaneSignalingFlowController implements DataFlowController {
private final DataPlaneSelectorService selectorClient;
private final DataPlaneClientFactory clientFactory;

private final DataFlowPropertiesProvider propertiesProvider;
private final String selectionStrategy;

public DataPlaneSignalingFlowController(ControlApiUrl callbackUrl, DataPlaneSelectorService selectorClient, DataPlaneClientFactory clientFactory, String selectionStrategy) {
public DataPlaneSignalingFlowController(ControlApiUrl callbackUrl, DataPlaneSelectorService selectorClient, DataFlowPropertiesProvider propertiesProvider, DataPlaneClientFactory clientFactory, String selectionStrategy) {
this.callbackUrl = callbackUrl;
this.selectorClient = selectorClient;
this.propertiesProvider = propertiesProvider;
this.clientFactory = clientFactory;
this.selectionStrategy = selectionStrategy;
}
Expand All @@ -73,6 +76,11 @@ public boolean canHandle(TransferProcess transferProcess) {
return StatusResult.failure(ResponseStatus.FATAL_ERROR, flowType.getFailureDetail());
}

var propertiesResult = propertiesProvider.propertiesFor(transferProcess, policy);
if (propertiesResult.failed()) {
return StatusResult.failure(ResponseStatus.FATAL_ERROR, propertiesResult.getFailureDetail());
}

var dataPlaneInstance = selectorClient.select(transferProcess.getContentDataAddress(), transferProcess.getDataDestination(), selectionStrategy, transferProcess.getTransferType());
var dataFlowRequest = DataFlowStartMessage.Builder.newInstance()
.id(UUID.randomUUID().toString())
Expand All @@ -84,6 +92,7 @@ public boolean canHandle(TransferProcess transferProcess) {
.assetId(transferProcess.getAssetId())
.flowType(flowType.getContent())
.callbackAddress(callbackUrl != null ? callbackUrl.get() : null)
.properties(propertiesResult.getContent())
.build();

var dataPlaneInstanceId = dataPlaneInstance != null ? dataPlaneInstance.getId() : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClient;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.connector.transfer.spi.flow.DataFlowPropertiesProvider;
import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.policy.model.Policy;
Expand All @@ -36,6 +37,7 @@

import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -53,8 +55,11 @@ public class DataPlaneSignalingFlowControllerTest {
private final DataPlaneClient dataPlaneClient = mock();
private final DataPlaneClientFactory dataPlaneClientFactory = mock();
private final DataPlaneSelectorService selectorService = mock();

private final DataFlowPropertiesProvider propertiesProvider = mock();
private final DataPlaneSignalingFlowController flowController =
new DataPlaneSignalingFlowController(() -> URI.create("http://localhost"), selectorService, dataPlaneClientFactory, "random");
new DataPlaneSignalingFlowController(() -> URI.create("http://localhost"), selectorService, propertiesProvider, dataPlaneClientFactory, "random");


@Test
void canHandle() {
Expand All @@ -80,6 +85,8 @@ void initiateFlow_transferSuccess(String transferType) {
.contentDataAddress(testDataAddress())
.build();

var customProperties = Map.of("foo", "bar");
when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.success(customProperties));
when(dataPlaneClient.start(any(DataFlowStartMessage.class))).thenReturn(StatusResult.success(mock(DataFlowResponseMessage.class)));
var dataPlaneInstance = createDataPlaneInstance();
when(selectorService.select(any(), any(), any(), eq(transferType))).thenReturn(dataPlaneInstance);
Expand All @@ -98,7 +105,7 @@ void initiateFlow_transferSuccess(String transferType) {
assertThat(captured.getAgreementId()).isEqualTo(transferProcess.getContractId());
assertThat(captured.getAssetId()).isEqualTo(transferProcess.getAssetId());
assertThat(transferType).contains(captured.getFlowType().toString());
assertThat(captured.getProperties()).isEmpty();
assertThat(captured.getProperties()).containsAllEntriesOf(customProperties);
assertThat(captured.getCallbackAddress()).isNotNull();
}

Expand All @@ -112,6 +119,7 @@ void initiateFlow_transferSuccess_withReturnedDataAddress() {

var response = mock(DataFlowResponseMessage.class);
when(response.getDataAddress()).thenReturn(DataAddress.Builder.newInstance().type("type").build());
when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.success(Map.of()));
when(dataPlaneClient.start(any(DataFlowStartMessage.class))).thenReturn(StatusResult.success(response));
var dataPlaneInstance = createDataPlaneInstance();
when(selectorService.select(any(), any(), any(), eq(HTTP_DATA_PULL))).thenReturn(dataPlaneInstance);
Expand All @@ -134,6 +142,7 @@ void initiateFlow_transferSuccess_withoutDataPlane() {
.transferType(HTTP_DATA_PULL)
.build();

when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.success(Map.of()));
when(dataPlaneClient.start(any(DataFlowStartMessage.class))).thenReturn(StatusResult.success(mock(DataFlowResponseMessage.class)));
when(selectorService.select(any(), any())).thenReturn(null);
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
Expand Down Expand Up @@ -171,6 +180,21 @@ void initiateFlow_invalidTransferType(String transferType) {
assertThat(result.getFailureMessages()).allSatisfy(s -> assertThat(s).contains("Failed to extract flow type from transferType %s".formatted(transferType)));
}

@Test
void initiateFlow_returnFailedResult_whenPropertiesResolveFails() {
var errorMsg = "error";
var transferProcess = transferProcessBuilder()
.contentDataAddress(testDataAddress())
.transferType(HTTP_DATA_PULL)
.build();

when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.failure(ResponseStatus.FATAL_ERROR, errorMsg));
var result = flowController.start(transferProcess, Policy.Builder.newInstance().build());

assertThat(result.failed()).isTrue();
assertThat(result.getFailureMessages()).allSatisfy(s -> assertThat(s).contains(errorMsg));
}

@Test
void initiateFlow_returnFailedResultIfTransferFails() {
var errorMsg = "error";
Expand All @@ -179,6 +203,7 @@ void initiateFlow_returnFailedResultIfTransferFails() {
.transferType(HTTP_DATA_PULL)
.build();

when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.success(Map.of()));
when(dataPlaneClient.start(any())).thenReturn(StatusResult.failure(ResponseStatus.FATAL_ERROR, errorMsg));
var dataPlaneInstance = createDataPlaneInstance();
when(selectorService.select(any(), any())).thenReturn(dataPlaneInstance);
Expand All @@ -192,65 +217,6 @@ void initiateFlow_returnFailedResultIfTransferFails() {
assertThat(result.getFailureMessages()).allSatisfy(s -> assertThat(s).contains(errorMsg));
}

@Nested
class Suspend {

@Test
void shouldCallTerminate() {
var transferProcess = TransferProcess.Builder.newInstance()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.build();
when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success());
var dataPlaneInstance = createDataPlaneInstance();
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance));

var result = flowController.suspend(transferProcess);

assertThat(result).isSucceeded();
verify(dataPlaneClient).suspend("transferProcessId");
}

@Test
void shouldCallTerminateOnTheRightDataPlane() {
var dataPlaneInstance = createDataPlaneInstance();
var mockedDataPlane = mock(DataPlaneInstance.class);
var transferProcess = TransferProcess.Builder.newInstance()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.dataPlaneId(dataPlaneInstance.getId())
.build();
when(mockedDataPlane.getId()).thenReturn("notValidId");
when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success());
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance, mockedDataPlane));

var result = flowController.suspend(transferProcess);

assertThat(result).isSucceeded();
verify(dataPlaneClient).suspend("transferProcessId");
verify(mockedDataPlane).getId();
}

@Test
void shouldFail_withInvalidDataPlaneId() {
var dataPlaneInstance = createDataPlaneInstance();
var transferProcess = TransferProcess.Builder.newInstance()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.dataPlaneId("invalid")
.build();
when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success());
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance));

var result = flowController.suspend(transferProcess);

assertThat(result).isFailed().detail().contains("Failed to select the data plane for suspending the transfer process");
}
}

@Test
void terminate_shouldCallTerminate() {
var transferProcess = transferProcessBuilder()
Expand Down Expand Up @@ -325,7 +291,6 @@ private DataPlaneInstance.Builder dataPlaneInstanceBuilder() {
return DataPlaneInstance.Builder.newInstance().url("http://any");
}


private DataPlaneInstance createDataPlaneInstance() {
return dataPlaneInstanceBuilder().build();
}
Expand All @@ -350,4 +315,63 @@ private TransferProcess.Builder transferProcessBuilder() {
.counterPartyAddress("test.connector.address")
.dataDestination(DataAddress.Builder.newInstance().type("test").build());
}

@Nested
class Suspend {

@Test
void shouldCallTerminate() {
var transferProcess = TransferProcess.Builder.newInstance()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.build();
when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success());
var dataPlaneInstance = createDataPlaneInstance();
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance));

var result = flowController.suspend(transferProcess);

assertThat(result).isSucceeded();
verify(dataPlaneClient).suspend("transferProcessId");
}

@Test
void shouldCallTerminateOnTheRightDataPlane() {
var dataPlaneInstance = createDataPlaneInstance();
var mockedDataPlane = mock(DataPlaneInstance.class);
var transferProcess = TransferProcess.Builder.newInstance()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.dataPlaneId(dataPlaneInstance.getId())
.build();
when(mockedDataPlane.getId()).thenReturn("notValidId");
when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success());
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance, mockedDataPlane));

var result = flowController.suspend(transferProcess);

assertThat(result).isSucceeded();
verify(dataPlaneClient).suspend("transferProcessId");
verify(mockedDataPlane).getId();
}

@Test
void shouldFail_withInvalidDataPlaneId() {
var dataPlaneInstance = createDataPlaneInstance();
var transferProcess = TransferProcess.Builder.newInstance()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.dataPlaneId("invalid")
.build();
when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success());
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance));

var result = flowController.suspend(transferProcess);

assertThat(result).isFailed().detail().contains("Failed to select the data plane for suspending the transfer process");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.transfer.spi.flow;

import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.runtime.metamodel.annotation.ExtensionPoint;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;

import java.util.Map;

/**
* Extension point allows additional properties to be included in a {@link DataFlowStartMessage}
*/
@FunctionalInterface
@ExtensionPoint
public interface DataFlowPropertiesProvider {

StatusResult<Map<String, String>> propertiesFor(TransferProcess transferProcess, Policy policy);

}

0 comments on commit 8725718

Please sign in to comment.