Skip to content

Commit

Permalink
feat: IDS catalog request filtering (#2015)
Browse files Browse the repository at this point in the history
* feat: IDS catalog request filtering

* code review changes

* code review changes #2

* feat: IDS catalog request filtering - fix checks

* code review changes

* further code review fixes

* further code review fixes

* further code review fixes

* add tests for RequestUtil
  • Loading branch information
maciejkizlich-zf committed Oct 4, 2022
1 parent e456b36 commit 4e8252e
Show file tree
Hide file tree
Showing 24 changed files with 242 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* Microsoft Corporation - Refactoring
* Fraunhofer Institute for Software and Systems Engineering - extended method implementation
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - improvements
*
* ZF Friedrichshafen AG - enable asset filtering
*/

package org.eclipse.dataspaceconnector.contract.offer;
Expand All @@ -24,17 +24,20 @@
import org.eclipse.dataspaceconnector.spi.contract.offer.ContractDefinitionService;
import org.eclipse.dataspaceconnector.spi.contract.offer.ContractOfferQuery;
import org.eclipse.dataspaceconnector.spi.contract.offer.ContractOfferService;
import org.eclipse.dataspaceconnector.spi.message.Range;
import org.eclipse.dataspaceconnector.spi.policy.store.PolicyDefinitionStore;
import org.eclipse.dataspaceconnector.spi.query.QuerySpec;
import org.eclipse.dataspaceconnector.spi.types.domain.asset.Asset;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.offer.ContractDefinition;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.offer.ContractOffer;
import org.jetbrains.annotations.NotNull;

import java.net.URI;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.stream.Stream.concat;

/**
* Implementation of the {@link ContractOfferService}.
*/
Expand All @@ -53,12 +56,14 @@ public ContractOfferServiceImpl(ParticipantAgentService agentService, ContractDe

@Override
@NotNull
public Stream<ContractOffer> queryContractOffers(ContractOfferQuery query, Range range) {
public Stream<ContractOffer> queryContractOffers(ContractOfferQuery query) {
var agent = agentService.createFor(query.getClaimToken());

return definitionService.definitionsFor(agent, range)
return definitionService.definitionsFor(agent, query.getRange())
.flatMap(definition -> {
var assets = assetIndex.queryAssets(definition.getSelectorExpression());
var assetFilterQuery = QuerySpec.Builder.newInstance()
.filter(concat(definition.getSelectorExpression().getCriteria().stream(), query.getAssetsCriteria().stream()).collect(Collectors.toList())).build();
var assets = assetIndex.queryAssets(assetFilterQuery);
return Optional.of(definition.getContractPolicyId())
.map(policyStore::findById)
.map(policy -> assets.map(asset -> createContractOffer(definition, policy.getPolicy(), asset)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@
import org.eclipse.dataspaceconnector.spi.message.Range;
import org.eclipse.dataspaceconnector.spi.policy.PolicyDefinition;
import org.eclipse.dataspaceconnector.spi.policy.store.PolicyDefinitionStore;
import org.eclipse.dataspaceconnector.spi.query.Criterion;
import org.eclipse.dataspaceconnector.spi.query.QuerySpec;
import org.eclipse.dataspaceconnector.spi.types.domain.asset.Asset;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.offer.ContractDefinition;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.emptyMap;
import static java.util.stream.Stream.concat;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -70,15 +75,14 @@ void shouldGetContractOffers() {
when(agentService.createFor(isA(ClaimToken.class))).thenReturn(new ParticipantAgent(emptyMap(), emptyMap()));
when(contractDefinitionService.definitionsFor(isA(ParticipantAgent.class), any())).thenReturn(Stream.of(contractDefinition));
var assetStream = Stream.of(Asset.Builder.newInstance().build(), Asset.Builder.newInstance().build());
when(assetIndex.queryAssets(isA(AssetSelectorExpression.class))).thenReturn(assetStream);
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenReturn(assetStream);
when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(Policy.Builder.newInstance().build()).build());

var query = ContractOfferQuery.builder().claimToken(ClaimToken.Builder.newInstance().build()).build();
var query = ContractOfferQuery.builder().range(DEFAULT_RANGE).claimToken(ClaimToken.Builder.newInstance().build()).build();

assertThat(contractOfferService.queryContractOffers(query, DEFAULT_RANGE)).hasSize(2);
assertThat(contractOfferService.queryContractOffers(query)).hasSize(2);
verify(agentService).createFor(isA(ClaimToken.class));
verify(contractDefinitionService).definitionsFor(isA(ParticipantAgent.class), eq(DEFAULT_RANGE));
verify(assetIndex).queryAssets(isA(AssetSelectorExpression.class));
verify(policyStore).findById("contract");
}

Expand All @@ -97,8 +101,40 @@ void shouldNotGetContractOfferIfPolicyIsNotFound() {

var query = ContractOfferQuery.builder().claimToken(ClaimToken.Builder.newInstance().build()).build();

var result = contractOfferService.queryContractOffers(query, DEFAULT_RANGE);
var result = contractOfferService.queryContractOffers(query);

assertThat(result).hasSize(0);
}

@Test
void shouldGetContractOffersWithAssetFilteringApplied() {
var contractDefinition = ContractDefinition.Builder.newInstance()
.id("1")
.accessPolicyId("access")
.contractPolicyId("contract")
.selectorExpression(AssetSelectorExpression.Builder.newInstance().whenEquals(Asset.PROPERTY_NAME, "assetName").build())
.build();

when(agentService.createFor(isA(ClaimToken.class))).thenReturn(new ParticipantAgent(emptyMap(), emptyMap()));
when(contractDefinitionService.definitionsFor(isA(ParticipantAgent.class), any())).thenReturn(Stream.of(contractDefinition));
var assetStream = Stream.of(Asset.Builder.newInstance().build(), Asset.Builder.newInstance().build());
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenReturn(assetStream);
when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(Policy.Builder.newInstance().build()).build());

var query = ContractOfferQuery.builder()
.range(DEFAULT_RANGE)
.claimToken(ClaimToken.Builder.newInstance().build())
.assetsCriteria(List.of(new Criterion(Asset.PROPERTY_ID, "=", "2")))
.build();

var expectedQuerySpec = QuerySpec.Builder.newInstance()
.filter(concat(contractDefinition.getSelectorExpression().getCriteria().stream(), query.getAssetsCriteria().stream())
.collect(Collectors.toList())).build();

assertThat(contractOfferService.queryContractOffers(query)).hasSize(2);
verify(agentService).createFor(isA(ClaimToken.class));
verify(contractDefinitionService).definitionsFor(isA(ParticipantAgent.class), eq(DEFAULT_RANGE));
verify(policyStore).findById("contract");
verify(assetIndex).queryAssets(eq(expectedQuerySpec));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.eclipse.dataspaceconnector.spi.message.Range;
import org.eclipse.dataspaceconnector.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.eclipse.dataspaceconnector.spi.query.QuerySpec;
import org.eclipse.dataspaceconnector.spi.types.domain.catalog.Catalog;
import org.eclipse.dataspaceconnector.spi.types.domain.catalog.CatalogRequest;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.offer.ContractOffer;
Expand Down Expand Up @@ -53,7 +54,7 @@ public BatchedRequestFetcher(RemoteMessageDispatcherRegistry dispatcherRegistry,
@NotNull
public CompletableFuture<List<ContractOffer>> fetch(CatalogRequest catalogRequest, int from, int batchSize) {
var range = new Range(from, from + batchSize);
var rq = catalogRequest.toBuilder().range(range).build();
var rq = catalogRequest.toBuilder().querySpec(QuerySpec.Builder.newInstance().range(range).build()).build();

return dispatcherRegistry.send(Catalog.class, rq, () -> null)
.thenApply(Catalog::getContractOffers)
Expand All @@ -72,11 +73,4 @@ private List<ContractOffer> concat(List<ContractOffer> list1, List<ContractOffer
list1.addAll(list2);
return list1;
}


private Catalog getCatalog(CatalogRequest catalogRequest, int from, int to) {
var future = dispatcherRegistry.send(Catalog.class, catalogRequest.toBuilder().range(new Range(from, to)).build(), () -> null);
return future.join();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,9 @@ void fetchAll() {

// verify the sequence of requests
assertThat(captor.getAllValues())
.extracting(CatalogRequest::getRange)
.extracting(l -> l.getQuerySpec().getRange())
.usingRecursiveFieldByFieldElementComparator()
.containsExactly(new Range(0, 5), new Range(5, 10), new Range(10, 15), new Range(15, 20));

}

private CatalogRequest createRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
*
* Contributors:
* Fraunhofer Institute for Software and Systems Engineering - initial API and implementation
*
* ZF Friedrichshafen AG - enable asset filtering
*/

package org.eclipse.dataspaceconnector.ids.api.multipart.dispatcher.sender.type;
Expand All @@ -30,7 +30,7 @@
import org.eclipse.dataspaceconnector.ids.core.util.CalendarUtil;
import org.eclipse.dataspaceconnector.ids.spi.domain.IdsConstants;
import org.eclipse.dataspaceconnector.spi.EdcException;
import org.eclipse.dataspaceconnector.spi.message.Range;
import org.eclipse.dataspaceconnector.spi.query.QuerySpec;
import org.eclipse.dataspaceconnector.spi.types.domain.catalog.Catalog;
import org.eclipse.dataspaceconnector.spi.types.domain.catalog.CatalogRequest;

Expand Down Expand Up @@ -79,16 +79,15 @@ public Message buildMessageHeader(CatalogRequest request, DynamicAttributeToken
._recipientConnector_(Collections.singletonList(URI.create(request.getConnectorId())))
.build();
//TODO: IDS REFACTORING: incorporate this into the protocol itself
message.setProperty(Range.FROM, request.getRange().getFrom());
message.setProperty(Range.TO, request.getRange().getTo());
message.setProperty(QuerySpec.QUERY_SPEC, request.getQuerySpec());
return message;
}

@Override
public String buildMessagePayload(CatalogRequest request) throws Exception {
return null;
}

/**
* Parses the response content and extracts the catalog from the received self description.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private void registerControllers(ServiceExtensionContext context) {

// create request handlers
var handlers = new LinkedList<Handler>();
handlers.add(new DescriptionRequestHandler(monitor, connectorId, transformerRegistry, assetIndex, dataCatalogService, contractOfferService, connectorService));
handlers.add(new DescriptionRequestHandler(monitor, connectorId, transformerRegistry, assetIndex, dataCatalogService, contractOfferService, connectorService, objectMapper));
handlers.add(new ArtifactRequestHandler(monitor, connectorId, objectMapper, contractNegotiationStore, contractValidationService, transferProcessManager, vault));
handlers.add(new EndpointDataReferenceHandler(monitor, connectorId, endpointDataReferenceReceiverRegistry, endpointDataReferenceTransformerRegistry, context.getTypeManager()));
handlers.add(new ContractRequestHandler(monitor, connectorId, objectMapper, providerNegotiationManager, transformerRegistry, assetIndex));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package org.eclipse.dataspaceconnector.ids.api.multipart.handler;

import com.fasterxml.jackson.databind.ObjectMapper;
import de.fraunhofer.iais.eis.Artifact;
import de.fraunhofer.iais.eis.Connector;
import de.fraunhofer.iais.eis.DescriptionRequestMessage;
Expand All @@ -34,16 +35,16 @@
import org.eclipse.dataspaceconnector.spi.contract.offer.ContractOfferQuery;
import org.eclipse.dataspaceconnector.spi.contract.offer.ContractOfferService;
import org.eclipse.dataspaceconnector.spi.iam.ClaimToken;
import org.eclipse.dataspaceconnector.spi.message.Range;
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.eclipse.dataspaceconnector.spi.query.Criterion;
import org.eclipse.dataspaceconnector.spi.query.QuerySpec;
import org.eclipse.dataspaceconnector.spi.result.Result;
import org.eclipse.dataspaceconnector.spi.types.domain.asset.Asset;
import org.jetbrains.annotations.NotNull;

import static java.lang.String.format;
import static java.util.stream.Collectors.toList;
import static org.eclipse.dataspaceconnector.ids.api.multipart.util.RequestUtil.getInt;
import static org.eclipse.dataspaceconnector.ids.api.multipart.util.RequestUtil.getQuerySpec;
import static org.eclipse.dataspaceconnector.ids.api.multipart.util.ResponseUtil.badParameters;
import static org.eclipse.dataspaceconnector.ids.api.multipart.util.ResponseUtil.createMultipartResponse;
import static org.eclipse.dataspaceconnector.ids.api.multipart.util.ResponseUtil.descriptionResponse;
Expand All @@ -57,6 +58,7 @@ public class DescriptionRequestHandler implements Handler {
private final CatalogService catalogService;
private final ContractOfferService contractOfferService;
private final ConnectorService connectorService;
private final ObjectMapper objectMapper;

public DescriptionRequestHandler(
@NotNull Monitor monitor,
Expand All @@ -65,14 +67,16 @@ public DescriptionRequestHandler(
@NotNull AssetIndex assetIndex,
@NotNull CatalogService catalogService,
@NotNull ContractOfferService contractOfferService,
@NotNull ConnectorService connectorService) {
@NotNull ConnectorService connectorService,
@NotNull ObjectMapper objectMapper) {
this.monitor = monitor;
this.connectorId = connectorId;
this.transformerRegistry = transformerRegistry;
this.assetIndex = assetIndex;
this.catalogService = catalogService;
this.contractOfferService = contractOfferService;
this.connectorService = connectorService;
this.objectMapper = objectMapper;
}

@Override
Expand All @@ -88,19 +92,15 @@ public boolean canHandle(@NotNull MultipartRequest multipartRequest) {
// Get ID of requested element
var requestedElement = IdsId.from(message.getRequestedElement());

//TODO: IDS REFACTORING: this should be a named property of the message object
// extract paging information, default to 0 ... Integer.MAX_VALUE
var from = getInt(message, Range.FROM, 0);
var to = getInt(message, Range.TO, Integer.MAX_VALUE);
var range = new Range(from, to);
var querySpec = getQuerySpec(message, objectMapper);

// Retrieve and transform requested element
Result<? extends ModelClass> result;
if (requestedElement.failed() || requestedElement.getContent() == null ||
(requestedElement.getContent().getType() == IdsType.CONNECTOR)) {
result = getConnector(claimToken, range);
result = getConnector(claimToken, querySpec);
} else {
var retrievedObject = retrieveRequestedElement(requestedElement.getContent(), claimToken, range);
var retrievedObject = retrieveRequestedElement(requestedElement.getContent(), claimToken, querySpec);
if (retrievedObject == null) {
return createMultipartResponse(notFound(message, connectorId));
}
Expand All @@ -118,8 +118,8 @@ public boolean canHandle(@NotNull MultipartRequest multipartRequest) {
return createMultipartResponse(descriptionResponse(message, connectorId), result.getContent());
}

private Result<Connector> getConnector(ClaimToken claimToken, Range range) {
return transformerRegistry.transform(connectorService.getConnector(claimToken, range), Connector.class);
private Result<Connector> getConnector(ClaimToken claimToken, QuerySpec querySpec) {
return transformerRegistry.transform(connectorService.getConnector(claimToken, querySpec), Connector.class);
}

/**
Expand All @@ -128,17 +128,17 @@ private Result<Connector> getConnector(ClaimToken claimToken, Range range) {
*
* @param idsId the ID.
* @param claimToken the claim token of the requesting connector.
* @param range the range.
* @param querySpec the QuerySpec containing Range and/or filtering criteria.
* @return the requested element.
*/
private Object retrieveRequestedElement(IdsId idsId, ClaimToken claimToken, Range range) {
private Object retrieveRequestedElement(IdsId idsId, ClaimToken claimToken, QuerySpec querySpec) {
var type = idsId.getType();
switch (type) {
case ARTIFACT:
case REPRESENTATION:
return assetIndex.findById(idsId.getValue());
case CATALOG:
return catalogService.getDataCatalog(claimToken, range);
return catalogService.getDataCatalog(claimToken, querySpec);
case RESOURCE:
var assetId = idsId.getValue();
var asset = assetIndex.findById(assetId);
Expand All @@ -148,9 +148,9 @@ private Object retrieveRequestedElement(IdsId idsId, ClaimToken claimToken, Rang

var contractOfferQuery = ContractOfferQuery.Builder.newInstance()
.claimToken(claimToken)
.criterion(new Criterion(Asset.PROPERTY_ID, "=", assetId))
.assetsCriterion(new Criterion(Asset.PROPERTY_ID, "=", assetId))
.build();
var targetingContractOffers = contractOfferService.queryContractOffers(contractOfferQuery, range).collect(toList());
var targetingContractOffers = contractOfferService.queryContractOffers(contractOfferQuery).collect(toList());

return new OfferedAsset(asset, targetingContractOffers);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

package org.eclipse.dataspaceconnector.ids.api.multipart.util;

import com.fasterxml.jackson.databind.ObjectMapper;
import de.fraunhofer.iais.eis.DescriptionRequestMessage;
import org.eclipse.dataspaceconnector.spi.query.QuerySpec;
import org.jetbrains.annotations.NotNull;

import static java.util.Optional.ofNullable;
Expand All @@ -25,14 +27,13 @@ public class RequestUtil {
* Extracts an arbitrary property from a {@link DescriptionRequestMessage}
*
* @param message The message
* @param propertyName the name of the property
* @param defaultValue If the message does not contain that property the default value is returned.
* @return either the property parsed into an Integer, or the default value
* @param objectMapper The objectMapper
* @return either the property parsed into specific type, or the default value
*/
public static int getInt(@NotNull DescriptionRequestMessage message, String propertyName, int defaultValue) {
public static QuerySpec getQuerySpec(@NotNull DescriptionRequestMessage message, ObjectMapper objectMapper) {
return ofNullable(message.getProperties())
.map(map -> map.get(propertyName))
.map(v -> Integer.parseInt(v.toString()))
.orElse(defaultValue);
.map(map -> map.get(QuerySpec.QUERY_SPEC))
.map(specEntry -> objectMapper.convertValue(specEntry, QuerySpec.class))
.orElse(QuerySpec.none());
}
}
}
Loading

0 comments on commit 4e8252e

Please sign in to comment.