From 53848b9ce3fb00dd4c677ee91d7f9e316b5a03f5 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 26 Sep 2016 18:01:47 +0200 Subject: [PATCH] [FLINK-4687] [rpc] Add getAddress to RpcService --- .../org/apache/flink/runtime/rpc/RpcService.java | 8 ++++++++ .../flink/runtime/rpc/akka/AkkaRpcService.java | 16 ++++++++++++++++ .../flink/runtime/rpc/TestingRpcService.java | 5 +++-- .../runtime/rpc/TestingSerialRpcService.java | 6 ++++++ .../flink/runtime/rpc/akka/AkkaRpcActorTest.java | 8 ++++---- .../runtime/rpc/akka/AkkaRpcServiceTest.java | 5 +++++ 6 files changed, 42 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index 437e08b0a3e59..96844ed53fb79 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -32,6 +32,14 @@ */ public interface RpcService { + /** + * Return the address under which the rpc service can be reached. If the rpc service cannot be + * contacted remotely, then it will return an empty string. + * + * @return Address of the rpc service or empty string if local rpc service + */ + String getAddress(); + /** * Connect to a remote rpc server under the provided address. Returns a rpc gateway which can * be used to communicate with the rpc server. If the connection failed, then the returned diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index cee19c4d00b97..6825557199d4e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -22,6 +22,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; +import akka.actor.Address; import akka.actor.Identify; import akka.actor.PoisonPill; import akka.actor.Props; @@ -75,6 +76,8 @@ public class AkkaRpcService implements RpcService { private final Set actors = new HashSet<>(4); private final long maximumFramesize; + private final String address; + private volatile boolean stopped; public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) { @@ -87,6 +90,19 @@ public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) { // only local communication maximumFramesize = Long.MAX_VALUE; } + + Address actorSystemAddress = AkkaUtils.getAddress(actorSystem); + + if (actorSystemAddress.host().isDefined()) { + address = actorSystemAddress.host().get(); + } else { + address = ""; + } + } + + @Override + public String getAddress() { + return address; } // this method does not mutate state and is thus thread-safe diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java index f1640565519c2..47c9e24cdd855 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import java.net.UnknownHostException; import java.util.concurrent.ConcurrentHashMap; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -57,14 +58,14 @@ public class TestingRpcService extends AkkaRpcService { /** * Creates a new {@code TestingRpcService}. */ - public TestingRpcService() { + public TestingRpcService() throws UnknownHostException { this(new Configuration()); } /** * Creates a new {@code TestingRpcService}, using the given configuration. */ - public TestingRpcService(Configuration configuration) { + public TestingRpcService(Configuration configuration) throws UnknownHostException { super(AkkaUtils.createLocalActorSystem(configuration), Time.seconds(10)); this.registeredConnections = new ConcurrentHashMap<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java index c58ea205c953a..5b8e6e65cd061 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java @@ -30,6 +30,7 @@ import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; +import java.net.InetAddress; import java.util.BitSet; import java.util.UUID; import java.util.concurrent.Callable; @@ -120,6 +121,11 @@ public > C startServer(S rpcEndpo return self; } + @Override + public String getAddress() { + return ""; + } + @Override public Future connect(String address, Class clazz) { RpcGateway gateway = registeredConnections.get(address); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 1e8c9a64d3dac..5d76024c364ec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -133,7 +133,7 @@ public void testWrongGatewayEndpointConnection() throws Exception { Future futureGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class); - WrongRpcGateway gateway = Await.result(futureGateway, timeout.duration()); + WrongRpcGateway gateway = futureGateway.get(timeout.getSize(), timeout.getUnit()); // since it is a tell operation we won't receive a RpcConnectionException, it's only logged gateway.tell("foobar"); @@ -141,10 +141,10 @@ public void testWrongGatewayEndpointConnection() throws Exception { Future result = gateway.barfoo(); try { - Await.result(result, timeout.duration()); + result.get(timeout.getSize(), timeout.getUnit()); fail("We expected a RpcConnectionException."); - } catch (RpcConnectionException rpcConnectionException) { - // we expect this exception here + } catch (ExecutionException executionException) { + assertTrue(executionException.getCause() instanceof RpcConnectionException); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index 5550cb5701aa2..3388011b46fc5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -115,4 +115,9 @@ public Integer call() throws Exception { assertEquals(expected, actual); assertTrue(latch.isTriggered()); } + + @Test + public void testGetAddress() { + assertEquals(AkkaUtils.getAddress(actorSystem).host().get(), akkaRpcService.getAddress()); + } }