From 18437969a28d8ba4341216cd9d201aceb04cfaf2 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 21 Sep 2016 18:16:27 +0200 Subject: [PATCH] [FLINK-4658] [rpc] Allow RpcService to execute Runnables and Callables in its executor --- .../apache/flink/runtime/rpc/RpcService.java | 31 +++++++++++++ .../runtime/rpc/akka/AkkaRpcService.java | 14 ++++++ .../flink/runtime/rpc/AsyncCallsTest.java | 1 - .../runtime/rpc/TestingSerialRpcService.java | 16 +++++++ .../runtime/rpc/akka/AkkaRpcServiceTest.java | 45 +++++++++++++++++++ 5 files changed, 106 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index a367ff2fdad9a..437e08b0a3e59 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -21,6 +21,7 @@ import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -89,4 +90,34 @@ public interface RpcService { * @param delay The delay after which the runnable will be executed */ void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit); + + /** + * Execute the given runnable in the executor of the RPC service. This method can be used to run + * code outside of the main thread of a {@link RpcEndpoint}. + * + *

IMPORTANT: This executor does not isolate the method invocations against + * any concurrent invocations and is therefore not suitable to run completion methods of futures + * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the + * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that + * {@code RpcEndpoint}. + * + * @param runnable to execute + */ + void execute(Runnable runnable); + + /** + * Execute the given callable and return its result as a {@link Future}. This method can be used + * to run code outside of the main thread of a {@link RpcEndpoint}. + * + *

IMPORTANT: This executor does not isolate the method invocations against + * any concurrent invocations and is therefore not suitable to run completion methods of futures + * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the + * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that + * {@code RpcEndpoint}. + * + * @param callable to execute + * @param is the return value type + * @return Future containing the callable's future result + */ + Future execute(Callable callable); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 36f11152f26fd..cee19c4d00b97 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -25,6 +25,7 @@ import akka.actor.Identify; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.dispatch.Futures; import akka.dispatch.Mapper; import akka.pattern.Patterns; @@ -48,6 +49,7 @@ import java.lang.reflect.Proxy; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -223,4 +225,16 @@ public void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) { actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, actorSystem.dispatcher()); } + + @Override + public void execute(Runnable runnable) { + actorSystem.dispatcher().execute(runnable); + } + + @Override + public Future execute(Callable callable) { + scala.concurrent.Future scalaFuture = Futures.future(callable, actorSystem.dispatcher()); + + return new FlinkFuture<>(scalaFuture); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java index 7c6b0ee34f746..e8255d40dacee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.rpc; import akka.actor.ActorSystem; -import akka.util.Timeout; import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java index 957453a9d3660..c58ea205c953a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java @@ -64,6 +64,22 @@ public void scheduleRunnable(final Runnable runnable, final long delay, final Ti } } + @Override + public void execute(Runnable runnable) { + runnable.run(); + } + + @Override + public Future execute(Callable callable) { + try { + T result = callable.call(); + + return FlinkCompletableFuture.completed(result); + } catch (Exception e) { + return FlinkCompletableFuture.completedExceptionally(e); + } + } + @Override public Executor getExecutor() { return executorService; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index 4e9e51805b637..8321fc4a1ea77 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -22,13 +22,18 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.Test; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class AkkaRpcServiceTest extends TestLogger { @@ -70,4 +75,44 @@ public void run() { assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay); } + + /** + * Tests that the {@link AkkaRpcService} can execute runnables + *s/ + @Test + public void testExecuteRunnable() throws Exception { + final OneShotLatch latch = new OneShotLatch(); + + akkaRpcService.execute(new Runnable() { + @Override + public void run() { + latch.trigger(); + } + }); + + latch.await(30L, TimeUnit.SECONDS); + } + + /** + * Tests that the {@link AkkaRpcService} can execute callables and returns their result as + * a {@link Future}. + */ + @Test + public void testExecuteCallable() throws InterruptedException, ExecutionException, TimeoutException { + final OneShotLatch latch = new OneShotLatch(); + final int expected = 42; + + Future result = akkaRpcService.execute(new Callable() { + @Override + public Integer call() throws Exception { + latch.trigger(); + return expected; + } + }); + + int actual = result.get(30L, TimeUnit.SECONDS); + + assertEquals(expected, actual); + assertTrue(latch.isTriggered()); + } }