Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-17881 OrderingFuture should notify dependents asynchronously #1198

Merged
merged 3 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@
* A little analogue of {@link CompletableFuture} that has the following property: callbacks (like {@link #whenComplete(BiConsumer)}
* and {@link #thenComposeToCompletable(Function)}) are invoked in the same order in which they were registered.
*
* <p>For completion methods ({@link #complete(Object)} and {@link #completeExceptionally(Throwable)} it is guaranteed that,
* upon returning, the caller will see the completion values (for example, using {@link #getNow(Object)}, UNLESS the
* completion method is interrupted.
*
* <p>Callbacks are invoked asynchronously relative to completion. This means that completer may exit the completion method
* before the callbacks are invoked.
*
* @param <T> Type of payload.
* @see CompletableFuture
*/
Expand All @@ -58,9 +65,9 @@ public class OrderingFuture<T> {
private final AtomicBoolean completionStarted = new AtomicBoolean(false);

/**
* Used by {@link #get(long, TimeUnit)} to wait for completion.
* Used by {@link #get(long, TimeUnit)} to wait for the moment when completion values are available.
*/
private final CountDownLatch completionLatch = new CountDownLatch(1);
private final CountDownLatch completionValuesReadyLatch = new CountDownLatch(1);

/**
* Creates an incomplete future.
Expand Down Expand Up @@ -140,20 +147,56 @@ private void completeInternal(@Nullable T result, @Nullable Throwable ex) {
if (!completionStarted.compareAndSet(false, true)) {
// Someone has already started the completion. We must leave as the following code can produce duplicate
// notifications of dependents if executed by more than one thread.

// But let's wait for completion first as it would be strange if someone calls completion and then manages
// to see that getNow() returns the fallback value.
waitForCompletionValuesVisibility();

return;
}

State<T> prevState;
ListNode<T> lastNotifiedNode = null;
assert state.phase == Phase.INCOMPLETE;

switchToNotifyingStage(result, ex);

assert state.phase == Phase.NOTIFYING;

completionValuesReadyLatch.countDown();

completeNotificationStage(result, ex);

assert state.phase == Phase.COMPLETED;
}

private void waitForCompletionValuesVisibility() {
try {
completionValuesReadyLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private void switchToNotifyingStage(@Nullable T result, @Nullable Throwable ex) {
while (true) {
prevState = state;
State<T> prevState = state;

if (state.completed) {
return;
// We can only compete with threads adding dependents in this loop, so we don't need to check the phase, it's INCOMPLETE.

State<T> newState = prevState.switchToNotifying(result, ex);

if (replaceState(prevState, newState)) {
break;
}
}
}

State<T> newState = new State<>(true, result, ex, null);
private void completeNotificationStage(@Nullable T result, @Nullable Throwable ex) {
ListNode<T> lastNotifiedNode = null;

while (true) {
State<T> prevState = state;

State<T> newState = prevState.switchToCompleted();

// We produce side-effects inside the retry loop, but it's ok as the queue can only grow, the queue
// state we see is always a prefix of a queue changed by a competitor (we only compete with operations
Expand All @@ -166,8 +209,6 @@ private void completeInternal(@Nullable T result, @Nullable Throwable ex) {
break;
}
}

completionLatch.countDown();
}

/**
Expand Down Expand Up @@ -223,7 +264,7 @@ public void whenComplete(BiConsumer<? super T, ? super Throwable> action) {
while (true) {
State<T> prevState = state;

if (prevState.completed) {
if (prevState.completionQueueProcessed()) {
acceptQuietly(action, prevState.result, prevState.exception);
return;
}
Expand Down Expand Up @@ -261,7 +302,7 @@ public <U> CompletableFuture<U> thenComposeToCompletable(Function<? super T, ? e
while (true) {
State<T> prevState = state;

if (prevState.completed) {
if (prevState.completionQueueProcessed()) {
if (prevState.exception != null) {
return CompletableFuture.failedFuture(wrapWithCompletionException(prevState.exception));
} else {
Expand Down Expand Up @@ -304,7 +345,7 @@ private static <T, U> CompletableFuture<U> applyMapper(Function<? super T, ? ext
public T getNow(T valueIfAbsent) {
State<T> currentState = state;

if (currentState.completed) {
if (currentState.completionValuesAvailable()) {
if (currentState.exception != null) {
throw wrapWithCompletionException(currentState.exception);
} else {
Expand All @@ -329,7 +370,7 @@ public T getNow(T valueIfAbsent) {
* @see CompletableFuture#get(long, TimeUnit)
*/
public T get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException {
boolean completedInTime = completionLatch.await(timeout, unit);
boolean completedInTime = completionValuesReadyLatch.await(timeout, unit);
if (!completedInTime) {
throw new TimeoutException();
}
Expand Down Expand Up @@ -432,27 +473,43 @@ public void onCompletion(T result, Throwable ex) {
}

private static class State<T> {
private static final State<?> EMPTY_STATE = new State<>(false, null, null, null);
private static final State<?> INCOMPLETE_STATE = new State<>(Phase.INCOMPLETE, null, null, null);

private final boolean completed;
private final Phase phase;
private final T result;
private final Throwable exception;
private final ListNode<T> dependentsQueueTail;

private State(boolean completed, T result, Throwable exception, ListNode<T> dependentsQueueTail) {
this.completed = completed;
private State(Phase phase, T result, Throwable exception, ListNode<T> dependentsQueueTail) {
this.phase = phase;
this.result = result;
this.exception = exception;
this.dependentsQueueTail = dependentsQueueTail;
}

@SuppressWarnings("unchecked")
private static <T> State<T> empty() {
return (State<T>) EMPTY_STATE;
return (State<T>) INCOMPLETE_STATE;
}

public boolean completionValuesAvailable() {
return phase != Phase.INCOMPLETE;
}

public boolean completionQueueProcessed() {
return phase == Phase.COMPLETED;
}

public State<T> switchToNotifying(T completionResult, Throwable completionCause) {
return new State<>(Phase.NOTIFYING, completionResult, completionCause, dependentsQueueTail);
}

public State<T> switchToCompleted() {
return new State<>(Phase.COMPLETED, result, exception, null);
}

public State<T> enqueueDependent(DependentAction<T> dependent) {
return new State<>(completed, result, exception, new ListNode<>(dependent, dependentsQueueTail));
return new State<>(phase, result, exception, new ListNode<>(dependent, dependentsQueueTail));
}
}

Expand Down Expand Up @@ -484,4 +541,22 @@ public void notifyHeadToTail(T result, Throwable exception, ListNode<T> lastNoti
}
}
}

private enum Phase {
/**
* Future is not complete, completion values are not known, callbacks are not invoked. Callbacks added in this phase
* are enqueued for a later invocation.
*/
INCOMPLETE,
/**
* Future is half-complete (completion values are available for the outside world), but callbacks are not yet invoked
* (but they are probably being invoked right now). Callbacks added in this phase are enqueued for a later invocation.
*/
NOTIFYING,
/**
* Future is fully completed: completion values are available, callbacks are invoked. In this phase, new callbacks
* are invoked immediately instead of being enqueued.
*/
COMPLETED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ private void executeInParallel(Runnable task1, Runnable task2) throws Interrupte
thread1.start();
thread2.start();

thread1.join();
thread2.join();
try {
thread1.join();
thread2.join();
} finally {
thread1.interrupt();
thread2.interrupt();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -264,6 +265,38 @@ void whenCompleteSwallowsExceptionThrownByActionOnAlreadyFailedFuture() {
}));
}

@Test
void whenCompleteSeesCompletionEffectsImmediatelyWithGetNow() {
OrderingFuture<Integer> future = new OrderingFuture<>();
AtomicInteger intHolder = new AtomicInteger();

future.whenComplete((res, ex) -> intHolder.set(future.getNow(999)));

future.complete(1);

assertThat(intHolder.get(), is(1));
}

@Test
void whenCompleteSeesCompletionEffectsImmediatelyWithGetWithTimeout() {
OrderingFuture<Integer> future = new OrderingFuture<>();
AtomicInteger intHolder = new AtomicInteger();

future.whenComplete((res, ex) -> {
try {
intHolder.set(future.get(0, TimeUnit.MILLISECONDS));
} catch (TimeoutException e) {
intHolder.set(999);
} catch (InterruptedException | ExecutionException e) {
fail("Unexpected exception", e);
}
});

future.complete(1);

assertThat(intHolder.get(), is(1));
}

@Test
void composeToCompletablePropagatesResultFromAlreadyCompletedFuture() {
OrderingFuture<Integer> orderingFuture = OrderingFuture.completedFuture(3);
Expand Down Expand Up @@ -375,6 +408,44 @@ void composeToCompletableWrapsCancellationExceptionInCompletionException() {
assertThat(causeRef.get().getCause(), is(cancellationException));
}


@Test
void composeToCompletableSeesCompletionEffectsImmediatelyWithGetNow() {
OrderingFuture<Integer> future = new OrderingFuture<>();
AtomicInteger intHolder = new AtomicInteger();

future.thenComposeToCompletable(x -> {
intHolder.set(future.getNow(999));
return CompletableFuture.completedFuture(null);
});

future.complete(1);

assertThat(intHolder.get(), is(1));
}

@Test
void composeToCompletableSeesCompletionEffectsImmediatelyWithGetWithTimeout() {
OrderingFuture<Integer> future = new OrderingFuture<>();
AtomicInteger intHolder = new AtomicInteger();

future.thenComposeToCompletable(x -> {
try {
intHolder.set(future.get(0, TimeUnit.MILLISECONDS));
} catch (TimeoutException e) {
intHolder.set(999);
} catch (InterruptedException | ExecutionException e) {
fail("Unexpected exception", e);
}

return CompletableFuture.completedFuture(null);
});

future.complete(1);

assertThat(intHolder.get(), is(1));
}

@Test
void getNowReturnsCompletionValueFromCompletedFuture() {
OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
Expand Down