Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-18638][runtime] FutureUtils support timeout message #12932

Merged
merged 2 commits into from Jul 24, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -421,7 +421,21 @@ public RetryException(Throwable cause) {
* @return The timeout enriched future
*/
public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
return orTimeout(future, timeout, timeUnit, Executors.directExecutor());
return orTimeout(future, timeout, timeUnit, Executors.directExecutor(), null);
}

/**
* Times the given future out after the timeout.
*
* @param future to time out
* @param timeout after which the given future is timed out
* @param timeUnit time unit of the timeout
* @param timeoutMsg timeout message for exception
* @param <T> type of the given future
* @return The timeout enriched future
*/
public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit, @Nullable String timeoutMsg) {
return orTimeout(future, timeout, timeUnit, Executors.directExecutor(), timeoutMsg);
}

/**
Expand All @@ -439,10 +453,30 @@ public static <T> CompletableFuture<T> orTimeout(
long timeout,
TimeUnit timeUnit,
Executor timeoutFailExecutor) {
return orTimeout(future, timeout, timeUnit, timeoutFailExecutor, null);
}

/**
* Times the given future out after the timeout.
*
* @param future to time out
* @param timeout after which the given future is timed out
* @param timeUnit time unit of the timeout
* @param timeoutFailExecutor executor that will complete the future exceptionally after the timeout is reached
* @param timeoutMsg timeout message for exception
* @param <T> type of the given future
* @return The timeout enriched future
*/
public static <T> CompletableFuture<T> orTimeout(
CompletableFuture<T> future,
long timeout,
TimeUnit timeUnit,
Executor timeoutFailExecutor,
@Nullable String timeoutMsg) {

if (!future.isDone()) {
final ScheduledFuture<?> timeoutFuture = Delayer.delay(
() -> timeoutFailExecutor.execute(new Timeout(future)), timeout, timeUnit);
() -> timeoutFailExecutor.execute(new Timeout(future, timeoutMsg)), timeout, timeUnit);

future.whenComplete((T value, Throwable throwable) -> {
if (!timeoutFuture.isDone()) {
Expand Down Expand Up @@ -1026,14 +1060,16 @@ public static <T> T getWithoutException(CompletableFuture<T> future) {
private static final class Timeout implements Runnable {

private final CompletableFuture<?> future;
private final String timeoutMsg;

private Timeout(CompletableFuture<?> future) {
private Timeout(CompletableFuture<?> future, @Nullable String timeoutMsg) {
this.future = checkNotNull(future);
this.timeoutMsg = timeoutMsg;
}

@Override
public void run() {
future.completeExceptionally(new TimeoutException());
future.completeExceptionally(new TimeoutException(timeoutMsg));
}
}

Expand Down