diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 98e9b410e7115..260d5bf65f47c 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -320,7 +320,7 @@ private void recoverWorkers() throws Exception {
}
@Override
- public void shutDown() throws Exception {
+ public void postStop() throws Exception {
Exception exception = null;
FiniteDuration stopTimeout = new FiniteDuration(5L, TimeUnit.SECONDS);
@@ -351,7 +351,7 @@ public void shutDown() throws Exception {
}
try {
- super.shutDown();
+ super.postStop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index e0ec049800d4c..2eb0e367340fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -101,7 +101,7 @@ protected Dispatcher(
//------------------------------------------------------
@Override
- public void shutDown() throws Exception {
+ public void postStop() throws Exception {
Exception exception = null;
// stop all currently running JobManagerRunners
for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
@@ -117,7 +117,7 @@ public void shutDown() throws Exception {
}
try {
- super.shutDown();
+ super.postStop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 3a55f2eb2e260..947a91486f4df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -320,13 +320,14 @@ public void start(final UUID leaderSessionID) throws Exception {
* Suspend the job and shutdown all other services including rpc.
*/
@Override
- public void shutDown() throws Exception {
+ public void postStop() throws Exception {
taskManagerHeartbeatManager.stop();
resourceManagerHeartbeatManager.stop();
// make sure there is a graceful exit
- getSelf().suspendExecution(new Exception("JobManager is shutting down."));
- super.shutDown();
+ suspendExecution(new Exception("JobManager is shutting down."));
+
+ super.postStop();
}
//----------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index ebba9cafc3e10..0dfbbcd2faad4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -204,7 +204,7 @@ public void start() throws Exception {
}
@Override
- public void shutDown() throws Exception {
+ public void postStop() throws Exception {
Exception exception = null;
taskManagerHeartbeatManager.stop();
@@ -229,14 +229,14 @@ public void shutDown() throws Exception {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
+ clearState();
+
try {
- super.shutDown();
+ super.postStop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
- clearState();
-
if (exception != null) {
ExceptionUtils.rethrowException(exception, "Error while shutting the ResourceManager down.");
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 311fa4905a28c..331f3a38e0e9f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -126,7 +126,7 @@ public String getEndpointId() {
}
// ------------------------------------------------------------------------
- // Start & Shutdown
+ // Start & shutdown & lifecycle callbacks
// ------------------------------------------------------------------------
/**
@@ -143,17 +143,24 @@ public void start() throws Exception {
}
/**
- * Shuts down the underlying RPC endpoint via the RPC service.
- * After this method was called, the RPC endpoint will no longer be reachable, neither remotely,
- * not via its {@link #getSelf() self gateway}. It will also not accepts executions in main thread
- * any more (via {@link #callAsync(Callable, Time)} and {@link #runAsync(Runnable)}).
- *
- *
This method can be overridden to add RPC endpoint specific shut down code.
- * The overridden method should always call the parent shut down method.
+ * User overridable callback.
+ *
+ *
This method is called when the RpcEndpoint is being shut down. The method is guaranteed
+ * to be executed in the main thread context and can be used to clean up internal state.
+ *
+ * IMPORTANT: This method should never be called directly by the user.
+ *
+ * @throws Exception if an error occurs. The exception is returned as result of the termination future.
+ */
+ public void postStop() throws Exception {}
+
+ /**
+ * Triggers the shut down of the rpc endpoint. The shut down is executed asynchronously.
*
- * @throws Exception indicating that the something went wrong while shutting the RPC endpoint down
+ *
In order to wait on the completion of the shut down, obtain the termination future
+ * via {@link #getTerminationFuture()}} and wait on its completion.
*/
- public void shutDown() throws Exception {
+ public final void shutDown() {
rpcService.stopServer(self);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 86cd83eabe17e..fe3fcc92a2df9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -37,6 +37,7 @@
import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
+import org.apache.flink.runtime.rpc.akka.messages.Shutdown;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
@@ -82,10 +83,15 @@ class AkkaRpcActor> extends Untyp
private final CompletableFuture terminationFuture;
+ /** Throwable which might have been thrown by the postStop method */
+ private Throwable shutdownThrowable;
+
AkkaRpcActor(final T rpcEndpoint, final CompletableFuture terminationFuture) {
this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
this.terminationFuture = checkNotNull(terminationFuture);
+
+ this.shutdownThrowable = null;
}
@Override
@@ -96,7 +102,12 @@ public void postStop() throws Exception {
// we would complete the future and let the actor system restart the actor with a completed
// future.
// Complete the termination future so that others know that we've stopped.
- terminationFuture.complete(null);
+
+ if (shutdownThrowable != null) {
+ terminationFuture.completeExceptionally(shutdownThrowable);
+ } else {
+ terminationFuture.complete(null);
+ }
}
@Override
@@ -134,6 +145,8 @@ private void handleMessage(Object message) {
handleCallAsync((CallAsync) message);
} else if (message instanceof RpcInvocation) {
handleRpcInvocation((RpcInvocation) message);
+ } else if (message instanceof Shutdown) {
+ triggerShutdown();
} else {
LOG.warn(
"Received message of unknown type {} with value {}. Dropping this message!",
@@ -292,6 +305,17 @@ private void handleRunAsync(RunAsync runAsync) {
}
}
+ private void triggerShutdown() {
+ try {
+ rpcEndpoint.postStop();
+ } catch (Throwable throwable) {
+ shutdownThrowable = throwable;
+ }
+
+ // now stop the actor which will stop processing of any further messages
+ getContext().system().stop(getSelf());
+ }
+
/**
* Look up the rpc method on the given {@link RpcEndpoint} instance.
*
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index e17364f4e959f..2f02e8c34ce74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -25,7 +25,6 @@
import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.Identify;
-import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
@@ -43,6 +42,7 @@
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.SelfGateway;
import org.apache.flink.runtime.rpc.StartStoppable;
+import org.apache.flink.runtime.rpc.akka.messages.Shutdown;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -257,8 +257,8 @@ public void stopServer(RpcGateway selfGateway) {
if (fromThisService) {
ActorRef selfActorRef = akkaClient.getRpcEndpoint();
- LOG.info("Stopping RPC endpoint {}.", selfActorRef.path());
- selfActorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ LOG.info("Trigger shut down of RPC endpoint {}.", selfActorRef.path());
+ selfActorRef.tell(Shutdown.getInstance(), ActorRef.noSender());
} else {
LOG.debug("RPC endpoint {} already stopped or from different RPC service");
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Shutdown.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Shutdown.java
new file mode 100644
index 0000000000000..c596d1248bf03
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Shutdown.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+
+/**
+ * Shut down message used to trigger the shut down of an AkkaRpcActor. This
+ * message is only intended for internal use by the {@link AkkaRpcService}.
+ */
+public final class Shutdown {
+
+ private static Shutdown instance = new Shutdown();
+
+ public static Shutdown getInstance() {
+ return instance;
+ }
+
+ private Shutdown() {}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 8038228e33df9..cdec08edfe142 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -248,7 +248,7 @@ public void start() throws Exception {
* Called to shut down the TaskManager. The method closes all TaskManager services.
*/
@Override
- public void shutDown() throws Exception {
+ public void postStop() throws Exception {
log.info("Stopping TaskManager {}.", getAddress());
Exception exception = null;
@@ -272,7 +272,7 @@ public void shutDown() throws Exception {
fileCache.shutdown();
try {
- super.shutDown();
+ super.postStop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 3c40bc266e3ef..0b0626731bef1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -30,6 +30,7 @@
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.core.Is;
@@ -232,6 +233,43 @@ public void testExceptionPropagationFuturePiping() throws Exception {
}
}
+ /**
+ * Tests that exception thrown in the postStop method are returned by the termination
+ * future.
+ */
+ @Test
+ public void testPostStopExceptionPropagation() throws Exception {
+ FailingPostStopEndpoint rpcEndpoint = new FailingPostStopEndpoint(akkaRpcService, "FailingPostStopEndpoint");
+ rpcEndpoint.start();
+
+ rpcEndpoint.shutDown();
+
+ Future terminationFuture = rpcEndpoint.getTerminationFuture();
+
+ try {
+ terminationFuture.get();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof FailingPostStopEndpoint.PostStopException);
+ }
+ }
+
+ /**
+ * Checks that the postStop callback is executed within the main thread.
+ */
+ @Test
+ public void testPostStopExecutedByMainThread() throws Exception {
+ SimpleRpcEndpoint simpleRpcEndpoint = new SimpleRpcEndpoint(akkaRpcService, "SimpleRpcEndpoint");
+ simpleRpcEndpoint.start();
+
+ simpleRpcEndpoint.shutDown();
+
+ Future terminationFuture = simpleRpcEndpoint.getTerminationFuture();
+
+ // check that we executed the postStop method in the main thread, otherwise an exception
+ // would be thrown here.
+ terminationFuture.get();
+ }
+
// ------------------------------------------------------------------------
// Test Actors and Interfaces
// ------------------------------------------------------------------------
@@ -305,4 +343,41 @@ public void run() {
return future;
}
}
+
+ // ------------------------------------------------------------------------
+
+ private static class SimpleRpcEndpoint extends RpcEndpoint {
+
+ protected SimpleRpcEndpoint(RpcService rpcService, String endpointId) {
+ super(rpcService, endpointId);
+ }
+
+ @Override
+ public void postStop() {
+ validateRunsInMainThread();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static class FailingPostStopEndpoint extends RpcEndpoint {
+
+ protected FailingPostStopEndpoint(RpcService rpcService, String endpointId) {
+ super(rpcService, endpointId);
+ }
+
+ @Override
+ public void postStop() throws Exception {
+ throw new PostStopException("Test exception.");
+ }
+
+ private static class PostStopException extends FlinkException {
+
+ private static final long serialVersionUID = 6701096588415871592L;
+
+ public PostStopException(String message) {
+ super(message);
+ }
+ }
+ }
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 6099d18cef9ac..8327b6a471604 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -171,9 +171,10 @@ protected void initialize() throws ResourceManagerException {
}
@Override
- public void shutDown() throws Exception {
+ public void postStop() throws Exception {
// shut down all components
Throwable firstException = null;
+
if (resourceManagerClient != null) {
try {
resourceManagerClient.stop();
@@ -181,21 +182,24 @@ public void shutDown() throws Exception {
firstException = t;
}
}
+
if (nodeManagerClient != null) {
try {
nodeManagerClient.stop();
} catch (Throwable t) {
- if (firstException == null) {
- firstException = t;
- } else {
- firstException.addSuppressed(t);
- }
+ firstException = ExceptionUtils.firstOrSuppressed(t, firstException);
}
}
+
+ try {
+ super.postStop();
+ } catch (Throwable t) {
+ firstException = ExceptionUtils.firstOrSuppressed(t, firstException);
+ }
+
if (firstException != null) {
ExceptionUtils.rethrowException(firstException, "Error while shutting down YARN resource manager");
}
- super.shutDown();
}
@Override