From 3117989b87c5e9a002251353a47271fed0a84271 Mon Sep 17 00:00:00 2001 From: beyond1920 Date: Sat, 27 Aug 2016 14:14:28 +0800 Subject: [PATCH 1/6] leader election of resourcemanager --- .../HighAvailabilityServices.java | 7 ++ .../highavailability/NonHaServices.java | 5 + .../rpc/resourcemanager/ResourceManager.java | 116 ++++++++++++++++-- .../TestingHighAvailabilityServices.java | 19 ++- .../ResourceManagerHATest.java | 86 +++++++++++++ 5 files changed, 224 insertions(+), 9 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index 73e4f1fe6d5c6..298147cf1dce1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -39,6 +39,13 @@ public interface HighAvailabilityServices { */ LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception; + /** + * Gets the leader election service for the cluster's resource manager. + * @return + * @throws Exception + */ + LeaderElectionService getResourceManagerLeaderElectionService() throws Exception; + /** * Gets the leader election service for the given job. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java index 3d2769bab1630..292a404118665 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java @@ -60,6 +60,11 @@ public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Excepti return new StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0)); } + @Override + public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + return new StandaloneLeaderElectionService(); + } + @Override public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { return new StandaloneLeaderElectionService(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java index 6f34465740760..0ada4a6b87dfd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java @@ -20,24 +20,26 @@ import akka.dispatch.Mapper; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.jobmaster.JobMaster; import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway; import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorRegistrationSuccess; -import org.apache.flink.util.Preconditions; -import scala.concurrent.ExecutionContext; -import scala.concurrent.ExecutionContext$; import scala.concurrent.Future; import java.util.HashMap; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ExecutorService; + +import static org.apache.flink.util.Preconditions.checkNotNull; /** * ResourceManager implementation. The resource manager is responsible for resource de-/allocation @@ -50,16 +52,40 @@ * */ public class ResourceManager extends RpcEndpoint { - private final ExecutionContext executionContext; private final Map jobMasterGateways; + private final HighAvailabilityServices highAvailabilityServices; + private LeaderElectionService leaderElectionService = null; + private UUID leaderSessionID = null; - public ResourceManager(RpcService rpcService, ExecutorService executorService) { + public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices) { super(rpcService); - this.executionContext = ExecutionContext$.MODULE$.fromExecutor( - Preconditions.checkNotNull(executorService)); + this.highAvailabilityServices = checkNotNull(highAvailabilityServices); this.jobMasterGateways = new HashMap<>(); } + @Override + public void start() { + // start a leader + try { + super.start(); + leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService(); + leaderElectionService.start(new ResourceManagerLeaderContender()); + } catch (Throwable e) { + log.error("A fatal error happened when starting the ResourceManager", e); + throw new RuntimeException("A fatal error happened when starting the ResourceManager", e); + } + } + + /** + * Gets the leader session id of current resourceManager. + * + * @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership. + */ + @VisibleForTesting + UUID getLeaderSessionID() { + return leaderSessionID; + } + /** * Register a {@link JobMaster} at the resource manager. * @@ -116,4 +142,78 @@ public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTa return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000); } + + /** + * Callback method when current resourceManager is granted leadership + * + * @param newLeaderSessionID unique leadershipID + */ + void handleGrantLeadership(final UUID newLeaderSessionID) { + runAsync(new Runnable() { + @Override + public void run() { + log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), newLeaderSessionID); + leaderSessionID = newLeaderSessionID; + // confirming the leader session ID might be blocking, + leaderElectionService.confirmLeaderSessionID(newLeaderSessionID); + } + }); + } + + /** + * Callback method when current resourceManager lose leadership. + */ + void handleRevokeLeadership() { + runAsync(new Runnable() { + @Override + public void run() { + log.info("ResourceManager {} was revoked leadership.", getAddress()); + jobMasterGateways.clear(); + leaderSessionID = null; + } + }); + } + + /** + * Callback method when an error happened to current resourceManager on leader election + * @param e + */ + void onLeaderElectionError(final Throwable e) { + runAsync(new Runnable() { + @Override + public void run() { + log.error("ResourceManager received an error from the LeaderElectionService.", e); + // terminate ResourceManager in case of an error + shutDown(); + } + }); + } + + private class ResourceManagerLeaderContender implements LeaderContender { + + @Override + public void grantLeadership(UUID leaderSessionID) { + handleGrantLeadership(leaderSessionID); + } + + @Override + public void revokeLeadership() { + handleRevokeLeadership(); + } + + @Override + public String getAddress() { + return getAddress(); + } + + /** + * Handles error occurring in the leader election service + * + * @param exception Exception being thrown in the leader election service + */ + @Override + public void handleError(Exception exception) { + onLeaderElectionError(exception); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index 4d654a390d37f..3162f40391ea5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -32,6 +32,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices private volatile LeaderElectionService jobMasterLeaderElectionService; + private volatile LeaderElectionService resourceManagerLeaderElectionService; + // ------------------------------------------------------------------------ // Setters for mock / testing implementations @@ -44,7 +46,11 @@ public void setResourceManagerLeaderRetriever(LeaderRetrievalService resourceMan public void setJobMasterLeaderElectionService(LeaderElectionService leaderElectionService) { this.jobMasterLeaderElectionService = leaderElectionService; } - + + public void setResourceManagerLeaderElectionService(LeaderElectionService leaderElectionService) { + this.resourceManagerLeaderElectionService = leaderElectionService; + } + // ------------------------------------------------------------------------ // HA Services Methods // ------------------------------------------------------------------------ @@ -69,4 +75,15 @@ public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) thro throw new IllegalStateException("JobMasterLeaderElectionService has not been set"); } } + + @Override + public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + LeaderElectionService service = resourceManagerLeaderElectionService; + + if (service != null) { + return service; + } else { + throw new IllegalStateException("ResourceManagerLeaderElectionService has not been set"); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java new file mode 100644 index 0000000000000..b7f08995e26d4 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.resourcemanager; + +import akka.util.Timeout; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; + +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * resourceManager HA test, including grant leadership and revoke leadership + */ +public class ResourceManagerHATest { + + private RpcService rpcService; + + @Before + public void setup() throws Exception { + rpcService = new TestingRpcService(); + } + + @After + public void teardown() throws Exception { + rpcService.stopService(); + } + + @Test + public void testGrantAndRevokeLeadership() throws Exception { + TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); + TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService); + + final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices); + resourceManager.start(); + // before grant leadership, resourceManager's leaderId is null + Assert.assertNull(resourceManager.getLeaderSessionID()); + final UUID leaderId = UUID.randomUUID(); + leaderElectionService.isLeader(leaderId); + // after grant leadership, resourceManager's leaderId has value + Assert.assertEquals(getLatestLeaderId(resourceManager), leaderId); + // then revoke leadership, resourceManager's leaderId is null again + leaderElectionService.notLeader(); + Assert.assertNull(getLatestLeaderId(resourceManager)); + } + + + private UUID getLatestLeaderId(final ResourceManager resourceManager) throws Exception { + Timeout timeout = new Timeout(200, TimeUnit.MILLISECONDS); + Future actualLeaderIdFuture = resourceManager.callAsync(new Callable() { + @Override + public UUID call() throws Exception { + return resourceManager.getLeaderSessionID(); + } + }, timeout); + UUID actualValue = Await.result(actualLeaderIdFuture, timeout.duration()); + return actualValue; + } + +} From e4b50335a03badfb6bbd7b149fad3193c77e5300 Mon Sep 17 00:00:00 2001 From: beyond1920 Date: Tue, 30 Aug 2016 15:16:59 +0800 Subject: [PATCH 2/6] add serial rpc service --- .../runtime/rpc/TestingSerialRpcService.java | 369 ++++++++++++++++++ 1 file changed, 369 insertions(+) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java new file mode 100644 index 0000000000000..d36ccaa932408 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import akka.dispatch.ExecutionContexts; +import akka.dispatch.Futures; +import akka.util.Timeout; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Preconditions; +import org.apache.log4j.Logger; +import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.lang.annotation.Annotation; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.BitSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class TestingSerialRpcService implements RpcService { + private final ScheduledExecutorService executorService; + private final ConcurrentHashMap registeredConnections; + + public TestingSerialRpcService() { + executorService = Executors.newSingleThreadScheduledExecutor(); + this.registeredConnections = new ConcurrentHashMap<>(); + } + + // ------------------------------------------------------------------------ + // connections + // ------------------------------------------------------------------------ + + public void registerGateway(String address, RpcGateway gateway) { + checkNotNull(address); + checkNotNull(gateway); + + if (registeredConnections.putIfAbsent(address, gateway) != null) { + throw new IllegalStateException("a gateway is already registered under " + address); + } + } + + @Override + public Future connect(String address, Class clazz) { + RpcGateway gateway = registeredConnections.get(address); + + if (gateway != null) { + if (clazz.isAssignableFrom(gateway.getClass())) { + @SuppressWarnings("unchecked") + C typedGateway = (C) gateway; + return Futures.successful(typedGateway); + } else { + return Futures.failed( + new Exception("Gateway registered under " + address + " is not of type " + clazz)); + } + } else { + return Futures.failed(new Exception("No gateway registered under that name")); + } + } + + @Override + public void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) { + executorService.schedule(runnable, delay, unit); + } + + @Override + public ExecutionContext getExecutionContext() { + return ExecutionContexts.fromExecutorService(executorService); + } + + @Override + public void stopService() { + executorService.shutdown(); + registeredConnections.clear(); + } + + @Override + public void stopServer(C selfGateway) { + + } + + @Override + public > C startServer(S rpcEndpoint) { + InvocationHandler akkaInvocationHandler = new TestingSerialInvocationHandler(rpcEndpoint, executorService); + + ClassLoader classLoader = getClass().getClassLoader(); + + @SuppressWarnings("unchecked") + C self = (C) Proxy.newProxyInstance( + classLoader, + new Class[]{ + rpcEndpoint.getSelfGatewayType(), + MainThreadExecutor.class, + StartStoppable.class, + RpcGateway.class}, + akkaInvocationHandler); + + return self; + } + + static class TestingSerialInvocationHandler> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable { + private static final Logger LOG = Logger.getLogger(TestingSerialInvocationHandler.class); + + private final T rpcEndpoint; + private final ScheduledExecutorService executorService; + private final Timeout timeout; + + public TestingSerialInvocationHandler(T rpcEndpoint, ScheduledExecutorService executorService) { + this(rpcEndpoint, executorService, new Timeout(new FiniteDuration(10, TimeUnit.SECONDS))); + } + + public TestingSerialInvocationHandler(T rpcEndpoint, ScheduledExecutorService executorService, Timeout timeout) { + this.rpcEndpoint = rpcEndpoint; + this.executorService = executorService; + this.timeout = timeout; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + Class declaringClass = method.getDeclaringClass(); + if (declaringClass.equals(MainThreadExecutor.class) || + declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) || + declaringClass.equals(RpcGateway.class)) { + return method.invoke(this, args); + } else { + final String methodName = method.getName(); + Class[] parameterTypes = method.getParameterTypes(); + Annotation[][] parameterAnnotations = method.getParameterAnnotations(); + Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout); + + final Tuple2[], Object[]> filteredArguments = filterArguments( + parameterTypes, + parameterAnnotations, + args); + + Class returnType = method.getReturnType(); + + if (returnType.equals(Void.TYPE)) { + return handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout); + } else if (returnType.equals(Future.class)) { + try { + Object result = handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout); + return Futures.successful(result); + } catch(Throwable e) { + return Futures.failed(e); + } + } else { + return handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout); + } + } + } + + /** + * Handle rpc invocations by looking up the rpc method on the rpc endpoint and calling this + * method with the provided method arguments. If the method has a return value, it is returned + * to the sender of the call. + * + */ + private Object handleRpcInvocationSync(final String methodName, + final Class[] parameterTypes, + final Object[] args, + final Timeout futureTimeout) throws Exception { + final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes); + ScheduledFuture scheduleFuture = executorService.schedule(new Callable() { + @Override + public Object call() { + try { + return rpcMethod.invoke(rpcEndpoint, args); + } catch(Throwable e) { + throw new RuntimeException(e); + } + } + }, 0, TimeUnit.MILLISECONDS); + + Class returnType = rpcMethod.getReturnType(); + if(returnType.equals(Void.TYPE)) { + scheduleFuture.get(futureTimeout.duration().toMillis(), TimeUnit.MILLISECONDS); + return null; + } else if(returnType.equals(Future.class)){ + Future futureResult = (Future)scheduleFuture.get(); + return Await.result(futureResult, futureTimeout.duration()); + } else { + return scheduleFuture.get(futureTimeout.duration().toMillis(), TimeUnit.MILLISECONDS); + } + } + + @Override + public void runAsync(Runnable runnable) { + executorService.execute(runnable); + } + + @Override + public Future callAsync(Callable callable, Timeout callTimeout) { + ScheduledFuture future = executorService.schedule(callable, 0, TimeUnit.MILLISECONDS); + try{ + V result = future.get(callTimeout.duration().toMillis(), TimeUnit.MILLISECONDS); + return Futures.successful(result); + } catch(Throwable e) { + return Futures.failed(e); + } + } + + @Override + public void scheduleRunAsync(Runnable runnable, long delay) { + executorService.schedule(runnable, delay, TimeUnit.MILLISECONDS); + } + + @Override + public void start() { + + } + + @Override + public void stop() { + + } + + @Override + public String getAddress() { + return null; + } + + /** + * Look up the rpc method on the given {@link RpcEndpoint} instance. + * + * @param methodName Name of the method + * @param parameterTypes Parameter types of the method + * @return Method of the rpc endpoint + * @throws NoSuchMethodException Thrown if the method with the given name and parameter types + * cannot be found at the rpc endpoint + */ + private Method lookupRpcMethod(final String methodName, final Class[] parameterTypes) throws NoSuchMethodException { + return rpcEndpoint.getClass().getMethod(methodName, parameterTypes); + } + + // ------------------------------------------------------------------------ + // Helper methods + // ------------------------------------------------------------------------ + + /** + * Extracts the {@link RpcTimeout} annotated rpc timeout value from the list of given method + * arguments. If no {@link RpcTimeout} annotated parameter could be found, then the default + * timeout is returned. + * + * @param parameterAnnotations Parameter annotations + * @param args Array of arguments + * @param defaultTimeout Default timeout to return if no {@link RpcTimeout} annotated parameter + * has been found + * @return Timeout extracted from the array of arguments or the default timeout + */ + private static Timeout extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, Timeout defaultTimeout) { + if (args != null) { + Preconditions.checkArgument(parameterAnnotations.length == args.length); + + for (int i = 0; i < parameterAnnotations.length; i++) { + if (isRpcTimeout(parameterAnnotations[i])) { + if (args[i] instanceof FiniteDuration) { + return new Timeout((FiniteDuration) args[i]); + } else { + throw new RuntimeException("The rpc timeout parameter must be of type " + + FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() + + " is not supported."); + } + } + } + } + + return defaultTimeout; + } + + /** + * Removes all {@link RpcTimeout} annotated parameters from the parameter type and argument + * list. + * + * @param parameterTypes Array of parameter types + * @param parameterAnnotations Array of parameter annotations + * @param args Arary of arguments + * @return Tuple of filtered parameter types and arguments which no longer contain the + * {@link RpcTimeout} annotated parameter types and arguments + */ + private static Tuple2[], Object[]> filterArguments( + Class[] parameterTypes, + Annotation[][] parameterAnnotations, + Object[] args) { + + Class[] filteredParameterTypes; + Object[] filteredArgs; + + if (args == null) { + filteredParameterTypes = parameterTypes; + filteredArgs = null; + } else { + Preconditions.checkArgument(parameterTypes.length == parameterAnnotations.length); + Preconditions.checkArgument(parameterAnnotations.length == args.length); + + BitSet isRpcTimeoutParameter = new BitSet(parameterTypes.length); + int numberRpcParameters = parameterTypes.length; + + for (int i = 0; i < parameterTypes.length; i++) { + if (isRpcTimeout(parameterAnnotations[i])) { + isRpcTimeoutParameter.set(i); + numberRpcParameters--; + } + } + + if (numberRpcParameters == parameterTypes.length) { + filteredParameterTypes = parameterTypes; + filteredArgs = args; + } else { + filteredParameterTypes = new Class[numberRpcParameters]; + filteredArgs = new Object[numberRpcParameters]; + int counter = 0; + + for (int i = 0; i < parameterTypes.length; i++) { + if (!isRpcTimeoutParameter.get(i)) { + filteredParameterTypes[counter] = parameterTypes[i]; + filteredArgs[counter] = args[i]; + counter++; + } + } + } + } + + return Tuple2.of(filteredParameterTypes, filteredArgs); + } + + /** + * Checks whether any of the annotations is of type {@link RpcTimeout} + * + * @param annotations Array of annotations + * @return True if {@link RpcTimeout} was found; otherwise false + */ + private static boolean isRpcTimeout(Annotation[] annotations) { + for (Annotation annotation : annotations) { + if (annotation.annotationType().equals(RpcTimeout.class)) { + return true; + } + } + + return false; + } + + } +} From 5bdec107edb21f14323b8d77ff42e19d42ecee63 Mon Sep 17 00:00:00 2001 From: beyond1920 Date: Tue, 30 Aug 2016 19:44:04 +0800 Subject: [PATCH 3/6] add a special rpcService implementation which directly executes the asynchronous calls serially one by one, it is just for testcase --- .../runtime/rpc/TestingSerialRpcService.java | 151 +++++++----------- .../ResourceManagerHATest.java | 25 ++- 2 files changed, 68 insertions(+), 108 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java index d36ccaa932408..e752b98abc70d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java @@ -22,8 +22,8 @@ import akka.dispatch.Futures; import akka.util.Timeout; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.util.DirectExecutorService; import org.apache.flink.util.Preconditions; -import org.apache.log4j.Logger; import scala.concurrent.Await; import scala.concurrent.ExecutionContext; import scala.concurrent.Future; @@ -34,60 +34,35 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.BitSet; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import static org.apache.flink.util.Preconditions.checkNotNull; -public class TestingSerialRpcService implements RpcService { - private final ScheduledExecutorService executorService; +/** + * An RPC Service implementation for testing. This RPC service directly executes all asynchronous calls one by one in the main thread. + */ +public class TestingSerialRpcService extends TestingRpcService { + + private final DirectExecutorService executorService; private final ConcurrentHashMap registeredConnections; public TestingSerialRpcService() { - executorService = Executors.newSingleThreadScheduledExecutor(); + executorService = new DirectExecutorService(); this.registeredConnections = new ConcurrentHashMap<>(); } - // ------------------------------------------------------------------------ - // connections - // ------------------------------------------------------------------------ - - public void registerGateway(String address, RpcGateway gateway) { - checkNotNull(address); - checkNotNull(gateway); - - if (registeredConnections.putIfAbsent(address, gateway) != null) { - throw new IllegalStateException("a gateway is already registered under " + address); - } - } - @Override - public Future connect(String address, Class clazz) { - RpcGateway gateway = registeredConnections.get(address); - - if (gateway != null) { - if (clazz.isAssignableFrom(gateway.getClass())) { - @SuppressWarnings("unchecked") - C typedGateway = (C) gateway; - return Futures.successful(typedGateway); - } else { - return Futures.failed( - new Exception("Gateway registered under " + address + " is not of type " + clazz)); - } - } else { - return Futures.failed(new Exception("No gateway registered under that name")); + public void scheduleRunnable(final Runnable runnable, final long delay, final TimeUnit unit) { + try { + unit.sleep(delay); + runnable.run(); + } catch (Throwable e) { + throw new RuntimeException(e); } } - @Override - public void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) { - executorService.schedule(runnable, delay, unit); - } - @Override public ExecutionContext getExecutionContext() { return ExecutionContexts.fromExecutorService(executorService); @@ -100,14 +75,15 @@ public void stopService() { } @Override - public void stopServer(C selfGateway) { + public void stopServer(RpcGateway selfGateway) { } @Override public > C startServer(S rpcEndpoint) { - InvocationHandler akkaInvocationHandler = new TestingSerialInvocationHandler(rpcEndpoint, executorService); + final String address = UUID.randomUUID().toString(); + InvocationHandler akkaInvocationHandler = new TestingSerialInvocationHandler(address, rpcEndpoint); ClassLoader classLoader = getClass().getClassLoader(); @SuppressWarnings("unchecked") @@ -123,21 +99,23 @@ public > C startServer(S rpcEndpo return self; } - static class TestingSerialInvocationHandler> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable { - private static final Logger LOG = Logger.getLogger(TestingSerialInvocationHandler.class); + private static class TestingSerialInvocationHandler> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable { private final T rpcEndpoint; - private final ScheduledExecutorService executorService; + + /** default timeout for asks */ private final Timeout timeout; - public TestingSerialInvocationHandler(T rpcEndpoint, ScheduledExecutorService executorService) { - this(rpcEndpoint, executorService, new Timeout(new FiniteDuration(10, TimeUnit.SECONDS))); + private final String address; + + private TestingSerialInvocationHandler(String address, T rpcEndpoint) { + this(address, rpcEndpoint, new Timeout(new FiniteDuration(10, TimeUnit.SECONDS))); } - public TestingSerialInvocationHandler(T rpcEndpoint, ScheduledExecutorService executorService, Timeout timeout) { + private TestingSerialInvocationHandler(String address, T rpcEndpoint, Timeout timeout) { this.rpcEndpoint = rpcEndpoint; - this.executorService = executorService; this.timeout = timeout; + this.address = address; } @Override @@ -160,13 +138,11 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl Class returnType = method.getReturnType(); - if (returnType.equals(Void.TYPE)) { - return handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout); - } else if (returnType.equals(Future.class)) { + if (returnType.equals(Future.class)) { try { Object result = handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout); return Futures.successful(result); - } catch(Throwable e) { + } catch (Throwable e) { return Futures.failed(e); } } else { @@ -179,55 +155,45 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl * Handle rpc invocations by looking up the rpc method on the rpc endpoint and calling this * method with the provided method arguments. If the method has a return value, it is returned * to the sender of the call. - * */ private Object handleRpcInvocationSync(final String methodName, final Class[] parameterTypes, final Object[] args, final Timeout futureTimeout) throws Exception { final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes); - ScheduledFuture scheduleFuture = executorService.schedule(new Callable() { - @Override - public Object call() { - try { - return rpcMethod.invoke(rpcEndpoint, args); - } catch(Throwable e) { - throw new RuntimeException(e); - } - } - }, 0, TimeUnit.MILLISECONDS); - - Class returnType = rpcMethod.getReturnType(); - if(returnType.equals(Void.TYPE)) { - scheduleFuture.get(futureTimeout.duration().toMillis(), TimeUnit.MILLISECONDS); - return null; - } else if(returnType.equals(Future.class)){ - Future futureResult = (Future)scheduleFuture.get(); - return Await.result(futureResult, futureTimeout.duration()); + Object result = rpcMethod.invoke(rpcEndpoint, args); + + if (result != null && result instanceof Future) { + Future future = (Future) result; + return Await.result(future, futureTimeout.duration()); } else { - return scheduleFuture.get(futureTimeout.duration().toMillis(), TimeUnit.MILLISECONDS); + return result; } } @Override public void runAsync(Runnable runnable) { - executorService.execute(runnable); + runnable.run(); } @Override public Future callAsync(Callable callable, Timeout callTimeout) { - ScheduledFuture future = executorService.schedule(callable, 0, TimeUnit.MILLISECONDS); - try{ - V result = future.get(callTimeout.duration().toMillis(), TimeUnit.MILLISECONDS); - return Futures.successful(result); - } catch(Throwable e) { + try { + TimeUnit.MILLISECONDS.sleep(callTimeout.duration().toMillis()); + return Futures.successful(callable.call()); + } catch (Throwable e) { return Futures.failed(e); } } @Override - public void scheduleRunAsync(Runnable runnable, long delay) { - executorService.schedule(runnable, delay, TimeUnit.MILLISECONDS); + public void scheduleRunAsync(final Runnable runnable, final long delay) { + try { + TimeUnit.MILLISECONDS.sleep(delay); + runnable.run(); + } catch (Throwable e) { + throw new RuntimeException(e); + } } @Override @@ -242,19 +208,20 @@ public void stop() { @Override public String getAddress() { - return null; + return address; } /** * Look up the rpc method on the given {@link RpcEndpoint} instance. * - * @param methodName Name of the method + * @param methodName Name of the method * @param parameterTypes Parameter types of the method * @return Method of the rpc endpoint * @throws NoSuchMethodException Thrown if the method with the given name and parameter types - * cannot be found at the rpc endpoint + * cannot be found at the rpc endpoint */ - private Method lookupRpcMethod(final String methodName, final Class[] parameterTypes) throws NoSuchMethodException { + private Method lookupRpcMethod(final String methodName, + final Class[] parameterTypes) throws NoSuchMethodException { return rpcEndpoint.getClass().getMethod(methodName, parameterTypes); } @@ -268,12 +235,13 @@ private Method lookupRpcMethod(final String methodName, final Class[] paramet * timeout is returned. * * @param parameterAnnotations Parameter annotations - * @param args Array of arguments - * @param defaultTimeout Default timeout to return if no {@link RpcTimeout} annotated parameter - * has been found + * @param args Array of arguments + * @param defaultTimeout Default timeout to return if no {@link RpcTimeout} annotated parameter + * has been found * @return Timeout extracted from the array of arguments or the default timeout */ - private static Timeout extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, Timeout defaultTimeout) { + private static Timeout extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, + Timeout defaultTimeout) { if (args != null) { Preconditions.checkArgument(parameterAnnotations.length == args.length); @@ -297,9 +265,9 @@ private static Timeout extractRpcTimeout(Annotation[][] parameterAnnotations, Ob * Removes all {@link RpcTimeout} annotated parameters from the parameter type and argument * list. * - * @param parameterTypes Array of parameter types + * @param parameterTypes Array of parameter types * @param parameterAnnotations Array of parameter annotations - * @param args Arary of arguments + * @param args Arary of arguments * @return Tuple of filtered parameter types and arguments which no longer contain the * {@link RpcTimeout} annotated parameter types and arguments */ @@ -345,7 +313,6 @@ private static Tuple2[], Object[]> filterArguments( } } } - return Tuple2.of(filteredParameterTypes, filteredArgs); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java index b7f08995e26d4..fed197f0ce8b8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java @@ -18,20 +18,26 @@ package org.apache.flink.runtime.rpc.resourcemanager; +import akka.dispatch.ExecutionContexts; import akka.util.Timeout; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.util.DirectExecutorService; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; /** @@ -43,7 +49,7 @@ public class ResourceManagerHATest { @Before public void setup() throws Exception { - rpcService = new TestingRpcService(); + rpcService = new TestingSerialRpcService(); } @After @@ -64,23 +70,10 @@ public void testGrantAndRevokeLeadership() throws Exception { final UUID leaderId = UUID.randomUUID(); leaderElectionService.isLeader(leaderId); // after grant leadership, resourceManager's leaderId has value - Assert.assertEquals(getLatestLeaderId(resourceManager), leaderId); + Assert.assertEquals(leaderId, resourceManager.getLeaderSessionID()); // then revoke leadership, resourceManager's leaderId is null again leaderElectionService.notLeader(); - Assert.assertNull(getLatestLeaderId(resourceManager)); - } - - - private UUID getLatestLeaderId(final ResourceManager resourceManager) throws Exception { - Timeout timeout = new Timeout(200, TimeUnit.MILLISECONDS); - Future actualLeaderIdFuture = resourceManager.callAsync(new Callable() { - @Override - public UUID call() throws Exception { - return resourceManager.getLeaderSessionID(); - } - }, timeout); - UUID actualValue = Await.result(actualLeaderIdFuture, timeout.duration()); - return actualValue; + Assert.assertNull(resourceManager.getLeaderSessionID()); } } From ce8600e6b326c34ff94e04718460997684857f2f Mon Sep 17 00:00:00 2001 From: beyond1920 Date: Wed, 31 Aug 2016 09:43:48 +0800 Subject: [PATCH 4/6] Change ResourceManagerLeaderContender code and TestingSerialRpcService code --- .../rpc/resourcemanager/ResourceManager.java | 88 ++++++++----------- .../runtime/rpc/TestingSerialRpcService.java | 51 +++++++++-- .../ResourceManagerHATest.java | 11 --- 3 files changed, 78 insertions(+), 72 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java index 0ada4a6b87dfd..ae8aaee801f79 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java @@ -143,67 +143,44 @@ public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTa return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000); } - /** - * Callback method when current resourceManager is granted leadership - * - * @param newLeaderSessionID unique leadershipID - */ - void handleGrantLeadership(final UUID newLeaderSessionID) { - runAsync(new Runnable() { - @Override - public void run() { - log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), newLeaderSessionID); - leaderSessionID = newLeaderSessionID; - // confirming the leader session ID might be blocking, - leaderElectionService.confirmLeaderSessionID(newLeaderSessionID); - } - }); - } - - /** - * Callback method when current resourceManager lose leadership. - */ - void handleRevokeLeadership() { - runAsync(new Runnable() { - @Override - public void run() { - log.info("ResourceManager {} was revoked leadership.", getAddress()); - jobMasterGateways.clear(); - leaderSessionID = null; - } - }); - } - - /** - * Callback method when an error happened to current resourceManager on leader election - * @param e - */ - void onLeaderElectionError(final Throwable e) { - runAsync(new Runnable() { - @Override - public void run() { - log.error("ResourceManager received an error from the LeaderElectionService.", e); - // terminate ResourceManager in case of an error - shutDown(); - } - }); - } - private class ResourceManagerLeaderContender implements LeaderContender { + /** + * Callback method when current resourceManager is granted leadership + * + * @param leaderSessionID unique leadershipID + */ @Override - public void grantLeadership(UUID leaderSessionID) { - handleGrantLeadership(leaderSessionID); + public void grantLeadership(final UUID leaderSessionID) { + runAsync(new Runnable() { + @Override + public void run() { + log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID); + ResourceManager.this.leaderSessionID = leaderSessionID; + // confirming the leader session ID might be blocking, + leaderElectionService.confirmLeaderSessionID(leaderSessionID); + } + }); } + /** + * Callback method when current resourceManager lose leadership. + */ @Override public void revokeLeadership() { - handleRevokeLeadership(); + runAsync(new Runnable() { + @Override + public void run() { + log.info("ResourceManager {} was revoked leadership.", getAddress()); + jobMasterGateways.clear(); + leaderSessionID = null; + } + }); } @Override public String getAddress() { - return getAddress(); + return ResourceManager.this.getAddress(); } /** @@ -212,8 +189,15 @@ public String getAddress() { * @param exception Exception being thrown in the leader election service */ @Override - public void handleError(Exception exception) { - onLeaderElectionError(exception); + public void handleError(final Exception exception) { + runAsync(new Runnable() { + @Override + public void run() { + log.error("ResourceManager received an error from the LeaderElectionService.", exception); + // terminate ResourceManager in case of an error + shutDown(); + } + }); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java index e752b98abc70d..7bdbb99da8aac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java @@ -39,11 +39,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * An RPC Service implementation for testing. This RPC service directly executes all asynchronous calls one by one in the main thread. */ -public class TestingSerialRpcService extends TestingRpcService { +public class TestingSerialRpcService implements RpcService { private final DirectExecutorService executorService; private final ConcurrentHashMap registeredConnections; @@ -93,12 +95,44 @@ public > C startServer(S rpcEndpo rpcEndpoint.getSelfGatewayType(), MainThreadExecutor.class, StartStoppable.class, - RpcGateway.class}, + RpcGateway.class + }, akkaInvocationHandler); return self; } + @Override + public Future connect(String address, Class clazz) { + RpcGateway gateway = registeredConnections.get(address); + + if (gateway != null) { + if (clazz.isAssignableFrom(gateway.getClass())) { + @SuppressWarnings("unchecked") + C typedGateway = (C) gateway; + return Futures.successful(typedGateway); + } else { + return Futures.failed( + new Exception("Gateway registered under " + address + " is not of type " + clazz)); + } + } else { + return Futures.failed(new Exception("No gateway registered under that name")); + } + } + + // ------------------------------------------------------------------------ + // connections + // ------------------------------------------------------------------------ + + public void registerGateway(String address, RpcGateway gateway) { + checkNotNull(address); + checkNotNull(gateway); + + if (registeredConnections.putIfAbsent(address, gateway) != null) { + throw new IllegalStateException("a gateway is already registered under " + address); + } + } + private static class TestingSerialInvocationHandler> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable { private final T rpcEndpoint; @@ -179,7 +213,6 @@ public void runAsync(Runnable runnable) { @Override public Future callAsync(Callable callable, Timeout callTimeout) { try { - TimeUnit.MILLISECONDS.sleep(callTimeout.duration().toMillis()); return Futures.successful(callable.call()); } catch (Throwable e) { return Futures.failed(e); @@ -197,18 +230,18 @@ public void scheduleRunAsync(final Runnable runnable, final long delay) { } @Override - public void start() { - + public String getAddress() { + return address; } @Override - public void stop() { - + public void start() { + // do nothing } @Override - public String getAddress() { - return address; + public void stop() { + // do nothing } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java index fed197f0ce8b8..28555f55dc38c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java @@ -18,27 +18,16 @@ package org.apache.flink.runtime.rpc.resourcemanager; -import akka.dispatch.ExecutionContexts; -import akka.util.Timeout; -import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.rpc.TestingSerialRpcService; -import org.apache.flink.runtime.util.DirectExecutorService; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; /** * resourceManager HA test, including grant leadership and revoke leadership From fe1947a026a28761e2bd8ba42e3ff3c0f28fd7d2 Mon Sep 17 00:00:00 2001 From: beyond1920 Date: Wed, 31 Aug 2016 16:26:29 +0800 Subject: [PATCH 5/6] override shutdown logic to stop leadershipService --- .../runtime/rpc/resourcemanager/ResourceManager.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java index ae8aaee801f79..f7147c977651b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java @@ -76,6 +76,17 @@ public void start() { } } + @Override + public void shutDown() { + try { + leaderElectionService.stop(); + super.shutDown(); + } catch(Throwable e) { + log.error("A fatal error happened when shutdown the ResourceManager", e); + throw new RuntimeException("A fatal error happened when shutdown the ResourceManager", e); + } + } + /** * Gets the leader session id of current resourceManager. * From 51088e27356d4ad72a8ac1c36f47942a89831868 Mon Sep 17 00:00:00 2001 From: beyond1920 Date: Wed, 31 Aug 2016 20:02:48 +0800 Subject: [PATCH 6/6] use a mocked RpcService rather than TestingSerialRpcService for resourceManager HA test --- .../ResourceManagerHATest.java | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java index 28555f55dc38c..dfffeda291760 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java @@ -20,34 +20,35 @@ import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.rpc.MainThreadExecutor; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.TestingSerialRpcService; -import org.junit.After; +import org.apache.flink.runtime.rpc.StartStoppable; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import java.util.UUID; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * resourceManager HA test, including grant leadership and revoke leadership */ public class ResourceManagerHATest { - private RpcService rpcService; - - @Before - public void setup() throws Exception { - rpcService = new TestingSerialRpcService(); - } - - @After - public void teardown() throws Exception { - rpcService.stopService(); - } - @Test public void testGrantAndRevokeLeadership() throws Exception { + // mock a RpcService which will return a special RpcGateway when call its startServer method, the returned RpcGateway directly execute runAsync call + TestingResourceManagerGatewayProxy gateway = mock(TestingResourceManagerGatewayProxy.class); + doCallRealMethod().when(gateway).runAsync(any(Runnable.class)); + + RpcService rpcService = mock(RpcService.class); + when(rpcService.startServer(any(RpcEndpoint.class))).thenReturn(gateway); + TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService); @@ -65,4 +66,11 @@ public void testGrantAndRevokeLeadership() throws Exception { Assert.assertNull(resourceManager.getLeaderSessionID()); } + private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutor, StartStoppable, RpcGateway { + @Override + public void runAsync(Runnable runnable) { + runnable.run(); + } + } + }