Skip to content

Commit

Permalink
3 state guard (#11)
Browse files Browse the repository at this point in the history
* Create `SequencedExecutor`.

* #3 implement StateGuard's first method 'mutate'

* #8 configure `error-prone` and apply its suggestions

* #1 add tests

* #1 nice catch, M. Error-Prone!

* #1 more strict Java compiling rules + consider warnings as errors

* #1 use @GuardedBy for better error-prone coverage.

* #3 complete StateGuard + tests
+ better tests for SequencedExecutor

* #3 clean up

* #8 avoid raw types (thanks error-prone)

* #3 add failure tests

* #3 add CODEOWNERS
  • Loading branch information
Adeynack committed Jan 18, 2018
1 parent cbc3e84 commit 717dbbf
Show file tree
Hide file tree
Showing 7 changed files with 490 additions and 34 deletions.
1 change: 1 addition & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* @adeynack
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
compile "com.google.errorprone:error_prone_core:2.1.3"
testCompile "junit:junit:4.12"
testCompile "org.hamcrest:hamcrest-junit:2.0.0.0"
}

tasks.withType(JavaCompile) {
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/com/github/adeynack/executors/StateDecision.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.github.adeynack.executors;

public class StateDecision<S, T> {

private final S updatedState;
private final T returnedValue;

public StateDecision(S updatedState, T returnedValue) {
this.updatedState = updatedState;
this.returnedValue = returnedValue;
}

public S getUpdatedState() {
return updatedState;
}

public T getReturnedValue() {
return returnedValue;
}
}
91 changes: 91 additions & 0 deletions src/main/java/com/github/adeynack/executors/StateGuard.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.github.adeynack.executors;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Function;

/**
* @param <S> type of the state to protect.
*/
@SuppressWarnings("WeakerAccess") // Library class
public class StateGuard<S> {

private volatile S state;
private final SequencedExecutor sequencedExecutor;

public StateGuard(S initialState, Executor baseExecutor) {
this.state = initialState;
this.sequencedExecutor = new SequencedExecutor(baseExecutor);
}

/**
* Mutate the state, without any return value.
* <p>
* During the execution of `stateMutation`, the state context is secure. It is executed
* using an internal {@link SequencedExecutor}. If the state is mutable (which should be
* avoided as much as possible), it is safe to mutate it inside of `stateReader`. However,
* the ideal way to use this is to keep the state immutable and return the new immutable
* version of the state as the return value of `stateMutation`.
*
* @param stateMutation a function receiving the actual state and returning the
* new, mutated, state.
* @return a {@link CompletionStage<Void>} indicating when the change is completed.
*/
public CompletionStage<Void> change(Function<S, S> stateMutation) {
return CompletableFuture.runAsync(
() -> {
S stateBefore = state;
state = stateMutation.apply(stateBefore);
},
sequencedExecutor);
}

/**
* Read a value from the state.
* <p>
* THREADING WARNING: This is not executed in an exclusive way. The state received in
* `stateReader` is not protected. DO NOT MUTATE IT. This is directly executed on the
* caller's thread. The ideal way of using this is as a fast way to extract information
* from the state at a given time.
*
* @param stateReader a function receiving the actual state and returning a value from it.
* @param <T> the type of the value to be returned.
* @return the value returned by `stateReader`.
*/
public <T> T immediateRead(Function<S, T> stateReader) {
S actualState = state;
return stateReader.apply(actualState);
}

/**
* Mutate the state and return a value from it.
* <p>
* During the execution of `stateMutation`, the state context is secure. It is executed
* using an internal {@link SequencedExecutor}. If the state is mutable (which should be
* avoided as much as possible), it is safe to mutate it inside of `stateReader`. However,
* the ideal way to use this is to keep the state immutable and return the new immutable
* version of the state as part of the return value of `stateMutation`.
*
* @param stateMutation a function receiving the actual state and returning a {@link StateDecision}
* with both the new state and the value to return to the caller. If it
* returns `null`, then the state stays the same and `null` is returned
* to the caller.
* @return a {@link CompletionStage<T>} providing, when ready, the read value.
*/
public <T> CompletionStage<T> readAndChange(Function<S, StateDecision<S, T>> stateMutation) {
return CompletableFuture.supplyAsync(
() -> {
S actualState = state;
StateDecision<S, T> decision = stateMutation.apply(actualState);
if (decision == null) {
return null;
} else {
state = decision.getUpdatedState();
return decision.getReturnedValue();
}
},
sequencedExecutor);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.github.adeynack.executors;

import com.github.adeynack.executors.testTools.SequentialChecker;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -10,12 +12,11 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
Expand All @@ -25,50 +26,39 @@
public class SequencedExecutorTest {

private SequencedExecutor sequencedExecutor;
private AtomicInteger refusedLocks;
private Lock testLock;
private SequentialChecker sequentialChecker;
private AtomicInteger createdTaskCounter;
private CompletableFuture<Integer> startedTaskWait;

@Before
public void before() {
final Executor baseExecutor = Executors.newFixedThreadPool(4);
final Executor baseExecutor = Executors.newFixedThreadPool(
4,
new ThreadFactoryBuilder().setNameFormat("sequenced-executor-test-pool-%d").build());
sequencedExecutor = new SequencedExecutor(baseExecutor);
refusedLocks = new AtomicInteger();
testLock = new ReentrantLock();
sequentialChecker = new SequentialChecker();
createdTaskCounter = new AtomicInteger();
startedTaskWait = new CompletableFuture<>();
}

private Runnable createTask(final int taskId, final boolean rePost, final boolean fail) {
private Runnable createTask(final Integer taskId, final boolean rePost, final boolean fail) {
int nt = createdTaskCounter.incrementAndGet();
System.out.println(String.format("+ There are now %s tasks created.", nt));
return () -> {
try {
System.out.println(String.format("[%s] Waiting on testLock", taskId));
final boolean gotLock = testLock.tryLock();
try {
if (!gotLock) {
System.out.println(String.format("[%s] ERROR: Could not obtain lock", taskId));
refusedLocks.incrementAndGet();
} else {
System.out.println(String.format("[%s] Lock obtained.", taskId));
if (rePost) {
sequencedExecutor.execute(createTask(taskId + 100, false, false));
}
if (fail) {
throw new RuntimeException(String.format("Task %s fails", taskId));
}
Thread.sleep(100);
sequentialChecker.check(taskId.toString(), true, () -> {
if (rePost) {
sequencedExecutor.execute(createTask(taskId + 100, false, false));
}
} finally {
if (gotLock) {
System.out.println(String.format("[%s] Unlocking testLock", taskId));
testLock.unlock();
if (fail) {
throw new RuntimeException(String.format("Task %s fails", taskId));
}
}
} catch (InterruptedException e) {
System.err.println(String.format("[%s] %s", taskId, e));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
} finally {
System.out.println(String.format("[%s] End of runnable.", taskId));
int t = createdTaskCounter.decrementAndGet();
Expand All @@ -80,20 +70,28 @@ private Runnable createTask(final int taskId, final boolean rePost, final boolea
};
}

private void assertAllTasksExecutedOnSequencedExecutor() {
sequentialChecker.getLog().forEach((entry) -> {
assertThat(entry.threadName, startsWith("sequenced-executor-test-pool-"));
});
}

@Test
public void twoTasksAreNotExecutedAtTheSameTime() throws Exception {
IntStream.range(0, 12)
.forEach(i -> sequencedExecutor.execute(createTask(i, false, false)));
startedTaskWait.get(1, TimeUnit.MINUTES);
assertEquals("No lock should be refused.", 0, refusedLocks.get());
sequentialChecker.assertMaxParallelTaskCount(1);
assertAllTasksExecutedOnSequencedExecutor();
}

@Test
public void twoTasksAreNotExecutedAtTheSameTimeWhenSubmittedFromAnExecutedTask() throws Exception {
IntStream.range(0, 12)
.forEach(i -> sequencedExecutor.execute(createTask(i, true, false)));
startedTaskWait.get(1, TimeUnit.MINUTES);
assertEquals("No lock should be refused.", 0, refusedLocks.get());
sequentialChecker.assertMaxParallelTaskCount(1);
assertAllTasksExecutedOnSequencedExecutor();
}

@Test
Expand All @@ -114,7 +112,7 @@ public void aFailingTaskDoesNotCrashTheExecutor() throws Exception {
assertEquals("Task 5 fails", cause.getMessage());
}

assertEquals("No lock should be refused.", 0, refusedLocks.get());
sequentialChecker.assertMaxParallelTaskCount(1);

// future at index 5 should have failed.
assertTrue(futures.get(5).isCompletedExceptionally());
Expand All @@ -130,6 +128,8 @@ public void aFailingTaskDoesNotCrashTheExecutor() throws Exception {
assertFalse(futures.get(9).isCompletedExceptionally());
assertFalse(futures.get(10).isCompletedExceptionally());
assertFalse(futures.get(11).isCompletedExceptionally());

assertAllTasksExecutedOnSequencedExecutor();
}

@Test
Expand All @@ -145,7 +145,7 @@ public void whenUsedInAFutureItReturnsTheValue() throws Exception {
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
.get(1, TimeUnit.MINUTES);

assertEquals("No lock should be refused.", 0, refusedLocks.get());
sequentialChecker.assertMaxParallelTaskCount(1);

assertEquals(0, futures.get(0).get().intValue());
assertEquals(1, futures.get(1).get().intValue());
Expand All @@ -159,6 +159,8 @@ public void whenUsedInAFutureItReturnsTheValue() throws Exception {
assertEquals(9, futures.get(9).get().intValue());
assertEquals(10, futures.get(10).get().intValue());
assertEquals(11, futures.get(11).get().intValue());

assertAllTasksExecutedOnSequencedExecutor();
}

}
Loading

0 comments on commit 717dbbf

Please sign in to comment.