From f2fb3bc606fe433488b5b33eb932bf2c9254246f Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 9 Jan 2018 17:50:37 +0100 Subject: [PATCH] [FLINK-8392] [rpc] Let termination future be completed by AkkaRpcActor#postStop Revert the changes introduced by FLINK-7754. An RpcEndpoint's termination future is now completed from the AkkaRpcActor#postStop method. --- .../runtime/rpc/akka/AkkaBasedEndpoint.java | 11 +-- .../rpc/akka/AkkaInvocationHandler.java | 30 +++---- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 24 +++--- .../runtime/rpc/akka/AkkaRpcService.java | 79 +++---------------- .../rpc/akka/FencedAkkaInvocationHandler.java | 3 +- .../runtime/rpc/akka/FencedAkkaRpcActor.java | 6 +- .../runtime/rpc/akka/AkkaRpcServiceTest.java | 7 +- 7 files changed, 40 insertions(+), 120 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBasedEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBasedEndpoint.java index 499de1e1c86f3..74935076ad2bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBasedEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBasedEndpoint.java @@ -22,10 +22,8 @@ import akka.actor.ActorRef; -import java.util.concurrent.CompletableFuture; - /** - * Interface for Akka based rpc gateways + * Interface for Akka based rpc gateways. */ interface AkkaBasedEndpoint extends RpcGateway { @@ -35,11 +33,4 @@ interface AkkaBasedEndpoint extends RpcGateway { * @return the {@link ActorRef} of the underlying RPC actor */ ActorRef getActorRef(); - - /** - * Returns the internal termination future. - * - * @return Internal termination future - */ - CompletableFuture getInternalTerminationFuture(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index 37f46e3ff3155..863b780958d77 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -18,23 +18,24 @@ package org.apache.flink.runtime.rpc.akka; -import akka.actor.ActorRef; -import akka.pattern.Patterns; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.rpc.FencedRpcGateway; import org.apache.flink.runtime.rpc.MainThreadExecutable; -import org.apache.flink.runtime.rpc.RpcServer; import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcServer; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.StartStoppable; +import org.apache.flink.runtime.rpc.akka.messages.Processing; import org.apache.flink.runtime.rpc.messages.CallAsync; import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation; -import org.apache.flink.runtime.rpc.akka.messages.Processing; import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation; import org.apache.flink.runtime.rpc.messages.RpcInvocation; import org.apache.flink.runtime.rpc.messages.RunAsync; import org.apache.flink.util.Preconditions; + +import akka.actor.ActorRef; +import akka.pattern.Patterns; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,8 +50,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; -import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Invocation handler to be used with an {@link AkkaRpcActor}. The invocation handler wraps the @@ -85,18 +86,13 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc @Nullable private final CompletableFuture terminationFuture; - // null if gateway; otherwise non-null - @Nullable - private final CompletableFuture internalTerminationFuture; - AkkaInvocationHandler( String address, String hostname, ActorRef rpcEndpoint, Time timeout, long maximumFramesize, - @Nullable CompletableFuture terminationFuture, - @Nullable CompletableFuture internalTerminationFuture) { + @Nullable CompletableFuture terminationFuture) { this.address = Preconditions.checkNotNull(address); this.hostname = Preconditions.checkNotNull(hostname); @@ -105,7 +101,6 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc this.timeout = Preconditions.checkNotNull(timeout); this.maximumFramesize = maximumFramesize; this.terminationFuture = terminationFuture; - this.internalTerminationFuture = internalTerminationFuture; } @Override @@ -159,7 +154,7 @@ public void scheduleRunAsync(Runnable runnable, long delayMillis) { @Override public CompletableFuture callAsync(Callable callable, Time callTimeout) { - if(isLocal) { + if (isLocal) { @SuppressWarnings("unchecked") CompletableFuture resultFuture = (CompletableFuture) ask(new CallAsync(callable), callTimeout); @@ -208,7 +203,7 @@ private Object invokeRpc(Method method, Object[] args) throws Exception { tell(rpcInvocation); result = null; - } else if (Objects.equals(returnType,CompletableFuture.class)) { + } else if (Objects.equals(returnType, CompletableFuture.class)) { // execute an asynchronous call result = ask(rpcInvocation, futureTimeout); } else { @@ -298,7 +293,7 @@ private static Time extractRpcTimeout(Annotation[][] parameterAnnotations, Objec } /** - * Checks whether any of the annotations is of type {@link RpcTimeout} + * Checks whether any of the annotations is of type {@link RpcTimeout}. * * @param annotations Array of annotations * @return True if {@link RpcTimeout} was found; otherwise false @@ -349,9 +344,4 @@ public String getHostname() { public CompletableFuture getTerminationFuture() { return terminationFuture; } - - @Override - public CompletableFuture getInternalTerminationFuture() { - return internalTerminationFuture; - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index f7488abad7afd..da7ce35497819 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -53,14 +53,14 @@ /** * Akka rpc actor which receives {@link LocalRpcInvocation}, {@link RunAsync} and {@link CallAsync} * {@link Processing} messages. - *

- * The {@link LocalRpcInvocation} designates a rpc and is dispatched to the given {@link RpcEndpoint} + * + *

The {@link LocalRpcInvocation} designates a rpc and is dispatched to the given {@link RpcEndpoint} * instance. - *

- * The {@link RunAsync} and {@link CallAsync} messages contain executable code which is executed + * + *

The {@link RunAsync} and {@link CallAsync} messages contain executable code which is executed * in the context of the actor thread. - *

- * The {@link Processing} message controls the processing behaviour of the akka rpc actor. A + * + *

The {@link Processing} message controls the processing behaviour of the akka rpc actor. A * {@link Processing#START} starts processing incoming messages. A {@link Processing#STOP} message * stops processing messages. All messages which arrive when the processing is stopped, will be * discarded. @@ -68,7 +68,7 @@ * @param Type of the {@link RpcEndpoint} */ class AkkaRpcActor extends UntypedActor { - + protected final Logger log = LoggerFactory.getLogger(getClass()); /** the endpoint to invoke the methods on. */ @@ -77,12 +77,12 @@ class AkkaRpcActor extends UntypedActor { /** the helper that tracks whether calls come from the main thread. */ private final MainThreadValidatorUtil mainThreadValidator; - private final CompletableFuture internalTerminationFuture; + private final CompletableFuture terminationFuture; - AkkaRpcActor(final T rpcEndpoint, final CompletableFuture internalTerminationFuture) { + AkkaRpcActor(final T rpcEndpoint, final CompletableFuture terminationFuture) { this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint"); this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint); - this.internalTerminationFuture = checkNotNull(internalTerminationFuture); + this.terminationFuture = checkNotNull(terminationFuture); } @Override @@ -106,9 +106,9 @@ public void postStop() throws Exception { // Complete the termination future so that others know that we've stopped. if (shutdownThrowable != null) { - internalTerminationFuture.completeExceptionally(shutdownThrowable); + terminationFuture.completeExceptionally(shutdownThrowable); } else { - internalTerminationFuture.complete(null); + terminationFuture.complete(null); } } finally { mainThreadValidator.exitMainThread(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 68b5aaaf1853e..7ff08f7a901fb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -54,10 +54,8 @@ import java.io.Serializable; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; @@ -65,7 +63,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.function.Function; import scala.Option; @@ -165,7 +162,6 @@ public CompletableFuture connect( actorRef, timeout, maximumFramesize, - null, null); }); } @@ -186,7 +182,6 @@ public > CompletableFuture timeout, maximumFramesize, null, - null, () -> fencingToken); }); } @@ -196,13 +191,12 @@ public RpcServer startServer(C rpcEndpoint) checkNotNull(rpcEndpoint, "rpc endpoint"); CompletableFuture terminationFuture = new CompletableFuture<>(); - CompletableFuture internalTerminationFuture = new CompletableFuture<>(); final Props akkaRpcActorProps; if (rpcEndpoint instanceof FencedRpcEndpoint) { - akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, internalTerminationFuture); + akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture); } else { - akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, internalTerminationFuture); + akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture); } ActorRef actorRef; @@ -240,7 +234,6 @@ public RpcServer startServer(C rpcEndpoint) timeout, maximumFramesize, terminationFuture, - internalTerminationFuture, ((FencedRpcEndpoint) rpcEndpoint)::getFencingToken); implementedRpcGateways.add(FencedMainThreadExecutable.class); @@ -251,8 +244,7 @@ public RpcServer startServer(C rpcEndpoint) actorRef, timeout, maximumFramesize, - terminationFuture, - internalTerminationFuture); + terminationFuture); } // Rather than using the System ClassLoader directly, we derive the ClassLoader @@ -280,7 +272,6 @@ public RpcServer fenceRpcServer(RpcServer rpcServer, F timeout, maximumFramesize, null, - null, () -> fencingToken); // Rather than using the System ClassLoader directly, we derive the ClassLoader @@ -300,43 +291,19 @@ public RpcServer fenceRpcServer(RpcServer rpcServer, F @Override public void stopServer(RpcServer selfGateway) { if (selfGateway instanceof AkkaBasedEndpoint) { - AkkaBasedEndpoint akkaClient = (AkkaBasedEndpoint) selfGateway; + final AkkaBasedEndpoint akkaClient = (AkkaBasedEndpoint) selfGateway; + final RpcEndpoint rpcEndpoint; - boolean fromThisService; synchronized (lock) { if (stopped) { return; } else { - fromThisService = actors.remove(akkaClient.getActorRef()) != null; + rpcEndpoint = actors.remove(akkaClient.getActorRef()); } } - if (fromThisService) { - ActorRef selfActorRef = akkaClient.getActorRef(); - LOG.info("Trigger shut down of RPC endpoint {}.", selfGateway.getAddress()); - - CompletableFuture akkaTerminationFuture = FutureUtils.toJava( - Patterns.gracefulStop( - selfActorRef, - FutureUtils.toFiniteDuration(timeout), - Kill.getInstance())); - - akkaTerminationFuture - .thenCombine( - akkaClient.getInternalTerminationFuture(), - (Boolean terminated, Void ignored) -> true) - .whenComplete( - (Boolean terminated, Throwable throwable) -> { - if (throwable != null) { - LOG.debug("Graceful RPC endpoint shutdown failed. Shutting endpoint down hard now.", throwable); - - actorSystem.stop(selfActorRef); - selfGateway.getTerminationFuture().completeExceptionally(throwable); - } else { - LOG.info("RPC endpoint {} has been shut down.", selfGateway.getAddress()); - selfGateway.getTerminationFuture().complete(null); - } - }); + if (rpcEndpoint != null) { + akkaClient.getActorRef().tell(Kill.getInstance(), ActorRef.noSender()); } else { LOG.debug("RPC endpoint {} already stopped or from different RPC service", selfGateway.getAddress()); } @@ -347,8 +314,6 @@ public void stopServer(RpcServer selfGateway) { public void stopService() { LOG.info("Stopping Akka RPC service."); - final List actorsToTerminate; - synchronized (lock) { if (stopped) { return; @@ -356,35 +321,13 @@ public void stopService() { stopped = true; - actorSystem.shutdown(); - - actorsToTerminate = new ArrayList<>(actors.values()); - - actors.clear(); } + actorSystem.shutdown(); actorSystem.awaitTermination(); - // complete the termination futures of all actors - for (RpcEndpoint rpcEndpoint : actorsToTerminate) { - final CompletableFuture terminationFuture = rpcEndpoint.getTerminationFuture(); - - AkkaBasedEndpoint akkaBasedEndpoint = rpcEndpoint.getSelfGateway(AkkaBasedEndpoint.class); - - CompletableFuture internalTerminationFuture = akkaBasedEndpoint.getInternalTerminationFuture(); - - internalTerminationFuture.whenComplete( - (Void ignored, Throwable throwable) -> { - if (throwable != null) { - terminationFuture.completeExceptionally(throwable); - } else { - terminationFuture.complete(true); - } - }); - - // make sure that if the internal termination futures haven't completed yet, then they time out - internalTerminationFuture.completeExceptionally( - new TimeoutException("The RpcEndpoint " + rpcEndpoint.getAddress() + " did not terminate in time.")); + synchronized (lock) { + actors.clear(); } LOG.info("Stopped Akka RPC service."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java index 03534aeceb522..3ca75e23cdfc6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java @@ -61,9 +61,8 @@ public FencedAkkaInvocationHandler( Time timeout, long maximumFramesize, @Nullable CompletableFuture terminationFuture, - @Nullable CompletableFuture internalTerminationFuture, Supplier fencingTokenSupplier) { - super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture, internalTerminationFuture); + super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture); this.fencingTokenSupplier = Preconditions.checkNotNull(fencingTokenSupplier); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java index fa83e4f4d936a..57280fdd8b09f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java @@ -19,9 +19,9 @@ package org.apache.flink.runtime.rpc.akka; import org.apache.flink.runtime.rpc.FencedRpcEndpoint; +import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException; import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; -import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.messages.FencedMessage; import org.apache.flink.runtime.rpc.messages.UnfencedMessage; @@ -38,8 +38,8 @@ */ public class FencedAkkaRpcActor & RpcGateway> extends AkkaRpcActor { - public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture internalTerminationFuture) { - super(rpcEndpoint, internalTerminationFuture); + public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture terminationFuture) { + super(rpcEndpoint, terminationFuture); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index a5c41ef15abb4..c4259f402413f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -137,12 +137,9 @@ public void testGetPort() { /** * Tests that we can wait for the termination of the rpc service - * - * @throws ExecutionException - * @throws InterruptedException */ @Test(timeout = 60000) - public void testTerminationFuture() throws ExecutionException, InterruptedException { + public void testTerminationFuture() throws Exception { final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); final AkkaRpcService rpcService = new AkkaRpcService(actorSystem, Time.milliseconds(1000)); @@ -150,7 +147,7 @@ public void testTerminationFuture() throws ExecutionException, InterruptedExcept assertFalse(terminationFuture.isDone()); - CompletableFuture.runAsync(() -> rpcService.stopService(), actorSystem.dispatcher()); + CompletableFuture.runAsync(rpcService::stopService, actorSystem.dispatcher()); terminationFuture.get(); }