Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/_includes/generated/akka_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>akka.ask.callstack</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>If true, call stack for asynchronous asks are captured. That way, when an ask fails (for example times out), you get a proper exception, describing to the original method call and call site. Note that in case of having millions of concurrent RPC calls, this may add to the memory footprint.</td>
</tr>
<tr>
<td><h5>akka.ask.timeout</h5></td>
<td style="word-wrap: break-word;">"10 s"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@
@PublicEvolving
public class AkkaOptions {

/**
* Timeout for akka ask calls.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forget to modify this :) ?

*/
public static final ConfigOption<Boolean> CAPTURE_ASK_CALLSTACK = ConfigOptions
.key("akka.ask.callstack")
.booleanType()
.defaultValue(true)
.withDescription("If true, call stack for asynchronous asks are captured. That way, when an ask fails " +
"(for example times out), you get a proper exception, describing to the original method call and " +
"call site. Note that in case of having millions of concurrent RPC calls, this may add to the " +
"memory footprint.");

/**
* Timeout for akka ask calls.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ protected boolean isRunning() {
/**
* Triggers start of the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready
* to process remote procedure calls.
*
* @throws Exception indicating that something went wrong while starting the RPC endpoint
*/
public final void start() {
rpcServer.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -90,13 +91,16 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
@Nullable
private final CompletableFuture<Void> terminationFuture;

private final boolean captureAskCallStack;

AkkaInvocationHandler(
String address,
String hostname,
ActorRef rpcEndpoint,
Time timeout,
long maximumFramesize,
@Nullable CompletableFuture<Void> terminationFuture) {
String address,
String hostname,
ActorRef rpcEndpoint,
Time timeout,
long maximumFramesize,
@Nullable CompletableFuture<Void> terminationFuture,
boolean captureAskCallStack) {

this.address = Preconditions.checkNotNull(address);
this.hostname = Preconditions.checkNotNull(hostname);
Expand All @@ -105,6 +109,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
this.timeout = Preconditions.checkNotNull(timeout);
this.maximumFramesize = maximumFramesize;
this.terminationFuture = terminationFuture;
this.captureAskCallStack = captureAskCallStack;
}

@Override
Expand Down Expand Up @@ -208,20 +213,20 @@ private Object invokeRpc(Method method, Object[] args) throws Exception {

result = null;
} else {
// Capture the call stack. It is significantly faster to do that via an exception than
// via Thread.getStackTrace(), because exceptions lazily initialize the stack trace, initially only
// capture a lightweight native pointer, and convert that into the stack trace lazily when needed.
final Throwable callStackCapture = captureAskCallStack ? new Throwable() : null;

// execute an asynchronous call
CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);

CompletableFuture<?> completableFuture = resultFuture.thenApply((Object o) -> {
if (o instanceof SerializedValue) {
try {
return ((SerializedValue<?>) o).deserializeValue(getClass().getClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new CompletionException(
new RpcException("Could not deserialize the serialized payload of RPC method : "
+ methodName, e));
}
final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);

final CompletableFuture<Object> completableFuture = new CompletableFuture<>();
resultFuture.whenComplete((resultValue, failure) -> {
if (failure != null) {
completableFuture.completeExceptionally(resolveTimeoutException(failure, callStackCapture, method));
} else {
return o;
completableFuture.complete(deserializeValueIfNeeded(resultValue, method));
}
});

Expand Down Expand Up @@ -370,4 +375,33 @@ public String getHostname() {
public CompletableFuture<Void> getTerminationFuture() {
return terminationFuture;
}

static Object deserializeValueIfNeeded(Object o, Method method) {
if (o instanceof SerializedValue) {
try {
return ((SerializedValue<?>) o).deserializeValue(AkkaInvocationHandler.class.getClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new CompletionException(
new RpcException(
"Could not deserialize the serialized payload of RPC method : " + method.getName(), e));
}
} else {
return o;
}
}

static Throwable resolveTimeoutException(Throwable exception, @Nullable Throwable callStackCapture, Method method) {
if (callStackCapture == null || (!(exception instanceof akka.pattern.AskTimeoutException))) {
return exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The null branch implies that the exception type is dependent on the configuration option, which sounds like an accident waiting to happen if someone wants to handle timeouts explicitly as they would have to know what there are 2 types of timeouts that can occur.

How problematic would it be to always create a TimeoutException, and only have different behaviors for how the stacktrace is set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be fine.
The main reason that I wanted this configurable is to have a way to turn call stack capture off, because during large RPC storms, this might add significantly to the memory consumption.
But always using a TimeoutException and referring to the RPC method name should be uncritical.

}

final TimeoutException newException = new TimeoutException("Invocation of " + method + " timed out.");
newException.initCause(exception);

// remove the stack frames coming from the proxy interface invocation
final StackTraceElement[] stackTrace = callStackCapture.getStackTrace();
newException.setStackTrace(Arrays.copyOfRange(stackTrace, 3, stackTrace.length));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we filter the stack trace containing o.a.f.runtime.rpc.akka instead of using 3? Because others may change call stack of this function unintentionally in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to initially keep it like that. There is a test that guards this, so we should see it if someone breaks this by accident.

The check is slightly trickier than it looks, because the package name of the proxy class depends on visibility keyword of the interface, etc.

I would suggest to keep it simple for now and fix it when the test tells us that this simple solution is not enough. Then we should also know which cases cause it to break, what the involved classnames and package names are, etc.


return newException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public class AkkaRpcService implements RpcService {
private final String address;
private final int port;

private final boolean captureAskCallstacks;

private final ScheduledExecutor internalScheduledExecutor;

private final CompletableFuture<Void> terminationFuture;
Expand All @@ -122,6 +124,8 @@ public AkkaRpcService(final ActorSystem actorSystem, final AkkaRpcServiceConfigu
port = -1;
}

captureAskCallstacks = configuration.captureAskCallStack();

internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem);

terminationFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -165,7 +169,8 @@ public <C extends RpcGateway> CompletableFuture<C> connect(
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
null);
null,
captureAskCallstacks);
});
}

Expand All @@ -185,7 +190,8 @@ public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture
configuration.getTimeout(),
configuration.getMaximumFramesize(),
null,
() -> fencingToken);
() -> fencingToken,
captureAskCallstacks);
});
}

Expand Down Expand Up @@ -247,7 +253,8 @@ public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint)
configuration.getTimeout(),
configuration.getMaximumFramesize(),
terminationFuture,
((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);
((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
captureAskCallstacks);

implementedRpcGateways.add(FencedMainThreadExecutable.class);
} else {
Expand All @@ -257,7 +264,8 @@ public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint)
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
terminationFuture);
terminationFuture,
captureAskCallstacks);
}

// Rather than using the System ClassLoader directly, we derive the ClassLoader
Expand Down Expand Up @@ -285,7 +293,8 @@ public <F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F
configuration.getTimeout(),
configuration.getMaximumFramesize(),
null,
() -> fencingToken);
() -> fencingToken,
captureAskCallstacks);

// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.runtime.rpc.akka;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;

Expand All @@ -38,11 +39,19 @@ public class AkkaRpcServiceConfiguration {

private final long maximumFramesize;

public AkkaRpcServiceConfiguration(@Nonnull Configuration configuration, @Nonnull Time timeout, long maximumFramesize) {
private final boolean captureAskCallStack;

public AkkaRpcServiceConfiguration(
@Nonnull Configuration configuration,
@Nonnull Time timeout,
long maximumFramesize,
boolean captureAskCallStack) {

checkArgument(maximumFramesize > 0L, "Maximum framesize must be positive.");
this.configuration = configuration;
this.timeout = timeout;
this.maximumFramesize = maximumFramesize;
this.captureAskCallStack = captureAskCallStack;
}

@Nonnull
Expand All @@ -59,12 +68,18 @@ public long getMaximumFramesize() {
return maximumFramesize;
}

public boolean captureAskCallStack() {
return captureAskCallStack;
}

public static AkkaRpcServiceConfiguration fromConfiguration(Configuration configuration) {
final Time timeout = AkkaUtils.getTimeoutAsTime(configuration);

final long maximumFramesize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);

return new AkkaRpcServiceConfiguration(configuration, timeout, maximumFramesize);
final boolean captureAskCallStacks = configuration.get(AkkaOptions.CAPTURE_ASK_CALLSTACK);

return new AkkaRpcServiceConfiguration(configuration, timeout, maximumFramesize, captureAskCallStacks);
}

public static AkkaRpcServiceConfiguration defaultConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ public FencedAkkaInvocationHandler(
Time timeout,
long maximumFramesize,
@Nullable CompletableFuture<Void> terminationFuture,
Supplier<F> fencingTokenSupplier) {
super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture);
Supplier<F> fencingTokenSupplier,
boolean captureAskCallStacks) {
super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture, captureAskCallStacks);

this.fencingTokenSupplier = Preconditions.checkNotNull(fencingTokenSupplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
/**
* Remote rpc invocation message which is used when the actor communication is remote and, thus, the
* message has to be serialized.
* <p>
* In order to fail fast and report an appropriate error message to the user, the method name, the
*
* <p>In order to fail fast and report an appropriate error message to the user, the method name, the
* parameter types and the arguments are eagerly serialized. In case the invocation call
* contains a non-serializable object, then an {@link IOException} is thrown.
*/
Expand Down Expand Up @@ -138,7 +138,7 @@ private void readObject(ObjectInputStream ois) throws IOException, ClassNotFound
// -------------------------------------------------------------------

/**
* Wrapper class for the method invocation information
* Wrapper class for the method invocation information.
*/
private static final class MethodInvocation implements Serializable {
private static final long serialVersionUID = 9187962608946082519L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;

import akka.pattern.AskTimeoutException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -63,9 +62,11 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -249,7 +250,9 @@ public void testDelayedRegisterTaskExecutor() throws Exception {
firstFuture.get();
fail("Should have failed because connection to taskmanager is delayed beyond timeout");
} catch (Exception e) {
assertThat(ExceptionUtils.stripExecutionException(e), instanceOf(AskTimeoutException.class));
final Throwable cause = ExceptionUtils.stripExecutionException(e);
assertThat(cause, instanceOf(TimeoutException.class));
assertThat(cause.getMessage(), containsString("ResourceManagerGateway.registerTaskExecutor"));
}

startConnection.await();
Expand Down
Loading