diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index a367ff2fdad9a..437e08b0a3e59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -21,6 +21,7 @@
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -89,4 +90,34 @@ public interface RpcService {
* @param delay The delay after which the runnable will be executed
*/
void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);
+
+ /**
+ * Execute the given runnable in the executor of the RPC service. This method can be used to run
+ * code outside of the main thread of a {@link RpcEndpoint}.
+ *
+ *
IMPORTANT: This executor does not isolate the method invocations against
+ * any concurrent invocations and is therefore not suitable to run completion methods of futures
+ * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the
+ * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that
+ * {@code RpcEndpoint}.
+ *
+ * @param runnable to execute
+ */
+ void execute(Runnable runnable);
+
+ /**
+ * Execute the given callable and return its result as a {@link Future}. This method can be used
+ * to run code outside of the main thread of a {@link RpcEndpoint}.
+ *
+ *
IMPORTANT: This executor does not isolate the method invocations against
+ * any concurrent invocations and is therefore not suitable to run completion methods of futures
+ * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the
+ * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that
+ * {@code RpcEndpoint}.
+ *
+ * @param callable to execute
+ * @param is the return value type
+ * @return Future containing the callable's future result
+ */
+ Future execute(Callable callable);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 36f11152f26fd..cee19c4d00b97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -25,6 +25,7 @@
import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.pattern.Patterns;
@@ -48,6 +49,7 @@
import java.lang.reflect.Proxy;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -223,4 +225,16 @@ public void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) {
actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, actorSystem.dispatcher());
}
+
+ @Override
+ public void execute(Runnable runnable) {
+ actorSystem.dispatcher().execute(runnable);
+ }
+
+ @Override
+ public Future execute(Callable callable) {
+ scala.concurrent.Future scalaFuture = Futures.future(callable, actorSystem.dispatcher());
+
+ return new FlinkFuture<>(scalaFuture);
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 7c6b0ee34f746..e8255d40dacee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.rpc;
import akka.actor.ActorSystem;
-import akka.util.Timeout;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 957453a9d3660..c58ea205c953a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -64,6 +64,22 @@ public void scheduleRunnable(final Runnable runnable, final long delay, final Ti
}
}
+ @Override
+ public void execute(Runnable runnable) {
+ runnable.run();
+ }
+
+ @Override
+ public Future execute(Callable callable) {
+ try {
+ T result = callable.call();
+
+ return FlinkCompletableFuture.completed(result);
+ } catch (Exception e) {
+ return FlinkCompletableFuture.completedExceptionally(e);
+ }
+ }
+
@Override
public Executor getExecutor() {
return executorService;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 4e9e51805b637..8321fc4a1ea77 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -22,13 +22,18 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Test;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class AkkaRpcServiceTest extends TestLogger {
@@ -70,4 +75,44 @@ public void run() {
assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
}
+
+ /**
+ * Tests that the {@link AkkaRpcService} can execute runnables
+ *s/
+ @Test
+ public void testExecuteRunnable() throws Exception {
+ final OneShotLatch latch = new OneShotLatch();
+
+ akkaRpcService.execute(new Runnable() {
+ @Override
+ public void run() {
+ latch.trigger();
+ }
+ });
+
+ latch.await(30L, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Tests that the {@link AkkaRpcService} can execute callables and returns their result as
+ * a {@link Future}.
+ */
+ @Test
+ public void testExecuteCallable() throws InterruptedException, ExecutionException, TimeoutException {
+ final OneShotLatch latch = new OneShotLatch();
+ final int expected = 42;
+
+ Future result = akkaRpcService.execute(new Callable() {
+ @Override
+ public Integer call() throws Exception {
+ latch.trigger();
+ return expected;
+ }
+ });
+
+ int actual = result.get(30L, TimeUnit.SECONDS);
+
+ assertEquals(expected, actual);
+ assertTrue(latch.isTriggered());
+ }
}