Skip to content

Commit

Permalink
Core: state machine entity base class (#1511)
Browse files Browse the repository at this point in the history
* Core: state machine entity base class

* Update CHANGELOG.md

* Update EntitySendRetryManagerTest.java

* Update StateMachine.java

* Renamed StateMachine to StatefulEntity

* Update ContractNegotiationIntegrationTest.java

* grammar fix
  • Loading branch information
algattik committed Jun 24, 2022
1 parent 5287082 commit 385be86
Show file tree
Hide file tree
Showing 18 changed files with 386 additions and 384 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ in the detailed section referring to by linking pull requests or issues.
* Provided default no-op `TransactionContext` (#1461)
* Refactored query capabilities for `Asset` (#1459)
* Refactored query capabilities for `ContractDefinition` (#1458)
* Refactored state machine and in-memory persistence (#1511)

#### Removed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* On every iteration it runs all the set processors sequentially,
* applying a wait strategy in the case no entities are processed on the iteration.
*/
public class StateMachine {
public class StateMachineManager {

private final List<StateProcessor> processors = new ArrayList<>();
private final ScheduledExecutorService executor;
Expand All @@ -46,14 +46,14 @@ public class StateMachine {
private final String name;
private int shutdownTimeout = 10;

private StateMachine(String name, Monitor monitor, ExecutorInstrumentation instrumentation, WaitStrategy waitStrategy) {
private StateMachineManager(String name, Monitor monitor, ExecutorInstrumentation instrumentation, WaitStrategy waitStrategy) {
this.name = name;
this.monitor = monitor;
this.waitStrategy = waitStrategy;
executor = instrumentation.instrument(
Executors.newSingleThreadScheduledExecutor(r -> {
var thread = Executors.defaultThreadFactory().newThread(r);
thread.setName("StateMachine-" + name);
thread.setName("StateMachineManager-" + name);
return thread;
}), name);
}
Expand All @@ -80,7 +80,7 @@ public CompletableFuture<Boolean> stop() {
try {
return executor.awaitTermination(shutdownTimeout, SECONDS);
} catch (InterruptedException e) {
monitor.severe(format("StateMachine [%s] await termination failed", name), e);
monitor.severe(format("StateMachineManager [%s] await termination failed", name), e);
return false;
}
});
Expand Down Expand Up @@ -124,20 +124,20 @@ private long performLogic() {
}
} catch (Error e) {
active.set(false);
monitor.severe(format("StateMachine [%s] unrecoverable error", name), e);
monitor.severe(format("StateMachineManager [%s] unrecoverable error", name), e);
} catch (Throwable e) {
monitor.severe(format("StateMachine [%s] error caught", name), e);
monitor.severe(format("StateMachineManager [%s] error caught", name), e);
return waitStrategy.retryInMillis();
}
return 0;
}

public static class Builder {

private final StateMachine loop;
private final StateMachineManager loop;

private Builder(String name, Monitor monitor, ExecutorInstrumentation instrumentation, WaitStrategy waitStrategy) {
loop = new StateMachine(name, monitor, instrumentation, waitStrategy);
loop = new StateMachineManager(name, monitor, instrumentation, waitStrategy);
}

public static Builder newInstance(String name, Monitor monitor, ExecutorInstrumentation instrumentation, WaitStrategy waitStrategy) {
Expand All @@ -154,7 +154,7 @@ public Builder shutdownTimeout(int seconds) {
return this;
}

public StateMachine build() {
public StateMachineManager build() {
return loop;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

/**
* Interface that declares an abstraction for a component that process some entities and return the number of the processed ones.
* Used by {@link StateMachine} to decide whether to apply wait strategy in loop iteration
* Used by {@link StateMachineManager} to decide whether to apply wait strategy in loop iteration
*
*/
@FunctionalInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

class StateMachineTest {
class StateMachineManagerTest {

private final WaitStrategy waitStrategy = mock(WaitStrategy.class);
private final Monitor monitor = mock(Monitor.class);
Expand All @@ -53,7 +53,7 @@ void shouldExecuteProcessorsAsyncAndCanBeStopped() throws InterruptedException {
Thread.sleep(100L);
return 1L;
});
var stateMachine = StateMachine.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
var stateMachine = StateMachineManager.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
.processor(processor)
.shutdownTimeout(1)
.build();
Expand All @@ -75,7 +75,7 @@ void shouldNotWaitForSomeTimeIfTheresAtLeastOneProcessedEntity() throws Interrup
latch.countDown();
return 1L;
}).when(waitStrategy).success();
var stateMachine = StateMachine.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
var stateMachine = StateMachineManager.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
.processor(processor)
.build();

Expand All @@ -96,7 +96,7 @@ void shouldWaitForSomeTimeIfNoEntityIsProcessed() throws InterruptedException {
latch.countDown();
return 0L;
}).when(waitStrategy).waitForMillis();
var stateMachine = StateMachine.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
var stateMachine = StateMachineManager.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
.processor(processor)
.build();

Expand All @@ -111,7 +111,7 @@ void shouldWaitForSomeTimeIfNoEntityIsProcessed() throws InterruptedException {
void shouldExitWithAnExceptionIfProcessorExitsWithAnUnrecoverableError() {
var processor = mock(StateProcessor.class);
when(processor.process()).thenThrow(new Error("unrecoverable"));
var stateMachine = StateMachine.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
var stateMachine = StateMachineManager.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
.processor(processor)
.build();

Expand All @@ -128,7 +128,7 @@ void shouldWaitRetryTimeWhenAnExceptionIsThrownByAnProcessor() throws Interrupte
latch.countDown();
return 1L;
});
var stateMachine = StateMachine.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
var stateMachine = StateMachineManager.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
.processor(processor)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ void shouldDelay(long stateTimestamp, long currentTime, long retryDelay, boolean
.id("any")
.stateCount(stateCount)
.stateTimestamp(stateTimestamp)
.clock(clock)
.build();

when(delayStrategy.retryInMillis())
Expand All @@ -80,6 +81,7 @@ void retriesExhausted(int retriesLeft) {
.id("any")
.stateCount(stateCount)
.stateTimestamp(stateTimestamp)
.clock(clock)
.build();

var expected = retriesLeft < 0;
Expand All @@ -100,24 +102,33 @@ public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
}
}

private static class TestEntity extends StatefulEntity {
private static class TestEntity extends StatefulEntity<TestEntity> {
@Override
public TestEntity copy() {
return this;
}

@JsonPOJOBuilder(withPrefix = "")
public static class Builder extends StatefulEntity.Builder<TestEntity, Builder> {

@Override
public Builder self() {
return this;
}

private Builder(TestEntity entity) {
super(entity);
}

@Override
protected TestEntity build() {
return super.build();
}

@JsonCreator
public static Builder newInstance() {
return new Builder(new TestEntity());
}

@Override
public void validate() {

}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.eclipse.dataspaceconnector.contract.negotiation;

import io.opentelemetry.extension.annotations.WithSpan;
import org.eclipse.dataspaceconnector.common.statemachine.StateMachine;
import org.eclipse.dataspaceconnector.common.statemachine.StateMachineManager;
import org.eclipse.dataspaceconnector.common.statemachine.StateProcessorImpl;
import org.eclipse.dataspaceconnector.contract.common.ContractId;
import org.eclipse.dataspaceconnector.policy.model.Policy;
Expand Down Expand Up @@ -60,13 +60,13 @@
*/
public class ConsumerContractNegotiationManagerImpl extends AbstractContractNegotiationManager implements ConsumerContractNegotiationManager {

private StateMachine stateMachine;
private StateMachineManager stateMachineManager;

private ConsumerContractNegotiationManagerImpl() {
}

public void start() {
stateMachine = StateMachine.Builder.newInstance("consumer-contract-negotiation", monitor, executorInstrumentation, waitStrategy)
stateMachineManager = StateMachineManager.Builder.newInstance("consumer-contract-negotiation", monitor, executorInstrumentation, waitStrategy)
.processor(processNegotiationsInState(INITIAL, this::processInitial))
.processor(processNegotiationsInState(REQUESTING, this::processRequesting))
.processor(processNegotiationsInState(CONSUMER_OFFERING, this::processConsumerOffering))
Expand All @@ -75,12 +75,12 @@ public void start() {
.processor(onCommands(this::processCommand))
.build();

stateMachine.start();
stateMachineManager.start();
}

public void stop() {
if (stateMachine != null) {
stateMachine.stop();
if (stateMachineManager != null) {
stateMachineManager.stop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.eclipse.dataspaceconnector.contract.negotiation;

import io.opentelemetry.extension.annotations.WithSpan;
import org.eclipse.dataspaceconnector.common.statemachine.StateMachine;
import org.eclipse.dataspaceconnector.common.statemachine.StateMachineManager;
import org.eclipse.dataspaceconnector.common.statemachine.StateProcessorImpl;
import org.eclipse.dataspaceconnector.contract.common.ContractId;
import org.eclipse.dataspaceconnector.policy.model.Policy;
Expand Down Expand Up @@ -54,7 +54,7 @@
*/
public class ProviderContractNegotiationManagerImpl extends AbstractContractNegotiationManager implements ProviderContractNegotiationManager {

private StateMachine stateMachine;
private StateMachineManager stateMachineManager;

private ProviderContractNegotiationManagerImpl() {
}
Expand All @@ -64,19 +64,19 @@ private ProviderContractNegotiationManagerImpl() {
//TODO validate previous offers against hash?

public void start() {
stateMachine = StateMachine.Builder.newInstance("provider-contract-negotiation", monitor, executorInstrumentation, waitStrategy)
stateMachineManager = StateMachineManager.Builder.newInstance("provider-contract-negotiation", monitor, executorInstrumentation, waitStrategy)
.processor(processNegotiationsInState(PROVIDER_OFFERING, this::processProviderOffering))
.processor(processNegotiationsInState(DECLINING, this::processDeclining))
.processor(processNegotiationsInState(CONFIRMING, this::processConfirming))
.processor(onCommands(this::processCommand))
.build();

stateMachine.start();
stateMachineManager.start();
}

public void stop() {
if (stateMachine != null) {
stateMachine.stop();
if (stateMachineManager != null) {
stateMachineManager.stop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ void testNegotiation_initialOfferDeclined() {
.pollInterval(DEFAULT_POLL_INTERVAL)
.untilAsserted(() -> {

assertThat(consumerNegotiationId).isNotNull();
var consumerNegotiation = consumerStore.find(consumerNegotiationId);
var providerNegotiation = providerStore.findForCorrelationId(consumerNegotiationId);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2022 Microsoft Corporation
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Microsoft Corporation - initial API and implementation
*
*/

package org.eclipse.dataspaceconnector.core.defaults;

import org.eclipse.dataspaceconnector.common.concurrency.LockManager;
import org.eclipse.dataspaceconnector.spi.entity.StatefulEntity;
import org.eclipse.dataspaceconnector.spi.query.QueryResolver;
import org.eclipse.dataspaceconnector.spi.query.QuerySpec;
import org.eclipse.dataspaceconnector.spi.query.ReflectionBasedQueryResolver;
import org.jetbrains.annotations.NotNull;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;

/**
* An in-memory, threadsafe entity store for a {@link StatefulEntity}.
* This implementation is intended for testing purposes only.
*/
public class InMemoryStatefulEntityStore<T extends StatefulEntity<T>> {
private final Map<String, Item<T>> entitiesById = new ConcurrentHashMap<>();
private final QueryResolver<T> queryResolver;
private final LockManager lockManager = new LockManager(new ReentrantReadWriteLock());

public InMemoryStatefulEntityStore(Class<T> clazz) {
queryResolver = new ReflectionBasedQueryResolver<>(clazz);
}

public T find(String id) {
var t = entitiesById.get(id);
if (t == null) {
return null;
}
return t.item.copy();
}

public void upsert(T entity) {
entitiesById.put(entity.getId(), new Item<>(entity.copy()));
}

public void delete(String id) {
entitiesById.remove(id);
}

public Stream<T> findAll(QuerySpec querySpec) {
return queryResolver.query(findAll(), querySpec);
}

public @NotNull List<T> nextForState(int state, int max) {
return lockManager.writeLock(() -> {
var items = entitiesById.values().stream()
.filter(e -> e.item.getState() == state)
.filter(e -> !e.leased)
.sorted(Comparator.comparingLong(e -> e.item.getStateTimestamp())) //order by state timestamp, oldest first
.limit(max)
.collect(toList());
items.forEach(e -> e.leased = true);
return items.stream().map(e -> e.item.copy()).collect(toList());
});
}

public Stream<T> findAll() {
return entitiesById.values().stream().map(e -> e.item);
}

private static class Item<V> {
private final V item;
private boolean leased = false;

Item(V item) {
this.item = item;
}
}
}
Loading

0 comments on commit 385be86

Please sign in to comment.