Skip to content

Commit

Permalink
Fixes AbstractInvocationFuture#thenApply (#17518)
Browse files Browse the repository at this point in the history
Fixes AbstractInvocationFuture#thenApply

When a user function is registered on a not-yet
complete future, the ApplyNode would not catch
an exception thrown from the user function. Now it
is caught and completes exceptionally the returned
CompletionStage.
  • Loading branch information
vbekiaris committed Sep 11, 2020
1 parent 098f293 commit cbd52fa
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1439,7 +1439,13 @@ public void execute(@Nonnull Executor executor, Object value) {
return;
}
try {
executor.execute(() -> future.complete(function.apply((V) value)));
executor.execute(() -> {
try {
future.complete(function.apply((V) value));
} catch (Throwable t) {
future.completeExceptionally(t);
}
});
} catch (RejectedExecutionException e) {
future.completeExceptionally(wrapToInstanceNotActiveException(e));
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,31 @@ public void thenAcceptAsync_exceptional() {
chained.join();
}

@Test
public void thenAccept_onCompletedFuture_whenActionThrowsException() {
thenAccept_whenActionThrowsException(0L);
}

@Test
public void thenAccept_onIncompleteFuture_whenActionThrowsException() {
thenAccept_whenActionThrowsException(1000L);
}

private void thenAccept_whenActionThrowsException(long completionDelay) {
CompletableFuture<Object> future = newCompletableFuture(false, completionDelay);
CompletableFuture<Void> chained = future.thenAccept(value -> {
throw new ExpectedRuntimeException();
});

assertTrueEventually(() -> assertTrue(future.isDone()));
assertTrueEventually(() -> assertTrue(chained.isDone()));
assertFalse(future.isCompletedExceptionally());
assertTrue(chained.isCompletedExceptionally());
expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
chained.join();
}

@Test
public void thenApply_whenCompletedFuture() {
CompletionStage<Object> future = newCompletableFuture(false, 0L);
Expand Down Expand Up @@ -258,6 +283,31 @@ public void thenApplyAsync_exceptional() {
chained.join();
}

@Test
public void thenApply_onCompletedFuture_whenActionThrowsException() {
thenApply_whenActionThrowsException(0L);
}

@Test
public void thenApply_onIncompleteFuture_whenActionThrowsException() {
thenApply_whenActionThrowsException(1000L);
}

public void thenApply_whenActionThrowsException(long completionDelay) {
CompletableFuture<Object> future = newCompletableFuture(false, completionDelay);
CompletableFuture<Void> chained = future.thenApply(value -> {
throw new ExpectedRuntimeException();
});

assertTrueEventually(() -> assertTrue(future.isDone()));
assertTrueEventually(() -> assertTrue(chained.isDone()));
assertFalse(future.isCompletedExceptionally());
assertTrue(chained.isCompletedExceptionally());
expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
chained.join();
}

@Test
public void thenRun_whenCompletedFuture() {
CompletionStage<Object> future = newCompletableFuture(false, 0L);
Expand Down Expand Up @@ -342,7 +392,7 @@ public void thenRunAsync_exceptional() {
}

@Test
public void thenRun_whenExceptionThrownFromRunnable() {
public void thenRun_whenActionThrowsException() {
CompletableFuture<Object> future = newCompletableFuture(false, 0L);
CompletableFuture<Void> chained = future.thenRun(() -> {
throw new IllegalStateException();
Expand All @@ -355,6 +405,31 @@ public void thenRun_whenExceptionThrownFromRunnable() {
chained.join();
}

@Test
public void thenRun_onCompletedFuture_whenActionThrowsException() {
thenRun_whenActionThrowsException(0L);
}

@Test
public void thenRun_onIncompleteFuture_whenActionThrowsException() {
thenRun_whenActionThrowsException(1000L);
}

private void thenRun_whenActionThrowsException(long completionDelay) {
CompletableFuture<Object> future = newCompletableFuture(false, completionDelay);
CompletableFuture<Void> chained = future.thenRun(() -> {
throw new ExpectedRuntimeException();
});

assertTrueEventually(() -> assertTrue(future.isDone()));
assertTrueEventually(() -> assertTrue(chained.isDone()));
assertFalse(future.isCompletedExceptionally());
assertTrue(chained.isCompletedExceptionally());
expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
chained.join();
}

@Test
public void whenComplete_whenCompletedFuture() {
CompletableFuture<Object> future = newCompletableFuture(false, 0L);
Expand Down Expand Up @@ -1044,6 +1119,29 @@ public void applyToEitherAsync_whenExecutionRejected() {
future.applyToEitherAsync(newCompletedFuture(null), v -> null, REJECTING_EXECUTOR).join();
}

@Test
public void applyToEitherAsync_onIncompleteFuture_whenExecutionRejected() {
CompletableFuture<Object> future = newCompletableFuture(false, 1000L);

expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(RejectedExecutionException.class));
future.applyToEitherAsync(newCompletedFuture(null), v -> null, REJECTING_EXECUTOR).join();
}

@Test
public void applyToEither_whenActionThrowsException() {
CompletableFuture<Object> future = newCompletableFuture(false, 1000L);
CompletableFuture<Long> nextStage = future.applyToEither(new CompletableFuture<Long>(),
(v) -> {
throw new ExpectedRuntimeException();
});

assertTrueEventually(() -> assertTrue(nextStage.isDone()));
expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
nextStage.join();
}

@Test
public void acceptEither() {
CompletableFuture<Object> future = newCompletableFuture(false, 0L);
Expand All @@ -1065,12 +1163,17 @@ public void acceptEitherAsync() {
}

@Test
public void applyToEitherAsync_onIncompleteFuture_whenExecutionRejected() {
public void acceptEither_whenActionThrowsException() {
CompletableFuture<Object> future = newCompletableFuture(false, 1000L);
CompletableFuture<Void> nextStage = future.acceptEither(new CompletableFuture<>(),
(v) -> {
throw new ExpectedRuntimeException();
});

assertTrueEventually(() -> assertTrue(nextStage.isDone()));
expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(RejectedExecutionException.class));
future.applyToEitherAsync(newCompletedFuture(null), v -> null, REJECTING_EXECUTOR).join();
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
nextStage.join();
}

@Test
Expand Down Expand Up @@ -1111,6 +1214,17 @@ public void runAfterBothAsync_onIncompleteFuture_whenExecutionRejected() {
future.runAfterBothAsync(newCompletedFuture(null), () -> ignore(), REJECTING_EXECUTOR).join();
}

@Test
public void runAfterBoth_whenActionThrowsException() {
CompletableFuture<Object> future = newCompletableFuture(false, 1000L);

expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
future.runAfterBoth(newCompletedFuture(null), () -> {
throw new ExpectedRuntimeException();
}).join();
}

@Test
public void runAfterEither() {
CompletableFuture<Object> future = newCompletableFuture(false, 0L);
Expand Down Expand Up @@ -1147,6 +1261,17 @@ public void runAfterEitherAsync_onIncompleteFuture_whenExecutionRejected() {
future.runAfterEitherAsync(newCompletedFuture(null), () -> ignore(), REJECTING_EXECUTOR).join();
}

@Test
public void runAfterEither_whenActionThrowsException() {
CompletableFuture<Object> future = newCompletableFuture(false, 1000L);

expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
future.runAfterEither(newCompletedFuture(null), () -> {
throw new ExpectedRuntimeException();
}).join();
}

@Test
public void thenAcceptBoth() {
CompletableFuture<Object> future = newCompletableFuture(false, 0L);
Expand Down Expand Up @@ -1183,6 +1308,18 @@ public void thenAcceptBothAsync_onIncompleteFuture_whenExecutionRejected() {
future.thenAcceptBothAsync(newCompletedFuture(null), (v, u) -> ignore(), REJECTING_EXECUTOR).join();
}

@Test
public void thenAcceptBoth_whenActionThrowsException() {
CompletableFuture<Object> future = newCompletableFuture(false, 1000L);

expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
future.thenAcceptBoth(newCompletedFuture(null),
(v, u) -> {
throw new ExpectedRuntimeException();
}).join();
}

@Test
public void thenCombine() {
CompletableFuture<Object> future = newCompletableFuture(false, 0L);
Expand Down Expand Up @@ -1221,6 +1358,17 @@ public void thenCombineAsync_onIncompleteFuture_whenExecutionRejected() {
future.thenCombineAsync(newCompletedFuture(null), (v, u) -> null, REJECTING_EXECUTOR).join();
}

@Test
public void thenCombine_whenActionThrowsException() {
CompletableFuture<Object> future = newCompletableFuture(false, 1000L);

expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
future.thenCombine(newCompletedFuture(null), (t, u) -> {
throw new ExpectedRuntimeException();
}).join();
}

@Test
public void thenComposeAsync_whenExecutionRejected() {
CompletableFuture<Object> future = newCompletableFuture(false, 0L);
Expand All @@ -1239,6 +1387,17 @@ public void thenComposeAsync_onIncompleteFuture_whenExecutionRejected() {
future.thenComposeAsync(v -> newCompletedFuture(null), REJECTING_EXECUTOR).join();
}

@Test
public void thenCompose_whenActionThrowsException() {
CompletableFuture<Object> future = newCompletableFuture(false, 1000L);

expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
future.thenCompose(v -> {
throw new ExpectedRuntimeException();
}).join();
}

public static class RejectingExecutor implements Executor {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.spi.impl;

import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.serialization.impl.DefaultSerializationServiceBuilder;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
public class DelegatingCompletableFuture_WrappingInternalCompletableFutureTest
extends InternalCompletableFutureTest {

private final InternalSerializationService serializationService
= new DefaultSerializationServiceBuilder().build();

@Override
protected InternalCompletableFuture<Object> newCompletableFuture(boolean exceptional, long completeAfterMillis) {
return new DelegatingCompletableFuture<>(serializationService,
super.newCompletableFuture(exceptional, completeAfterMillis));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.spi.impl;

import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.serialization.impl.DefaultSerializationServiceBuilder;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture_CompletionStageTest;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
public class DelegatingCompletableFuture_WrappingInvocationFutureTest
extends InvocationFuture_CompletionStageTest {

private final InternalSerializationService serializationService
= new DefaultSerializationServiceBuilder().build();

@Override
protected InternalCompletableFuture<Object> newCompletableFuture(boolean exceptional, long completeAfterMillis) {
return new DelegatingCompletableFuture<>(serializationService,
super.newCompletableFuture(exceptional, completeAfterMillis));
}
}

0 comments on commit cbd52fa

Please sign in to comment.