Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,61 @@ public static <T> CompletableFuture<T> retryWithDelay(
scheduledExecutor);
}

/**
* Schedule the operation with the given delay.
*
* @param operation to schedule
* @param delay delay to schedule
* @param scheduledExecutor executor to be used for the operation
* @return Future which schedules the given operation with given delay.
*/
public static CompletableFuture<Void> scheduleWithDelay(
final Runnable operation,
final Time delay,
final ScheduledExecutor scheduledExecutor) {
Supplier<Void> operationSupplier = () -> {
operation.run();
return null;
};
return scheduleWithDelay(operationSupplier, delay, scheduledExecutor);
}

/**
* Schedule the operation with the given delay.
*
* @param operation to schedule
* @param delay delay to schedule
* @param scheduledExecutor executor to be used for the operation
* @param <T> type of the result
* @return Future which schedules the given operation with given delay.
*/
public static <T> CompletableFuture<T> scheduleWithDelay(
final Supplier<T> operation,
final Time delay,
final ScheduledExecutor scheduledExecutor) {
final CompletableFuture<T> resultFuture = new CompletableFuture<>();

ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
() -> {
try {
resultFuture.complete(operation.get());
} catch (Throwable t) {
resultFuture.completeExceptionally(t);
}
},
delay.getSize(),
delay.getUnit()
);

resultFuture.whenComplete(
(t, throwable) -> {
if (!scheduledFuture.isDone()) {
scheduledFuture.cancel(false);
}
});
return resultFuture;
}

private static <T> void retryOperationWithDelay(
final CompletableFuture<T> resultFuture,
final Supplier<CompletableFuture<T>> operation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,7 @@ public void restart(long expectedGlobalVersion) {

scheduleForExecution();
}
// TODO remove the catch block if we align the schematics to not fail global within the restarter.
catch (Throwable t) {
LOG.warn("Failed to restart the job.", t);
failGlobal(t);
Expand Down Expand Up @@ -1433,8 +1434,13 @@ private boolean tryRestartOrFail(long globalModVersionForRestart) {
LOG.info("Restarting the job {} ({}).", getJobName(), getJobID());

RestartCallback restarter = new ExecutionGraphRestartCallback(this, globalModVersionForRestart);
restartStrategy.restart(restarter, getJobMasterMainThreadExecutor());

FutureUtils.assertNoException(
restartStrategy
.restart(restarter, getJobMasterMainThreadExecutor())
.exceptionally((throwable) -> {
failGlobal(throwable);
return null;
}));
return true;
}
else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -108,16 +109,18 @@ protected void restartTasks(final Set<ExecutionVertexID> verticesToRestart) {

FutureUtils.assertNoException(
cancelTasks(verticesToRestart)
.thenRunAsync(resetAndRescheduleTasks(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor())
.thenComposeAsync(resetAndRescheduleTasks(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor())
.handle(failGlobalOnError()));
}

private Runnable resetAndRescheduleTasks(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) {
final RestartStrategy restartStrategy = executionGraph.getRestartStrategy();
return () -> restartStrategy.restart(
createResetAndRescheduleTasksCallback(globalModVersion, vertexVersions),
executionGraph.getJobMasterMainThreadExecutor()
);
private Function<Object, CompletableFuture<Void>> resetAndRescheduleTasks(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) {
return (ignored) -> {
final RestartStrategy restartStrategy = executionGraph.getRestartStrategy();
return restartStrategy.restart(
createResetAndRescheduleTasksCallback(globalModVersion, vertexVersions),
executionGraph.getJobMasterMainThreadExecutor()
);
};
}

private RestartCallback createResetAndRescheduleTasksCallback(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.util.Preconditions;

import scala.concurrent.duration.Duration;

import java.util.ArrayDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Duration;

/**
* Restart strategy which tries to restart the given {@link ExecutionGraph} when failure rate exceeded
* with a fixed time delay in between.
Expand Down Expand Up @@ -68,18 +70,12 @@ public boolean canRestart() {
}

@Override
public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
public CompletableFuture<Void> restart(final RestartCallback restarter, ScheduledExecutor executor) {
if (isRestartTimestampsQueueFull()) {
restartTimestampsDeque.remove();
}
restartTimestampsDeque.add(System.currentTimeMillis());

executor.schedule(new Runnable() {
@Override
public void run() {
restarter.triggerFullRecovery();
}
}, delayInterval.getSize(), delayInterval.getUnit());
return FutureUtils.scheduleWithDelay(restarter::triggerFullRecovery, delayInterval, executor);
}

private boolean isRestartTimestampsQueueFull() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

package org.apache.flink.runtime.executiongraph.restart;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.util.Preconditions;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;

import scala.concurrent.duration.Duration;

Expand Down Expand Up @@ -60,9 +62,9 @@ public boolean canRestart() {
}

@Override
public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
public CompletableFuture<Void> restart(final RestartCallback restarter, ScheduledExecutor executor) {
currentRestartAttempt++;
executor.schedule(restarter::triggerFullRecovery, delayBetweenRestartAttempts, TimeUnit.MILLISECONDS);
return FutureUtils.scheduleWithDelay(restarter::triggerFullRecovery, Time.milliseconds(delayBetweenRestartAttempts), executor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;

import java.util.concurrent.CompletableFuture;

/**
* Restart strategy which does not restart an {@link ExecutionGraph}.
*/
Expand All @@ -33,7 +35,7 @@ public boolean canRestart() {
}

@Override
public void restart(RestartCallback restarter, ScheduledExecutor executor) {
public CompletableFuture<Void> restart(RestartCallback restarter, ScheduledExecutor executor) {
throw new UnsupportedOperationException("NoRestartStrategy does not support restart.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;

import java.util.concurrent.CompletableFuture;

/**
* Strategy for {@link ExecutionGraph} restarts.
*/
Expand All @@ -42,6 +44,7 @@ public interface RestartStrategy {
*
* @param restarter The hook to restart the ExecutionGraph
* @param executor An scheduled executor to delay the restart
* @return A {@link CompletableFuture} that will be completed when the restarting process is done.
*/
void restart(RestartCallback restarter, ScheduledExecutor executor);
CompletableFuture<Void> restart(RestartCallback restarter, ScheduledExecutor executor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.flink.runtime.concurrent.ScheduledExecutor;

import java.util.concurrent.CompletableFuture;


/**
* A restart strategy that validates that it is not in use by throwing {@link IllegalStateException}
Expand All @@ -34,7 +36,7 @@ public boolean canRestart() {
}

@Override
public void restart(final RestartCallback restarter, final ScheduledExecutor executor) {
public CompletableFuture<Void> restart(final RestartCallback restarter, final ScheduledExecutor executor) {
throw new IllegalStateException("Unexpected restart() call");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,65 @@ public void testRetryWithDelayCancellation() {
assertTrue(scheduledFuture.isCancelled());
}

/**
* Tests that the operation could be scheduled with expected delay.
*/
@Test
public void testScheduleWithDelay() throws Exception {
final ManuallyTriggeredScheduledExecutor scheduledExecutor = new ManuallyTriggeredScheduledExecutor();

final int expectedResult = 42;
CompletableFuture<Integer> completableFuture = FutureUtils.scheduleWithDelay(
() -> expectedResult,
Time.milliseconds(0),
scheduledExecutor);

scheduledExecutor.triggerScheduledTasks();
final int actualResult = completableFuture.get();

assertEquals(expectedResult, actualResult);
}

/**
* Tests that a scheduled task is canceled if the scheduled future is being cancelled.
*/
@Test
public void testScheduleWithDelayCancellation() {
final ManuallyTriggeredScheduledExecutor scheduledExecutor = new ManuallyTriggeredScheduledExecutor();

final Runnable noOpRunnable = () -> {};
CompletableFuture<Void> completableFuture = FutureUtils.scheduleWithDelay(
noOpRunnable,
TestingUtils.infiniteTime(),
scheduledExecutor);

final ScheduledFuture<?> scheduledFuture = scheduledExecutor
.getScheduledTasks()
.iterator()
.next();

completableFuture.cancel(false);

assertTrue(completableFuture.isCancelled());
assertTrue(scheduledFuture.isCancelled());
}

/**
* Tests that the operation is never scheduled if the delay is virtually infinite.
*/
@Test
public void testScheduleWithInfiniteDelayNeverSchedulesOperation() {
final Runnable noOpRunnable = () -> {};
final CompletableFuture<Void> completableFuture = FutureUtils.scheduleWithDelay(
noOpRunnable,
TestingUtils.infiniteTime(),
TestingUtils.defaultScheduledExecutor());

assertFalse(completableFuture.isDone());

completableFuture.cancel(false);
}

/**
* Tests that a future is timed out after the specified timeout.
*/
Expand Down
Loading