Skip to content

Commit

Permalink
refactor(api): deprecate TransferRequest assetId (#4269)
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Jun 14, 2024
1 parent 8c2ca13 commit 40c67e8
Show file tree
Hide file tree
Showing 20 changed files with 220 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,22 @@ public Object resolveParameter(ParameterContext parameterContext, ExtensionConte
*
* @param mock the service mock
*/
public <T> void registerServiceMock(Class<T> type, T mock) {
public <T> RuntimeExtension registerServiceMock(Class<T> type, T mock) {
runtime.registerServiceMock(type, mock);
return this;
}

/**
* Registers a service extension with the runtime.
*/
public <T extends SystemExtension> void registerSystemExtension(Class<T> type, SystemExtension extension) {
public <T extends SystemExtension> RuntimeExtension registerSystemExtension(Class<T> type, SystemExtension extension) {
runtime.registerSystemExtension(type, extension);
return this;
}

public void setConfiguration(Map<String, String> configuration) {
public RuntimeExtension setConfiguration(Map<String, String> configuration) {
registerSystemExtension(ConfigurationExtension.class, (ConfigurationExtension) () -> ConfigFactory.fromMap(configuration));
return this;
}

public <T> T getService(Class<T> clazz) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,6 @@ public ServiceResult<List<TransferProcess>> search(QuerySpec query) {
return ServiceResult.badRequest("Contract agreement with id %s not found".formatted(request.getContractId()));
}

if (!agreement.getAssetId().equals(request.getAssetId())) {
return ServiceResult.badRequest("Asset id %s in contract agreement does not match asset id in transfer request %s".formatted(agreement.getAssetId(), request.getAssetId()));
}

var flowType = flowTypeExtractor.extract(request.getTransferType()).getContent();

if (flowType == FlowType.PUSH) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
import org.eclipse.edc.connector.controlplane.transfer.spi.types.protocol.TransferStartMessage;
import org.eclipse.edc.connector.core.event.EventExecutorServiceContainer;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.junit.extensions.EdcExtension;
import org.eclipse.edc.junit.extensions.RuntimeExtension;
import org.eclipse.edc.junit.extensions.RuntimePerClassExtension;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.agent.ParticipantAgent;
Expand All @@ -47,81 +48,56 @@
import org.eclipse.edc.spi.iam.ClaimToken;
import org.eclipse.edc.spi.iam.IdentityService;
import org.eclipse.edc.spi.iam.TokenRepresentation;
import org.eclipse.edc.spi.iam.VerificationContext;
import org.eclipse.edc.spi.message.RemoteMessageDispatcher;
import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.edc.spi.protocol.ProtocolWebhook;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.validator.spi.DataAddressValidatorRegistry;
import org.eclipse.edc.validator.spi.ValidationResult;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;

import java.time.Duration;
import java.util.Map;
import java.util.UUID;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;
import static org.eclipse.edc.junit.matchers.EventEnvelopeMatcher.isEnvelopeOf;
import static org.eclipse.edc.util.io.Ports.getFreePort;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.ArgumentMatchers.matches;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(EdcExtension.class)
public class TransferProcessEventDispatchTest {

public static final Duration TIMEOUT = Duration.ofSeconds(30);
private final EventSubscriber eventSubscriber = mock();
private final IdentityService identityService = mock();

@NotNull
private static RemoteMessageDispatcher getTestDispatcher() {
var testDispatcher = mock(RemoteMessageDispatcher.class);
when(testDispatcher.protocol()).thenReturn("test");
var ack = TransferProcessAck.Builder.newInstance().build();
when(testDispatcher.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success(ack)));
return testDispatcher;
}

@BeforeEach
void setUp(EdcExtension extension) {
var configuration = Map.of(
"edc.transfer.send.retry.limit", "0",
"edc.transfer.send.retry.base-delay.ms", "0",
"web.http.port", String.valueOf(getFreePort()),
"web.http.path", "/api"
);

extension.setConfiguration(configuration);
extension.registerServiceMock(TransferWaitStrategy.class, () -> 1);
extension.registerServiceMock(EventExecutorServiceContainer.class, new EventExecutorServiceContainer(newSingleThreadExecutor()));
extension.registerServiceMock(IdentityService.class, identityService);
extension.registerServiceMock(ProtocolWebhook.class, () -> "http://dummy");
extension.registerServiceMock(PolicyArchive.class, mock());
extension.registerServiceMock(ContractNegotiationStore.class, mock());
extension.registerServiceMock(ParticipantAgentService.class, mock());
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
var dataAddressValidatorRegistry = mock(DataAddressValidatorRegistry.class);
when(dataAddressValidatorRegistry.validateSource(any())).thenReturn(ValidationResult.success());
when(dataAddressValidatorRegistry.validateDestination(any())).thenReturn(ValidationResult.success());
extension.registerServiceMock(DataAddressValidatorRegistry.class, dataAddressValidatorRegistry);
}
@RegisterExtension
static final RuntimeExtension RUNTIME = new RuntimePerClassExtension()
.setConfiguration(Map.of(
"edc.transfer.send.retry.limit", "0",
"edc.transfer.send.retry.base-delay.ms", "0"
))
.registerServiceMock(TransferWaitStrategy.class, () -> 1)
.registerServiceMock(EventExecutorServiceContainer.class, new EventExecutorServiceContainer(newSingleThreadExecutor()))
.registerServiceMock(IdentityService.class, mock())
.registerServiceMock(ProtocolWebhook.class, () -> "http://dummy")
.registerServiceMock(PolicyArchive.class, mock())
.registerServiceMock(ContractNegotiationStore.class, mock())
.registerServiceMock(ParticipantAgentService.class, mock())
.registerServiceMock(DataPlaneClientFactory.class, mock());

@Test
void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService service,
Expand All @@ -130,31 +106,35 @@ void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService se
RemoteMessageDispatcherRegistry dispatcherRegistry,
PolicyArchive policyArchive,
ContractNegotiationStore negotiationStore,
ParticipantAgentService agentService) {
ParticipantAgentService agentService,
IdentityService identityService) {

var token = ClaimToken.Builder.newInstance().build();
var tokenRepresentation = TokenRepresentation.Builder.newInstance().token(UUID.randomUUID().toString()).build();

when(identityService.verifyJwtToken(eq(tokenRepresentation), isA(VerificationContext.class))).thenReturn(Result.success(token));
when(identityService.verifyJwtToken(any(), any())).thenReturn(Result.success(token));

var transferRequest = createTransferRequest();
var agent = mock(ParticipantAgent.class);
var agreement = mock(ContractAgreement.class);
var providerId = "ProviderId";
var agreement = ContractAgreement.Builder.newInstance()
.assetId("assetId")
.providerId(providerId)
.consumerId("consumerId")
.policy(Policy.Builder.newInstance().build())
.build();

when(agreement.getAssetId()).thenReturn(transferRequest.getAssetId());
when(agreement.getProviderId()).thenReturn(providerId);
when(agreement.getPolicy()).thenReturn(Policy.Builder.newInstance().build());
when(agent.getIdentity()).thenReturn(providerId);

dispatcherRegistry.register(getTestDispatcher());
when(policyArchive.findPolicyForContract(matches(transferRequest.getContractId()))).thenReturn(Policy.Builder.newInstance().target(transferRequest.getAssetId()).build());
when(policyArchive.findPolicyForContract(matches(transferRequest.getContractId()))).thenReturn(Policy.Builder.newInstance().target("assetId").build());
when(negotiationStore.findContractAgreement(transferRequest.getContractId())).thenReturn(agreement);
when(agentService.createFor(token)).thenReturn(agent);
eventRouter.register(TransferProcessEvent.class, eventSubscriber);

var initiateResult = service.initiateTransfer(transferRequest);

assertThat(initiateResult).isSucceeded();
await().atMost(TIMEOUT).untilAsserted(() -> {
verify(eventSubscriber).on(argThat(isEnvelopeOf(TransferProcessInitiated.class)));
verify(eventSubscriber).on(argThat(isEnvelopeOf(TransferProcessProvisioned.class)));
Expand All @@ -169,15 +149,16 @@ void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService se
.dataAddress(dataAddress)
.build();

protocolService.notifyStarted(startMessage, tokenRepresentation);
var startedResult = protocolService.notifyStarted(startMessage, tokenRepresentation);

assertThat(startedResult).isSucceeded();
await().atMost(TIMEOUT).untilAsserted(() -> {
ArgumentCaptor<EventEnvelope<TransferProcessStarted>> captor = ArgumentCaptor.forClass(EventEnvelope.class);
verify(eventSubscriber, times(4)).on(captor.capture());
assertThat(captor.getValue()).isNotNull()
.extracting(EventEnvelope::getPayload)
.extracting(TransferProcessStarted::getDataAddress)
.usingRecursiveComparison().isEqualTo(dataAddress);
var captor = ArgumentCaptor.forClass(EventEnvelope.class);
verify(eventSubscriber, atLeast(3)).on(captor.capture());
var payload = captor.getAllValues().stream().map(EventEnvelope::getPayload).filter(TransferProcessStarted.class::isInstance).findFirst();
assertThat(payload).isPresent().get().isInstanceOfSatisfying(TransferProcessStarted.class, s -> {
assertThat(s.getDataAddress()).usingRecursiveComparison().isEqualTo(dataAddress);
});
});

var transferProcess = initiateResult.getContent();
Expand All @@ -195,23 +176,6 @@ void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService se
});
}

@Test
void shouldTerminateOnInvalidPolicy(TransferProcessService service, EventRouter eventRouter, RemoteMessageDispatcherRegistry dispatcherRegistry, ContractNegotiationStore negotiationStore) {
dispatcherRegistry.register(getTestDispatcher());
eventRouter.register(TransferProcessEvent.class, eventSubscriber);
var transferRequest = createTransferRequest();
var agreement = mock(ContractAgreement.class);
when(agreement.getAssetId()).thenReturn(transferRequest.getAssetId());
when(negotiationStore.findContractAgreement(transferRequest.getContractId())).thenReturn(agreement);

service.initiateTransfer(transferRequest);

await().atMost(TIMEOUT).untilAsserted(() -> {
verify(eventSubscriber).on(argThat(isEnvelopeOf(TransferProcessInitiated.class)));
verify(eventSubscriber).on(argThat(isEnvelopeOf(TransferProcessTerminated.class)));
});
}

@Test
void shouldDispatchEventOnTransferProcessTerminated(TransferProcessService service,
EventRouter eventRouter,
Expand All @@ -220,9 +184,13 @@ void shouldDispatchEventOnTransferProcessTerminated(TransferProcessService servi
ContractNegotiationStore negotiationStore) {

var transferRequest = createTransferRequest();
when(policyArchive.findPolicyForContract(matches("contractId"))).thenReturn(Policy.Builder.newInstance().target(transferRequest.getAssetId()).build());
var agreement = mock(ContractAgreement.class);
when(agreement.getAssetId()).thenReturn(transferRequest.getAssetId());
when(policyArchive.findPolicyForContract(matches("contractId"))).thenReturn(Policy.Builder.newInstance().target("assetId").build());
var agreement = ContractAgreement.Builder.newInstance()
.assetId("assetId")
.providerId("providerId")
.consumerId("consumerId")
.policy(Policy.Builder.newInstance().build())
.build();
when(negotiationStore.findContractAgreement(transferRequest.getContractId())).thenReturn(agreement);
dispatcherRegistry.register(getTestDispatcher());
eventRouter.register(TransferProcessEvent.class, eventSubscriber);
Expand All @@ -241,23 +209,45 @@ void shouldDispatchEventOnTransferProcessTerminated(TransferProcessService servi
}

@Test
void shouldDispatchEventOnTransferProcessFailure(TransferProcessService service, EventRouter eventRouter, RemoteMessageDispatcherRegistry dispatcherRegistry, ContractNegotiationStore negotiationStore) {
dispatcherRegistry.register(getTestDispatcher());
void shouldDispatchEventOnTransferProcessFailure(TransferProcessService service, EventRouter eventRouter, RemoteMessageDispatcherRegistry dispatcherRegistry,
ContractNegotiationStore negotiationStore, PolicyArchive policyArchive) {
dispatcherRegistry.register(getFailingDispatcher());
eventRouter.register(TransferProcessEvent.class, eventSubscriber);
var transferRequest = createTransferRequest();
var agreement = mock(ContractAgreement.class);
when(agreement.getAssetId()).thenReturn(transferRequest.getAssetId());
var agreement = ContractAgreement.Builder.newInstance()
.assetId("assetId")
.providerId("providerId")
.consumerId("consumerId")
.policy(Policy.Builder.newInstance().build())
.build();
when(negotiationStore.findContractAgreement(transferRequest.getContractId())).thenReturn(agreement);
when(policyArchive.findPolicyForContract(any())).thenReturn(Policy.Builder.newInstance().build());

service.initiateTransfer(transferRequest);

await().atMost(TIMEOUT).untilAsserted(() -> verify(eventSubscriber).on(argThat(isEnvelopeOf(TransferProcessTerminated.class))));
}

@NotNull
private RemoteMessageDispatcher getTestDispatcher() {
var testDispatcher = mock(RemoteMessageDispatcher.class);
when(testDispatcher.protocol()).thenReturn("test");
var ack = TransferProcessAck.Builder.newInstance().build();
when(testDispatcher.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success(ack)));
return testDispatcher;
}

@NotNull
private RemoteMessageDispatcher getFailingDispatcher() {
var testDispatcher = mock(RemoteMessageDispatcher.class);
when(testDispatcher.protocol()).thenReturn("test");
when(testDispatcher.dispatch(any(), any())).thenReturn(failedFuture(new EdcException("cannot send message")));
return testDispatcher;
}

private TransferRequest createTransferRequest() {
return TransferRequest.Builder.newInstance()
.id("dataRequestId")
.assetId("assetId")
.dataDestination(DataAddress.Builder.newInstance().type("any").build())
.protocol("test")
.counterPartyAddress("http://an/address")
Expand Down
Loading

0 comments on commit 40c67e8

Please sign in to comment.