Skip to content

Commit

Permalink
[FLINK-7295] [rpc] Add postStop callback for proper shutdown of RpcEn…
Browse files Browse the repository at this point in the history
…dpoints

In order to execute a proper shutdown of RpcEndpoints it is necessary to have
a callback which is executed in the main thread context directly before stopping
processing of messages. This PR introduces the postStop method which acts as
this callback. All endpoint specific cleanup should be executed in this method.

The RpcEndpoint#shutDown method now only triggers the shut down of an RpcEndpoint.
In order to wait on the completion of the shut down, one has to wait on the
termination future which can be retrieved via RpcEndpoint#getTerminationFuture.

This PR also adapts the existing RpcEndpoints such that they execute their former
shutDown logic in the postStop method.

This closes #4420.
  • Loading branch information
tillrohrmann committed Jul 31, 2017
1 parent 49acd09 commit 80468b1
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 34 deletions.
Expand Up @@ -320,7 +320,7 @@ private void recoverWorkers() throws Exception {
} }


@Override @Override
public void shutDown() throws Exception { public void postStop() throws Exception {
Exception exception = null; Exception exception = null;
FiniteDuration stopTimeout = new FiniteDuration(5L, TimeUnit.SECONDS); FiniteDuration stopTimeout = new FiniteDuration(5L, TimeUnit.SECONDS);


Expand Down Expand Up @@ -351,7 +351,7 @@ public void shutDown() throws Exception {
} }


try { try {
super.shutDown(); super.postStop();
} catch (Exception e) { } catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception); exception = ExceptionUtils.firstOrSuppressed(e, exception);
} }
Expand Down
Expand Up @@ -101,7 +101,7 @@ protected Dispatcher(
//------------------------------------------------------ //------------------------------------------------------


@Override @Override
public void shutDown() throws Exception { public void postStop() throws Exception {
Exception exception = null; Exception exception = null;
// stop all currently running JobManagerRunners // stop all currently running JobManagerRunners
for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) { for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
Expand All @@ -117,7 +117,7 @@ public void shutDown() throws Exception {
} }


try { try {
super.shutDown(); super.postStop();
} catch (Exception e) { } catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception); exception = ExceptionUtils.firstOrSuppressed(e, exception);
} }
Expand Down
Expand Up @@ -320,13 +320,14 @@ public void start(final UUID leaderSessionID) throws Exception {
* Suspend the job and shutdown all other services including rpc. * Suspend the job and shutdown all other services including rpc.
*/ */
@Override @Override
public void shutDown() throws Exception { public void postStop() throws Exception {
taskManagerHeartbeatManager.stop(); taskManagerHeartbeatManager.stop();
resourceManagerHeartbeatManager.stop(); resourceManagerHeartbeatManager.stop();


// make sure there is a graceful exit // make sure there is a graceful exit
getSelf().suspendExecution(new Exception("JobManager is shutting down.")); suspendExecution(new Exception("JobManager is shutting down."));
super.shutDown();
super.postStop();
} }


//---------------------------------------------------------------------------------------------- //----------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -204,7 +204,7 @@ public void start() throws Exception {
} }


@Override @Override
public void shutDown() throws Exception { public void postStop() throws Exception {
Exception exception = null; Exception exception = null;


taskManagerHeartbeatManager.stop(); taskManagerHeartbeatManager.stop();
Expand All @@ -229,14 +229,14 @@ public void shutDown() throws Exception {
exception = ExceptionUtils.firstOrSuppressed(e, exception); exception = ExceptionUtils.firstOrSuppressed(e, exception);
} }


clearState();

try { try {
super.shutDown(); super.postStop();
} catch (Exception e) { } catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception); exception = ExceptionUtils.firstOrSuppressed(e, exception);
} }


clearState();

if (exception != null) { if (exception != null) {
ExceptionUtils.rethrowException(exception, "Error while shutting the ResourceManager down."); ExceptionUtils.rethrowException(exception, "Error while shutting the ResourceManager down.");
} }
Expand Down
Expand Up @@ -126,7 +126,7 @@ public String getEndpointId() {
} }


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Start & Shutdown // Start & shutdown & lifecycle callbacks
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


/** /**
Expand All @@ -143,17 +143,24 @@ public void start() throws Exception {
} }


/** /**
* Shuts down the underlying RPC endpoint via the RPC service. * User overridable callback.
* After this method was called, the RPC endpoint will no longer be reachable, neither remotely, *
* not via its {@link #getSelf() self gateway}. It will also not accepts executions in main thread * <p>This method is called when the RpcEndpoint is being shut down. The method is guaranteed
* any more (via {@link #callAsync(Callable, Time)} and {@link #runAsync(Runnable)}). * to be executed in the main thread context and can be used to clean up internal state.
* *
* <p>This method can be overridden to add RPC endpoint specific shut down code. * IMPORTANT: This method should never be called directly by the user.
* The overridden method should always call the parent shut down method. *
* @throws Exception if an error occurs. The exception is returned as result of the termination future.
*/
public void postStop() throws Exception {}

/**
* Triggers the shut down of the rpc endpoint. The shut down is executed asynchronously.
* *
* @throws Exception indicating that the something went wrong while shutting the RPC endpoint down * <p>In order to wait on the completion of the shut down, obtain the termination future
* via {@link #getTerminationFuture()}} and wait on its completion.
*/ */
public void shutDown() throws Exception { public final void shutDown() {
rpcService.stopServer(self); rpcService.stopServer(self);
} }


Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation; import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.akka.messages.RunAsync; import org.apache.flink.runtime.rpc.akka.messages.RunAsync;


import org.apache.flink.runtime.rpc.akka.messages.Shutdown;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand Down Expand Up @@ -82,10 +83,15 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp


private final CompletableFuture<Void> terminationFuture; private final CompletableFuture<Void> terminationFuture;


/** Throwable which might have been thrown by the postStop method */
private Throwable shutdownThrowable;

AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Void> terminationFuture) { AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Void> terminationFuture) {
this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint"); this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint); this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
this.terminationFuture = checkNotNull(terminationFuture); this.terminationFuture = checkNotNull(terminationFuture);

this.shutdownThrowable = null;
} }


@Override @Override
Expand All @@ -96,7 +102,12 @@ public void postStop() throws Exception {
// we would complete the future and let the actor system restart the actor with a completed // we would complete the future and let the actor system restart the actor with a completed
// future. // future.
// Complete the termination future so that others know that we've stopped. // Complete the termination future so that others know that we've stopped.
terminationFuture.complete(null);
if (shutdownThrowable != null) {
terminationFuture.completeExceptionally(shutdownThrowable);
} else {
terminationFuture.complete(null);
}
} }


@Override @Override
Expand Down Expand Up @@ -134,6 +145,8 @@ private void handleMessage(Object message) {
handleCallAsync((CallAsync) message); handleCallAsync((CallAsync) message);
} else if (message instanceof RpcInvocation) { } else if (message instanceof RpcInvocation) {
handleRpcInvocation((RpcInvocation) message); handleRpcInvocation((RpcInvocation) message);
} else if (message instanceof Shutdown) {
triggerShutdown();
} else { } else {
LOG.warn( LOG.warn(
"Received message of unknown type {} with value {}. Dropping this message!", "Received message of unknown type {} with value {}. Dropping this message!",
Expand Down Expand Up @@ -292,6 +305,17 @@ private void handleRunAsync(RunAsync runAsync) {
} }
} }


private void triggerShutdown() {
try {
rpcEndpoint.postStop();
} catch (Throwable throwable) {
shutdownThrowable = throwable;
}

// now stop the actor which will stop processing of any further messages
getContext().system().stop(getSelf());
}

/** /**
* Look up the rpc method on the given {@link RpcEndpoint} instance. * Look up the rpc method on the given {@link RpcEndpoint} instance.
* *
Expand Down
Expand Up @@ -25,7 +25,6 @@
import akka.actor.Address; import akka.actor.Address;
import akka.actor.Cancellable; import akka.actor.Cancellable;
import akka.actor.Identify; import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.actor.Props; import akka.actor.Props;
import akka.dispatch.Futures; import akka.dispatch.Futures;
import akka.dispatch.Mapper; import akka.dispatch.Mapper;
Expand All @@ -43,6 +42,7 @@
import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.SelfGateway; import org.apache.flink.runtime.rpc.SelfGateway;
import org.apache.flink.runtime.rpc.StartStoppable; import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.rpc.akka.messages.Shutdown;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand Down Expand Up @@ -257,8 +257,8 @@ public void stopServer(RpcGateway selfGateway) {


if (fromThisService) { if (fromThisService) {
ActorRef selfActorRef = akkaClient.getRpcEndpoint(); ActorRef selfActorRef = akkaClient.getRpcEndpoint();
LOG.info("Stopping RPC endpoint {}.", selfActorRef.path()); LOG.info("Trigger shut down of RPC endpoint {}.", selfActorRef.path());
selfActorRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); selfActorRef.tell(Shutdown.getInstance(), ActorRef.noSender());
} else { } else {
LOG.debug("RPC endpoint {} already stopped or from different RPC service"); LOG.debug("RPC endpoint {} already stopped or from different RPC service");
} }
Expand Down
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rpc.akka.messages;

import org.apache.flink.runtime.rpc.akka.AkkaRpcService;

/**
* Shut down message used to trigger the shut down of an AkkaRpcActor. This
* message is only intended for internal use by the {@link AkkaRpcService}.
*/
public final class Shutdown {

private static Shutdown instance = new Shutdown();

public static Shutdown getInstance() {
return instance;
}

private Shutdown() {}
}
Expand Up @@ -248,7 +248,7 @@ public void start() throws Exception {
* Called to shut down the TaskManager. The method closes all TaskManager services. * Called to shut down the TaskManager. The method closes all TaskManager services.
*/ */
@Override @Override
public void shutDown() throws Exception { public void postStop() throws Exception {
log.info("Stopping TaskManager {}.", getAddress()); log.info("Stopping TaskManager {}.", getAddress());


Exception exception = null; Exception exception = null;
Expand All @@ -272,7 +272,7 @@ public void shutDown() throws Exception {
fileCache.shutdown(); fileCache.shutdown();


try { try {
super.shutDown(); super.postStop();
} catch (Exception e) { } catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception); exception = ExceptionUtils.firstOrSuppressed(e, exception);
} }
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException; import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger;


import org.hamcrest.core.Is; import org.hamcrest.core.Is;
Expand Down Expand Up @@ -232,6 +233,43 @@ public void testExceptionPropagationFuturePiping() throws Exception {
} }
} }


/**
* Tests that exception thrown in the postStop method are returned by the termination
* future.
*/
@Test
public void testPostStopExceptionPropagation() throws Exception {
FailingPostStopEndpoint rpcEndpoint = new FailingPostStopEndpoint(akkaRpcService, "FailingPostStopEndpoint");
rpcEndpoint.start();

rpcEndpoint.shutDown();

Future<Void> terminationFuture = rpcEndpoint.getTerminationFuture();

try {
terminationFuture.get();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof FailingPostStopEndpoint.PostStopException);
}
}

/**
* Checks that the postStop callback is executed within the main thread.
*/
@Test
public void testPostStopExecutedByMainThread() throws Exception {
SimpleRpcEndpoint simpleRpcEndpoint = new SimpleRpcEndpoint(akkaRpcService, "SimpleRpcEndpoint");
simpleRpcEndpoint.start();

simpleRpcEndpoint.shutDown();

Future<Void> terminationFuture = simpleRpcEndpoint.getTerminationFuture();

// check that we executed the postStop method in the main thread, otherwise an exception
// would be thrown here.
terminationFuture.get();
}

// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Test Actors and Interfaces // Test Actors and Interfaces
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand Down Expand Up @@ -305,4 +343,41 @@ public void run() {
return future; return future;
} }
} }

// ------------------------------------------------------------------------

private static class SimpleRpcEndpoint extends RpcEndpoint<RpcGateway> {

protected SimpleRpcEndpoint(RpcService rpcService, String endpointId) {
super(rpcService, endpointId);
}

@Override
public void postStop() {
validateRunsInMainThread();
}
}

// ------------------------------------------------------------------------

private static class FailingPostStopEndpoint extends RpcEndpoint<RpcGateway> {

protected FailingPostStopEndpoint(RpcService rpcService, String endpointId) {
super(rpcService, endpointId);
}

@Override
public void postStop() throws Exception {
throw new PostStopException("Test exception.");
}

private static class PostStopException extends FlinkException {

private static final long serialVersionUID = 6701096588415871592L;

public PostStopException(String message) {
super(message);
}
}
}
} }

0 comments on commit 80468b1

Please sign in to comment.