From 4cde59141688dc12713cd0f41e21b27cd6fa3657 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 10 Aug 2016 18:42:26 +0200 Subject: [PATCH] [FLINK-4362] [rpc] Auto generate rpc gateways via Java proxies This PR introduces a generic AkkaRpcActor which receives rpc calls as a RpcInvocation message. The RpcInvocation message is generated by the AkkaInvocationHandler which gets them from automatically generated Java Proxies. Add documentation for proxy based akka rpc service Log unknown message type in AkkaRpcActor but do not fail actor Use ReflectionUtil to extract RpcGateway type from RpcEndpoint This closes #2357. --- .../org/apache/flink/util/ReflectionUtil.java | 10 +- .../flink/runtime/rpc/MainThreadExecutor.java | 4 +- .../apache/flink/runtime/rpc/RpcEndpoint.java | 22 +- .../apache/flink/runtime/rpc/RpcService.java | 2 +- .../flink/runtime/rpc/akka/AkkaGateway.java | 4 +- .../rpc/akka/AkkaInvocationHandler.java | 226 ++++++++++++++++++ .../flink/runtime/rpc/akka/AkkaRpcActor.java | 175 ++++++++++++++ .../runtime/rpc/akka/AkkaRpcService.java | 121 +++++----- .../flink/runtime/rpc/akka/BaseAkkaActor.java | 50 ---- .../runtime/rpc/akka/BaseAkkaGateway.java | 41 ---- .../akka/jobmaster/JobMasterAkkaActor.java | 58 ----- .../akka/jobmaster/JobMasterAkkaGateway.java | 57 ----- .../{CallableMessage.java => CallAsync.java} | 18 +- .../runtime/rpc/akka/messages/CancelTask.java | 36 --- .../rpc/akka/messages/ExecuteTask.java | 36 --- .../messages/RegisterAtResourceManager.java | 36 --- .../rpc/akka/messages/RegisterJobMaster.java | 36 --- .../rpc/akka/messages/RequestSlot.java | 37 --- .../rpc/akka/messages/RpcInvocation.java | 98 ++++++++ .../{RunnableMessage.java => RunAsync.java} | 17 +- .../messages/UpdateTaskExecutionState.java | 37 --- .../ResourceManagerAkkaActor.java | 65 ----- .../ResourceManagerAkkaGateway.java | 67 ------ .../taskexecutor/TaskExecutorAkkaActor.java | 77 ------ .../taskexecutor/TaskExecutorAkkaGateway.java | 59 ----- .../runtime/rpc/jobmaster/JobMaster.java | 4 +- .../rpc/resourcemanager/ResourceManager.java | 4 +- .../rpc/taskexecutor/TaskExecutor.java | 4 +- .../runtime/rpc/RpcCompletenessTest.java | 50 ++-- .../runtime/rpc/akka/AkkaRpcServiceTest.java | 4 +- .../rpc/taskexecutor/TaskExecutorTest.java | 2 +- 31 files changed, 645 insertions(+), 812 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java rename flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/{CallableMessage.java => CallAsync.java} (68%) delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RpcInvocation.java rename flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/{RunnableMessage.java => RunAsync.java} (70%) delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java diff --git a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java index fe2d4c01f1d12..b851ebacdfd88 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java @@ -48,6 +48,14 @@ public static Class getTemplateType1(Class clazz) { return getTemplateType(clazz, 0); } + public static Class getTemplateType1(Type type) { + if (type instanceof ParameterizedType) { + return (Class) getTemplateTypes((ParameterizedType) type)[0]; + } else { + throw new IllegalArgumentException(); + } + } + public static Class getTemplateType2(Class clazz) { return getTemplateType(clazz, 1); } @@ -123,7 +131,7 @@ public static Class[] getTemplateTypes(ParameterizedType paramterizedType) { Class[] types = new Class[paramterizedType.getActualTypeArguments().length]; int i = 0; for (Type templateArgument : paramterizedType.getActualTypeArguments()) { - assert (templateArgument instanceof Class); + assert templateArgument instanceof Class; types[i++] = (Class) templateArgument; } return types; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java index 14b2997bee83c..882c1b751e5e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java @@ -47,9 +47,9 @@ public interface MainThreadExecutor { * future will throw a {@link TimeoutException}. * * @param callable Callable to be executed - * @param timeout Timeout for the future to complete + * @param callTimeout Timeout for the future to complete * @param Return value of the callable * @return Future of the callable result */ - Future callAsync(Callable callable, Timeout timeout); + Future callAsync(Callable callable, Timeout callTimeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 0d928a88d1e4a..aef08031093ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -20,6 +20,7 @@ import akka.util.Timeout; +import org.apache.flink.util.ReflectionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +61,9 @@ public abstract class RpcEndpoint { /** RPC service to be used to start the RPC server and to obtain rpc gateways */ private final RpcService rpcService; + /** Class of the self gateway */ + private final Class selfGatewayType; + /** Self gateway which can be used to schedule asynchronous calls on yourself */ private final C self; @@ -70,15 +74,19 @@ public abstract class RpcEndpoint { * of the executing rpc server. */ private final MainThreadExecutionContext mainThreadExecutionContext; - /** * Initializes the RPC endpoint. * * @param rpcService The RPC server that dispatches calls to this RPC endpoint. */ - public RpcEndpoint(RpcService rpcService) { + protected RpcEndpoint(final RpcService rpcService) { this.rpcService = checkNotNull(rpcService, "rpcService"); + + // IMPORTANT: Don't change order of selfGatewayType and self because rpcService.startServer + // requires that selfGatewayType has been initialized + this.selfGatewayType = ReflectionUtil.getTemplateType1(getClass()); this.self = rpcService.startServer(this); + this.selfAddress = rpcService.getAddress(self); this.mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self); } @@ -149,6 +157,7 @@ public RpcService getRpcService() { // Asynchronous executions // ------------------------------------------------------------------------ + /** * Execute the runnable in the main thread of the underlying RPC endpoint. * @@ -172,6 +181,15 @@ public Future callAsync(Callable callable, Timeout timeout) { return ((MainThreadExecutor) self).callAsync(callable, timeout); } + /** + * Returns the class of the self gateway type. + * + * @return Class of the self gateway type + */ + public final Class getSelfGatewayType() { + return selfGatewayType; + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index 90ff7b671ce03..f93be8361c077 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -46,7 +46,7 @@ public interface RpcService { * @param Type of the self rpc gateway associated with the rpc server * @return Self gateway to dispatch remote procedure calls to oneself */ - C startServer(S rpcEndpoint); + > C startServer(S rpcEndpoint); /** * Stop the underlying rpc server of the provided self gateway. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java index a96a6008ba23a..a826e7dcf469e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java @@ -23,7 +23,7 @@ /** * Interface for Akka based rpc gateways */ -public interface AkkaGateway { +interface AkkaGateway { - ActorRef getActorRef(); + ActorRef getRpcServer(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java new file mode 100644 index 0000000000000..e8e383a171da6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import akka.actor.ActorRef; +import akka.pattern.Patterns; +import akka.util.Timeout; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.rpc.MainThreadExecutor; +import org.apache.flink.runtime.rpc.RpcTimeout; +import org.apache.flink.runtime.rpc.akka.messages.CallAsync; +import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation; +import org.apache.flink.runtime.rpc.akka.messages.RunAsync; +import org.apache.flink.util.Preconditions; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.lang.annotation.Annotation; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.util.BitSet; +import java.util.concurrent.Callable; + +/** + * Invocation handler to be used with a {@link AkkaRpcActor}. The invocation handler wraps the + * rpc in a {@link RpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is + * executed. + */ +class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor { + private final ActorRef rpcServer; + + // default timeout for asks + private final Timeout timeout; + + AkkaInvocationHandler(ActorRef rpcServer, Timeout timeout) { + this.rpcServer = Preconditions.checkNotNull(rpcServer); + this.timeout = Preconditions.checkNotNull(timeout); + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + Class declaringClass = method.getDeclaringClass(); + + Object result; + + if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutor.class) || declaringClass.equals(Object.class)) { + result = method.invoke(this, args); + } else { + String methodName = method.getName(); + Class[] parameterTypes = method.getParameterTypes(); + Annotation[][] parameterAnnotations = method.getParameterAnnotations(); + Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout); + + Tuple2[], Object[]> filteredArguments = filterArguments( + parameterTypes, + parameterAnnotations, + args); + + RpcInvocation rpcInvocation = new RpcInvocation( + methodName, + filteredArguments.f0, + filteredArguments.f1); + + Class returnType = method.getReturnType(); + + if (returnType.equals(Void.TYPE)) { + rpcServer.tell(rpcInvocation, ActorRef.noSender()); + + result = null; + } else if (returnType.equals(Future.class)) { + // execute an asynchronous call + result = Patterns.ask(rpcServer, rpcInvocation, futureTimeout); + } else { + // execute a synchronous call + Future futureResult = Patterns.ask(rpcServer, rpcInvocation, futureTimeout); + FiniteDuration duration = timeout.duration(); + + result = Await.result(futureResult, duration); + } + } + + return result; + } + + @Override + public ActorRef getRpcServer() { + return rpcServer; + } + + @Override + public void runAsync(Runnable runnable) { + // Unfortunately I couldn't find a way to allow only local communication. Therefore, the + // runnable field is transient transient + rpcServer.tell(new RunAsync(runnable), ActorRef.noSender()); + } + + @Override + public Future callAsync(Callable callable, Timeout callTimeout) { + // Unfortunately I couldn't find a way to allow only local communication. Therefore, the + // callable field is declared transient + @SuppressWarnings("unchecked") + Future result = (Future) Patterns.ask(rpcServer, new CallAsync(callable), callTimeout); + + return result; + } + + /** + * Extracts the {@link RpcTimeout} annotated rpc timeout value from the list of given method + * arguments. If no {@link RpcTimeout} annotated parameter could be found, then the default + * timeout is returned. + * + * @param parameterAnnotations Parameter annotations + * @param args Array of arguments + * @param defaultTimeout Default timeout to return if no {@link RpcTimeout} annotated parameter + * has been found + * @return Timeout extracted from the array of arguments or the default timeout + */ + private static Timeout extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, Timeout defaultTimeout) { + if (args != null) { + Preconditions.checkArgument(parameterAnnotations.length == args.length); + + for (int i = 0; i < parameterAnnotations.length; i++) { + if (isRpcTimeout(parameterAnnotations[i])) { + if (args[i] instanceof FiniteDuration) { + return new Timeout((FiniteDuration) args[i]); + } else { + throw new RuntimeException("The rpc timeout parameter must be of type " + + FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() + + " is not supported."); + } + } + } + } + + return defaultTimeout; + } + + /** + * Removes all {@link RpcTimeout} annotated parameters from the parameter type and argument + * list. + * + * @param parameterTypes Array of parameter types + * @param parameterAnnotations Array of parameter annotations + * @param args Arary of arguments + * @return Tuple of filtered parameter types and arguments which no longer contain the + * {@link RpcTimeout} annotated parameter types and arguments + */ + private static Tuple2[], Object[]> filterArguments( + Class[] parameterTypes, + Annotation[][] parameterAnnotations, + Object[] args) { + + Class[] filteredParameterTypes; + Object[] filteredArgs; + + if (args == null) { + filteredParameterTypes = parameterTypes; + filteredArgs = null; + } else { + Preconditions.checkArgument(parameterTypes.length == parameterAnnotations.length); + Preconditions.checkArgument(parameterAnnotations.length == args.length); + + BitSet isRpcTimeoutParameter = new BitSet(parameterTypes.length); + int numberRpcParameters = parameterTypes.length; + + for (int i = 0; i < parameterTypes.length; i++) { + if (isRpcTimeout(parameterAnnotations[i])) { + isRpcTimeoutParameter.set(i); + numberRpcParameters--; + } + } + + if (numberRpcParameters == parameterTypes.length) { + filteredParameterTypes = parameterTypes; + filteredArgs = args; + } else { + filteredParameterTypes = new Class[numberRpcParameters]; + filteredArgs = new Object[numberRpcParameters]; + int counter = 0; + + for (int i = 0; i < parameterTypes.length; i++) { + if (!isRpcTimeoutParameter.get(i)) { + filteredParameterTypes[counter] = parameterTypes[i]; + filteredArgs[counter] = args[i]; + counter++; + } + } + } + } + + return Tuple2.of(filteredParameterTypes, filteredArgs); + } + + /** + * Checks whether any of the annotations is of type {@link RpcTimeout} + * + * @param annotations Array of annotations + * @return True if {@link RpcTimeout} was found; otherwise false + */ + private static boolean isRpcTimeout(Annotation[] annotations) { + for (Annotation annotation : annotations) { + if (annotation.annotationType().equals(RpcTimeout.class)) { + return true; + } + } + + return false; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java new file mode 100644 index 0000000000000..57da38a6f7bde --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import akka.actor.Status; +import akka.actor.UntypedActor; +import akka.pattern.Patterns; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.akka.messages.CallAsync; +import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation; +import org.apache.flink.runtime.rpc.akka.messages.RunAsync; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +import java.lang.reflect.Method; +import java.util.concurrent.Callable; + +/** + * Akka rpc actor which receives {@link RpcInvocation}, {@link RunAsync} and {@link CallAsync} + * messages. + *

+ * The {@link RpcInvocation} designates a rpc and is dispatched to the given {@link RpcEndpoint} + * instance. + *

+ * The {@link RunAsync} and {@link CallAsync} messages contain executable code which is executed + * in the context of the actor thread. + * + * @param Type of the {@link RpcGateway} associated with the {@link RpcEndpoint} + * @param Type of the {@link RpcEndpoint} + */ +class AkkaRpcActor> extends UntypedActor { + private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcActor.class); + + private final T rpcEndpoint; + + AkkaRpcActor(final T rpcEndpoint) { + this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint, "rpc endpoint"); + } + + @Override + public void onReceive(final Object message) { + if (message instanceof RunAsync) { + handleRunAsync((RunAsync) message); + } else if (message instanceof CallAsync) { + handleCallAsync((CallAsync) message); + } else if (message instanceof RpcInvocation) { + handleRpcInvocation((RpcInvocation) message); + } else { + LOG.warn("Received message of unknown type {}. Dropping this message!", message.getClass()); + } + } + + /** + * Handle rpc invocations by looking up the rpc method on the rpc endpoint and calling this + * method with the provided method arguments. If the method has a return value, it is returned + * to the sender of the call. + * + * @param rpcInvocation Rpc invocation message + */ + private void handleRpcInvocation(RpcInvocation rpcInvocation) { + Method rpcMethod = null; + + try { + rpcMethod = lookupRpcMethod(rpcInvocation.getMethodName(), rpcInvocation.getParameterTypes()); + } catch (final NoSuchMethodException e) { + LOG.error("Could not find rpc method for rpc invocation: {}.", rpcInvocation, e); + } + + if (rpcMethod != null) { + if (rpcMethod.getReturnType().equals(Void.TYPE)) { + // No return value to send back + try { + rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); + } catch (Throwable e) { + LOG.error("Error while executing remote procedure call {}.", rpcMethod, e); + } + } else { + try { + Object result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); + + if (result instanceof Future) { + // pipe result to sender + Patterns.pipe((Future) result, getContext().dispatcher()).to(getSender()); + } else { + // tell the sender the result of the computation + getSender().tell(new Status.Success(result), getSelf()); + } + } catch (Throwable e) { + // tell the sender about the failure + getSender().tell(new Status.Failure(e), getSelf()); + } + } + } + } + + /** + * Handle asynchronous {@link Callable}. This method simply executes the given {@link Callable} + * in the context of the actor thread. + * + * @param callAsync Call async message + */ + private void handleCallAsync(CallAsync callAsync) { + if (callAsync.getCallable() == null) { + final String result = "Received a " + callAsync.getClass().getName() + " message with an empty " + + "callable field. This indicates that this message has been serialized " + + "prior to sending the message. The " + callAsync.getClass().getName() + + " is only supported with local communication."; + + LOG.warn(result); + + getSender().tell(new Status.Failure(new Exception(result)), getSelf()); + } else { + try { + Object result = callAsync.getCallable().call(); + + getSender().tell(new Status.Success(result), getSelf()); + } catch (Throwable e) { + getSender().tell(new Status.Failure(e), getSelf()); + } + } + } + + /** + * Handle asynchronous {@link Runnable}. This method simply executes the given {@link Runnable} + * in the context of the actor thread. + * + * @param runAsync Run async message + */ + private void handleRunAsync(RunAsync runAsync) { + if (runAsync.getRunnable() == null) { + LOG.warn("Received a {} message with an empty runnable field. This indicates " + + "that this message has been serialized prior to sending the message. The " + + "{} is only supported with local communication.", + runAsync.getClass().getName(), + runAsync.getClass().getName()); + } else { + try { + runAsync.getRunnable().run(); + } catch (final Throwable e) { + LOG.error("Caught exception while executing runnable in main thread.", e); + } + } + } + + /** + * Look up the rpc method on the given {@link RpcEndpoint} instance. + * + * @param methodName Name of the method + * @param parameterTypes Parameter types of the method + * @return Method of the rpc endpoint + * @throws NoSuchMethodException + */ + private Method lookupRpcMethod(final String methodName, final Class[] parameterTypes) throws NoSuchMethodException { + return rpcEndpoint.getClass().getMethod(methodName, parameterTypes); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index d55bd135b273b..17983d01b67c4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -29,88 +29,82 @@ import akka.pattern.AskableActorSelection; import akka.util.Timeout; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.rpc.jobmaster.JobMaster; -import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway; -import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager; -import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rpc.MainThreadExecutor; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.akka.jobmaster.JobMasterAkkaActor; -import org.apache.flink.runtime.rpc.akka.jobmaster.JobMasterAkkaGateway; -import org.apache.flink.runtime.rpc.akka.resourcemanager.ResourceManagerAkkaActor; -import org.apache.flink.runtime.rpc.akka.resourcemanager.ResourceManagerAkkaGateway; -import org.apache.flink.runtime.rpc.akka.taskexecutor.TaskExecutorAkkaActor; -import org.apache.flink.runtime.rpc.akka.taskexecutor.TaskExecutorAkkaGateway; -import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway; -import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Future; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Proxy; +import java.util.Collection; import java.util.HashSet; -import java.util.Set; +/** + * Akka based {@link RpcService} implementation. The rpc service starts an Akka actor to receive + * rpcs from a {@link RpcGateway}. + */ public class AkkaRpcService implements RpcService { + private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcService.class); + private final ActorSystem actorSystem; private final Timeout timeout; - private final Set actors = new HashSet<>(); + private final Collection actors = new HashSet<>(4); - public AkkaRpcService(ActorSystem actorSystem, Timeout timeout) { - this.actorSystem = actorSystem; - this.timeout = timeout; + public AkkaRpcService(final ActorSystem actorSystem, final Timeout timeout) { + this.actorSystem = Preconditions.checkNotNull(actorSystem, "actor system"); + this.timeout = Preconditions.checkNotNull(timeout, "timeout"); } @Override - public Future connect(String address, final Class clazz) { - ActorSelection actorSel = actorSystem.actorSelection(address); + public Future connect(final String address, final Class clazz) { + LOG.info("Try to connect to remote rpc server with address {}. Returning a {} gateway.", address, clazz.getName()); - AskableActorSelection asker = new AskableActorSelection(actorSel); + final ActorSelection actorSel = actorSystem.actorSelection(address); - Future identify = asker.ask(new Identify(42), timeout); + final AskableActorSelection asker = new AskableActorSelection(actorSel); + + final Future identify = asker.ask(new Identify(42), timeout); return identify.map(new Mapper(){ + @Override public C apply(Object obj) { ActorRef actorRef = ((ActorIdentity) obj).getRef(); - if (clazz == TaskExecutorGateway.class) { - return (C) new TaskExecutorAkkaGateway(actorRef, timeout); - } else if (clazz == ResourceManagerGateway.class) { - return (C) new ResourceManagerAkkaGateway(actorRef, timeout); - } else if (clazz == JobMasterGateway.class) { - return (C) new JobMasterAkkaGateway(actorRef, timeout); - } else { - throw new RuntimeException("Could not find remote endpoint " + clazz); - } + InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(actorRef, timeout); + + @SuppressWarnings("unchecked") + C proxy = (C) Proxy.newProxyInstance( + ClassLoader.getSystemClassLoader(), + new Class[] {clazz}, + akkaInvocationHandler); + + return proxy; } }, actorSystem.dispatcher()); } @Override - public C startServer(S rpcEndpoint) { - ActorRef ref; - C self; - if (rpcEndpoint instanceof TaskExecutor) { - ref = actorSystem.actorOf( - Props.create(TaskExecutorAkkaActor.class, rpcEndpoint) - ); - - self = (C) new TaskExecutorAkkaGateway(ref, timeout); - } else if (rpcEndpoint instanceof ResourceManager) { - ref = actorSystem.actorOf( - Props.create(ResourceManagerAkkaActor.class, rpcEndpoint) - ); - - self = (C) new ResourceManagerAkkaGateway(ref, timeout); - } else if (rpcEndpoint instanceof JobMaster) { - ref = actorSystem.actorOf( - Props.create(JobMasterAkkaActor.class, rpcEndpoint) - ); - - self = (C) new JobMasterAkkaGateway(ref, timeout); - } else { - throw new RuntimeException("Could not start RPC server for class " + rpcEndpoint.getClass()); - } + public > C startServer(S rpcEndpoint) { + Preconditions.checkNotNull(rpcEndpoint, "rpc endpoint"); - actors.add(ref); + LOG.info("Start Akka rpc actor to handle rpcs for {}.", rpcEndpoint.getClass().getName()); + + Props akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint); + + ActorRef actorRef = actorSystem.actorOf(akkaRpcActorProps); + actors.add(actorRef); + + InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(actorRef, timeout); + + @SuppressWarnings("unchecked") + C self = (C) Proxy.newProxyInstance( + ClassLoader.getSystemClassLoader(), + new Class[]{rpcEndpoint.getSelfGatewayType(), MainThreadExecutor.class, AkkaGateway.class}, + akkaInvocationHandler); return self; } @@ -120,16 +114,19 @@ public void stopServer(C selfGateway) { if (selfGateway instanceof AkkaGateway) { AkkaGateway akkaClient = (AkkaGateway) selfGateway; - if (actors.contains(akkaClient.getActorRef())) { - akkaClient.getActorRef().tell(PoisonPill.getInstance(), ActorRef.noSender()); - } else { - // don't stop this actor since it was not started by this RPC service + if (actors.contains(akkaClient.getRpcServer())) { + ActorRef selfActorRef = akkaClient.getRpcServer(); + + LOG.info("Stop Akka rpc actor {}.", selfActorRef.path()); + + selfActorRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); } } } @Override public void stopService() { + LOG.info("Stop Akka rpc service."); actorSystem.shutdown(); actorSystem.awaitTermination(); } @@ -137,9 +134,11 @@ public void stopService() { @Override public String getAddress(C selfGateway) { if (selfGateway instanceof AkkaGateway) { - return AkkaUtils.getAkkaURL(actorSystem, ((AkkaGateway) selfGateway).getActorRef()); + ActorRef actorRef = ((AkkaGateway) selfGateway).getRpcServer(); + return AkkaUtils.getAkkaURL(actorSystem, actorRef); } else { - throw new RuntimeException("Cannot get address for non " + AkkaGateway.class.getName() + "."); + String className = AkkaGateway.class.getName(); + throw new RuntimeException("Cannot get address for non " + className + '.'); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java deleted file mode 100644 index 3cb499cffca1f..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.akka; - -import akka.actor.Status; -import akka.actor.UntypedActor; -import org.apache.flink.runtime.rpc.akka.messages.CallableMessage; -import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class BaseAkkaActor extends UntypedActor { - private static final Logger LOG = LoggerFactory.getLogger(BaseAkkaActor.class); - - @Override - public void onReceive(Object message) throws Exception { - if (message instanceof RunnableMessage) { - try { - ((RunnableMessage) message).getRunnable().run(); - } catch (Exception e) { - LOG.error("Encountered error while executing runnable.", e); - } - } else if (message instanceof CallableMessage) { - try { - Object result = ((CallableMessage) message).getCallable().call(); - sender().tell(new Status.Success(result), getSelf()); - } catch (Exception e) { - sender().tell(new Status.Failure(e), getSelf()); - } - } else { - throw new RuntimeException("Unknown message " + message); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java deleted file mode 100644 index 512790d11639a..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.akka; - -import akka.actor.ActorRef; -import akka.pattern.Patterns; -import akka.util.Timeout; -import org.apache.flink.runtime.rpc.MainThreadExecutor; -import org.apache.flink.runtime.rpc.akka.messages.CallableMessage; -import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage; -import scala.concurrent.Future; - -import java.util.concurrent.Callable; - -public abstract class BaseAkkaGateway implements MainThreadExecutor, AkkaGateway { - @Override - public void runAsync(Runnable runnable) { - getActorRef().tell(new RunnableMessage(runnable), ActorRef.noSender()); - } - - @Override - public Future callAsync(Callable callable, Timeout timeout) { - return (Future) Patterns.ask(getActorRef(), new CallableMessage(callable), timeout); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java deleted file mode 100644 index 9e04ea91070aa..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.akka.jobmaster; - -import akka.actor.ActorRef; -import akka.actor.Status; -import org.apache.flink.runtime.rpc.akka.BaseAkkaActor; -import org.apache.flink.runtime.rpc.akka.messages.RegisterAtResourceManager; -import org.apache.flink.runtime.rpc.jobmaster.JobMaster; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState; - -public class JobMasterAkkaActor extends BaseAkkaActor { - private final JobMaster jobMaster; - - public JobMasterAkkaActor(JobMaster jobMaster) { - this.jobMaster = jobMaster; - } - - @Override - public void onReceive(Object message) throws Exception { - if (message instanceof UpdateTaskExecutionState) { - - final ActorRef sender = getSender(); - - UpdateTaskExecutionState updateTaskExecutionState = (UpdateTaskExecutionState) message; - - try { - Acknowledge result = jobMaster.updateTaskExecutionState(updateTaskExecutionState.getTaskExecutionState()); - sender.tell(new Status.Success(result), getSelf()); - } catch (Exception e) { - sender.tell(new Status.Failure(e), getSelf()); - } - } else if (message instanceof RegisterAtResourceManager) { - RegisterAtResourceManager registerAtResourceManager = (RegisterAtResourceManager) message; - - jobMaster.registerAtResourceManager(registerAtResourceManager.getAddress()); - } else { - super.onReceive(message); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java deleted file mode 100644 index e6bf061701faf..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.akka.jobmaster; - -import akka.actor.ActorRef; -import akka.pattern.AskableActorRef; -import akka.util.Timeout; -import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway; -import org.apache.flink.runtime.rpc.akka.messages.RegisterAtResourceManager; -import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState; -import org.apache.flink.runtime.taskmanager.TaskExecutionState; -import scala.concurrent.Future; -import scala.reflect.ClassTag$; - -public class JobMasterAkkaGateway extends BaseAkkaGateway implements JobMasterGateway { - private final AskableActorRef actorRef; - private final Timeout timeout; - - public JobMasterAkkaGateway(ActorRef actorRef, Timeout timeout) { - this.actorRef = new AskableActorRef(actorRef); - this.timeout = timeout; - } - - @Override - public Future updateTaskExecutionState(TaskExecutionState taskExecutionState) { - return actorRef.ask(new UpdateTaskExecutionState(taskExecutionState), timeout) - .mapTo(ClassTag$.MODULE$.apply(Acknowledge.class)); - } - - @Override - public void registerAtResourceManager(String address) { - actorRef.actorRef().tell(new RegisterAtResourceManager(address), actorRef.actorRef()); - } - - @Override - public ActorRef getActorRef() { - return actorRef.actorRef(); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallAsync.java similarity index 68% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallAsync.java index f0e555f729ca4..79b7825e8a3ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallAsync.java @@ -18,16 +18,24 @@ package org.apache.flink.runtime.rpc.akka.messages; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; import java.util.concurrent.Callable; -public class CallableMessage { - private final Callable callable; +/** + * Message for asynchronous callable invocations + */ +public final class CallAsync implements Serializable { + private static final long serialVersionUID = 2834204738928484060L; + + private transient Callable callable; - public CallableMessage(Callable callable) { - this.callable = callable; + public CallAsync(Callable callable) { + this.callable = Preconditions.checkNotNull(callable); } - public Callable getCallable() { + public Callable getCallable() { return callable; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java deleted file mode 100644 index 0b9e9dc9f6447..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.akka.messages; - -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; - -import java.io.Serializable; - -public class CancelTask implements Serializable { - private static final long serialVersionUID = -2998176874447950595L; - private final ExecutionAttemptID executionAttemptID; - - public CancelTask(ExecutionAttemptID executionAttemptID) { - this.executionAttemptID = executionAttemptID; - } - - public ExecutionAttemptID getExecutionAttemptID() { - return executionAttemptID; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java deleted file mode 100644 index a83d539082eb2..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.akka.messages; - -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; - -import java.io.Serializable; - -public class ExecuteTask implements Serializable { - private static final long serialVersionUID = -6769958430967048348L; - private final TaskDeploymentDescriptor taskDeploymentDescriptor; - - public ExecuteTask(TaskDeploymentDescriptor taskDeploymentDescriptor) { - this.taskDeploymentDescriptor = taskDeploymentDescriptor; - } - - public TaskDeploymentDescriptor getTaskDeploymentDescriptor() { - return taskDeploymentDescriptor; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java deleted file mode 100644 index 3ade08250b9ea..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.akka.messages; - -import java.io.Serializable; - -public class RegisterAtResourceManager implements Serializable { - - private static final long serialVersionUID = -4175905742620903602L; - - private final String address; - - public RegisterAtResourceManager(String address) { - this.address = address; - } - - public String getAddress() { - return address; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java deleted file mode 100644 index b35ea3895aa0e..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.akka.messages; - -import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration; - -import java.io.Serializable; - -public class RegisterJobMaster implements Serializable{ - private static final long serialVersionUID = -4616879574192641507L; - private final JobMasterRegistration jobMasterRegistration; - - public RegisterJobMaster(JobMasterRegistration jobMasterRegistration) { - this.jobMasterRegistration = jobMasterRegistration; - } - - public JobMasterRegistration getJobMasterRegistration() { - return jobMasterRegistration; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java deleted file mode 100644 index 85ceeec6f0738..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.akka.messages; - -import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest; - -import java.io.Serializable; - -public class RequestSlot implements Serializable { - private static final long serialVersionUID = 7207463889348525866L; - - private final SlotRequest slotRequest; - - public RequestSlot(SlotRequest slotRequest) { - this.slotRequest = slotRequest; - } - - public SlotRequest getSlotRequest() { - return slotRequest; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RpcInvocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RpcInvocation.java new file mode 100644 index 0000000000000..5d52ef1c0b298 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RpcInvocation.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.messages; + +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +/** + * Rpc invocation message containing the remote procedure name, its parameter types and the + * corresponding call arguments. + */ +public final class RpcInvocation implements Serializable { + private static final long serialVersionUID = -7058254033460536037L; + + private final String methodName; + private final Class[] parameterTypes; + private transient Object[] args; + + public RpcInvocation(String methodName, Class[] parameterTypes, Object[] args) { + this.methodName = Preconditions.checkNotNull(methodName); + this.parameterTypes = Preconditions.checkNotNull(parameterTypes); + this.args = args; + } + + public String getMethodName() { + return methodName; + } + + public Class[] getParameterTypes() { + return parameterTypes; + } + + public Object[] getArgs() { + return args; + } + + private void writeObject(ObjectOutputStream oos) throws IOException { + oos.defaultWriteObject(); + + if (args != null) { + // write has args true + oos.writeBoolean(true); + + for (int i = 0; i < args.length; i++) { + try { + oos.writeObject(args[i]); + } catch (IOException e) { + Class argClass = args[i].getClass(); + + throw new IOException("Could not write " + i + "th argument of method " + + methodName + ". The argument type is " + argClass + ". " + + "Make sure that this type is serializable.", e); + } + } + } else { + // write has args false + oos.writeBoolean(false); + } + } + + private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { + ois.defaultReadObject(); + + boolean hasArgs = ois.readBoolean(); + + if (hasArgs) { + int numberArguments = parameterTypes.length; + + args = new Object[numberArguments]; + + for (int i = 0; i < numberArguments; i++) { + args[i] = ois.readObject(); + } + } else { + args = null; + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java similarity index 70% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java index 35567388208bd..fb958525f5079 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java @@ -18,11 +18,20 @@ package org.apache.flink.runtime.rpc.akka.messages; -public class RunnableMessage { - private final Runnable runnable; +import org.apache.flink.util.Preconditions; - public RunnableMessage(Runnable runnable) { - this.runnable = runnable; +import java.io.Serializable; + +/** + * Message for asynchronous runnable invocations + */ +public final class RunAsync implements Serializable { + private static final long serialVersionUID = -3080595100695371036L; + + private final transient Runnable runnable; + + public RunAsync(Runnable runnable) { + this.runnable = Preconditions.checkNotNull(runnable); } public Runnable getRunnable() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java deleted file mode 100644 index f89cd2f516041..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.akka.messages; - -import org.apache.flink.runtime.taskmanager.TaskExecutionState; - -import java.io.Serializable; - -public class UpdateTaskExecutionState implements Serializable{ - private static final long serialVersionUID = -6662229114427331436L; - - private final TaskExecutionState taskExecutionState; - - public UpdateTaskExecutionState(TaskExecutionState taskExecutionState) { - this.taskExecutionState = taskExecutionState; - } - - public TaskExecutionState getTaskExecutionState() { - return taskExecutionState; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java deleted file mode 100644 index 13101f9362ff6..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.akka.resourcemanager; - -import akka.actor.ActorRef; -import akka.actor.Status; -import akka.pattern.Patterns; -import org.apache.flink.runtime.rpc.akka.BaseAkkaActor; -import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse; -import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager; -import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment; -import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster; -import org.apache.flink.runtime.rpc.akka.messages.RequestSlot; -import scala.concurrent.Future; - -public class ResourceManagerAkkaActor extends BaseAkkaActor { - private final ResourceManager resourceManager; - - public ResourceManagerAkkaActor(ResourceManager resourceManager) { - this.resourceManager = resourceManager; - } - - @Override - public void onReceive(Object message) throws Exception { - final ActorRef sender = getSender(); - - if (message instanceof RegisterJobMaster) { - RegisterJobMaster registerJobMaster = (RegisterJobMaster) message; - - try { - Future response = resourceManager.registerJobMaster(registerJobMaster.getJobMasterRegistration()); - Patterns.pipe(response, getContext().dispatcher()).to(sender()); - } catch (Exception e) { - sender.tell(new Status.Failure(e), getSelf()); - } - } else if (message instanceof RequestSlot) { - RequestSlot requestSlot = (RequestSlot) message; - - try { - SlotAssignment response = resourceManager.requestSlot(requestSlot.getSlotRequest()); - sender.tell(new Status.Success(response), getSelf()); - } catch (Exception e) { - sender.tell(new Status.Failure(e), getSelf()); - } - } else { - super.onReceive(message); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java deleted file mode 100644 index 13047072ecad7..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.akka.resourcemanager; - -import akka.actor.ActorRef; -import akka.pattern.AskableActorRef; -import akka.util.Timeout; -import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway; -import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration; -import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse; -import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; -import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment; -import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest; -import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster; -import org.apache.flink.runtime.rpc.akka.messages.RequestSlot; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; -import scala.reflect.ClassTag$; - -public class ResourceManagerAkkaGateway extends BaseAkkaGateway implements ResourceManagerGateway { - private final AskableActorRef actorRef; - private final Timeout timeout; - - public ResourceManagerAkkaGateway(ActorRef actorRef, Timeout timeout) { - this.actorRef = new AskableActorRef(actorRef); - this.timeout = timeout; - } - - @Override - public Future registerJobMaster(JobMasterRegistration jobMasterRegistration, FiniteDuration timeout) { - return actorRef.ask(new RegisterJobMaster(jobMasterRegistration), new Timeout(timeout)) - .mapTo(ClassTag$.MODULE$.apply(RegistrationResponse.class)); - } - - @Override - public Future registerJobMaster(JobMasterRegistration jobMasterRegistration) { - return actorRef.ask(new RegisterJobMaster(jobMasterRegistration), timeout) - .mapTo(ClassTag$.MODULE$.apply(RegistrationResponse.class)); - } - - @Override - public Future requestSlot(SlotRequest slotRequest) { - return actorRef.ask(new RequestSlot(slotRequest), timeout) - .mapTo(ClassTag$.MODULE$.apply(SlotAssignment.class)); - } - - @Override - public ActorRef getActorRef() { - return actorRef.actorRef(); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java deleted file mode 100644 index ed522cc727378..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.akka.taskexecutor; - -import akka.actor.ActorRef; -import akka.actor.Status; -import akka.dispatch.OnComplete; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.rpc.akka.BaseAkkaActor; -import org.apache.flink.runtime.rpc.akka.messages.CancelTask; -import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask; -import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway; - -public class TaskExecutorAkkaActor extends BaseAkkaActor { - private final TaskExecutorGateway taskExecutor; - - public TaskExecutorAkkaActor(TaskExecutorGateway taskExecutor) { - this.taskExecutor = taskExecutor; - } - - @Override - public void onReceive(Object message) throws Exception { - final ActorRef sender = getSender(); - - if (message instanceof ExecuteTask) { - ExecuteTask executeTask = (ExecuteTask) message; - - taskExecutor.executeTask(executeTask.getTaskDeploymentDescriptor()).onComplete( - new OnComplete() { - @Override - public void onComplete(Throwable failure, Acknowledge success) throws Throwable { - if (failure != null) { - sender.tell(new Status.Failure(failure), getSelf()); - } else { - sender.tell(new Status.Success(Acknowledge.get()), getSelf()); - } - } - }, - getContext().dispatcher() - ); - } else if (message instanceof CancelTask) { - CancelTask cancelTask = (CancelTask) message; - - taskExecutor.cancelTask(cancelTask.getExecutionAttemptID()).onComplete( - new OnComplete() { - @Override - public void onComplete(Throwable failure, Acknowledge success) throws Throwable { - if (failure != null) { - sender.tell(new Status.Failure(failure), getSelf()); - } else { - sender.tell(new Status.Success(Acknowledge.get()), getSelf()); - } - } - }, - getContext().dispatcher() - ); - } else { - super.onReceive(message); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java deleted file mode 100644 index 7f0a52284c6aa..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.akka.taskexecutor; - -import akka.actor.ActorRef; -import akka.pattern.AskableActorRef; -import akka.util.Timeout; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway; -import org.apache.flink.runtime.rpc.akka.messages.CancelTask; -import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask; -import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway; -import scala.concurrent.Future; -import scala.reflect.ClassTag$; - -public class TaskExecutorAkkaGateway extends BaseAkkaGateway implements TaskExecutorGateway { - private final AskableActorRef actorRef; - private final Timeout timeout; - - public TaskExecutorAkkaGateway(ActorRef actorRef, Timeout timeout) { - this.actorRef = new AskableActorRef(actorRef); - this.timeout = timeout; - } - - @Override - public Future executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor) { - return actorRef.ask(new ExecuteTask(taskDeploymentDescriptor), timeout) - .mapTo(ClassTag$.MODULE$.apply(Acknowledge.class)); - } - - @Override - public Future cancelTask(ExecutionAttemptID executionAttemptId) { - return actorRef.ask(new CancelTask(executionAttemptId), timeout) - .mapTo(ClassTag$.MODULE$.apply(Acknowledge.class)); - } - - @Override - public ActorRef getActorRef() { - return actorRef.actorRef(); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java index b81b19c27321e..e53cd68f93fdb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.util.Preconditions; import scala.Tuple2; import scala.concurrent.ExecutionContext; import scala.concurrent.ExecutionContext$; @@ -76,7 +77,8 @@ public class JobMaster extends RpcEndpoint { public JobMaster(RpcService rpcService, ExecutorService executorService) { super(rpcService); - executionContext = ExecutionContext$.MODULE$.fromExecutor(executorService); + executionContext = ExecutionContext$.MODULE$.fromExecutor( + Preconditions.checkNotNull(executorService)); scheduledExecutorService = new ScheduledThreadPoolExecutor(1); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java index c7e8def051398..729ef0c5f41e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.jobmaster.JobMaster; import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway; +import org.apache.flink.util.Preconditions; import scala.concurrent.ExecutionContext; import scala.concurrent.ExecutionContext$; import scala.concurrent.Future; @@ -49,7 +50,8 @@ public class ResourceManager extends RpcEndpoint { public ResourceManager(RpcService rpcService, ExecutorService executorService) { super(rpcService); - this.executionContext = ExecutionContext$.MODULE$.fromExecutor(executorService); + this.executionContext = ExecutionContext$.MODULE$.fromExecutor( + Preconditions.checkNotNull(executorService)); this.jobMasterGateways = new HashMap<>(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java index cdfc3bd63e1e5..3a7dd9f0ce558 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.Preconditions; import scala.concurrent.ExecutionContext; import java.util.HashSet; @@ -47,7 +48,8 @@ public class TaskExecutor extends RpcEndpoint { public TaskExecutor(RpcService rpcService, ExecutorService executorService) { super(rpcService); - this.executionContext = ExecutionContexts$.MODULE$.fromExecutor(executorService); + this.executionContext = ExecutionContexts$.MODULE$.fromExecutor( + Preconditions.checkNotNull(executorService)); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java index 0ded25e4cc7fc..e50533e2a1538 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java @@ -18,15 +18,15 @@ package org.apache.flink.runtime.rpc; +import org.apache.flink.util.ReflectionUtil; import org.apache.flink.util.TestLogger; import org.junit.Test; import org.reflections.Reflections; import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; import java.lang.annotation.Annotation; import java.lang.reflect.Method; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -51,9 +51,8 @@ public void testRpcCompleteness() { for (Class rpcEndpoint :classes){ c = rpcEndpoint; - Type superClass = c.getGenericSuperclass(); - Class rpcGatewayType = extractTypeParameter(superClass, 0); + Class rpcGatewayType = ReflectionUtil.getTemplateType1(c); if (rpcGatewayType != null) { checkCompleteness(rpcEndpoint, (Class) rpcGatewayType); @@ -137,13 +136,16 @@ private void checkGatewayMethod(Method gatewayMethod) { } Annotation[][] parameterAnnotations = gatewayMethod.getParameterAnnotations(); + Class[] parameterTypes = gatewayMethod.getParameterTypes(); int rpcTimeoutParameters = 0; - for (Annotation[] parameterAnnotation : parameterAnnotations) { - for (Annotation annotation : parameterAnnotation) { - if (annotation.equals(RpcTimeout.class)) { - rpcTimeoutParameters++; - } + for (int i = 0; i < parameterAnnotations.length; i++) { + if (isRpcTimeout(parameterAnnotations[i])) { + assertTrue( + "The rpc timeout has to be of type " + FiniteDuration.class.getName() + ".", + parameterTypes[i].equals(FiniteDuration.class)); + + rpcTimeoutParameters++; } } @@ -211,10 +213,10 @@ private boolean checkMethod(Method gatewayMethod, Method endpointMethod) { if (!futureClass.equals(RpcCompletenessTest.futureClass)) { return false; } else { - Class valueClass = extractTypeParameter(futureClass, 0); + Class valueClass = ReflectionUtil.getTemplateType1(gatewayMethod.getGenericReturnType()); if (endpointMethod.getReturnType().equals(futureClass)) { - Class rpcEndpointValueClass = extractTypeParameter(endpointMethod.getReturnType(), 0); + Class rpcEndpointValueClass = ReflectionUtil.getTemplateType1(endpointMethod.getGenericReturnType()); // check if we have the same future value types if (valueClass != null && rpcEndpointValueClass != null && !checkType(valueClass, rpcEndpointValueClass)) { @@ -251,7 +253,7 @@ private String generateEndpointMethodSignature(Method method) { if (method.getReturnType().equals(Void.TYPE)) { builder.append("void").append(" "); } else if (method.getReturnType().equals(futureClass)) { - Class valueClass = extractTypeParameter(method.getGenericReturnType(), 0); + Class valueClass = ReflectionUtil.getTemplateType1(method.getGenericReturnType()); builder .append(futureClass.getSimpleName()) @@ -291,30 +293,6 @@ private String generateEndpointMethodSignature(Method method) { return builder.toString(); } - private Class extractTypeParameter(Type genericType, int position) { - if (genericType instanceof ParameterizedType) { - ParameterizedType parameterizedType = (ParameterizedType) genericType; - - Type[] typeArguments = parameterizedType.getActualTypeArguments(); - - if (position < 0 || position >= typeArguments.length) { - throw new IndexOutOfBoundsException("The generic type " + - parameterizedType.getRawType() + " only has " + typeArguments.length + - " type arguments."); - } else { - Type typeArgument = typeArguments[position]; - - if (typeArgument instanceof Class) { - return (Class) typeArgument; - } else { - return null; - } - } - } else { - return null; - } - } - private boolean isRpcTimeout(Annotation[] annotations) { for (Annotation annotation : annotations) { if (annotation.annotationType().equals(RpcTimeout.class)) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index 642a380593906..a4e1d7f3d3606 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -61,10 +61,10 @@ public void testJobMasterResourceManagerRegistration() throws Exception { AkkaGateway akkaClient = (AkkaGateway) rm; - jobMaster.registerAtResourceManager(AkkaUtils.getAkkaURL(actorSystem, akkaClient.getActorRef())); + jobMaster.registerAtResourceManager(AkkaUtils.getAkkaURL(actorSystem, akkaClient.getRpcServer())); // wait for successful registration - FiniteDuration timeout = new FiniteDuration(20, TimeUnit.SECONDS); + FiniteDuration timeout = new FiniteDuration(200, TimeUnit.SECONDS); Deadline deadline = timeout.fromNow(); while (deadline.hasTimeLeft() && !jobMaster.isConnected()) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java index c1435278ad735..33c9cb61ca4e6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java @@ -48,7 +48,7 @@ public class TaskExecutorTest extends TestLogger { @Test public void testTaskExecution() throws Exception { RpcService testingRpcService = mock(RpcService.class); - DirectExecutorService directExecutorService = null; + DirectExecutorService directExecutorService = new DirectExecutorService(); TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService); TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(