From 80468b15c0e5976f2b45160f9ed833a237cb6fcd Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 28 Jul 2017 16:13:55 +0200 Subject: [PATCH] [FLINK-7295] [rpc] Add postStop callback for proper shutdown of RpcEndpoints In order to execute a proper shutdown of RpcEndpoints it is necessary to have a callback which is executed in the main thread context directly before stopping processing of messages. This PR introduces the postStop method which acts as this callback. All endpoint specific cleanup should be executed in this method. The RpcEndpoint#shutDown method now only triggers the shut down of an RpcEndpoint. In order to wait on the completion of the shut down, one has to wait on the termination future which can be retrieved via RpcEndpoint#getTerminationFuture. This PR also adapts the existing RpcEndpoints such that they execute their former shutDown logic in the postStop method. This closes #4420. --- .../MesosResourceManager.java | 4 +- .../flink/runtime/dispatcher/Dispatcher.java | 4 +- .../flink/runtime/jobmaster/JobMaster.java | 7 +- .../resourcemanager/ResourceManager.java | 8 +- .../apache/flink/runtime/rpc/RpcEndpoint.java | 27 ++++--- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 26 ++++++- .../runtime/rpc/akka/AkkaRpcService.java | 6 +- .../runtime/rpc/akka/messages/Shutdown.java | 36 +++++++++ .../runtime/taskexecutor/TaskExecutor.java | 4 +- .../runtime/rpc/akka/AkkaRpcActorTest.java | 75 +++++++++++++++++++ .../flink/yarn/YarnResourceManager.java | 18 +++-- 11 files changed, 181 insertions(+), 34 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Shutdown.java diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index 98e9b410e7115..260d5bf65f47c 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -320,7 +320,7 @@ private void recoverWorkers() throws Exception { } @Override - public void shutDown() throws Exception { + public void postStop() throws Exception { Exception exception = null; FiniteDuration stopTimeout = new FiniteDuration(5L, TimeUnit.SECONDS); @@ -351,7 +351,7 @@ public void shutDown() throws Exception { } try { - super.shutDown(); + super.postStop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index e0ec049800d4c..2eb0e367340fb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -101,7 +101,7 @@ protected Dispatcher( //------------------------------------------------------ @Override - public void shutDown() throws Exception { + public void postStop() throws Exception { Exception exception = null; // stop all currently running JobManagerRunners for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) { @@ -117,7 +117,7 @@ public void shutDown() throws Exception { } try { - super.shutDown(); + super.postStop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 3a55f2eb2e260..947a91486f4df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -320,13 +320,14 @@ public void start(final UUID leaderSessionID) throws Exception { * Suspend the job and shutdown all other services including rpc. */ @Override - public void shutDown() throws Exception { + public void postStop() throws Exception { taskManagerHeartbeatManager.stop(); resourceManagerHeartbeatManager.stop(); // make sure there is a graceful exit - getSelf().suspendExecution(new Exception("JobManager is shutting down.")); - super.shutDown(); + suspendExecution(new Exception("JobManager is shutting down.")); + + super.postStop(); } //---------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index ebba9cafc3e10..0dfbbcd2faad4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -204,7 +204,7 @@ public void start() throws Exception { } @Override - public void shutDown() throws Exception { + public void postStop() throws Exception { Exception exception = null; taskManagerHeartbeatManager.stop(); @@ -229,14 +229,14 @@ public void shutDown() throws Exception { exception = ExceptionUtils.firstOrSuppressed(e, exception); } + clearState(); + try { - super.shutDown(); + super.postStop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } - clearState(); - if (exception != null) { ExceptionUtils.rethrowException(exception, "Error while shutting the ResourceManager down."); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 311fa4905a28c..331f3a38e0e9f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -126,7 +126,7 @@ public String getEndpointId() { } // ------------------------------------------------------------------------ - // Start & Shutdown + // Start & shutdown & lifecycle callbacks // ------------------------------------------------------------------------ /** @@ -143,17 +143,24 @@ public void start() throws Exception { } /** - * Shuts down the underlying RPC endpoint via the RPC service. - * After this method was called, the RPC endpoint will no longer be reachable, neither remotely, - * not via its {@link #getSelf() self gateway}. It will also not accepts executions in main thread - * any more (via {@link #callAsync(Callable, Time)} and {@link #runAsync(Runnable)}). - * - *

This method can be overridden to add RPC endpoint specific shut down code. - * The overridden method should always call the parent shut down method. + * User overridable callback. + * + *

This method is called when the RpcEndpoint is being shut down. The method is guaranteed + * to be executed in the main thread context and can be used to clean up internal state. + * + * IMPORTANT: This method should never be called directly by the user. + * + * @throws Exception if an error occurs. The exception is returned as result of the termination future. + */ + public void postStop() throws Exception {} + + /** + * Triggers the shut down of the rpc endpoint. The shut down is executed asynchronously. * - * @throws Exception indicating that the something went wrong while shutting the RPC endpoint down + *

In order to wait on the completion of the shut down, obtain the termination future + * via {@link #getTerminationFuture()}} and wait on its completion. */ - public void shutDown() throws Exception { + public final void shutDown() { rpcService.stopServer(self); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 86cd83eabe17e..fe3fcc92a2df9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation; import org.apache.flink.runtime.rpc.akka.messages.RunAsync; +import org.apache.flink.runtime.rpc.akka.messages.Shutdown; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; @@ -82,10 +83,15 @@ class AkkaRpcActor> extends Untyp private final CompletableFuture terminationFuture; + /** Throwable which might have been thrown by the postStop method */ + private Throwable shutdownThrowable; + AkkaRpcActor(final T rpcEndpoint, final CompletableFuture terminationFuture) { this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint"); this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint); this.terminationFuture = checkNotNull(terminationFuture); + + this.shutdownThrowable = null; } @Override @@ -96,7 +102,12 @@ public void postStop() throws Exception { // we would complete the future and let the actor system restart the actor with a completed // future. // Complete the termination future so that others know that we've stopped. - terminationFuture.complete(null); + + if (shutdownThrowable != null) { + terminationFuture.completeExceptionally(shutdownThrowable); + } else { + terminationFuture.complete(null); + } } @Override @@ -134,6 +145,8 @@ private void handleMessage(Object message) { handleCallAsync((CallAsync) message); } else if (message instanceof RpcInvocation) { handleRpcInvocation((RpcInvocation) message); + } else if (message instanceof Shutdown) { + triggerShutdown(); } else { LOG.warn( "Received message of unknown type {} with value {}. Dropping this message!", @@ -292,6 +305,17 @@ private void handleRunAsync(RunAsync runAsync) { } } + private void triggerShutdown() { + try { + rpcEndpoint.postStop(); + } catch (Throwable throwable) { + shutdownThrowable = throwable; + } + + // now stop the actor which will stop processing of any further messages + getContext().system().stop(getSelf()); + } + /** * Look up the rpc method on the given {@link RpcEndpoint} instance. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index e17364f4e959f..2f02e8c34ce74 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -25,7 +25,6 @@ import akka.actor.Address; import akka.actor.Cancellable; import akka.actor.Identify; -import akka.actor.PoisonPill; import akka.actor.Props; import akka.dispatch.Futures; import akka.dispatch.Mapper; @@ -43,6 +42,7 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.SelfGateway; import org.apache.flink.runtime.rpc.StartStoppable; +import org.apache.flink.runtime.rpc.akka.messages.Shutdown; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -257,8 +257,8 @@ public void stopServer(RpcGateway selfGateway) { if (fromThisService) { ActorRef selfActorRef = akkaClient.getRpcEndpoint(); - LOG.info("Stopping RPC endpoint {}.", selfActorRef.path()); - selfActorRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); + LOG.info("Trigger shut down of RPC endpoint {}.", selfActorRef.path()); + selfActorRef.tell(Shutdown.getInstance(), ActorRef.noSender()); } else { LOG.debug("RPC endpoint {} already stopped or from different RPC service"); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Shutdown.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Shutdown.java new file mode 100644 index 0000000000000..c596d1248bf03 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Shutdown.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.messages; + +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; + +/** + * Shut down message used to trigger the shut down of an AkkaRpcActor. This + * message is only intended for internal use by the {@link AkkaRpcService}. + */ +public final class Shutdown { + + private static Shutdown instance = new Shutdown(); + + public static Shutdown getInstance() { + return instance; + } + + private Shutdown() {} +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 8038228e33df9..cdec08edfe142 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -248,7 +248,7 @@ public void start() throws Exception { * Called to shut down the TaskManager. The method closes all TaskManager services. */ @Override - public void shutDown() throws Exception { + public void postStop() throws Exception { log.info("Stopping TaskManager {}.", getAddress()); Exception exception = null; @@ -272,7 +272,7 @@ public void shutDown() throws Exception { fileCache.shutdown(); try { - super.shutDown(); + super.postStop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 3c40bc266e3ef..0b0626731bef1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; import org.hamcrest.core.Is; @@ -232,6 +233,43 @@ public void testExceptionPropagationFuturePiping() throws Exception { } } + /** + * Tests that exception thrown in the postStop method are returned by the termination + * future. + */ + @Test + public void testPostStopExceptionPropagation() throws Exception { + FailingPostStopEndpoint rpcEndpoint = new FailingPostStopEndpoint(akkaRpcService, "FailingPostStopEndpoint"); + rpcEndpoint.start(); + + rpcEndpoint.shutDown(); + + Future terminationFuture = rpcEndpoint.getTerminationFuture(); + + try { + terminationFuture.get(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof FailingPostStopEndpoint.PostStopException); + } + } + + /** + * Checks that the postStop callback is executed within the main thread. + */ + @Test + public void testPostStopExecutedByMainThread() throws Exception { + SimpleRpcEndpoint simpleRpcEndpoint = new SimpleRpcEndpoint(akkaRpcService, "SimpleRpcEndpoint"); + simpleRpcEndpoint.start(); + + simpleRpcEndpoint.shutDown(); + + Future terminationFuture = simpleRpcEndpoint.getTerminationFuture(); + + // check that we executed the postStop method in the main thread, otherwise an exception + // would be thrown here. + terminationFuture.get(); + } + // ------------------------------------------------------------------------ // Test Actors and Interfaces // ------------------------------------------------------------------------ @@ -305,4 +343,41 @@ public void run() { return future; } } + + // ------------------------------------------------------------------------ + + private static class SimpleRpcEndpoint extends RpcEndpoint { + + protected SimpleRpcEndpoint(RpcService rpcService, String endpointId) { + super(rpcService, endpointId); + } + + @Override + public void postStop() { + validateRunsInMainThread(); + } + } + + // ------------------------------------------------------------------------ + + private static class FailingPostStopEndpoint extends RpcEndpoint { + + protected FailingPostStopEndpoint(RpcService rpcService, String endpointId) { + super(rpcService, endpointId); + } + + @Override + public void postStop() throws Exception { + throw new PostStopException("Test exception."); + } + + private static class PostStopException extends FlinkException { + + private static final long serialVersionUID = 6701096588415871592L; + + public PostStopException(String message) { + super(message); + } + } + } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 6099d18cef9ac..8327b6a471604 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -171,9 +171,10 @@ protected void initialize() throws ResourceManagerException { } @Override - public void shutDown() throws Exception { + public void postStop() throws Exception { // shut down all components Throwable firstException = null; + if (resourceManagerClient != null) { try { resourceManagerClient.stop(); @@ -181,21 +182,24 @@ public void shutDown() throws Exception { firstException = t; } } + if (nodeManagerClient != null) { try { nodeManagerClient.stop(); } catch (Throwable t) { - if (firstException == null) { - firstException = t; - } else { - firstException.addSuppressed(t); - } + firstException = ExceptionUtils.firstOrSuppressed(t, firstException); } } + + try { + super.postStop(); + } catch (Throwable t) { + firstException = ExceptionUtils.firstOrSuppressed(t, firstException); + } + if (firstException != null) { ExceptionUtils.rethrowException(firstException, "Error while shutting down YARN resource manager"); } - super.shutDown(); } @Override