From 1971f26cc5d0afa4022b3daa93eacd9f9541b072 Mon Sep 17 00:00:00 2001 From: "wenlong.lwl" Date: Wed, 17 Aug 2016 22:09:54 +0800 Subject: [PATCH 1/5] 1. remove restriction of getAddress in RpcService 2. add support of Endpoint inherit and RpcGateway interface inherit 3. add some unittest for AkkaRpcService, remove JobMaster Register Test 4. add error handle method RpcEndpoint, which is a common function for RpcEndpoint --- .../apache/flink/runtime/rpc/RpcEndpoint.java | 31 +++++ .../apache/flink/runtime/rpc/RpcService.java | 8 +- .../flink/runtime/rpc/akka/AkkaGateway.java | 5 +- .../rpc/akka/AkkaInvocationHandler.java | 2 +- .../runtime/rpc/akka/AkkaRpcService.java | 22 ++-- .../runtime/rpc/RpcCompletenessTest.java | 23 +++- .../runtime/rpc/akka/AkkaRpcServiceTest.java | 112 ++++++++++++------ 7 files changed, 148 insertions(+), 55 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index a28bc1437c020..795302fc6bd3d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -217,6 +217,36 @@ public Future callAsync(Callable callable, Timeout timeout) { return ((MainThreadExecutor) self).callAsync(callable, timeout); } + // ------------------------------------------------------------------------ + // Error handling + // ------------------------------------------------------------------------ + + /** + * Notifies the Endpoint that a fatal error has occurred and it cannot proceed. + * This method should be used when asynchronous threads want to notify the + * Endpoint of a fatal error. + * + * @param t The exception describing the fatal error + */ + public void onFatalErrorAsync(final Throwable t) { + runAsync(new Runnable() { + @Override + public void run() { + onFatalError(t); + } + }); + } + + /** + * Notifies the Endpoint that a fatal error has occurred and it cannot proceed. + * This method must only be called from within the TaskExecutor's main thread. + * + * @param t The exception describing the fatal error + */ + public void onFatalError(Throwable t) { + log.error("FATAL ERROR", t); + }; + // ------------------------------------------------------------------------ // Main Thread Validation // ------------------------------------------------------------------------ @@ -277,4 +307,5 @@ public ExecutionContext prepare() { return this; } } + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index fabdb057514c3..eb96e12ff833a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -44,7 +44,7 @@ public interface RpcService { /** * Start a rpc server which forwards the remote procedure calls to the provided rpc endpoint. * - * @param rpcEndpoint Rpc protocl to dispath the rpcs to + * @param rpcEndpoint Rpc protocol to dispatch the rpcs to * @param Type of the rpc endpoint * @param Type of the self rpc gateway associated with the rpc server * @return Self gateway to dispatch remote procedure calls to oneself @@ -65,15 +65,15 @@ public interface RpcService { void stopService(); /** - * Get the fully qualified address of the underlying rpc server represented by the self gateway. + * Get the fully qualified address of the underlying rpc server represented by the gateway. * It must be possible to connect from a remote host to the rpc server via the returned fully * qualified address. * - * @param selfGateway Self gateway associated with the underlying rpc server + * @param gateway RpcGateway associated with the underlying rpc server * @param Type of the rpc gateway * @return Fully qualified address */ - String getAddress(C selfGateway); + String getAddress(C gateway); /** * Gets the execution context, provided by this RPC service. This execution diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java index f6125dc0a99da..335ffd749771b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java @@ -26,5 +26,8 @@ */ interface AkkaGateway extends RpcGateway { - ActorRef getRpcEndpoint(); + ActorRef getActorRef(); + + interface ServerGateway extends AkkaGateway {}; + interface ClientGateway extends AkkaGateway {}; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index 524bf7480097d..4ffbf02ffbabf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -139,7 +139,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl } @Override - public ActorRef getRpcEndpoint() { + public ActorRef getActorRef() { return rpcEndpoint; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index d987c2f0361c0..82d7fda48fef5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -112,7 +112,7 @@ public C apply(Object obj) { @SuppressWarnings("unchecked") C proxy = (C) Proxy.newProxyInstance( classLoader, - new Class[] {clazz}, + new Class[] {clazz, AkkaGateway.ClientGateway.class}, akkaInvocationHandler); return proxy; @@ -149,7 +149,7 @@ public > C startServer(S rpcEndpo rpcEndpoint.getSelfGatewayType(), MainThreadExecutor.class, StartStoppable.class, - AkkaGateway.class}, + AkkaGateway.ServerGateway.class}, akkaInvocationHandler); return self; @@ -157,7 +157,7 @@ public > C startServer(S rpcEndpo @Override public void stopServer(RpcGateway selfGateway) { - if (selfGateway instanceof AkkaGateway) { + if (selfGateway instanceof AkkaGateway.ServerGateway) { AkkaGateway akkaClient = (AkkaGateway) selfGateway; boolean fromThisService; @@ -165,17 +165,21 @@ public void stopServer(RpcGateway selfGateway) { if (stopped) { return; } else { - fromThisService = actors.remove(akkaClient.getRpcEndpoint()); + fromThisService = actors.remove(akkaClient.getActorRef()); } } if (fromThisService) { - ActorRef selfActorRef = akkaClient.getRpcEndpoint(); + ActorRef selfActorRef = akkaClient.getActorRef(); LOG.info("Stopping RPC endpoint {}.", selfActorRef.path()); selfActorRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); } else { - LOG.debug("RPC endpoint {} already stopped or from different RPC service"); + LOG.debug("RPC endpoint {} already stopped or from different RPC service", + getAddress(selfGateway)); } + } else { + LOG.warn("RPC endpoint {} is not a self gateway of the endpoint", + getAddress(selfGateway)); } } @@ -197,11 +201,11 @@ public void stopService() { } @Override - public String getAddress(RpcGateway selfGateway) { + public String getAddress(RpcGateway gateway) { checkState(!stopped, "RpcService is stopped"); - if (selfGateway instanceof AkkaGateway) { - ActorRef actorRef = ((AkkaGateway) selfGateway).getRpcEndpoint(); + if (gateway instanceof AkkaGateway) { + ActorRef actorRef = ((AkkaGateway) gateway).getActorRef(); return AkkaUtils.getAkkaURL(actorSystem, actorRef); } else { String className = AkkaGateway.class.getName(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java index 97cf0cb35524e..23e9fb2dc3f08 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java @@ -65,8 +65,8 @@ public void testRpcCompleteness() { } private void checkCompleteness(Class rpcEndpoint, Class rpcGateway) { - Method[] gatewayMethods = rpcGateway.getDeclaredMethods(); - Method[] serverMethods = rpcEndpoint.getDeclaredMethods(); + Method[] gatewayMethods = getInterfaceMethods(rpcGateway).toArray(new Method[0]); + Method[] serverMethods = rpcEndpoint.getMethods(); Map> rpcMethods = new HashMap<>(); Set unmatchedRpcMethods = new HashSet<>(); @@ -337,4 +337,23 @@ private static Class resolvePrimitiveType(Class primitveType) { throw new RuntimeException("Could not retrive basic type information for primitive type " + primitveType + '.'); } } + + /** + * Extract all methods defined by a interface + * @param interfaceClass the given interface class + * @return all methods defined by the given interface + */ + private List getInterfaceMethods(Class interfaceClass) { + if(!interfaceClass.isInterface()) { + fail(interfaceClass.getName() + "is not a interface"); + } + ArrayList allMethods = new ArrayList<>(); + for(Method method : interfaceClass.getDeclaredMethods()) { + allMethods.add(method); + } + for(Class superClass : interfaceClass.getInterfaces()) { + allMethods.addAll(getInterfaceMethods(superClass)); + } + return allMethods; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index 7b4ab89a84c24..adca197235945 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -21,8 +21,15 @@ import akka.actor.ActorSystem; import akka.util.Timeout; +import org.apache.flink.api.common.JobID; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcMethod; +import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.jobmaster.JobMaster; import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager; @@ -31,13 +38,19 @@ import org.junit.AfterClass; import org.junit.Test; +import org.mockito.Mockito; +import org.powermock.reflect.Whitebox; +import scala.concurrent.Await; +import scala.concurrent.Future; import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -81,50 +94,73 @@ public void run() { assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay); } - // ------------------------------------------------------------------------ - // specific component tests - should be moved to the test classes - // for those components - // ------------------------------------------------------------------------ - - /** - * Tests that the {@link JobMaster} can connect to the {@link ResourceManager} using the - * {@link AkkaRpcService}. - */ - @Test - public void testJobMasterResourceManagerRegistration() throws Exception { - Timeout akkaTimeout = new Timeout(10, TimeUnit.SECONDS); - ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); - ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem(); - AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, akkaTimeout); - AkkaRpcService akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaTimeout); - ExecutorService executorService = new ForkJoinPool(); - - ResourceManager resourceManager = new ResourceManager(akkaRpcService, executorService); - JobMaster jobMaster = new JobMaster(akkaRpcService2, executorService); - - resourceManager.start(); - jobMaster.start(); - - ResourceManagerGateway rm = resourceManager.getSelf(); + private interface TestRpcGateway extends RpcGateway { + Future inc(Integer input); + } - assertTrue(rm instanceof AkkaGateway); + private static class TestRpcEndpoint extends RpcEndpoint { - AkkaGateway akkaClient = (AkkaGateway) rm; + /** + * Initializes the RPC endpoint. + * + * @param rpcService The RPC server that dispatches calls to this RPC endpoint. + */ + protected TestRpcEndpoint(RpcService rpcService) { + super(rpcService); + } - - jobMaster.registerAtResourceManager(AkkaUtils.getAkkaURL(actorSystem, akkaClient.getRpcEndpoint())); + @RpcMethod + public Integer inc(Integer input) { + return input + 1; + } + } - // wait for successful registration - FiniteDuration timeout = new FiniteDuration(200, TimeUnit.SECONDS); - Deadline deadline = timeout.fromNow(); + @Test + public void testAkkaRpcGateway() throws Exception { + ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); + AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, new Timeout(10, TimeUnit.SECONDS)); + TestRpcEndpoint endpoint = new TestRpcEndpoint(akkaRpcService); + String address = endpoint.getAddress(); + endpoint.start(); + + FiniteDuration akkaTimeout = new FiniteDuration(10, TimeUnit.SECONDS); + TestRpcGateway client1 = + Await.result(akkaRpcService.connect(address, TestRpcGateway.class), akkaTimeout); + TestRpcGateway client2 = + Await.result(akkaRpcService.connect(address, TestRpcGateway.class), akkaTimeout); + assertEquals(akkaRpcService.getAddress(client1), akkaRpcService.getAddress(client2)); + assertEquals(new Integer(11), Await.result(client1.inc(10), akkaTimeout)); + assertEquals(new Integer(21), Await.result(client2.inc(20), akkaTimeout)); + } - while (deadline.hasTimeLeft() && !jobMaster.isConnected()) { - Thread.sleep(100); + @Test + public void testStopServer() throws Exception { + ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); + AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, new Timeout(10, TimeUnit.SECONDS)); + TestRpcEndpoint endpoint = new TestRpcEndpoint(akkaRpcService); + String address = endpoint.getAddress(); + endpoint.start(); + + // ensure server is working + FiniteDuration akkaTimeout = new FiniteDuration(10, TimeUnit.SECONDS); + TestRpcGateway client1 = + Await.result(akkaRpcService.connect(address, TestRpcGateway.class), akkaTimeout); + assertEquals(new Integer(11), Await.result(client1.inc(10), akkaTimeout)); + + // try to stop a client, server still work + akkaRpcService.stopServer(client1); + assertEquals(new Integer(11), Await.result(client1.inc(10), akkaTimeout)); + + // stop the server, server stopped + akkaRpcService.stopServer(endpoint.getSelf()); + + boolean caught = false; + try { + Await.result(client1.inc(10), akkaTimeout); + } catch (Exception e) { + caught = true; } + assertTrue(caught); - assertFalse(deadline.isOverdue()); - - jobMaster.shutDown(); - resourceManager.shutDown(); } } From bd1f8b1877ca6e7df6e7ed73d70846d6e598c06b Mon Sep 17 00:00:00 2001 From: "wenlong.lwl" Date: Wed, 17 Aug 2016 22:18:23 +0800 Subject: [PATCH 2/5] remove error process method since had been added in RpcEndpoint --- .../rpc/taskexecutor/TaskExecutor.java | 25 +------------------ 1 file changed, 1 insertion(+), 24 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java index 1a637bb05f4f9..55461638d74dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java @@ -118,30 +118,7 @@ public void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLe // ------------------------------------------------------------------------ // Error handling // ------------------------------------------------------------------------ - - /** - * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed. - * This method should be used when asynchronous threads want to notify the - * TaskExecutor of a fatal error. - * - * @param t The exception describing the fatal error - */ - void onFatalErrorAsync(final Throwable t) { - runAsync(new Runnable() { - @Override - public void run() { - onFatalError(t); - } - }); - } - - /** - * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed. - * This method must only be called from within the TaskExecutor's main thread. - * - * @param t The exception describing the fatal error - */ - void onFatalError(Throwable t) { + public void onFatalError(Throwable t) { // to be determined, probably delegate to a fatal error handler that // would either log (mini cluster) ot kill the process (yarn, mesos, ...) log.error("FATAL ERROR", t); From 7c61c84ff8458ae8c07933acc4b36edc867449c5 Mon Sep 17 00:00:00 2001 From: "wenlong.lwl" Date: Thu, 18 Aug 2016 00:03:24 +0800 Subject: [PATCH 3/5] revert mixed changes --- .../apache/flink/runtime/rpc/RpcEndpoint.java | 30 ------------------- .../rpc/taskexecutor/TaskExecutor.java | 25 +++++++++++++++- .../runtime/rpc/RpcCompletenessTest.java | 4 +-- 3 files changed, 26 insertions(+), 33 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 795302fc6bd3d..78b18e4a84142 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -217,36 +217,6 @@ public Future callAsync(Callable callable, Timeout timeout) { return ((MainThreadExecutor) self).callAsync(callable, timeout); } - // ------------------------------------------------------------------------ - // Error handling - // ------------------------------------------------------------------------ - - /** - * Notifies the Endpoint that a fatal error has occurred and it cannot proceed. - * This method should be used when asynchronous threads want to notify the - * Endpoint of a fatal error. - * - * @param t The exception describing the fatal error - */ - public void onFatalErrorAsync(final Throwable t) { - runAsync(new Runnable() { - @Override - public void run() { - onFatalError(t); - } - }); - } - - /** - * Notifies the Endpoint that a fatal error has occurred and it cannot proceed. - * This method must only be called from within the TaskExecutor's main thread. - * - * @param t The exception describing the fatal error - */ - public void onFatalError(Throwable t) { - log.error("FATAL ERROR", t); - }; - // ------------------------------------------------------------------------ // Main Thread Validation // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java index 55461638d74dc..042a4e326b580 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java @@ -118,7 +118,30 @@ public void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLe // ------------------------------------------------------------------------ // Error handling // ------------------------------------------------------------------------ - public void onFatalError(Throwable t) { + + /** + * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed. + * This method should be used when asynchronous threads want to notify the + * TaskExecutor of a fatal error. + * + * @param t The exception describing the fatal error + */ + void onFatalErrorAsync(final Throwable t) { + runAsync(new Runnable() { + @Override + public void run() { + onFatalError(t); + } + }); + } + + /** + * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed. + * This method must only be called from within the TaskExecutor's main thread. + * + * @param t The exception describing the fatal error + */ + void onFatalError(Throwable t) { // to be determined, probably delegate to a fatal error handler that // would either log (mini cluster) ot kill the process (yarn, mesos, ...) log.error("FATAL ERROR", t); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java index 23e9fb2dc3f08..6047b1f57342f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java @@ -65,8 +65,8 @@ public void testRpcCompleteness() { } private void checkCompleteness(Class rpcEndpoint, Class rpcGateway) { - Method[] gatewayMethods = getInterfaceMethods(rpcGateway).toArray(new Method[0]); - Method[] serverMethods = rpcEndpoint.getMethods(); + Method[] gatewayMethods = rpcGateway.getDeclaredMethods(); + Method[] serverMethods = rpcEndpoint.getDeclaredMethods(); Map> rpcMethods = new HashMap<>(); Set unmatchedRpcMethods = new HashSet<>(); From a0bcfcf69637ef8f4264af48fc26c2ea55e268b9 Mon Sep 17 00:00:00 2001 From: "wenlong.lwl" Date: Thu, 18 Aug 2016 00:26:27 +0800 Subject: [PATCH 4/5] add connected akka gateway to actors cache --- .../runtime/rpc/akka/AkkaRpcService.java | 7 +++- .../runtime/rpc/akka/AkkaRpcServiceTest.java | 33 +++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 82d7fda48fef5..667ccbe9c3d50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -101,6 +101,7 @@ public Future connect(final String address, final Clas @Override public C apply(Object obj) { ActorRef actorRef = ((ActorIdentity) obj).getRef(); + actors.add(actorRef); InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(actorRef, timeout, maximumFramesize); @@ -206,7 +207,11 @@ public String getAddress(RpcGateway gateway) { if (gateway instanceof AkkaGateway) { ActorRef actorRef = ((AkkaGateway) gateway).getActorRef(); - return AkkaUtils.getAkkaURL(actorSystem, actorRef); + if(actors.contains(actorRef)) { + return AkkaUtils.getAkkaURL(actorSystem, actorRef); + } else { + throw new IllegalArgumentException("Cannot get address for created by other actor system."); + } } else { String className = AkkaGateway.class.getName(); throw new IllegalArgumentException("Cannot get address for non " + className + '.'); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index adca197235945..3104f90e028fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -133,6 +133,39 @@ public void testAkkaRpcGateway() throws Exception { assertEquals(new Integer(21), Await.result(client2.inc(20), akkaTimeout)); } + @Test + public void testMultiActorSystem() throws Exception { + ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); + ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem(); + ActorSystem actorSystem3 = AkkaUtils.createDefaultActorSystem(); + AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, new Timeout(10, TimeUnit.SECONDS)); + AkkaRpcService akkaRpcService2 = new AkkaRpcService(actorSystem2, new Timeout(10, TimeUnit.SECONDS)); + AkkaRpcService akkaRpcService3 = new AkkaRpcService(actorSystem3, new Timeout(10, TimeUnit.SECONDS)); + TestRpcEndpoint endpoint = new TestRpcEndpoint(akkaRpcService); + TestRpcEndpoint endpoint2 = new TestRpcEndpoint(akkaRpcService2); + endpoint.start(); + endpoint2.start(); + + FiniteDuration akkaTimeout = new FiniteDuration(10, TimeUnit.SECONDS); + TestRpcGateway client1 = + Await.result(akkaRpcService3.connect(endpoint.getAddress(), TestRpcGateway.class), akkaTimeout); + TestRpcGateway client2 = + Await.result(akkaRpcService3.connect(endpoint2.getAddress(), TestRpcGateway.class), akkaTimeout); + assertEquals(endpoint.getAddress(), akkaRpcService3.getAddress(client1)); + assertEquals(endpoint2.getAddress(), akkaRpcService3.getAddress(client2)); + assertEquals(new Integer(11), Await.result(client1.inc(10), akkaTimeout)); + assertEquals(new Integer(21), Await.result(client2.inc(20), akkaTimeout)); + + boolean caught = false; + try { + akkaRpcService.getAddress(client2); + } catch (IllegalArgumentException e) { + caught = true; + } + assertTrue(caught); + + } + @Test public void testStopServer() throws Exception { ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); From d33743a73b5ca7d61f7a5d66778ca833246584ad Mon Sep 17 00:00:00 2001 From: "wenlong.lwl" Date: Thu, 18 Aug 2016 23:47:55 +0800 Subject: [PATCH 5/5] do some revert missed for the mixed changes --- .../apache/flink/runtime/rpc/RpcEndpoint.java | 1 - .../runtime/rpc/RpcCompletenessTest.java | 19 ------------------- 2 files changed, 20 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 78b18e4a84142..a28bc1437c020 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -277,5 +277,4 @@ public ExecutionContext prepare() { return this; } } - } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java index 6047b1f57342f..97cf0cb35524e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java @@ -337,23 +337,4 @@ private static Class resolvePrimitiveType(Class primitveType) { throw new RuntimeException("Could not retrive basic type information for primitive type " + primitveType + '.'); } } - - /** - * Extract all methods defined by a interface - * @param interfaceClass the given interface class - * @return all methods defined by the given interface - */ - private List getInterfaceMethods(Class interfaceClass) { - if(!interfaceClass.isInterface()) { - fail(interfaceClass.getName() + "is not a interface"); - } - ArrayList allMethods = new ArrayList<>(); - for(Method method : interfaceClass.getDeclaredMethods()) { - allMethods.add(method); - } - for(Class superClass : interfaceClass.getInterfaces()) { - allMethods.addAll(getInterfaceMethods(superClass)); - } - return allMethods; - } }