Skip to content

Commit

Permalink
fix(sql): do not use QuerySpec when SqlContractDefinitionStore#find…
Browse files Browse the repository at this point in the history
…All() (#1539)

* fix(sql): skip QuerySpec for SqlContractDefinitionStore#findAll()

* fixed ContractNegotiationIntegrationTest

* remove findAll()
  • Loading branch information
paullatzelsperger committed Jun 27, 2022
1 parent a25009f commit cfba051
Show file tree
Hide file tree
Showing 17 changed files with 88 additions and 91 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -45,6 +45,7 @@ in the detailed section referring to by linking pull requests or issues.
* Avoid endless loops in `ContractNegotiationManager` (#1487)
* Fix race condition in `ContractNegotiationIntegrationTest` (#1505)
* Fix for change in Cosmos DB behavior on missing sort fields (#1514)
* Effectively removed default LIMIT in SQL Contract Def Store (#1515)

## [milestone-4] - 2022-06-07

Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.eclipse.dataspaceconnector.spi.policy.PolicyEngine;
import org.eclipse.dataspaceconnector.spi.policy.store.PolicyDefinitionStore;
import org.eclipse.dataspaceconnector.spi.query.QuerySpec;
import org.eclipse.dataspaceconnector.spi.result.Result;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.offer.ContractDefinition;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -55,7 +56,8 @@ public ContractDefinitionServiceImpl(Monitor monitor, ContractDefinitionStore co
@NotNull
@Override
public Stream<ContractDefinition> definitionsFor(ParticipantAgent agent) {
return definitionStore.findAll().stream()
//todo: once IDS supports pagination, replace this with the actual paging parameters
return definitionStore.findAll(QuerySpec.max())
.filter(definition -> evaluatePolicies(definition, agent));
}

Expand Down
Expand Up @@ -74,6 +74,7 @@ void testNegotiation_initialOfferAccepted() {
await().atMost(DEFAULT_TEST_TIMEOUT)
.pollInterval(DEFAULT_POLL_INTERVAL)
.untilAsserted(() -> {
assertThat(consumerNegotiationId).isNotNull();
var consumerNegotiation = consumerStore.find(consumerNegotiationId);
var providerNegotiation = providerStore.findForCorrelationId(consumerNegotiationId);

Expand Down Expand Up @@ -176,6 +177,7 @@ void testNegotiation_agreementDeclined() {
await().atMost(DEFAULT_TEST_TIMEOUT)
.pollInterval(DEFAULT_POLL_INTERVAL)
.untilAsserted(() -> {
assertThat(consumerNegotiationId).isNotNull();
var consumerNegotiation = consumerStore.find(consumerNegotiationId);
var providerNegotiation = providerStore.findForCorrelationId(consumerNegotiationId);

Expand Down
Expand Up @@ -22,13 +22,14 @@
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.eclipse.dataspaceconnector.spi.policy.PolicyEngine;
import org.eclipse.dataspaceconnector.spi.policy.store.PolicyDefinitionStore;
import org.eclipse.dataspaceconnector.spi.query.QuerySpec;
import org.eclipse.dataspaceconnector.spi.result.Result;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.offer.ContractDefinition;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.dataspaceconnector.spi.asset.AssetSelectorExpression.SELECT_ALL;
Expand Down Expand Up @@ -61,13 +62,13 @@ void definitionsFor_verifySatisfiesPolicies() {
var def = PolicyDefinition.Builder.newInstance().policy(Policy.Builder.newInstance().build()).build();
when(policyStore.findById(any())).thenReturn(def);
when(policyEngine.evaluate(NEGOTIATION_SCOPE, def.getPolicy(), agent)).thenReturn(Result.success(def.getPolicy()));
when(definitionStore.findAll()).thenReturn(List.of(ContractDefinition.Builder.newInstance().id("1").accessPolicyId("access").contractPolicyId("contract").selectorExpression(SELECT_ALL).build()));
when(definitionStore.findAll(eq(QuerySpec.max()))).thenReturn(Stream.of(ContractDefinition.Builder.newInstance().id("1").accessPolicyId("access").contractPolicyId("contract").selectorExpression(SELECT_ALL).build()));

var definitions = definitionService.definitionsFor(agent);

assertThat(definitions).hasSize(1);
verify(policyEngine, atLeastOnce()).evaluate(NEGOTIATION_SCOPE, def.getPolicy(), agent);
verify(definitionStore).findAll();
verify(definitionStore).findAll(eq(QuerySpec.max()));
}

@Test
Expand All @@ -77,13 +78,13 @@ void definitionsFor_verifyDoesNotSatisfyAccessPolicy() {
when(policyStore.findById(any())).thenReturn(definition);
var contractDefinition = ContractDefinition.Builder.newInstance().id("1").accessPolicyId("access").contractPolicyId("contract").selectorExpression(SELECT_ALL).build();
when(policyEngine.evaluate(any(), any(), any())).thenReturn(Result.failure("invalid"));
when(definitionStore.findAll()).thenReturn(List.of(contractDefinition));
when(definitionStore.findAll(eq(QuerySpec.max()))).thenReturn(Stream.of(contractDefinition));

var result = definitionService.definitionsFor(agent);

assertThat(result).isEmpty();
verify(policyEngine, atLeastOnce()).evaluate(NEGOTIATION_SCOPE, definition.getPolicy(), agent);
verify(definitionStore).findAll();
verify(definitionStore).findAll(eq(QuerySpec.max()));
}

@Test
Expand All @@ -96,13 +97,13 @@ void definitionsFor_verifyDoesNotSatisfyUsagePolicy() {
when(policyEngine.evaluate(eq(NEGOTIATION_SCOPE), any(), any()))
.thenReturn(Result.success(definition.getPolicy()))
.thenReturn(Result.failure("invalid"));
when(definitionStore.findAll()).thenReturn(List.of(contractDefinition));
when(definitionStore.findAll(eq(QuerySpec.max()))).thenReturn(Stream.of(contractDefinition));

var result = definitionService.definitionsFor(agent);

assertThat(result).isEmpty();
verify(policyEngine, atLeastOnce()).evaluate(NEGOTIATION_SCOPE, definition.getPolicy(), agent);
verify(definitionStore).findAll();
verify(definitionStore).findAll(QuerySpec.max());
}

@Test
Expand All @@ -111,7 +112,7 @@ void definitionsFor_verifyPoliciesNotFound() {
var policy = Policy.Builder.newInstance().build();
when(policyStore.findById(any())).thenReturn(null);
when(policyEngine.evaluate(NEGOTIATION_SCOPE, policy, agent)).thenReturn(Result.success(policy));
when(definitionStore.findAll()).thenReturn(List.of(ContractDefinition.Builder.newInstance().id("1").accessPolicyId("access").contractPolicyId("contract").selectorExpression(SELECT_ALL).build()));
when(definitionStore.findAll(QuerySpec.max())).thenReturn(Stream.of(ContractDefinition.Builder.newInstance().id("1").accessPolicyId("access").contractPolicyId("contract").selectorExpression(SELECT_ALL).build()));

var definitions = definitionService.definitionsFor(agent);

Expand Down
Expand Up @@ -23,7 +23,6 @@
import org.jetbrains.annotations.NotNull;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
Expand All @@ -36,11 +35,6 @@ public class InMemoryContractDefinitionStore implements ContractDefinitionStore
private final Map<String, ContractDefinition> cache = new ConcurrentHashMap<>();
private final QueryResolver<ContractDefinition> queryResolver = new ReflectionBasedQueryResolver<>(ContractDefinition.class);

@Override
public @NotNull Collection<ContractDefinition> findAll() {
return Collections.unmodifiableCollection(cache.values());
}

@Override
public @NotNull Stream<ContractDefinition> findAll(QuerySpec spec) {
return queryResolver.query(cache.values().stream(), spec);
Expand Down
Expand Up @@ -37,14 +37,14 @@ void verifyStore() {
var definition2 = ContractDefinition.Builder.newInstance().id("2").accessPolicyId("access").contractPolicyId("contract").selectorExpression(SELECT_ALL).build();

store.save(definition1);
assertThat(store.findAll()).contains(definition1);
assertThat(store.findAll(QuerySpec.max())).contains(definition1);

store.save(List.of(definition2));
assertThat(store.findAll()).contains(definition1);
assertThat(store.findAll(QuerySpec.max())).contains(definition1);

var deletedDefinition = store.deleteById(definition1.getId());
assertThat(deletedDefinition).isEqualTo(definition1);
assertThat(store.findAll()).doesNotContain(definition1);
assertThat(store.findAll(QuerySpec.max())).doesNotContain(definition1);
}

@Test
Expand Down
Expand Up @@ -252,11 +252,6 @@ private static class FakeContractDefinitionStore implements ContractDefinitionSt
contractDefinitions.add(contractDefinition);
}

@Override
public @NotNull Collection<ContractDefinition> findAll() {
return contractDefinitions;
}

@Override
public @NotNull Stream<ContractDefinition> findAll(QuerySpec spec) {
throw new UnsupportedOperationException();
Expand Down
Expand Up @@ -265,11 +265,6 @@ private static class FakeContractDefinitionStore implements ContractDefinitionSt

private final List<ContractDefinition> contractDefinitions = new ArrayList<>();

@Override
public @NotNull Collection<ContractDefinition> findAll() {
return contractDefinitions;
}

@Override
public @NotNull Stream<ContractDefinition> findAll(QuerySpec spec) {
throw new UnsupportedOperationException();
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.dataspaceconnector.junit.extensions.EdcExtension;
import org.eclipse.dataspaceconnector.spi.asset.AssetSelectorExpression;
import org.eclipse.dataspaceconnector.spi.contract.offer.store.ContractDefinitionStore;
import org.eclipse.dataspaceconnector.spi.query.QuerySpec;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.offer.ContractDefinition;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -112,7 +113,7 @@ void postContractDefinition(ContractDefinitionStore store) {
.post("/contractdefinitions")
.then()
.statusCode(204);
assertThat(store.findAll()).isNotEmpty();
assertThat(store.findAll(QuerySpec.max())).isNotEmpty();
}

@Test
Expand All @@ -129,7 +130,7 @@ void postContractDefinition_invalidBody(ContractDefinitionStore store) {
.post("/contractdefinitions")
.then()
.statusCode(400);
assertThat(store.findAll()).isEmpty();
assertThat(store.findAll(QuerySpec.max())).isEmpty();
}

@Test
Expand All @@ -143,7 +144,7 @@ void postContractDefinition_alreadyExists(ContractDefinitionLoader loader, Contr
.post("/contractdefinitions")
.then()
.statusCode(409);
assertThat(store.findAll()).hasSize(1);
assertThat(store.findAll(QuerySpec.max())).hasSize(1);
}

@Test
Expand All @@ -155,7 +156,7 @@ void deleteContractDefinition(ContractDefinitionLoader loader, ContractDefinitio
.delete("/contractdefinitions/definitionId")
.then()
.statusCode(204);
assertThat(store.findAll()).isEmpty();
assertThat(store.findAll(QuerySpec.max())).isEmpty();
}

@Test
Expand Down
Expand Up @@ -62,11 +62,6 @@ public CosmosContractDefinitionStore(CosmosDbApi cosmosDbApi, TypeManager typeMa
queryResolver = new ReflectionBasedQueryResolver<>(ContractDefinition.class);
}

@Override
public @NotNull Collection<ContractDefinition> findAll() {
return getCache().values();
}

@Override
public @NotNull Stream<ContractDefinition> findAll(QuerySpec spec) {
return lockManager.readLock(() -> queryResolver.query(getCache().values().stream(), spec));
Expand Down
Expand Up @@ -104,7 +104,7 @@ void findAll() {
container.createItem(doc2);

store.reload();
assertThat(store.findAll()).hasSize(2).containsExactlyInAnyOrder(doc1.getWrappedInstance(), doc2.getWrappedInstance());
assertThat(store.findAll(QuerySpec.max())).hasSize(2).containsExactlyInAnyOrder(doc1.getWrappedInstance(), doc2.getWrappedInstance());
}

@Test
Expand All @@ -114,12 +114,12 @@ void findAll_noReload() {
container.createItem(doc1);
container.createItem(doc2);

assertThat(store.findAll()).hasSize(2);
assertThat(store.findAll(QuerySpec.max())).hasSize(2);
}

@Test
void findAll_emptyResult() {
assertThat(store.findAll()).isNotNull().isEmpty();
assertThat(store.findAll(QuerySpec.max())).isNotNull().isEmpty();
}

@Test
Expand Down Expand Up @@ -187,11 +187,11 @@ void saveAll() {
void save_delete_find_shouldNotExist() {
var def1 = generateDefinition();
store.save(def1);
assertThat(store.findAll()).containsOnly(def1);
assertThat(store.findAll(QuerySpec.max())).containsOnly(def1);

store.deleteById(def1.getId());

assertThat(store.findAll()).doesNotContain(def1);
assertThat(store.findAll(QuerySpec.max())).doesNotContain(def1);
}

@Test
Expand Down Expand Up @@ -332,7 +332,7 @@ void verify_readWriteFindAll() {
// add an object
var def = generateDefinition();
store.save(def);
assertThat(store.findAll()).containsExactly(def);
assertThat(store.findAll(QuerySpec.max())).containsExactly(def);

// modify the object
var modifiedDef = ContractDefinition.Builder.newInstance().id(def.getId())
Expand Down
Expand Up @@ -68,7 +68,7 @@ void findAll() {
when(cosmosDbApiMock.queryAllItems()).thenReturn(List.of(doc1, doc2));

store.reload();
var all = store.findAll();
var all = store.findAll(QuerySpec.max());

assertThat(all).hasSize(2).containsExactlyInAnyOrder(doc1.getWrappedInstance(), doc2.getWrappedInstance());
verify(cosmosDbApiMock).queryAllItems();
Expand All @@ -78,28 +78,28 @@ void findAll() {
void findAll_noReload() {
when(cosmosDbApiMock.queryAllItems()).thenReturn(Collections.emptyList());

var all = store.findAll();
var all = store.findAll(QuerySpec.max());
assertThat(all).isEmpty();
verify(cosmosDbApiMock).queryAllItems();
}

@Test
void findById() {
var doc = generateDocument(TEST_PART_KEY);
when(cosmosDbApiMock.queryAllItems()).thenReturn(List.of(doc));

var result = store.findById(doc.getId());

assertThat(result).isNotNull().isEqualTo(doc.getWrappedInstance());
verify(cosmosDbApiMock).queryAllItems();
}

@Test
void findById_invalidId() {
when(cosmosDbApiMock.queryAllItems()).thenReturn(Collections.emptyList());

var result = store.findById("invalid-id");

assertThat(result).isNull();
verify(cosmosDbApiMock).queryAllItems();
}
Expand Down Expand Up @@ -128,7 +128,7 @@ void save_verifyWriteThrough() {

store.save(definition); //should write through the cache

var all = store.findAll();
var all = store.findAll(QuerySpec.max());

assertThat(all).isNotEmpty().containsExactlyInAnyOrder((ContractDefinition) captor.getValue().getWrappedInstance());
verify(cosmosDbApiMock).queryAllItems();
Expand Down
Expand Up @@ -178,11 +178,6 @@ private static class FakeContractDefinitionStore implements ContractDefinitionSt

private final List<ContractDefinition> contractDefinitions = new ArrayList<>();

@Override
public @NotNull Collection<ContractDefinition> findAll() {
return contractDefinitions;
}

@Override
public @NotNull Stream<ContractDefinition> findAll(QuerySpec spec) {
throw new UnsupportedOperationException();
Expand Down
Expand Up @@ -36,10 +36,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.sql.DataSource;

import static java.lang.String.format;
import static org.eclipse.dataspaceconnector.sql.SqlQueryExecutor.executeQuery;

public class SqlContractDefinitionStore implements ContractDefinitionStore {
Expand Down Expand Up @@ -67,11 +67,6 @@ ContractDefinition mapResultSet(ResultSet resultSet) throws Exception {
.build();
}

@Override
public @NotNull Collection<ContractDefinition> findAll() {
return findAll(QuerySpec.none()).collect(Collectors.toList());
}

@Override
public @NotNull Stream<ContractDefinition> findAll(QuerySpec spec) {
return transactionContext.execute(() -> {
Expand Down Expand Up @@ -160,7 +155,7 @@ private void updateInternal(Connection connection, ContractDefinition definition
Objects.requireNonNull(definition);
transactionContext.execute(() -> {
if (!existsById(connection, definition.getId())) {
throw new EdcPersistenceException(String.format("Cannot update. Contract Definition with ID '%s' does not exist.", definition.getId()));
throw new EdcPersistenceException(format("Cannot update. Contract Definition with ID '%s' does not exist.", definition.getId()));
}

executeQuery(connection, statements.getUpdateTemplate(),
Expand Down Expand Up @@ -197,7 +192,7 @@ private <T> T single(List<T> list) {
}

private DataSource getDataSource() {
return Objects.requireNonNull(dataSourceRegistry.resolve(dataSourceName), String.format("DataSource %s could not be resolved", dataSourceName));
return Objects.requireNonNull(dataSourceRegistry.resolve(dataSourceName), format("DataSource %s could not be resolved", dataSourceName));
}

private Connection getConnection() throws SQLException {
Expand Down

0 comments on commit cfba051

Please sign in to comment.