Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use the range to limit offers, not definitions #2018

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.eclipse.dataspaceconnector.spi.agent.ParticipantAgent;
import org.eclipse.dataspaceconnector.spi.contract.offer.ContractDefinitionService;
import org.eclipse.dataspaceconnector.spi.contract.offer.store.ContractDefinitionStore;
import org.eclipse.dataspaceconnector.spi.message.Range;
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.eclipse.dataspaceconnector.spi.policy.PolicyDefinition;
import org.eclipse.dataspaceconnector.spi.policy.engine.PolicyEngine;
Expand Down Expand Up @@ -55,10 +54,8 @@ public ContractDefinitionServiceImpl(Monitor monitor, ContractDefinitionStore co

@NotNull
@Override
public Stream<ContractDefinition> definitionsFor(ParticipantAgent agent, Range range) {
return definitionStore.findAll(QuerySpec.Builder.newInstance()
.range(range)
.build())
public Stream<ContractDefinition> definitionsFor(ParticipantAgent agent) {
return definitionStore.findAll(QuerySpec.max())
.filter(definition -> evaluateAccessPolicy(definition, agent));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* Microsoft Corporation - Refactoring
* Fraunhofer Institute for Software and Systems Engineering - extended method implementation
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - improvements
* ZF Friedrichshafen AG - enable asset filtering
*
*/

package org.eclipse.dataspaceconnector.contract.offer;
Expand All @@ -32,7 +32,8 @@
import org.jetbrains.annotations.NotNull;

import java.net.URI;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -58,16 +59,44 @@ public ContractOfferServiceImpl(ParticipantAgentService agentService, ContractDe
@NotNull
public Stream<ContractOffer> queryContractOffers(ContractOfferQuery query) {
var agent = agentService.createFor(query.getClaimToken());
var numSeenAssets = new AtomicLong(0);
var range = query.getRange();
var limit = range.getTo() - range.getFrom();
var skip = new AtomicInteger(range.getFrom());

return definitionService.definitionsFor(agent, query.getRange())
return definitionService.definitionsFor(agent)
.takeWhile(d -> numSeenAssets.get() < range.getTo())
.flatMap(definition -> {
var assetFilterQuery = QuerySpec.Builder.newInstance()
.filter(concat(definition.getSelectorExpression().getCriteria().stream(), query.getAssetsCriteria().stream()).collect(Collectors.toList())).build();
var assets = assetIndex.queryAssets(assetFilterQuery);
return Optional.of(definition.getContractPolicyId())
.map(policyStore::findById)
.map(policy -> assets.map(asset -> createContractOffer(definition, policy.getPolicy(), asset)))
.orElseGet(Stream::empty);
var criteria = definition.getSelectorExpression().getCriteria();

var querySpecBuilder = QuerySpec.Builder.newInstance()
.filter(concat(criteria.stream(), query.getAssetsCriteria().stream()).collect(Collectors.toList()));

var querySpec = querySpecBuilder.build();
var numAssets = assetIndex.countAssets(querySpec);

if (skip.get() > 0) {
querySpecBuilder.offset(skip.get());
}
if (numAssets + numSeenAssets.get() > limit) {
querySpecBuilder.limit(limit);
}

if (skip.get() < numAssets) {
var byId = policyStore.findById(definition.getContractPolicyId());
if (byId == null) { //policy not found
return Stream.empty();
}
var assets = assetIndex.queryAssets(querySpecBuilder.build());
numSeenAssets.addAndGet(numAssets);
skip.addAndGet(Long.valueOf(-numAssets).intValue());
return assets.map(a -> createContractOffer(definition, byId.getPolicy(), a));

} else {
numSeenAssets.addAndGet(numAssets);
skip.addAndGet(Long.valueOf(-numAssets).intValue());
return Stream.empty();
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void definitionsFor_verifySatisfiesPolicies() {
when(policyEngine.evaluate(CATALOGING_SCOPE, def.getPolicy(), agent)).thenReturn(Result.success(def.getPolicy()));
when(definitionStore.findAll(any())).thenReturn(Stream.of(ContractDefinition.Builder.newInstance().id("1").accessPolicyId("access").contractPolicyId("contract").selectorExpression(SELECT_ALL).build()));

var definitions = definitionService.definitionsFor(agent, DEFAULT_RANGE);
var definitions = definitionService.definitionsFor(agent);

assertThat(definitions).hasSize(1);
verify(policyEngine, atLeastOnce()).evaluate(CATALOGING_SCOPE, def.getPolicy(), agent);
Expand All @@ -82,7 +82,7 @@ void definitionsFor_verifyDoesNotSatisfyAccessPolicy() {
when(policyEngine.evaluate(any(), any(), any())).thenReturn(Result.failure("invalid"));
when(definitionStore.findAll(any())).thenReturn(Stream.of(contractDefinition));

var result = definitionService.definitionsFor(agent, DEFAULT_RANGE);
var result = definitionService.definitionsFor(agent);

assertThat(result).isEmpty();
verify(policyEngine, atLeastOnce()).evaluate(CATALOGING_SCOPE, definition.getPolicy(), agent);
Expand All @@ -97,7 +97,7 @@ void definitionsFor_verifyPoliciesNotFound() {
when(policyEngine.evaluate(CATALOGING_SCOPE, policy, agent)).thenReturn(Result.success(policy));
when(definitionStore.findAll(QuerySpec.max())).thenReturn(Stream.of(ContractDefinition.Builder.newInstance().id("1").accessPolicyId("access").contractPolicyId("contract").selectorExpression(SELECT_ALL).build()));

var definitions = definitionService.definitionsFor(agent, DEFAULT_RANGE);
var definitions = definitionService.definitionsFor(agent);

assertThat(definitions).hasSize(0);
verify(policyEngine, never()).evaluate(any(), any(), any());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright (c) 2021 Daimler TSS GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Daimler TSS GmbH - Initial API and Implementation
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - improvements
*
*/

package org.eclipse.dataspaceconnector.contract.offer;

import org.eclipse.dataspaceconnector.core.controlplane.defaults.assetindex.InMemoryAssetIndex;
import org.eclipse.dataspaceconnector.policy.model.Policy;
import org.eclipse.dataspaceconnector.spi.agent.ParticipantAgent;
import org.eclipse.dataspaceconnector.spi.agent.ParticipantAgentService;
import org.eclipse.dataspaceconnector.spi.asset.AssetIndex;
import org.eclipse.dataspaceconnector.spi.asset.AssetSelectorExpression;
import org.eclipse.dataspaceconnector.spi.contract.offer.ContractDefinitionService;
import org.eclipse.dataspaceconnector.spi.contract.offer.ContractOfferQuery;
import org.eclipse.dataspaceconnector.spi.contract.offer.ContractOfferService;
import org.eclipse.dataspaceconnector.spi.iam.ClaimToken;
import org.eclipse.dataspaceconnector.spi.message.Range;
import org.eclipse.dataspaceconnector.spi.policy.PolicyDefinition;
import org.eclipse.dataspaceconnector.spi.policy.store.PolicyDefinitionStore;
import org.eclipse.dataspaceconnector.spi.query.Criterion;
import org.eclipse.dataspaceconnector.spi.types.domain.DataAddress;
import org.eclipse.dataspaceconnector.spi.types.domain.asset.Asset;
import org.eclipse.dataspaceconnector.spi.types.domain.asset.AssetEntry;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.offer.ContractDefinition;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.emptyMap;
import static java.util.stream.IntStream.range;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* This could be seen as se second part of the {@code ContractOfferServiceImplTest}, using the in-mem asset index
*/
class ContractOfferServiceImplIntegrationTest {

private final ContractDefinitionService contractDefinitionService = mock(ContractDefinitionService.class);
private final ParticipantAgentService agentService = mock(ParticipantAgentService.class);
private final PolicyDefinitionStore policyStore = mock(PolicyDefinitionStore.class);
private AssetIndex assetIndex;
private ContractOfferService contractOfferService;

@BeforeEach
void setUp() {
assetIndex = new InMemoryAssetIndex();
contractOfferService = new ContractOfferServiceImpl(agentService, contractDefinitionService, assetIndex, policyStore);
}

@Test
void shouldLimitResult_withHeterogenousChunks() {
var assets1 = range(10, 24).mapToObj(i -> createAsset("asset" + i).build()).collect(Collectors.toList());
var assets2 = range(24, 113).mapToObj(i -> createAsset("asset" + i).build()).collect(Collectors.toList());
var assets3 = range(113, 178).mapToObj(i -> createAsset("asset" + i).build()).collect(Collectors.toList());

store(assets1);
store(assets2);
store(assets3);

var def1 = getContractDefBuilder("def1").selectorExpression(selectorFrom(assets1)).build();
var def2 = getContractDefBuilder("def2").selectorExpression(selectorFrom(assets2)).build();
var def3 = getContractDefBuilder("def3").selectorExpression(selectorFrom(assets3)).build();

when(agentService.createFor(isA(ClaimToken.class))).thenReturn(new ParticipantAgent(emptyMap(), emptyMap()));
when(contractDefinitionService.definitionsFor(isA(ParticipantAgent.class))).thenAnswer(i -> Stream.of(def1, def2, def3));

when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(Policy.Builder.newInstance().build()).build());


var from = 20;
var to = 50;
var query = ContractOfferQuery.builder().range(new Range(from, to)).claimToken(ClaimToken.Builder.newInstance().build()).build();

assertThat(contractOfferService.queryContractOffers(query)).hasSize(to - from);
verify(agentService).createFor(isA(ClaimToken.class));
verify(contractDefinitionService, times(1)).definitionsFor(isA(ParticipantAgent.class));
verify(policyStore).findById("contract");
}

@Test
void shouldLimitResult_insufficientAssets() {
var assets1 = range(0, 12).mapToObj(i -> createAsset("asset" + i).build()).collect(Collectors.toList());
var assets2 = range(12, 18).mapToObj(i -> createAsset("asset" + i).build()).collect(Collectors.toList());

store(assets1);
store(assets2);

var def1 = getContractDefBuilder("def1").selectorExpression(selectorFrom(assets1)).build();
var def2 = getContractDefBuilder("def2").selectorExpression(selectorFrom(assets2)).build();

when(agentService.createFor(isA(ClaimToken.class))).thenReturn(new ParticipantAgent(emptyMap(), emptyMap()));
when(contractDefinitionService.definitionsFor(isA(ParticipantAgent.class))).thenAnswer(i -> Stream.of(def1, def2));
when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(Policy.Builder.newInstance().build()).build());

var from = 14;
var to = 50;
var query = ContractOfferQuery.builder().range(new Range(from, to)).claimToken(ClaimToken.Builder.newInstance().build()).build();

// 4 definitions, 10 assets each = 40 offers total -> offset 20 ==> result = 20
assertThat(contractOfferService.queryContractOffers(query)).hasSize(4);
verify(agentService).createFor(isA(ClaimToken.class));
verify(contractDefinitionService).definitionsFor(isA(ParticipantAgent.class));
verify(policyStore, atLeastOnce()).findById("contract");
}

@Test
void shouldLimitResult_pageOffsetLargerThanNumAssets() {
var contractDefinition = range(0, 2).mapToObj(i -> getContractDefBuilder(String.valueOf(i))
.build());

when(agentService.createFor(isA(ClaimToken.class))).thenReturn(new ParticipantAgent(emptyMap(), emptyMap()));
when(contractDefinitionService.definitionsFor(isA(ParticipantAgent.class))).thenAnswer(i -> contractDefinition);

when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(Policy.Builder.newInstance().build()).build());

var from = 25;
var to = 50;
var query = ContractOfferQuery.builder().range(new Range(from, to)).claimToken(ClaimToken.Builder.newInstance().build()).build();

// 2 definitions, 10 assets each = 20 offers total -> offset of 25 is outside
assertThat(contractOfferService.queryContractOffers(query)).isEmpty();
verify(agentService).createFor(isA(ClaimToken.class));
verify(contractDefinitionService).definitionsFor(isA(ParticipantAgent.class));
verify(policyStore, never()).findById("contract");
}

private void store(Collection<Asset> assets) {
assets.stream().map(a -> new AssetEntry(a, DataAddress.Builder.newInstance().type("test-type").build()))
.forEach(assetIndex::accept);
}

private AssetSelectorExpression selectorFrom(Collection<Asset> assets1) {
var builder = AssetSelectorExpression.Builder.newInstance();
var ids = assets1.stream().map(a -> a.getId()).collect(Collectors.toList());
return builder.criteria(List.of(new Criterion(Asset.PROPERTY_ID, "in", ids))).build();
}

private ContractDefinition.Builder getContractDefBuilder(String id) {
return ContractDefinition.Builder.newInstance()
.id(id)
.accessPolicyId("access")
.contractPolicyId("contract")
.selectorExpression(AssetSelectorExpression.SELECT_ALL);
}

private Asset.Builder createAsset(String id) {
return Asset.Builder.newInstance().id(id).name("test asset " + id);
}

}
Loading