Skip to content

Commit

Permalink
IGNITE-17881 OrderingFuture should notify dependents asynchronously (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
rpuch authored and lowka committed Apr 19, 2023
1 parent 7badbb0 commit b8db160
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@
* A little analogue of {@link CompletableFuture} that has the following property: callbacks (like {@link #whenComplete(BiConsumer)}
* and {@link #thenComposeToCompletable(Function)}) are invoked in the same order in which they were registered.
*
* <p>For completion methods ({@link #complete(Object)} and {@link #completeExceptionally(Throwable)} it is guaranteed that,
* upon returning, the caller will see the completion values (for example, using {@link #getNow(Object)}, UNLESS the
* completion method is interrupted.
*
* <p>Callbacks are invoked asynchronously relative to completion. This means that completer may exit the completion method
* before the callbacks are invoked.
*
* @param <T> Type of payload.
* @see CompletableFuture
*/
Expand All @@ -58,9 +65,9 @@ public class OrderingFuture<T> {
private final AtomicBoolean completionStarted = new AtomicBoolean(false);

/**
* Used by {@link #get(long, TimeUnit)} to wait for completion.
* Used by {@link #get(long, TimeUnit)} to wait for the moment when completion values are available.
*/
private final CountDownLatch completionLatch = new CountDownLatch(1);
private final CountDownLatch completionValuesReadyLatch = new CountDownLatch(1);

/**
* Creates an incomplete future.
Expand Down Expand Up @@ -140,20 +147,56 @@ private void completeInternal(@Nullable T result, @Nullable Throwable ex) {
if (!completionStarted.compareAndSet(false, true)) {
// Someone has already started the completion. We must leave as the following code can produce duplicate
// notifications of dependents if executed by more than one thread.

// But let's wait for completion first as it would be strange if someone calls completion and then manages
// to see that getNow() returns the fallback value.
waitForCompletionValuesVisibility();

return;
}

State<T> prevState;
ListNode<T> lastNotifiedNode = null;
assert state.phase == Phase.INCOMPLETE;

switchToNotifyingStage(result, ex);

assert state.phase == Phase.NOTIFYING;

completionValuesReadyLatch.countDown();

completeNotificationStage(result, ex);

assert state.phase == Phase.COMPLETED;
}

private void waitForCompletionValuesVisibility() {
try {
completionValuesReadyLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private void switchToNotifyingStage(@Nullable T result, @Nullable Throwable ex) {
while (true) {
prevState = state;
State<T> prevState = state;

if (state.completed) {
return;
// We can only compete with threads adding dependents in this loop, so we don't need to check the phase, it's INCOMPLETE.

State<T> newState = prevState.switchToNotifying(result, ex);

if (replaceState(prevState, newState)) {
break;
}
}
}

State<T> newState = new State<>(true, result, ex, null);
private void completeNotificationStage(@Nullable T result, @Nullable Throwable ex) {
ListNode<T> lastNotifiedNode = null;

while (true) {
State<T> prevState = state;

State<T> newState = prevState.switchToCompleted();

// We produce side-effects inside the retry loop, but it's ok as the queue can only grow, the queue
// state we see is always a prefix of a queue changed by a competitor (we only compete with operations
Expand All @@ -166,8 +209,6 @@ private void completeInternal(@Nullable T result, @Nullable Throwable ex) {
break;
}
}

completionLatch.countDown();
}

/**
Expand Down Expand Up @@ -223,7 +264,7 @@ public void whenComplete(BiConsumer<? super T, ? super Throwable> action) {
while (true) {
State<T> prevState = state;

if (prevState.completed) {
if (prevState.completionQueueProcessed()) {
acceptQuietly(action, prevState.result, prevState.exception);
return;
}
Expand Down Expand Up @@ -261,7 +302,7 @@ public <U> CompletableFuture<U> thenComposeToCompletable(Function<? super T, ? e
while (true) {
State<T> prevState = state;

if (prevState.completed) {
if (prevState.completionQueueProcessed()) {
if (prevState.exception != null) {
return CompletableFuture.failedFuture(wrapWithCompletionException(prevState.exception));
} else {
Expand Down Expand Up @@ -304,7 +345,7 @@ private static <T, U> CompletableFuture<U> applyMapper(Function<? super T, ? ext
public T getNow(T valueIfAbsent) {
State<T> currentState = state;

if (currentState.completed) {
if (currentState.completionValuesAvailable()) {
if (currentState.exception != null) {
throw wrapWithCompletionException(currentState.exception);
} else {
Expand All @@ -329,7 +370,7 @@ public T getNow(T valueIfAbsent) {
* @see CompletableFuture#get(long, TimeUnit)
*/
public T get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException {
boolean completedInTime = completionLatch.await(timeout, unit);
boolean completedInTime = completionValuesReadyLatch.await(timeout, unit);
if (!completedInTime) {
throw new TimeoutException();
}
Expand Down Expand Up @@ -432,27 +473,43 @@ public void onCompletion(T result, Throwable ex) {
}

private static class State<T> {
private static final State<?> EMPTY_STATE = new State<>(false, null, null, null);
private static final State<?> INCOMPLETE_STATE = new State<>(Phase.INCOMPLETE, null, null, null);

private final boolean completed;
private final Phase phase;
private final T result;
private final Throwable exception;
private final ListNode<T> dependentsQueueTail;

private State(boolean completed, T result, Throwable exception, ListNode<T> dependentsQueueTail) {
this.completed = completed;
private State(Phase phase, T result, Throwable exception, ListNode<T> dependentsQueueTail) {
this.phase = phase;
this.result = result;
this.exception = exception;
this.dependentsQueueTail = dependentsQueueTail;
}

@SuppressWarnings("unchecked")
private static <T> State<T> empty() {
return (State<T>) EMPTY_STATE;
return (State<T>) INCOMPLETE_STATE;
}

public boolean completionValuesAvailable() {
return phase != Phase.INCOMPLETE;
}

public boolean completionQueueProcessed() {
return phase == Phase.COMPLETED;
}

public State<T> switchToNotifying(T completionResult, Throwable completionCause) {
return new State<>(Phase.NOTIFYING, completionResult, completionCause, dependentsQueueTail);
}

public State<T> switchToCompleted() {
return new State<>(Phase.COMPLETED, result, exception, null);
}

public State<T> enqueueDependent(DependentAction<T> dependent) {
return new State<>(completed, result, exception, new ListNode<>(dependent, dependentsQueueTail));
return new State<>(phase, result, exception, new ListNode<>(dependent, dependentsQueueTail));
}
}

Expand Down Expand Up @@ -484,4 +541,22 @@ public void notifyHeadToTail(T result, Throwable exception, ListNode<T> lastNoti
}
}
}

private enum Phase {
/**
* Future is not complete, completion values are not known, callbacks are not invoked. Callbacks added in this phase
* are enqueued for a later invocation.
*/
INCOMPLETE,
/**
* Future is half-complete (completion values are available for the outside world), but callbacks are not yet invoked
* (but they are probably being invoked right now). Callbacks added in this phase are enqueued for a later invocation.
*/
NOTIFYING,
/**
* Future is fully completed: completion values are available, callbacks are invoked. In this phase, new callbacks
* are invoked immediately instead of being enqueued.
*/
COMPLETED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ private void executeInParallel(Runnable task1, Runnable task2) throws Interrupte
thread1.start();
thread2.start();

thread1.join();
thread2.join();
try {
thread1.join();
thread2.join();
} finally {
thread1.interrupt();
thread2.interrupt();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -264,6 +265,38 @@ void whenCompleteSwallowsExceptionThrownByActionOnAlreadyFailedFuture() {
}));
}

@Test
void whenCompleteSeesCompletionEffectsImmediatelyWithGetNow() {
OrderingFuture<Integer> future = new OrderingFuture<>();
AtomicInteger intHolder = new AtomicInteger();

future.whenComplete((res, ex) -> intHolder.set(future.getNow(999)));

future.complete(1);

assertThat(intHolder.get(), is(1));
}

@Test
void whenCompleteSeesCompletionEffectsImmediatelyWithGetWithTimeout() {
OrderingFuture<Integer> future = new OrderingFuture<>();
AtomicInteger intHolder = new AtomicInteger();

future.whenComplete((res, ex) -> {
try {
intHolder.set(future.get(0, TimeUnit.MILLISECONDS));
} catch (TimeoutException e) {
intHolder.set(999);
} catch (InterruptedException | ExecutionException e) {
fail("Unexpected exception", e);
}
});

future.complete(1);

assertThat(intHolder.get(), is(1));
}

@Test
void composeToCompletablePropagatesResultFromAlreadyCompletedFuture() {
OrderingFuture<Integer> orderingFuture = OrderingFuture.completedFuture(3);
Expand Down Expand Up @@ -375,6 +408,44 @@ void composeToCompletableWrapsCancellationExceptionInCompletionException() {
assertThat(causeRef.get().getCause(), is(cancellationException));
}


@Test
void composeToCompletableSeesCompletionEffectsImmediatelyWithGetNow() {
OrderingFuture<Integer> future = new OrderingFuture<>();
AtomicInteger intHolder = new AtomicInteger();

future.thenComposeToCompletable(x -> {
intHolder.set(future.getNow(999));
return CompletableFuture.completedFuture(null);
});

future.complete(1);

assertThat(intHolder.get(), is(1));
}

@Test
void composeToCompletableSeesCompletionEffectsImmediatelyWithGetWithTimeout() {
OrderingFuture<Integer> future = new OrderingFuture<>();
AtomicInteger intHolder = new AtomicInteger();

future.thenComposeToCompletable(x -> {
try {
intHolder.set(future.get(0, TimeUnit.MILLISECONDS));
} catch (TimeoutException e) {
intHolder.set(999);
} catch (InterruptedException | ExecutionException e) {
fail("Unexpected exception", e);
}

return CompletableFuture.completedFuture(null);
});

future.complete(1);

assertThat(intHolder.get(), is(1));
}

@Test
void getNowReturnsCompletionValueFromCompletedFuture() {
OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
Expand Down

0 comments on commit b8db160

Please sign in to comment.