diff --git a/docs/_includes/generated/akka_configuration.html b/docs/_includes/generated/akka_configuration.html index c77a39366ac92..2baeb6ff4f3fd 100644 --- a/docs/_includes/generated/akka_configuration.html +++ b/docs/_includes/generated/akka_configuration.html @@ -8,6 +8,12 @@ + +
akka.ask.callstack
+ true + Boolean + If true, call stack for asynchronous asks are captured. That way, when an ask fails (for example times out), you get a proper exception, describing to the original method call and call site. Note that in case of having millions of concurrent RPC calls, this may add to the memory footprint. +
akka.ask.timeout
"10 s" diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java index e71f18f5c9e4b..3864eefb4012a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java @@ -29,6 +29,18 @@ @PublicEvolving public class AkkaOptions { + /** + * Timeout for akka ask calls. + */ + public static final ConfigOption CAPTURE_ASK_CALLSTACK = ConfigOptions + .key("akka.ask.callstack") + .booleanType() + .defaultValue(true) + .withDescription("If true, call stack for asynchronous asks are captured. That way, when an ask fails " + + "(for example times out), you get a proper exception, describing to the original method call and " + + "call site. Note that in case of having millions of concurrent RPC calls, this may add to the " + + "memory footprint."); + /** * Timeout for akka ask calls. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index bc0da64cf63db..f67984d730d93 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -166,8 +166,6 @@ protected boolean isRunning() { /** * Triggers start of the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready * to process remote procedure calls. - * - * @throws Exception indicating that something went wrong while starting the RPC endpoint */ public final void start() { rpcServer.start(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index 44bfb3b3f9a48..9b775eaf84882 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -47,6 +47,7 @@ import java.lang.annotation.Annotation; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; +import java.util.Arrays; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; @@ -90,13 +91,16 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc @Nullable private final CompletableFuture terminationFuture; + private final boolean captureAskCallStack; + AkkaInvocationHandler( - String address, - String hostname, - ActorRef rpcEndpoint, - Time timeout, - long maximumFramesize, - @Nullable CompletableFuture terminationFuture) { + String address, + String hostname, + ActorRef rpcEndpoint, + Time timeout, + long maximumFramesize, + @Nullable CompletableFuture terminationFuture, + boolean captureAskCallStack) { this.address = Preconditions.checkNotNull(address); this.hostname = Preconditions.checkNotNull(hostname); @@ -105,6 +109,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc this.timeout = Preconditions.checkNotNull(timeout); this.maximumFramesize = maximumFramesize; this.terminationFuture = terminationFuture; + this.captureAskCallStack = captureAskCallStack; } @Override @@ -208,20 +213,20 @@ private Object invokeRpc(Method method, Object[] args) throws Exception { result = null; } else { + // Capture the call stack. It is significantly faster to do that via an exception than + // via Thread.getStackTrace(), because exceptions lazily initialize the stack trace, initially only + // capture a lightweight native pointer, and convert that into the stack trace lazily when needed. + final Throwable callStackCapture = captureAskCallStack ? new Throwable() : null; + // execute an asynchronous call - CompletableFuture resultFuture = ask(rpcInvocation, futureTimeout); - - CompletableFuture completableFuture = resultFuture.thenApply((Object o) -> { - if (o instanceof SerializedValue) { - try { - return ((SerializedValue) o).deserializeValue(getClass().getClassLoader()); - } catch (IOException | ClassNotFoundException e) { - throw new CompletionException( - new RpcException("Could not deserialize the serialized payload of RPC method : " - + methodName, e)); - } + final CompletableFuture resultFuture = ask(rpcInvocation, futureTimeout); + + final CompletableFuture completableFuture = new CompletableFuture<>(); + resultFuture.whenComplete((resultValue, failure) -> { + if (failure != null) { + completableFuture.completeExceptionally(resolveTimeoutException(failure, callStackCapture, method)); } else { - return o; + completableFuture.complete(deserializeValueIfNeeded(resultValue, method)); } }); @@ -370,4 +375,33 @@ public String getHostname() { public CompletableFuture getTerminationFuture() { return terminationFuture; } + + static Object deserializeValueIfNeeded(Object o, Method method) { + if (o instanceof SerializedValue) { + try { + return ((SerializedValue) o).deserializeValue(AkkaInvocationHandler.class.getClassLoader()); + } catch (IOException | ClassNotFoundException e) { + throw new CompletionException( + new RpcException( + "Could not deserialize the serialized payload of RPC method : " + method.getName(), e)); + } + } else { + return o; + } + } + + static Throwable resolveTimeoutException(Throwable exception, @Nullable Throwable callStackCapture, Method method) { + if (callStackCapture == null || (!(exception instanceof akka.pattern.AskTimeoutException))) { + return exception; + } + + final TimeoutException newException = new TimeoutException("Invocation of " + method + " timed out."); + newException.initCause(exception); + + // remove the stack frames coming from the proxy interface invocation + final StackTraceElement[] stackTrace = callStackCapture.getStackTrace(); + newException.setStackTrace(Arrays.copyOfRange(stackTrace, 3, stackTrace.length)); + + return newException; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 344f96b595276..0c41f053e68ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -98,6 +98,8 @@ public class AkkaRpcService implements RpcService { private final String address; private final int port; + private final boolean captureAskCallstacks; + private final ScheduledExecutor internalScheduledExecutor; private final CompletableFuture terminationFuture; @@ -122,6 +124,8 @@ public AkkaRpcService(final ActorSystem actorSystem, final AkkaRpcServiceConfigu port = -1; } + captureAskCallstacks = configuration.captureAskCallStack(); + internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem); terminationFuture = new CompletableFuture<>(); @@ -165,7 +169,8 @@ public CompletableFuture connect( actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), - null); + null, + captureAskCallstacks); }); } @@ -185,7 +190,8 @@ public > CompletableFuture configuration.getTimeout(), configuration.getMaximumFramesize(), null, - () -> fencingToken); + () -> fencingToken, + captureAskCallstacks); }); } @@ -247,7 +253,8 @@ public RpcServer startServer(C rpcEndpoint) configuration.getTimeout(), configuration.getMaximumFramesize(), terminationFuture, - ((FencedRpcEndpoint) rpcEndpoint)::getFencingToken); + ((FencedRpcEndpoint) rpcEndpoint)::getFencingToken, + captureAskCallstacks); implementedRpcGateways.add(FencedMainThreadExecutable.class); } else { @@ -257,7 +264,8 @@ public RpcServer startServer(C rpcEndpoint) actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), - terminationFuture); + terminationFuture, + captureAskCallstacks); } // Rather than using the System ClassLoader directly, we derive the ClassLoader @@ -285,7 +293,8 @@ public RpcServer fenceRpcServer(RpcServer rpcServer, F configuration.getTimeout(), configuration.getMaximumFramesize(), null, - () -> fencingToken); + () -> fencingToken, + captureAskCallstacks); // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java index 0c478a9cbc5c2..91b5a07e5a78d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.rpc.akka; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -38,11 +39,19 @@ public class AkkaRpcServiceConfiguration { private final long maximumFramesize; - public AkkaRpcServiceConfiguration(@Nonnull Configuration configuration, @Nonnull Time timeout, long maximumFramesize) { + private final boolean captureAskCallStack; + + public AkkaRpcServiceConfiguration( + @Nonnull Configuration configuration, + @Nonnull Time timeout, + long maximumFramesize, + boolean captureAskCallStack) { + checkArgument(maximumFramesize > 0L, "Maximum framesize must be positive."); this.configuration = configuration; this.timeout = timeout; this.maximumFramesize = maximumFramesize; + this.captureAskCallStack = captureAskCallStack; } @Nonnull @@ -59,12 +68,18 @@ public long getMaximumFramesize() { return maximumFramesize; } + public boolean captureAskCallStack() { + return captureAskCallStack; + } + public static AkkaRpcServiceConfiguration fromConfiguration(Configuration configuration) { final Time timeout = AkkaUtils.getTimeoutAsTime(configuration); final long maximumFramesize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration); - return new AkkaRpcServiceConfiguration(configuration, timeout, maximumFramesize); + final boolean captureAskCallStacks = configuration.get(AkkaOptions.CAPTURE_ASK_CALLSTACK); + + return new AkkaRpcServiceConfiguration(configuration, timeout, maximumFramesize, captureAskCallStacks); } public static AkkaRpcServiceConfiguration defaultConfiguration() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java index 564b1efa7c72f..1a15fe28036ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java @@ -61,8 +61,9 @@ public FencedAkkaInvocationHandler( Time timeout, long maximumFramesize, @Nullable CompletableFuture terminationFuture, - Supplier fencingTokenSupplier) { - super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture); + Supplier fencingTokenSupplier, + boolean captureAskCallStacks) { + super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture, captureAskCallStacks); this.fencingTokenSupplier = Preconditions.checkNotNull(fencingTokenSupplier); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java index 486816de8e80d..db7a2042c80b9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java @@ -29,8 +29,8 @@ /** * Remote rpc invocation message which is used when the actor communication is remote and, thus, the * message has to be serialized. - *

- * In order to fail fast and report an appropriate error message to the user, the method name, the + * + *

In order to fail fast and report an appropriate error message to the user, the method name, the * parameter types and the arguments are eagerly serialized. In case the invocation call * contains a non-serializable object, then an {@link IOException} is thrown. */ @@ -138,7 +138,7 @@ private void readObject(ObjectInputStream ois) throws IOException, ClassNotFound // ------------------------------------------------------------------- /** - * Wrapper class for the method invocation information + * Wrapper class for the method invocation information. */ private static final class MethodInvocation implements Serializable { private static final long serialVersionUID = 9187962608946082519L; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index 553b07ce924ed..3a1632cac27e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -51,7 +51,6 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; -import akka.pattern.AskTimeoutException; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -63,9 +62,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -249,7 +250,9 @@ public void testDelayedRegisterTaskExecutor() throws Exception { firstFuture.get(); fail("Should have failed because connection to taskmanager is delayed beyond timeout"); } catch (Exception e) { - assertThat(ExceptionUtils.stripExecutionException(e), instanceOf(AskTimeoutException.class)); + final Throwable cause = ExceptionUtils.stripExecutionException(e); + assertThat(cause, instanceOf(TimeoutException.class)); + assertThat(cause.getMessage(), containsString("ResourceManagerGateway.registerTaskExecutor")); } startConnection.await(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java new file mode 100644 index 0000000000000..445702608a33c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.RpcTimeout; +import org.apache.flink.util.IOUtils; + +import akka.actor.ActorSystem; +import akka.actor.Terminated; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Tests that ask timeouts report the call stack of the calling function. + */ +public class TimeoutCallStackTest { + + private static ActorSystem actorSystem; + private static RpcService rpcService; + + private final List endpointsToStop = new ArrayList<>(); + + @BeforeClass + public static void setup() { + actorSystem = AkkaUtils.createDefaultActorSystem(); + rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration()); + } + + @AfterClass + public static void teardown() throws Exception { + + final CompletableFuture rpcTerminationFuture = rpcService.stopService(); + final CompletableFuture actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate()); + + FutureUtils + .waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)) + .get(10_000, TimeUnit.MILLISECONDS); + } + + @After + public void stopTestEndpoints() { + endpointsToStop.forEach(IOUtils::closeQuietly); + } + + @Test + public void testTimeoutException() throws Exception { + final TestingGateway gateway = createTestingGateway(); + + final CompletableFuture future = gateway.callThatTimesOut(Time.milliseconds(1)); + + Throwable failureCause = null; + try { + future.get(); + fail("test buggy: the call should never have completed"); + } catch (ExecutionException e) { + failureCause = e.getCause(); + } + + assertThat(failureCause, instanceOf(TimeoutException.class)); + assertThat(failureCause.getMessage(), containsString("callThatTimesOut")); + assertThat(failureCause.getStackTrace()[0].getMethodName(), equalTo("testTimeoutException")); + } + + // ------------------------------------------------------------------------ + // setup helpers + // ------------------------------------------------------------------------ + + private TestingGateway createTestingGateway() throws Exception { + final TestingRpcEndpoint endpoint = new TestingRpcEndpoint(rpcService, "test_name"); + endpointsToStop.add(endpoint); + endpoint.start(); + + return rpcService.connect(endpoint.getAddress(), TestingGateway.class).get(); + } + + // ------------------------------------------------------------------------ + // testing mocks / stubs + // ------------------------------------------------------------------------ + + private interface TestingGateway extends RpcGateway { + + CompletableFuture callThatTimesOut(@RpcTimeout Time timeout); + } + + private static final class TestingRpcEndpoint extends RpcEndpoint implements TestingGateway { + + TestingRpcEndpoint(RpcService rpcService, String endpointId) { + super(rpcService, endpointId); + } + + @Override + public CompletableFuture callThatTimesOut(@RpcTimeout Time timeout) { + // return a future that never completes, so the call is guaranteed to time out + return new CompletableFuture<>(); + } + } +}