From 7ef6d4d7599549c5fb750a692b2fdc1d1ef65272 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Sat, 26 Aug 2017 11:45:36 +0200 Subject: [PATCH] [FLINK-7523] Add proper resource shutdown to ResourceManager/JobManagerRunner This commit waits for the completion of the shutdown of the ResourceManager before shutting down the ResourceManagerRuntimeServices. The JobManagerServices are now exclusively passed in to the JobManagerRunner which means that it is no longer responsible for shutting the JobManagerServices down. Additionally, it waits until the JobMaster has been shut down before closing the LeaderElectionService as well as the JobManagerMetricGroup. The JobManagerServices are now managed by the caller of the JobManagerRunner. This allows to reuse them across multiple JobManagerRunners. The RpcEndpoint#postStop method is now called by the UntypedActor#postStop method, which ensures that the RpcEndpoint's method is also called if only the underlying RpcService is shut down (without explicitly shutting down the RpcEndpoint). --- .../flink/runtime/dispatcher/Dispatcher.java | 45 +++++-- .../dispatcher/StandaloneDispatcher.java | 5 +- .../entrypoint/JobClusterEntrypoint.java | 19 ++- .../runtime/jobmaster/JobManagerRunner.java | 112 +++++------------- .../runtime/minicluster/MiniCluster.java | 2 + .../minicluster/MiniClusterJobDispatcher.java | 23 +++- .../ResourceManagerRunner.java | 49 ++++---- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 100 ++++++---------- .../runtime/rpc/akka/AkkaRpcService.java | 6 +- .../runtime/rpc/messages/ControlMessage.java | 26 ---- .../flink/runtime/rpc/messages/Shutdown.java | 36 ------ .../runtime/dispatcher/DispatcherTest.java | 3 +- 12 files changed, 176 insertions(+), 250 deletions(-) delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/ControlMessage.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 00cbb2fc16140..8977415a8f07d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobManagerServices; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; @@ -69,7 +70,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint impleme private final RunningJobsRegistry runningJobsRegistry; private final HighAvailabilityServices highAvailabilityServices; - private final BlobServer blobServer; + private final JobManagerServices jobManagerServices; private final HeartbeatServices heartbeatServices; private final MetricRegistry metricRegistry; @@ -92,7 +93,9 @@ protected Dispatcher( this.configuration = Preconditions.checkNotNull(configuration); this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); - this.blobServer = Preconditions.checkNotNull(blobServer); + this.jobManagerServices = JobManagerServices.fromConfiguration( + configuration, + Preconditions.checkNotNull(blobServer)); this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices); this.metricRegistry = Preconditions.checkNotNull(metricRegistry); this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler); @@ -111,10 +114,16 @@ protected Dispatcher( @Override public void postStop() throws Exception { - Exception exception = null; + Throwable exception = null; clearState(); + try { + jobManagerServices.shutdown(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + try { submittedJobGraphStore.stop(); } catch (Exception e) { @@ -184,8 +193,8 @@ public CompletableFuture submitJob(JobGraph jobGraph, Time timeout) configuration, getRpcService(), highAvailabilityServices, - blobServer, heartbeatServices, + jobManagerServices, metricRegistry, new DispatcherOnCompleteActions(jobGraph.getJobID()), fatalErrorHandler); @@ -247,13 +256,23 @@ private void removeJob(JobID jobId, boolean cleanupHA) throws Exception { * *

The state are all currently running jobs. */ - private void clearState() { + private void clearState() throws Exception { + Exception exception = null; + // stop all currently running JobManager since they run in the same process for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) { - jobManagerRunner.shutdown(); + try { + jobManagerRunner.shutdown(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } } jobManagerRunners.clear(); + + if (exception != null) { + throw exception; + } } /** @@ -296,8 +315,8 @@ protected abstract JobManagerRunner createJobManagerRunner( Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, - BlobServer blobServer, HeartbeatServices heartbeatServices, + JobManagerServices jobManagerServices, MetricRegistry metricRegistry, OnCompletionActions onCompleteActions, FatalErrorHandler fatalErrorHandler) throws Exception; @@ -321,7 +340,11 @@ public void grantLeadership(final UUID newLeaderSessionID) { // clear the state if we've been the leader before if (getFencingToken() != null) { - clearState(); + try { + clearState(); + } catch (Exception e) { + log.warn("Could not properly clear the Dispatcher state while granting leadership.", e); + } } setFencingToken(dispatcherId); @@ -342,7 +365,11 @@ public void revokeLeadership() { runAsyncWithoutFencing( () -> { log.info("Dispatcher {} was revoked leadership.", getAddress()); - clearState(); + try { + clearState(); + } catch (Exception e) { + log.warn("Could not properly clear the Dispatcher state while revoking leadership.", e); + } setFencingToken(DispatcherId.generate()); }); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java index dfd6a8aac9bf1..d6d82b1bd42b9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobManagerServices; import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -64,8 +65,8 @@ protected JobManagerRunner createJobManagerRunner( Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, - BlobServer blobServer, HeartbeatServices heartbeatServices, + JobManagerServices jobManagerServices, MetricRegistry metricRegistry, OnCompletionActions onCompleteActions, FatalErrorHandler fatalErrorHandler) throws Exception { @@ -76,8 +77,8 @@ protected JobManagerRunner createJobManagerRunner( configuration, rpcService, highAvailabilityServices, - blobServer, heartbeatServices, + jobManagerServices, metricRegistry, onCompleteActions, fatalErrorHandler); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java index e70f6c8ca7943..124c6c62fcaba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobManagerServices; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -43,6 +44,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { private ResourceManager resourceManager; + private JobManagerServices jobManagerServices; + private JobManagerRunner jobManagerRunner; public JobClusterEntrypoint(Configuration configuration) { @@ -67,12 +70,14 @@ protected void startClusterComponents( metricRegistry, this); + jobManagerServices = JobManagerServices.fromConfiguration(configuration, blobServer); + jobManagerRunner = createJobManagerRunner( configuration, ResourceID.generate(), rpcService, highAvailabilityServices, - blobServer, + jobManagerServices, heartbeatServices, metricRegistry, this); @@ -89,7 +94,7 @@ protected JobManagerRunner createJobManagerRunner( ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, - BlobServer blobService, + JobManagerServices jobManagerServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler) throws Exception { @@ -102,8 +107,8 @@ protected JobManagerRunner createJobManagerRunner( configuration, rpcService, highAvailabilityServices, - blobService, heartbeatServices, + jobManagerServices, metricRegistry, new TerminatingOnCompleteActions(jobGraph.getJobID()), fatalErrorHandler); @@ -121,6 +126,14 @@ protected void stopClusterComponents(boolean cleanupHaData) throws Exception { } } + if (jobManagerServices != null) { + try { + jobManagerServices.shutdown(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + if (resourceManager != null) { try { resourceManager.shutDown(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 8766fabb7a21e..b5b4b826144c2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -22,9 +22,9 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -36,10 +36,10 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,53 +92,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F // ------------------------------------------------------------------------ - public JobManagerRunner( - final ResourceID resourceId, - final JobGraph jobGraph, - final Configuration configuration, - final RpcService rpcService, - final HighAvailabilityServices haServices, - final BlobServer blobService, - final HeartbeatServices heartbeatServices, - final OnCompletionActions toNotifyOnComplete, - final FatalErrorHandler errorHandler) throws Exception { - this( - resourceId, - jobGraph, - configuration, - rpcService, - haServices, - blobService, - heartbeatServices, - new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)), - toNotifyOnComplete, - errorHandler); - } - - public JobManagerRunner( - final ResourceID resourceId, - final JobGraph jobGraph, - final Configuration configuration, - final RpcService rpcService, - final HighAvailabilityServices haServices, - final BlobServer blobService, - final HeartbeatServices heartbeatServices, - final MetricRegistry metricRegistry, - final OnCompletionActions toNotifyOnComplete, - final FatalErrorHandler errorHandler) throws Exception { - this( - resourceId, - jobGraph, - configuration, - rpcService, - haServices, - heartbeatServices, - JobManagerServices.fromConfiguration(configuration, blobService), - metricRegistry, - toNotifyOnComplete, - errorHandler); - } - /** * *

Exceptions that occur while creating the JobManager or JobManagerRunner are directly @@ -217,12 +170,6 @@ public JobManagerRunner( } catch (Throwable t) { // clean up everything - try { - jobManagerServices.shutdown(); - } catch (Throwable tt) { - log.error("Error while shutting down JobManager services", tt); - } - if (jobManagerMetrics != null) { jobManagerMetrics.close(); } @@ -245,40 +192,37 @@ public void start() throws Exception { } } - public void shutdown() { - shutdownInternally(); + public void shutdown() throws Exception { + shutdownInternally().get(); } - private void shutdownInternally() { + private CompletableFuture shutdownInternally() { synchronized (lock) { shutdown = true; - if (leaderElectionService != null) { - try { - leaderElectionService.stop(); - } catch (Throwable t) { - log.error("Could not properly shutdown the leader election service", t); - } - } - - try { - jobManager.shutDown(); - } catch (Throwable t) { - log.error("Error shutting down JobManager", t); - } - - try { - jobManagerServices.shutdown(); - } catch (Throwable t) { - log.error("Error shutting down JobManager services", t); - } - - // make all registered metrics go away - try { - jobManagerMetricGroup.close(); - } catch (Throwable t) { - log.error("Error while unregistering metrics", t); - } + jobManager.shutDown(); + + return jobManager.getTerminationFuture() + .thenAccept( + ignored -> { + Throwable exception = null; + try { + leaderElectionService.stop(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + + // make all registered metrics go away + try { + jobManagerMetricGroup.close(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + + if (exception != null) { + throw new FlinkFutureException("Could not properly shut down the JobManagerRunner.", exception); + } + }); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 95f430c0ee245..2fe0587cd4bcc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -353,6 +353,8 @@ private void shutdownInternally() throws Exception { if (tm != null) { try { tm.shutDown(); + // wait for the TaskManager to properly terminate + tm.getTerminationFuture().get(); } catch (Throwable t) { exception = firstOrSuppressed(t, exception); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java index 2bb94f2dec142..60d9a6692609a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java @@ -33,6 +33,8 @@ import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -156,7 +158,7 @@ public MiniClusterJobDispatcher( * Shuts down the mini cluster dispatcher. If a job is currently running, that job will be * terminally failed. */ - public void shutdown() { + public void shutdown() throws Exception { synchronized (lock) { if (!shutdown) { shutdown = true; @@ -166,14 +168,31 @@ public void shutdown() { // in this shutdown code we copy the references to the stack first, // to avoid concurrent modification + Throwable exception = null; + JobManagerRunner[] runners = this.runners; if (runners != null) { this.runners = null; for (JobManagerRunner runner : runners) { - runner.shutdown(); + try { + runner.shutdown(); + } catch (Throwable e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } } } + + // shut down the JobManagerServices + try { + jobManagerServices.shutdown(); + } catch (Throwable throwable) { + exception = ExceptionUtils.firstOrSuppressed(throwable, exception); + } + + if (exception != null) { + throw new FlinkException("Could not properly terminate all JobManagerRunners.", exception); + } } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java index d0c411ceea6bb..ed6e18c60adff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java @@ -20,16 +20,18 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; + /** * Simple {@link StandaloneResourceManager} runner. It instantiates the resource manager's services * and handles fatal errors by shutting the resource manager down. @@ -91,27 +93,23 @@ public void start() throws Exception { } public void shutDown() throws Exception { - shutDownInternally(); + // wait for the completion + shutDownInternally().get(); } - private void shutDownInternally() throws Exception { - Exception exception = null; + private CompletableFuture shutDownInternally() { synchronized (lock) { - try { - resourceManager.shutDown(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - try { - resourceManagerRuntimeServices.shutDown(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - if (exception != null) { - ExceptionUtils.rethrow(exception, "Error while shutting down the resource manager runner."); - } + resourceManager.shutDown(); + + return resourceManager.getTerminationFuture() + .thenAccept( + ignored -> { + try { + resourceManagerRuntimeServices.shutDown(); + } catch (Exception e) { + throw new FlinkFutureException("Could not properly shut down the resource manager runtime services.", e); + } + }); } } @@ -123,10 +121,13 @@ private void shutDownInternally() throws Exception { public void onFatalError(Throwable exception) { LOG.error("Encountered fatal error.", exception); - try { - shutDownInternally(); - } catch (Exception e) { - LOG.error("Could not properly shut down the resource manager.", e); - } + CompletableFuture shutdownFuture = shutDownInternally(); + + shutdownFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + if (throwable != null) { + LOG.error("Could not properly shut down the resource manager runner.", throwable); + } + }); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 74c1509757a32..f6c2e8be6faf3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -18,31 +18,26 @@ package org.apache.flink.runtime.rpc.akka; -import akka.actor.ActorRef; -import akka.actor.Status; -import akka.actor.UntypedActor; -import akka.pattern.Patterns; import org.apache.flink.runtime.rpc.MainThreadValidatorUtil; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException; import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException; +import org.apache.flink.runtime.rpc.akka.messages.Processing; +import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.runtime.rpc.messages.CallAsync; -import org.apache.flink.runtime.rpc.messages.ControlMessage; import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation; -import org.apache.flink.runtime.rpc.akka.messages.Processing; import org.apache.flink.runtime.rpc.messages.RpcInvocation; import org.apache.flink.runtime.rpc.messages.RunAsync; - -import org.apache.flink.runtime.rpc.messages.Shutdown; -import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.util.ExceptionUtils; + +import akka.actor.ActorRef; +import akka.actor.Status; +import akka.actor.UntypedActor; +import akka.pattern.Patterns; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.duration.FiniteDuration; -import scala.concurrent.impl.Promise; - import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -50,6 +45,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import scala.concurrent.duration.FiniteDuration; +import scala.concurrent.impl.Promise; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -73,38 +71,47 @@ class AkkaRpcActor extends UntypedActor { protected final Logger log = LoggerFactory.getLogger(getClass()); - /** the endpoint to invoke the methods on */ + /** the endpoint to invoke the methods on. */ protected final T rpcEndpoint; - /** the helper that tracks whether calls come from the main thread */ + /** the helper that tracks whether calls come from the main thread. */ private final MainThreadValidatorUtil mainThreadValidator; private final CompletableFuture terminationFuture; - /** Throwable which might have been thrown by the postStop method */ - private Throwable shutdownThrowable; - AkkaRpcActor(final T rpcEndpoint, final CompletableFuture terminationFuture) { this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint"); this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint); this.terminationFuture = checkNotNull(terminationFuture); - - this.shutdownThrowable = null; } @Override public void postStop() throws Exception { - super.postStop(); + mainThreadValidator.enterMainThread(); - // IMPORTANT: This only works if we don't use a restarting supervisor strategy. Otherwise - // we would complete the future and let the actor system restart the actor with a completed - // future. - // Complete the termination future so that others know that we've stopped. + try { + Throwable shutdownThrowable = null; - if (shutdownThrowable != null) { - terminationFuture.completeExceptionally(shutdownThrowable); - } else { - terminationFuture.complete(null); + try { + rpcEndpoint.postStop(); + } catch (Throwable throwable) { + shutdownThrowable = throwable; + } + + super.postStop(); + + // IMPORTANT: This only works if we don't use a restarting supervisor strategy. Otherwise + // we would complete the future and let the actor system restart the actor with a completed + // future. + // Complete the termination future so that others know that we've stopped. + + if (shutdownThrowable != null) { + terminationFuture.completeExceptionally(shutdownThrowable); + } else { + terminationFuture.complete(null); + } + } finally { + mainThreadValidator.exitMainThread(); } } @@ -119,11 +126,7 @@ public void onReceive(final Object message) { mainThreadValidator.enterMainThread(); try { - if (msg instanceof ControlMessage) { - handleControlMessage(((ControlMessage) msg)); - } else { - handleMessage(msg); - } + handleMessage(msg); } finally { mainThreadValidator.exitMainThread(); } @@ -139,20 +142,6 @@ public void onReceive(final Object message) { } } - private void handleControlMessage(ControlMessage controlMessage) { - if (controlMessage instanceof Shutdown) { - triggerShutdown(); - } else { - log.warn( - "Received control message of unknown type {} with value {}. Dropping this control message!", - controlMessage.getClass().getName(), - controlMessage); - - sendErrorIfSender(new AkkaUnknownMessageException("Received unknown control message " + controlMessage + - " of type " + controlMessage.getClass().getSimpleName() + '.')); - } - } - protected void handleMessage(Object message) { if (message instanceof RunAsync) { handleRunAsync((RunAsync) message); @@ -186,7 +175,7 @@ private void handleRpcInvocation(RpcInvocation rpcInvocation) { Class[] parameterTypes = rpcInvocation.getParameterTypes(); rpcMethod = lookupRpcMethod(methodName, parameterTypes); - } catch(ClassNotFoundException e) { + } catch (ClassNotFoundException e) { log.error("Could not load method arguments.", e); RpcConnectionException rpcException = new RpcConnectionException("Could not load method arguments.", e); @@ -294,7 +283,7 @@ private void handleRunAsync(RunAsync runAsync) { runAsync.getClass().getName()); } else { - final long timeToRun = runAsync.getTimeNanos(); + final long timeToRun = runAsync.getTimeNanos(); final long delayNanos; if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime()) <= 0) { @@ -307,7 +296,7 @@ private void handleRunAsync(RunAsync runAsync) { } } else { - // schedule for later. send a new message after the delay, which will then be immediately executed + // schedule for later. send a new message after the delay, which will then be immediately executed FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS); RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun); @@ -317,17 +306,6 @@ private void handleRunAsync(RunAsync runAsync) { } } - private void triggerShutdown() { - try { - rpcEndpoint.postStop(); - } catch (Throwable throwable) { - shutdownThrowable = throwable; - } - - // now stop the actor which will stop processing of any further messages - getContext().system().stop(getSelf()); - } - /** * Look up the rpc method on the given {@link RpcEndpoint} instance. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 536a7892659ec..07b334d8d604e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -42,7 +42,6 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcServer; import org.apache.flink.runtime.rpc.RpcUtils; -import org.apache.flink.runtime.rpc.messages.Shutdown; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -297,7 +296,7 @@ public void stopServer(RpcServer selfGateway) { if (fromThisService) { ActorRef selfActorRef = akkaClient.getRpcEndpoint(); LOG.info("Trigger shut down of RPC endpoint {}.", selfActorRef.path()); - selfActorRef.tell(Shutdown.getInstance(), ActorRef.noSender()); + actorSystem.stop(selfActorRef); } else { LOG.debug("RPC endpoint {} already stopped or from different RPC service"); } @@ -314,11 +313,14 @@ public void stopService() { } stopped = true; + actorSystem.shutdown(); actors.clear(); } actorSystem.awaitTermination(); + + LOG.info("Stopped Akka RPC service."); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/ControlMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/ControlMessage.java deleted file mode 100644 index c16bdd75a1029..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/ControlMessage.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.messages; - -/** - * Base interface for control messages which are treated separately by the RPC server - * implementation. - */ -public interface ControlMessage { -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java deleted file mode 100644 index 50b076caf6210..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.messages; - -import org.apache.flink.runtime.rpc.akka.AkkaRpcService; - -/** - * Shut down message used to trigger the shut down of an AkkaRpcActor. This - * message is only intended for internal use by the {@link AkkaRpcService}. - */ -public final class Shutdown implements ControlMessage { - - private static Shutdown instance = new Shutdown(); - - public static Shutdown getInstance() { - return instance; - } - - private Shutdown() {} -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 884668624f00b..da76115006027 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobManagerServices; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricRegistry; @@ -216,8 +217,8 @@ protected JobManagerRunner createJobManagerRunner( Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, - BlobServer blobServer, HeartbeatServices heartbeatServices, + JobManagerServices jobManagerServices, MetricRegistry metricRegistry, OnCompletionActions onCompleteActions, FatalErrorHandler fatalErrorHandler) throws Exception {