Skip to content

Commit d3e0201

Browse files
kbuciKrishen Bhan
andauthored
fix(common): FutureUtils:allOf should always throw root cause exception (#18456)
FutureUtils.allOf() has a race condition that causes the original root-cause exception to be silently replaced by a CancellationException, making it impossible to diagnose failures in any code path that uses it — most notably MultipleSparkJobExecutionStrategy.performClustering(), which executes clustering groups in parallel using FutureUtils.allOf(). Fixing the same in this patch --------- Co-authored-by: Krishen Bhan <“bkrishen@uber.com”>
1 parent 12b3a06 commit d3e0201

File tree

2 files changed

+150
-7
lines changed

2 files changed

+150
-7
lines changed

hudi-common/src/main/java/org/apache/hudi/common/util/FutureUtils.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import java.util.List;
2222
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.CompletionException;
24+
import java.util.concurrent.atomic.AtomicReference;
2325
import java.util.stream.Collectors;
2426

2527
/**
@@ -42,24 +44,37 @@ public class FutureUtils {
4244
*/
4345
public static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) {
4446
CompletableFuture<Void> union = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
47+
AtomicReference<Throwable> firstFailure = new AtomicReference<>();
4548

4649
futures.forEach(future -> {
4750
// NOTE: We add a callback to every future, to cancel all the other not yet completed futures,
4851
// which will be providing for an early termination semantic: whenever any of the futures
4952
// fail other futures will be cancelled and the exception will be returned as a result
5053
future.whenComplete((ignored, throwable) -> {
5154
if (throwable != null) {
55+
firstFailure.compareAndSet(null, throwable);
56+
// Note that {@link CompletableFuture#cancel} does not interrupt the other underlying tasks;
57+
// it only marks their futures as cancelled. The tasks will still run to completion.
5258
futures.forEach(f -> f.cancel(true));
53-
union.completeExceptionally(throwable);
5459
}
5560
});
5661
});
5762

58-
return union.thenApply(aVoid ->
59-
futures.stream()
60-
// NOTE: This join wouldn't block, since all the
61-
// futures are completed at this point.
62-
.map(CompletableFuture::join)
63-
.collect(Collectors.toList()));
63+
return union.handle((aVoid, throwable) -> {
64+
Throwable realCause = firstFailure.get();
65+
// Prefer the first real failure captured by the whenComplete callbacks over whatever
66+
// exception the {@link CompletableFuture#allOf} BiRelay propagated, since the BiRelay may
67+
// receive either the original exception or a CancellationException from cancel depending on
68+
// timing. In the unexpected case that realCause is null but throwable is not null, the
69+
// subsequent join call will throw it.
70+
if (realCause != null) {
71+
throw new CompletionException(realCause);
72+
}
73+
// NOTE: This join wouldn't block, since all the
74+
// futures are completed at this point (allOf guarantees this).
75+
return futures.stream()
76+
.map(CompletableFuture::join)
77+
.collect(Collectors.toList());
78+
});
6479
}
6580
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.common.util;
20+
21+
import org.junit.jupiter.api.Test;
22+
23+
import java.io.IOException;
24+
import java.util.Arrays;
25+
import java.util.List;
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.CompletionException;
28+
import java.util.concurrent.CountDownLatch;
29+
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.TimeUnit;
32+
33+
import static org.junit.jupiter.api.Assertions.assertEquals;
34+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
35+
import static org.junit.jupiter.api.Assertions.assertThrows;
36+
import static org.junit.jupiter.api.Assertions.assertTrue;
37+
38+
/**
39+
* Tests for {@link FutureUtils}.
40+
*/
41+
public class TestFutureUtils {
42+
43+
@Test
44+
public void testAllOfSuccess() {
45+
CompletableFuture<String> f1 = CompletableFuture.completedFuture("a");
46+
CompletableFuture<String> f2 = CompletableFuture.completedFuture("b");
47+
CompletableFuture<String> f3 = CompletableFuture.completedFuture("c");
48+
49+
List<String> results = FutureUtils.allOf(Arrays.asList(f1, f2, f3)).join();
50+
assertEquals(Arrays.asList("a", "b", "c"), results);
51+
}
52+
53+
@Test
54+
public void testAllOfSingleFailurePreservesOriginalExceptionAndCancelsOthers() {
55+
IOException originalCause = new IOException("disk failed");
56+
57+
CompletableFuture<String> f1 = new CompletableFuture<>();
58+
CompletableFuture<String> f2 = new CompletableFuture<>();
59+
CompletableFuture<String> f3 = new CompletableFuture<>();
60+
61+
CompletableFuture<List<String>> result = FutureUtils.allOf(Arrays.asList(f1, f2, f3));
62+
63+
f1.completeExceptionally(originalCause);
64+
65+
CompletionException thrown = assertThrows(CompletionException.class, result::join);
66+
assertEquals(originalCause, thrown.getCause(),
67+
"The original IOException should be preserved, not masked by CancellationException");
68+
assertTrue(f2.isCancelled(), "f2 should be cancelled after f1 fails");
69+
assertTrue(f3.isCancelled(), "f3 should be cancelled after f1 fails");
70+
}
71+
72+
/**
73+
* Ensure that the original exception is preserved even when the futures are completed in a concurrent manner.
74+
*/
75+
@Test
76+
public void testAllOfPreservesOriginalExceptionUnderConcurrency() throws Exception {
77+
IOException originalCause = new IOException("disk failed");
78+
ExecutorService executor = Executors.newFixedThreadPool(4);
79+
try {
80+
CountDownLatch allStarted = new CountDownLatch(3);
81+
82+
CompletableFuture<String> f1 = new CompletableFuture<>();
83+
CompletableFuture<String> f2 = new CompletableFuture<>();
84+
CompletableFuture<String> f3 = new CompletableFuture<>();
85+
86+
executor.submit(() -> {
87+
allStarted.countDown();
88+
try {
89+
allStarted.await(5, TimeUnit.SECONDS);
90+
} catch (InterruptedException e) {
91+
Thread.currentThread().interrupt();
92+
}
93+
f1.completeExceptionally(originalCause);
94+
return null;
95+
});
96+
executor.submit(() -> {
97+
allStarted.countDown();
98+
try {
99+
allStarted.await(5, TimeUnit.SECONDS);
100+
Thread.sleep(50);
101+
} catch (InterruptedException e) {
102+
Thread.currentThread().interrupt();
103+
}
104+
f2.complete("b");
105+
return null;
106+
});
107+
executor.submit(() -> {
108+
allStarted.countDown();
109+
try {
110+
allStarted.await(5, TimeUnit.SECONDS);
111+
Thread.sleep(50);
112+
} catch (InterruptedException e) {
113+
Thread.currentThread().interrupt();
114+
}
115+
f3.complete("c");
116+
return null;
117+
});
118+
119+
CompletableFuture<List<String>> result = FutureUtils.allOf(Arrays.asList(f1, f2, f3));
120+
121+
CompletionException thrown = assertThrows(CompletionException.class, result::join);
122+
assertInstanceOf(IOException.class, thrown.getCause(),
123+
"original IOException must be preserved, got: " + thrown.getCause());
124+
} finally {
125+
executor.shutdownNow();
126+
}
127+
}
128+
}

0 commit comments

Comments
 (0)