Skip to content

Commit

Permalink
core: dispatch contractdefinition events (#1538)
Browse files Browse the repository at this point in the history
* core: dispatch contractdefinition events

* PR remarks
  • Loading branch information
ndr-brt committed Jul 4, 2022
1 parent 6d531cb commit 630f3d1
Show file tree
Hide file tree
Showing 30 changed files with 404 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ in the detailed section referring to by linking pull requests or issues.
#### Added

* Event Framework for Asset entity (#1453)
* Event Framework for ContractDefinition entity (#1436)
* Event Framework for PolicyDefinition entity (#1437)
* SQL Translation layer (#1357, #1459)
* Permit API verbose error response (#1479)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.eclipse.dataspaceconnector.api.datamanagement.asset;

import org.eclipse.dataspaceconnector.api.datamanagement.asset.service.AssetObservableImpl;
import org.eclipse.dataspaceconnector.api.datamanagement.asset.service.AssetService;
import org.eclipse.dataspaceconnector.api.datamanagement.asset.service.AssetServiceImpl;
import org.eclipse.dataspaceconnector.api.datamanagement.asset.service.EventAssetListener;
Expand All @@ -30,6 +29,7 @@
import org.eclipse.dataspaceconnector.spi.asset.AssetIndex;
import org.eclipse.dataspaceconnector.spi.contract.negotiation.store.ContractNegotiationStore;
import org.eclipse.dataspaceconnector.spi.event.EventRouter;
import org.eclipse.dataspaceconnector.spi.observe.asset.AssetObservableImpl;
import org.eclipse.dataspaceconnector.spi.system.Inject;
import org.eclipse.dataspaceconnector.spi.system.Provides;
import org.eclipse.dataspaceconnector.spi.system.ServiceExtension;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.dataspaceconnector.dataloading.AssetLoader;
import org.eclipse.dataspaceconnector.spi.asset.AssetIndex;
import org.eclipse.dataspaceconnector.spi.contract.negotiation.store.ContractNegotiationStore;
import org.eclipse.dataspaceconnector.spi.observe.asset.AssetObservable;
import org.eclipse.dataspaceconnector.spi.query.Criterion;
import org.eclipse.dataspaceconnector.spi.query.QuerySpec;
import org.eclipse.dataspaceconnector.spi.transaction.TransactionContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.eclipse.dataspaceconnector.spi.event.EventRouter;
import org.eclipse.dataspaceconnector.spi.event.asset.AssetCreated;
import org.eclipse.dataspaceconnector.spi.event.asset.AssetDeleted;
import org.eclipse.dataspaceconnector.spi.observe.asset.AssetListener;
import org.eclipse.dataspaceconnector.spi.types.domain.asset.Asset;

import java.time.Clock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.dataspaceconnector.policy.model.Policy;
import org.eclipse.dataspaceconnector.spi.asset.AssetIndex;
import org.eclipse.dataspaceconnector.spi.contract.negotiation.store.ContractNegotiationStore;
import org.eclipse.dataspaceconnector.spi.observe.asset.AssetObservable;
import org.eclipse.dataspaceconnector.spi.query.QuerySpec;
import org.eclipse.dataspaceconnector.spi.transaction.NoopTransactionContext;
import org.eclipse.dataspaceconnector.spi.transaction.TransactionContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
*
*/

val rsApi: String by project
val restAssured: String by project
val awaitility: String by project
val jerseyVersion: String by project
val restAssured: String by project
val rsApi: String by project

plugins {
`java-library`
Expand All @@ -35,6 +36,7 @@ dependencies {

testImplementation(project(":extensions:junit"))
testImplementation("io.rest-assured:rest-assured:${restAssured}")
testImplementation("org.awaitility:awaitility:${awaitility}")
}

publishing {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,26 @@
package org.eclipse.dataspaceconnector.api.datamanagement.contractdefinition;

import org.eclipse.dataspaceconnector.api.datamanagement.configuration.DataManagementApiConfiguration;
import org.eclipse.dataspaceconnector.api.datamanagement.contractdefinition.service.ContractDefinitionService;
import org.eclipse.dataspaceconnector.api.datamanagement.contractdefinition.service.ContractDefinitionServiceImpl;
import org.eclipse.dataspaceconnector.api.datamanagement.contractdefinition.service.EventContractDefinitionListener;
import org.eclipse.dataspaceconnector.api.datamanagement.contractdefinition.transform.ContractDefinitionDtoToContractDefinitionTransformer;
import org.eclipse.dataspaceconnector.api.datamanagement.contractdefinition.transform.ContractDefinitionToContractDefinitionDtoTransformer;
import org.eclipse.dataspaceconnector.api.transformer.DtoTransformerRegistry;
import org.eclipse.dataspaceconnector.dataloading.ContractDefinitionLoader;
import org.eclipse.dataspaceconnector.spi.WebService;
import org.eclipse.dataspaceconnector.spi.contract.definition.observe.ContractDefinitionObservableImpl;
import org.eclipse.dataspaceconnector.spi.contract.offer.store.ContractDefinitionStore;
import org.eclipse.dataspaceconnector.spi.event.EventRouter;
import org.eclipse.dataspaceconnector.spi.system.Inject;
import org.eclipse.dataspaceconnector.spi.system.Provides;
import org.eclipse.dataspaceconnector.spi.system.ServiceExtension;
import org.eclipse.dataspaceconnector.spi.system.ServiceExtensionContext;
import org.eclipse.dataspaceconnector.spi.transaction.TransactionContext;

import java.time.Clock;

@Provides(ContractDefinitionService.class)
public class ContractDefinitionApiExtension implements ServiceExtension {
@Inject
WebService webService;
Expand All @@ -47,6 +55,12 @@ public class ContractDefinitionApiExtension implements ServiceExtension {
@Inject
TransactionContext transactionContext;

@Inject
Clock clock;

@Inject
EventRouter eventRouter;

@Override
public String name() {
return "Data Management API: Contract Definition";
Expand All @@ -59,7 +73,12 @@ public void initialize(ServiceExtensionContext context) {

var monitor = context.getMonitor();

var service = new ContractDefinitionServiceImpl(contractDefinitionStore, contractDefinitionLoader, transactionContext);
var contractDefinitionObservable = new ContractDefinitionObservableImpl();
contractDefinitionObservable.registerListener(new EventContractDefinitionListener(clock, eventRouter));

var service = new ContractDefinitionServiceImpl(contractDefinitionStore, contractDefinitionLoader, transactionContext, contractDefinitionObservable);
context.registerService(ContractDefinitionService.class, service);

webService.registerResource(config.getContextAlias(), new ContractDefinitionApiController(monitor, service, transformerRegistry));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.eclipse.dataspaceconnector.api.result.ServiceResult;
import org.eclipse.dataspaceconnector.dataloading.ContractDefinitionLoader;
import org.eclipse.dataspaceconnector.spi.contract.definition.observe.ContractDefinitionObservable;
import org.eclipse.dataspaceconnector.spi.contract.offer.store.ContractDefinitionStore;
import org.eclipse.dataspaceconnector.spi.query.QuerySpec;
import org.eclipse.dataspaceconnector.spi.transaction.TransactionContext;
Expand All @@ -30,11 +31,13 @@ public class ContractDefinitionServiceImpl implements ContractDefinitionService
private final ContractDefinitionStore store;
private final ContractDefinitionLoader loader;
private final TransactionContext transactionContext;
private final ContractDefinitionObservable observable;

public ContractDefinitionServiceImpl(ContractDefinitionStore store, ContractDefinitionLoader loader, TransactionContext transactionContext) {
public ContractDefinitionServiceImpl(ContractDefinitionStore store, ContractDefinitionLoader loader, TransactionContext transactionContext, ContractDefinitionObservable observable) {
this.store = store;
this.loader = loader;
this.transactionContext = transactionContext;
this.observable = observable;
}

@Override
Expand All @@ -52,6 +55,7 @@ public ServiceResult<ContractDefinition> create(ContractDefinition contractDefin
return transactionContext.execute(() -> {
if (findById(contractDefinition.getId()) == null) {
loader.accept(contractDefinition);
observable.invokeForEach(l -> l.created(contractDefinition));
return ServiceResult.success(contractDefinition);
} else {
return ServiceResult.conflict(format("ContractDefinition %s cannot be created because it already exist", contractDefinition.getId()));
Expand All @@ -68,6 +72,7 @@ public ServiceResult<ContractDefinition> delete(String contractDefinitionId) {
if (deleted == null) {
return ServiceResult.notFound(format("ContractDefinition %s does not exist", contractDefinitionId));
} else {
observable.invokeForEach(l -> l.deleted(deleted));
return ServiceResult.success(deleted);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.dataspaceconnector.api.datamanagement.contractdefinition.service;

import org.eclipse.dataspaceconnector.spi.contract.definition.observe.ContractDefinitionListener;
import org.eclipse.dataspaceconnector.spi.event.EventRouter;
import org.eclipse.dataspaceconnector.spi.event.contractdefinition.ContractDefinitionCreated;
import org.eclipse.dataspaceconnector.spi.event.contractdefinition.ContractDefinitionDeleted;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.offer.ContractDefinition;

import java.time.Clock;

/**
* Listener responsible for creating and publishing events regarding ContractDefinition state changes
*/
public class EventContractDefinitionListener implements ContractDefinitionListener {
private final Clock clock;
private final EventRouter eventRouter;

public EventContractDefinitionListener(Clock clock, EventRouter eventRouter) {
this.clock = clock;
this.eventRouter = eventRouter;
}

@Override
public void created(ContractDefinition contractDefinition) {
var event = ContractDefinitionCreated.Builder.newInstance()
.contractDefinitionId(contractDefinition.getId())
.at(clock.millis())
.build();

eventRouter.publish(event);
}

@Override
public void deleted(ContractDefinition contractDefinition) {
var event = ContractDefinitionDeleted.Builder.newInstance()
.contractDefinitionId(contractDefinition.getId())
.at(clock.millis())
.build();

eventRouter.publish(event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.dataspaceconnector.api.datamanagement.contractdefinition.service;

import org.eclipse.dataspaceconnector.junit.extensions.EdcExtension;
import org.eclipse.dataspaceconnector.spi.asset.AssetSelectorExpression;
import org.eclipse.dataspaceconnector.spi.event.EventRouter;
import org.eclipse.dataspaceconnector.spi.event.EventSubscriber;
import org.eclipse.dataspaceconnector.spi.event.contractdefinition.ContractDefinitionCreated;
import org.eclipse.dataspaceconnector.spi.event.contractdefinition.ContractDefinitionDeleted;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.offer.ContractDefinition;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.UUID;

import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

@ExtendWith(EdcExtension.class)
public class ContractDefinitionEventDispatchTest {

private final EventSubscriber eventSubscriber = mock(EventSubscriber.class);

@Test
void shouldDispatchEventOnContractDefinitionCreationAndDeletion(ContractDefinitionService service, EventRouter eventRouter) {
eventRouter.register(eventSubscriber);
var contractDefinition = ContractDefinition.Builder.newInstance()
.id(UUID.randomUUID().toString())
.contractPolicyId(UUID.randomUUID().toString())
.accessPolicyId(UUID.randomUUID().toString())
.selectorExpression(AssetSelectorExpression.SELECT_ALL)
.build();

service.create(contractDefinition);

await().untilAsserted(() -> verify(eventSubscriber).on(isA(ContractDefinitionCreated.class)));

service.delete(contractDefinition.getId());

await().untilAsserted(() -> verify(eventSubscriber).on(isA(ContractDefinitionDeleted.class)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

import org.eclipse.dataspaceconnector.dataloading.ContractDefinitionLoader;
import org.eclipse.dataspaceconnector.spi.asset.AssetSelectorExpression;
import org.eclipse.dataspaceconnector.spi.contract.definition.observe.ContractDefinitionListener;
import org.eclipse.dataspaceconnector.spi.contract.definition.observe.ContractDefinitionObservable;
import org.eclipse.dataspaceconnector.spi.contract.definition.observe.ContractDefinitionObservableImpl;
import org.eclipse.dataspaceconnector.spi.contract.offer.store.ContractDefinitionStore;
import org.eclipse.dataspaceconnector.spi.query.QuerySpec;
import org.eclipse.dataspaceconnector.spi.transaction.NoopTransactionContext;
import org.eclipse.dataspaceconnector.spi.transaction.TransactionContext;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.offer.ContractDefinition;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.UUID;
Expand All @@ -30,6 +34,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.dataspaceconnector.api.result.ServiceFailure.Reason.CONFLICT;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.isA;
Expand All @@ -43,7 +48,15 @@ class ContractDefinitionServiceImplTest {
private final ContractDefinitionStore store = mock(ContractDefinitionStore.class);
private final ContractDefinitionLoader loader = mock(ContractDefinitionLoader.class);
private final TransactionContext transactionContext = new NoopTransactionContext();
private final ContractDefinitionServiceImpl service = new ContractDefinitionServiceImpl(store, loader, transactionContext);
private final ContractDefinitionObservable observable = new ContractDefinitionObservableImpl();
private final ContractDefinitionListener listener = mock(ContractDefinitionListener.class);

private final ContractDefinitionServiceImpl service = new ContractDefinitionServiceImpl(store, loader, transactionContext, observable);

@BeforeEach
void setUp() {
observable.registerListener(listener);
}

@Test
void findById_filtersById() {
Expand Down Expand Up @@ -85,6 +98,7 @@ void create_shouldCreateDefinitionIfItDoesNotAlreadyExist() {
assertThat(inserted.succeeded()).isTrue();
assertThat(inserted.getContent()).matches(hasId(definition.getId()));
verify(loader).accept(argThat(it -> definition.getId().equals(it.getId())));
verify(listener).created(any());
}

@Test
Expand All @@ -97,6 +111,7 @@ void create_shouldNotCreateDefinitionIfItAlreadyExists() {
assertThat(inserted.failed()).isTrue();
assertThat(inserted.reason()).isEqualTo(CONFLICT);
verifyNoInteractions(loader);
verifyNoInteractions(listener);
}

@Test
Expand All @@ -108,6 +123,7 @@ void delete_shouldDeleteDefinitionIfItsNotReferencedByAnyAgreement() {

assertThat(deleted.succeeded()).isTrue();
assertThat(deleted.getContent()).matches(hasId(definition.getId()));
verify(listener).deleted(any());
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

import org.eclipse.dataspaceconnector.api.datamanagement.configuration.DataManagementApiConfiguration;
import org.eclipse.dataspaceconnector.api.datamanagement.policy.service.EventPolicyDefinitionListener;
import org.eclipse.dataspaceconnector.api.datamanagement.policy.service.PolicyDefinitionObservableImpl;
import org.eclipse.dataspaceconnector.api.datamanagement.policy.service.PolicyDefinitionService;
import org.eclipse.dataspaceconnector.api.datamanagement.policy.service.PolicyDefinitionServiceImpl;
import org.eclipse.dataspaceconnector.api.transformer.DtoTransformerRegistry;
import org.eclipse.dataspaceconnector.spi.WebService;
import org.eclipse.dataspaceconnector.spi.contract.offer.store.ContractDefinitionStore;
import org.eclipse.dataspaceconnector.spi.event.EventRouter;
import org.eclipse.dataspaceconnector.spi.observe.policydefinition.PolicyDefinitionObservableImpl;
import org.eclipse.dataspaceconnector.spi.policy.store.PolicyDefinitionStore;
import org.eclipse.dataspaceconnector.spi.system.Inject;
import org.eclipse.dataspaceconnector.spi.system.Provides;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.dataspaceconnector.spi.event.EventRouter;
import org.eclipse.dataspaceconnector.spi.event.policydefinition.PolicyDefinitionCreated;
import org.eclipse.dataspaceconnector.spi.event.policydefinition.PolicyDefinitionDeleted;
import org.eclipse.dataspaceconnector.spi.observe.policydefinition.PolicyDefinitionListener;

import java.time.Clock;

Expand All @@ -36,7 +37,7 @@ public EventPolicyDefinitionListener(Clock clock, EventRouter eventRouter) {
@Override
public void created(PolicyDefinition policyDefinition) {
var event = PolicyDefinitionCreated.Builder.newInstance()
.id(policyDefinition.getUid())
.policyDefinitionId(policyDefinition.getUid())
.at(clock.millis())
.build();

Expand All @@ -46,7 +47,7 @@ public void created(PolicyDefinition policyDefinition) {
@Override
public void deleted(PolicyDefinition policyDefinition) {
var event = PolicyDefinitionDeleted.Builder.newInstance()
.id(policyDefinition.getUid())
.policyDefinitionId(policyDefinition.getUid())
.at(clock.millis())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.eclipse.dataspaceconnector.api.result.ServiceResult;
import org.eclipse.dataspaceconnector.policy.model.PolicyDefinition;
import org.eclipse.dataspaceconnector.spi.contract.offer.store.ContractDefinitionStore;
import org.eclipse.dataspaceconnector.spi.observe.policydefinition.PolicyDefinitionObservable;
import org.eclipse.dataspaceconnector.spi.policy.store.PolicyDefinitionStore;
import org.eclipse.dataspaceconnector.spi.query.QuerySpec;
import org.eclipse.dataspaceconnector.spi.transaction.TransactionContext;
Expand Down
Loading

0 comments on commit 630f3d1

Please sign in to comment.