From 2c40697e46c0e4e9b7c5b86a3ef94b31c3330cb6 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Sat, 24 Feb 2018 15:39:31 +0100 Subject: [PATCH] [FLINK-8773] [flip6] Make JobManagerRunner shut down non blocking The Dispatcher no longer shuts down the JobManagerRunner in a blocking fashion. Instead it registers the termination futures and calls the shut down of the JobManagerSharedServices once all JobManagerRunners have terminated. This closes #5575. --- .../flink/runtime/dispatcher/Dispatcher.java | 111 +++++++++--------- .../runtime/jobmaster/JobManagerRunner.java | 13 +- .../dispatcher/MiniDispatcherTest.java | 1 + .../jobmaster/JobManagerRunnerTest.java | 8 +- 4 files changed, 64 insertions(+), 69 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index e212752295296..b31d04db91b64 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -65,13 +65,16 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; /** * Base class for the Dispatcher component. The Dispatcher component is responsible @@ -109,6 +112,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint impleme @Nullable protected final String restAddress; + private CompletableFuture orphanedJobManagerRunnersTerminationFuture = CompletableFuture.completedFuture(null); + public Dispatcher( RpcService rpcService, String endpointId, @@ -158,38 +163,39 @@ public Dispatcher( @Override public CompletableFuture postStop() { log.info("Stopping dispatcher {}.", getAddress()); - Exception exception = null; - try { - clearState(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } + final CompletableFuture jobManagerRunnersTerminationFuture = terminateJobManagerRunners(); - try { - jobManagerSharedServices.shutdown(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } + final CompletableFuture allJobManagerRunnersTerminationFuture = FutureUtils.completeAll(Arrays.asList( + jobManagerRunnersTerminationFuture, + orphanedJobManagerRunnersTerminationFuture)); - try { - submittedJobGraphStore.stop(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } + return FutureUtils.runAfterwards( + allJobManagerRunnersTerminationFuture, + () -> { + Exception exception = null; + try { + jobManagerSharedServices.shutdown(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } - try { - leaderElectionService.stop(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } + try { + submittedJobGraphStore.stop(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } - if (exception != null) { - return FutureUtils.completedExceptionally( - new FlinkException("Could not properly terminate the Dispatcher.", exception)); - } else { - return CompletableFuture.completedFuture(null); - } + try { + leaderElectionService.stop(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + if (exception != null) { + throw exception; + } + }); } @Override @@ -491,7 +497,8 @@ private void removeJob(JobID jobId, boolean cleanupHA) throws Exception { JobManagerRunner jobManagerRunner = jobManagerRunners.remove(jobId); if (jobManagerRunner != null) { - jobManagerRunner.shutdown(); + final CompletableFuture jobManagerRunnerTerminationFuture = jobManagerRunner.closeAsync(); + registerOrphanedJobManagerTerminationFuture(jobManagerRunnerTerminationFuture); } if (cleanupHA) { @@ -502,28 +509,17 @@ private void removeJob(JobID jobId, boolean cleanupHA) throws Exception { } /** - * Clears the state of the dispatcher. + * Terminate all currently running {@link JobManagerRunner}. * - *

The state are all currently running jobs. + * @return Future which is completed once all {@link JobManagerRunner} have terminated */ - private void clearState() throws Exception { - Exception exception = null; - + private CompletableFuture terminateJobManagerRunners() { log.info("Stopping all currently running jobs of dispatcher {}.", getAddress()); - // stop all currently running JobManager since they run in the same process - for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) { - try { - jobManagerRunner.shutdown(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - } + final List> terminationFutures = jobManagerRunners.values().stream() + .map(JobManagerRunner::closeAsync) + .collect(Collectors.toList()); - jobManagerRunners.clear(); - - if (exception != null) { - throw exception; - } + return FutureUtils.completeAll(terminationFutures); } /** @@ -600,6 +596,12 @@ protected void jobNotFinished(JobID jobId) { } } + private void registerOrphanedJobManagerTerminationFuture(CompletableFuture jobManagerRunnerTerminationFuture) { + orphanedJobManagerRunnersTerminationFuture = FutureUtils.completeAll(Arrays.asList( + orphanedJobManagerRunnersTerminationFuture, + jobManagerRunnerTerminationFuture)); + } + //------------------------------------------------------ // Leader contender //------------------------------------------------------ @@ -619,11 +621,9 @@ public void grantLeadership(final UUID newLeaderSessionID) { // clear the state if we've been the leader before if (getFencingToken() != null) { - try { - clearState(); - } catch (Exception e) { - log.warn("Could not properly clear the Dispatcher state while granting leadership.", e); - } + final CompletableFuture jobManagerRunnersTerminationFuture = terminateJobManagerRunners(); + registerOrphanedJobManagerTerminationFuture(jobManagerRunnersTerminationFuture); + jobManagerRunners.clear(); } setFencingToken(dispatcherId); @@ -644,11 +644,10 @@ public void revokeLeadership() { runAsyncWithoutFencing( () -> { log.info("Dispatcher {} was revoked leadership.", getAddress()); - try { - clearState(); - } catch (Exception e) { - log.warn("Could not properly clear the Dispatcher state while revoking leadership.", e); - } + + final CompletableFuture jobManagerRunnersTerminationFuture = terminateJobManagerRunners(); + registerOrphanedJobManagerTerminationFuture(jobManagerRunnersTerminationFuture); + jobManagerRunners.clear(); // clear the fencing token indicating that we don't have the leadership right now setFencingToken(null); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 42692434b2f65..80aa673c86a70 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.metrics.util.MetricUtils; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -59,7 +60,7 @@ * The runner for the job manager. It deals with job level leader election and make underlying job manager * properly reacted. */ -public class JobManagerRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { +public class JobManagerRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler, AutoCloseableAsync { private static final Logger log = LoggerFactory.getLogger(JobManagerRunner.class); @@ -98,9 +99,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F * Exceptions that occur while creating the JobManager or JobManagerRunner are directly * thrown and not reported to the given {@code FatalErrorHandler}. * - *

This JobManagerRunner assumes that it owns the given {@code JobManagerSharedServices}. - * It will shut them down on error and on calls to {@link #shutdown()}. - * * @throws Exception Thrown if the runner cannot be set up, because either one of the * required services could not be started, ot the Job could not be initialized. */ @@ -214,11 +212,8 @@ public void start() throws Exception { } } - public void shutdown() throws Exception { - shutdownInternally().get(); - } - - private CompletableFuture shutdownInternally() { + @Override + public CompletableFuture closeAsync() { synchronized (lock) { if (!shutdown) { shutdown = true; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java index 2b989398ea71f..c6eda2e816a41 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java @@ -289,6 +289,7 @@ public JobManagerRunner createJobManagerRunner( final JobManagerRunner mock = mock(JobManagerRunner.class); when(mock.getResultFuture()).thenReturn(resultFuture); + when(mock.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); return mock; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java index ba72293c9cfde..0c238cece7dc3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java @@ -152,7 +152,7 @@ public void testJobCompletion() throws Exception { assertThat(resultFuture.get(), is(archivedExecutionGraph)); } finally { - jobManagerRunner.shutdown(); + jobManagerRunner.close(); } } @@ -176,7 +176,7 @@ public void testJobFinishedByOther() throws Exception { assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(JobNotFinishedException.class)); } } finally { - jobManagerRunner.shutdown(); + jobManagerRunner.close(); } } @@ -191,7 +191,7 @@ public void testShutDown() throws Exception { assertThat(resultFuture.isDone(), is(false)); - jobManagerRunner.shutdown(); + jobManagerRunner.closeAsync(); try { resultFuture.get(); @@ -200,7 +200,7 @@ public void testShutDown() throws Exception { assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(JobNotFinishedException.class)); } } finally { - jobManagerRunner.shutdown(); + jobManagerRunner.close(); } }