diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index 05be10dc52f5f..cf49904b26c56 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -421,7 +421,21 @@ public RetryException(Throwable cause) { * @return The timeout enriched future */ public static CompletableFuture orTimeout(CompletableFuture future, long timeout, TimeUnit timeUnit) { - return orTimeout(future, timeout, timeUnit, Executors.directExecutor()); + return orTimeout(future, timeout, timeUnit, Executors.directExecutor(), null); + } + + /** + * Times the given future out after the timeout. + * + * @param future to time out + * @param timeout after which the given future is timed out + * @param timeUnit time unit of the timeout + * @param timeoutMsg timeout message for exception + * @param type of the given future + * @return The timeout enriched future + */ + public static CompletableFuture orTimeout(CompletableFuture future, long timeout, TimeUnit timeUnit, @Nullable String timeoutMsg) { + return orTimeout(future, timeout, timeUnit, Executors.directExecutor(), timeoutMsg); } /** @@ -439,10 +453,30 @@ public static CompletableFuture orTimeout( long timeout, TimeUnit timeUnit, Executor timeoutFailExecutor) { + return orTimeout(future, timeout, timeUnit, timeoutFailExecutor, null); + } + + /** + * Times the given future out after the timeout. + * + * @param future to time out + * @param timeout after which the given future is timed out + * @param timeUnit time unit of the timeout + * @param timeoutFailExecutor executor that will complete the future exceptionally after the timeout is reached + * @param timeoutMsg timeout message for exception + * @param type of the given future + * @return The timeout enriched future + */ + public static CompletableFuture orTimeout( + CompletableFuture future, + long timeout, + TimeUnit timeUnit, + Executor timeoutFailExecutor, + @Nullable String timeoutMsg) { if (!future.isDone()) { final ScheduledFuture timeoutFuture = Delayer.delay( - () -> timeoutFailExecutor.execute(new Timeout(future)), timeout, timeUnit); + () -> timeoutFailExecutor.execute(new Timeout(future, timeoutMsg)), timeout, timeUnit); future.whenComplete((T value, Throwable throwable) -> { if (!timeoutFuture.isDone()) { @@ -1026,14 +1060,16 @@ public static T getWithoutException(CompletableFuture future) { private static final class Timeout implements Runnable { private final CompletableFuture future; + private final String timeoutMsg; - private Timeout(CompletableFuture future) { + private Timeout(CompletableFuture future, @Nullable String timeoutMsg) { this.future = checkNotNull(future); + this.timeoutMsg = timeoutMsg; } @Override public void run() { - future.completeExceptionally(new TimeoutException()); + future.completeExceptionally(new TimeoutException(timeoutMsg)); } }