From b52fb356d48850453956d2f6c4e52ba8fe2da8ab Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 21 Sep 2016 15:18:27 +0200 Subject: [PATCH] [FLINK-4580] [rpc] Report rpc invocation exceptions to the caller --- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 53 +++++++++++-------- .../runtime/rpc/akka/AkkaRpcActorTest.java | 34 ++++++++++++ 2 files changed, 66 insertions(+), 21 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 2373be9414ed2..92964470f9199 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation; import org.apache.flink.runtime.rpc.akka.messages.RunAsync; +import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,11 +85,11 @@ public void onReceive(final Object message) { unstashAll(); getContext().become(new Procedure() { @Override - public void apply(Object message) throws Exception { - if (message.equals(Processing.STOP)) { + public void apply(Object msg) throws Exception { + if (msg.equals(Processing.STOP)) { getContext().unbecome(); } else { - handleMessage(message); + handleMessage(msg); } } }); @@ -128,21 +129,36 @@ private void handleMessage(Object message) { * @param rpcInvocation Rpc invocation message */ private void handleRpcInvocation(RpcInvocation rpcInvocation) { + Method rpcMethod = null; + try { String methodName = rpcInvocation.getMethodName(); Class[] parameterTypes = rpcInvocation.getParameterTypes(); - Method rpcMethod = lookupRpcMethod(methodName, parameterTypes); + rpcMethod = lookupRpcMethod(methodName, parameterTypes); + } catch(ClassNotFoundException e) { + LOG.error("Could not load method arguments.", e); + + RpcConnectionException rpcException = new RpcConnectionException("Could not load method arguments.", e); + getSender().tell(new Status.Failure(rpcException), getSelf()); + } catch (IOException e) { + LOG.error("Could not deserialize rpc invocation message.", e); + + RpcConnectionException rpcException = new RpcConnectionException("Could not deserialize rpc invocation message.", e); + getSender().tell(new Status.Failure(rpcException), getSelf()); + } catch (final NoSuchMethodException e) { + LOG.error("Could not find rpc method for rpc invocation.", e); + + RpcConnectionException rpcException = new RpcConnectionException("Could not find rpc method for rpc invocation.", e); + getSender().tell(new Status.Failure(rpcException), getSelf()); + } - if (rpcMethod.getReturnType().equals(Void.TYPE)) { - // No return value to send back - try { + if (rpcMethod != null) { + try { + if (rpcMethod.getReturnType().equals(Void.TYPE)) { + // No return value to send back rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); - } catch (Throwable e) { - LOG.error("Error while executing remote procedure call {}.", rpcMethod, e); - } - } else { - try { + } else { Object result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); if (result instanceof Future) { @@ -152,17 +168,12 @@ private void handleRpcInvocation(RpcInvocation rpcInvocation) { // tell the sender the result of the computation getSender().tell(new Status.Success(result), getSelf()); } - } catch (Throwable e) { - // tell the sender about the failure - getSender().tell(new Status.Failure(e), getSelf()); } + } catch (Throwable e) { + LOG.error("Error while executing remote procedure call {}.", rpcMethod, e); + // tell the sender about the failure + getSender().tell(new Status.Failure(e), getSelf()); } - } catch(ClassNotFoundException e) { - LOG.error("Could not load method arguments.", e); - } catch (IOException e) { - LOG.error("Could not deserialize rpc invocation message.", e); - } catch (final NoSuchMethodException e) { - LOG.error("Could not find rpc method for rpc invocation: {}.", rpcInvocation, e); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index a6ceb9104a1ef..fcba350d29b3b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -118,10 +118,44 @@ public void testMessageStashing() throws Exception { rpcEndpoint.shutDown(); } + /** + * Tests that we receive a RpcConnectionException when calling a rpc method (with return type) + * on a wrong rpc endpoint. + * + * @throws Exception + */ + @Test + public void testWrongGatewayEndpointConnection() throws Exception { + DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService); + + rpcEndpoint.start(); + + Future futureGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class); + + WrongRpcGateway gateway = Await.result(futureGateway, timeout.duration()); + + // since it is a tell operation we won't receive a RpcConnectionException, it's only logged + gateway.tell("foobar"); + + Future result = gateway.barfoo(); + + try { + Await.result(result, timeout.duration()); + fail("We expected a RpcConnectionException."); + } catch (RpcConnectionException rpcConnectionException) { + // we expect this exception here + } + } + private interface DummyRpcGateway extends RpcGateway { Future foobar(); } + private interface WrongRpcGateway extends RpcGateway { + Future barfoo(); + void tell(String message); + } + private static class DummyRpcEndpoint extends RpcEndpoint { private volatile int _foobar = 42;