Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes AbstractInvocationFuture#thenApply #17518

Merged
merged 2 commits into from Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1439,7 +1439,13 @@ public void execute(@Nonnull Executor executor, Object value) {
return;
}
try {
executor.execute(() -> future.complete(function.apply((V) value)));
executor.execute(() -> {
try {
future.complete(function.apply((V) value));
} catch (Throwable t) {
future.completeExceptionally(t);
}
});
} catch (RejectedExecutionException e) {
future.completeExceptionally(wrapToInstanceNotActiveException(e));
throw e;
Expand Down
Expand Up @@ -161,6 +161,31 @@ public void thenAcceptAsync_exceptional() {
chained.join();
}

@Test
public void thenAccept_onCompletedFuture_whenActionThrowsException() {
thenAccept_whenActionThrowsException(0L);
}

@Test
public void thenAccept_onIncompleteFuture_whenActionThrowsException() {
thenAccept_whenActionThrowsException(1000L);
}

private void thenAccept_whenActionThrowsException(long completionDelay) {
CompletableFuture<Object> future = newCompletableFuture(false, completionDelay);
CompletableFuture<Void> chained = future.thenAccept(value -> {
throw new ExpectedRuntimeException();
});

assertTrueEventually(() -> assertTrue(future.isDone()));
assertTrueEventually(() -> assertTrue(chained.isDone()));
assertFalse(future.isCompletedExceptionally());
assertTrue(chained.isCompletedExceptionally());
expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
chained.join();
}

@Test
public void thenApply_whenCompletedFuture() {
CompletionStage<Object> future = newCompletableFuture(false, 0L);
Expand Down Expand Up @@ -258,6 +283,31 @@ public void thenApplyAsync_exceptional() {
chained.join();
}

@Test
public void thenApply_onCompletedFuture_whenActionThrowsException() {
thenApply_whenActionThrowsException(0L);
}

@Test
public void thenApply_onIncompleteFuture_whenActionThrowsException() {
thenApply_whenActionThrowsException(1000L);
}

public void thenApply_whenActionThrowsException(long completionDelay) {
CompletableFuture<Object> future = newCompletableFuture(false, completionDelay);
CompletableFuture<Void> chained = future.thenApply(value -> {
throw new ExpectedRuntimeException();
});

assertTrueEventually(() -> assertTrue(future.isDone()));
assertTrueEventually(() -> assertTrue(chained.isDone()));
assertFalse(future.isCompletedExceptionally());
assertTrue(chained.isCompletedExceptionally());
expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
chained.join();
}

@Test
public void thenRun_whenCompletedFuture() {
CompletionStage<Object> future = newCompletableFuture(false, 0L);
Expand Down Expand Up @@ -342,7 +392,7 @@ public void thenRunAsync_exceptional() {
}

@Test
public void thenRun_whenExceptionThrownFromRunnable() {
public void thenRun_whenActionThrowsException() {
CompletableFuture<Object> future = newCompletableFuture(false, 0L);
CompletableFuture<Void> chained = future.thenRun(() -> {
throw new IllegalStateException();
Expand All @@ -355,6 +405,31 @@ public void thenRun_whenExceptionThrownFromRunnable() {
chained.join();
}

@Test
public void thenRun_onCompletedFuture_whenActionThrowsException() {
thenRun_whenActionThrowsException(0L);
}

@Test
public void thenRun_onIncompleteFuture_whenActionThrowsException() {
thenRun_whenActionThrowsException(1000L);
}

private void thenRun_whenActionThrowsException(long completionDelay) {
CompletableFuture<Object> future = newCompletableFuture(false, completionDelay);
CompletableFuture<Void> chained = future.thenRun(() -> {
throw new ExpectedRuntimeException();
});

assertTrueEventually(() -> assertTrue(future.isDone()));
assertTrueEventually(() -> assertTrue(chained.isDone()));
assertFalse(future.isCompletedExceptionally());
assertTrue(chained.isCompletedExceptionally());
expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
chained.join();
}

@Test
public void whenComplete_whenCompletedFuture() {
CompletableFuture<Object> future = newCompletableFuture(false, 0L);
Expand Down Expand Up @@ -1044,6 +1119,29 @@ public void applyToEitherAsync_whenExecutionRejected() {
future.applyToEitherAsync(newCompletedFuture(null), v -> null, REJECTING_EXECUTOR).join();
}

@Test
public void applyToEitherAsync_onIncompleteFuture_whenExecutionRejected() {
CompletableFuture<Object> future = newCompletableFuture(false, 1000L);

expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(RejectedExecutionException.class));
future.applyToEitherAsync(newCompletedFuture(null), v -> null, REJECTING_EXECUTOR).join();
}

@Test
public void applyToEither_whenActionThrowsException() {
CompletableFuture<Object> future = newCompletableFuture(false, 1000L);
CompletableFuture<Long> nextStage = future.applyToEither(new CompletableFuture<Long>(),
(v) -> {
throw new ExpectedRuntimeException();
});

assertTrueEventually(() -> assertTrue(nextStage.isDone()));
expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
nextStage.join();
}

@Test
public void acceptEither() {
CompletableFuture<Object> future = newCompletableFuture(false, 0L);
Expand All @@ -1065,12 +1163,17 @@ public void acceptEitherAsync() {
}

@Test
public void applyToEitherAsync_onIncompleteFuture_whenExecutionRejected() {
public void acceptEither_whenActionThrowsException() {
CompletableFuture<Object> future = newCompletableFuture(false, 1000L);
CompletableFuture<Void> nextStage = future.acceptEither(new CompletableFuture<>(),
(v) -> {
throw new ExpectedRuntimeException();
});

assertTrueEventually(() -> assertTrue(nextStage.isDone()));
expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(RejectedExecutionException.class));
future.applyToEitherAsync(newCompletedFuture(null), v -> null, REJECTING_EXECUTOR).join();
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
nextStage.join();
}

@Test
Expand Down Expand Up @@ -1111,6 +1214,17 @@ public void runAfterBothAsync_onIncompleteFuture_whenExecutionRejected() {
future.runAfterBothAsync(newCompletedFuture(null), () -> ignore(), REJECTING_EXECUTOR).join();
}

@Test
public void runAfterBoth_whenActionThrowsException() {
CompletableFuture<Object> future = newCompletableFuture(false, 1000L);

expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
future.runAfterBoth(newCompletedFuture(null), () -> {
throw new ExpectedRuntimeException();
}).join();
}

@Test
public void runAfterEither() {
CompletableFuture<Object> future = newCompletableFuture(false, 0L);
Expand Down Expand Up @@ -1147,6 +1261,17 @@ public void runAfterEitherAsync_onIncompleteFuture_whenExecutionRejected() {
future.runAfterEitherAsync(newCompletedFuture(null), () -> ignore(), REJECTING_EXECUTOR).join();
}

@Test
public void runAfterEither_whenActionThrowsException() {
CompletableFuture<Object> future = newCompletableFuture(false, 1000L);

expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
future.runAfterEither(newCompletedFuture(null), () -> {
throw new ExpectedRuntimeException();
}).join();
}

@Test
public void thenAcceptBoth() {
CompletableFuture<Object> future = newCompletableFuture(false, 0L);
Expand Down Expand Up @@ -1183,6 +1308,18 @@ public void thenAcceptBothAsync_onIncompleteFuture_whenExecutionRejected() {
future.thenAcceptBothAsync(newCompletedFuture(null), (v, u) -> ignore(), REJECTING_EXECUTOR).join();
}

@Test
public void thenAcceptBoth_whenActionThrowsException() {
CompletableFuture<Object> future = newCompletableFuture(false, 1000L);

expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
future.thenAcceptBoth(newCompletedFuture(null),
(v, u) -> {
throw new ExpectedRuntimeException();
}).join();
}

@Test
public void thenCombine() {
CompletableFuture<Object> future = newCompletableFuture(false, 0L);
Expand Down Expand Up @@ -1221,6 +1358,17 @@ public void thenCombineAsync_onIncompleteFuture_whenExecutionRejected() {
future.thenCombineAsync(newCompletedFuture(null), (v, u) -> null, REJECTING_EXECUTOR).join();
}

@Test
public void thenCombine_whenActionThrowsException() {
CompletableFuture<Object> future = newCompletableFuture(false, 1000L);

expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
future.thenCombine(newCompletedFuture(null), (t, u) -> {
throw new ExpectedRuntimeException();
}).join();
}

@Test
public void thenComposeAsync_whenExecutionRejected() {
CompletableFuture<Object> future = newCompletableFuture(false, 0L);
Expand All @@ -1239,6 +1387,17 @@ public void thenComposeAsync_onIncompleteFuture_whenExecutionRejected() {
future.thenComposeAsync(v -> newCompletedFuture(null), REJECTING_EXECUTOR).join();
}

@Test
public void thenCompose_whenActionThrowsException() {
CompletableFuture<Object> future = newCompletableFuture(false, 1000L);

expectedException.expect(CompletionException.class);
expectedException.expectCause(new RootCauseMatcher(ExpectedRuntimeException.class));
future.thenCompose(v -> {
throw new ExpectedRuntimeException();
}).join();
}

public static class RejectingExecutor implements Executor {

@Override
Expand Down
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.spi.impl;

import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.serialization.impl.DefaultSerializationServiceBuilder;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
public class DelegatingCompletableFuture_WrappingInternalCompletableFutureTest
extends InternalCompletableFutureTest {

private final InternalSerializationService serializationService
= new DefaultSerializationServiceBuilder().build();

@Override
protected InternalCompletableFuture<Object> newCompletableFuture(boolean exceptional, long completeAfterMillis) {
return new DelegatingCompletableFuture<>(serializationService,
super.newCompletableFuture(exceptional, completeAfterMillis));
}
}
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.spi.impl;

import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.serialization.impl.DefaultSerializationServiceBuilder;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture_CompletionStageTest;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
public class DelegatingCompletableFuture_WrappingInvocationFutureTest
extends InvocationFuture_CompletionStageTest {

private final InternalSerializationService serializationService
= new DefaultSerializationServiceBuilder().build();

@Override
protected InternalCompletableFuture<Object> newCompletableFuture(boolean exceptional, long completeAfterMillis) {
return new DelegatingCompletableFuture<>(serializationService,
super.newCompletableFuture(exceptional, completeAfterMillis));
}
}