Skip to content

Commit f586223

Browse files
committed
[FLINK-4658] [rpc] Allow RpcService to execute Runnables and Callables in its executor
This closes #2531.
1 parent ae42bde commit f586223

File tree

5 files changed

+106
-1
lines changed

5 files changed

+106
-1
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.runtime.concurrent.Future;
2222
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
2323

24+
import java.util.concurrent.Callable;
2425
import java.util.concurrent.Executor;
2526
import java.util.concurrent.TimeUnit;
2627

@@ -89,4 +90,34 @@ public interface RpcService {
8990
* @param delay The delay after which the runnable will be executed
9091
*/
9192
void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);
93+
94+
/**
95+
* Execute the given runnable in the executor of the RPC service. This method can be used to run
96+
* code outside of the main thread of a {@link RpcEndpoint}.
97+
*
98+
* <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against
99+
* any concurrent invocations and is therefore not suitable to run completion methods of futures
100+
* that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the
101+
* {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that
102+
* {@code RpcEndpoint}.
103+
*
104+
* @param runnable to execute
105+
*/
106+
void execute(Runnable runnable);
107+
108+
/**
109+
* Execute the given callable and return its result as a {@link Future}. This method can be used
110+
* to run code outside of the main thread of a {@link RpcEndpoint}.
111+
*
112+
* <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against
113+
* any concurrent invocations and is therefore not suitable to run completion methods of futures
114+
* that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the
115+
* {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that
116+
* {@code RpcEndpoint}.
117+
*
118+
* @param callable to execute
119+
* @param <T> is the return value type
120+
* @return Future containing the callable's future result
121+
*/
122+
<T> Future<T> execute(Callable<T> callable);
92123
}

flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import akka.actor.Identify;
2626
import akka.actor.PoisonPill;
2727
import akka.actor.Props;
28+
import akka.dispatch.Futures;
2829
import akka.dispatch.Mapper;
2930

3031
import akka.pattern.Patterns;
@@ -48,6 +49,7 @@
4849
import java.lang.reflect.Proxy;
4950
import java.util.HashSet;
5051
import java.util.Set;
52+
import java.util.concurrent.Callable;
5153
import java.util.concurrent.Executor;
5254
import java.util.concurrent.TimeUnit;
5355

@@ -223,4 +225,16 @@ public void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) {
223225

224226
actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, actorSystem.dispatcher());
225227
}
228+
229+
@Override
230+
public void execute(Runnable runnable) {
231+
actorSystem.dispatcher().execute(runnable);
232+
}
233+
234+
@Override
235+
public <T> Future<T> execute(Callable<T> callable) {
236+
scala.concurrent.Future<T> scalaFuture = Futures.future(callable, actorSystem.dispatcher());
237+
238+
return new FlinkFuture<>(scalaFuture);
239+
}
226240
}

flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.flink.runtime.rpc;
2020

2121
import akka.actor.ActorSystem;
22-
import akka.util.Timeout;
2322

2423
import org.apache.flink.api.common.time.Time;
2524
import org.apache.flink.core.testutils.OneShotLatch;

flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,22 @@ public void scheduleRunnable(final Runnable runnable, final long delay, final Ti
6464
}
6565
}
6666

67+
@Override
68+
public void execute(Runnable runnable) {
69+
runnable.run();
70+
}
71+
72+
@Override
73+
public <T> Future<T> execute(Callable<T> callable) {
74+
try {
75+
T result = callable.call();
76+
77+
return FlinkCompletableFuture.completed(result);
78+
} catch (Exception e) {
79+
return FlinkCompletableFuture.completedExceptionally(e);
80+
}
81+
}
82+
6783
@Override
6884
public Executor getExecutor() {
6985
return executorService;

flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,18 @@
2222
import org.apache.flink.api.common.time.Time;
2323
import org.apache.flink.core.testutils.OneShotLatch;
2424
import org.apache.flink.runtime.akka.AkkaUtils;
25+
import org.apache.flink.runtime.concurrent.Future;
2526
import org.apache.flink.util.TestLogger;
2627

2728
import org.junit.AfterClass;
2829
import org.junit.Test;
2930

31+
import java.util.concurrent.Callable;
32+
import java.util.concurrent.ExecutionException;
3033
import java.util.concurrent.TimeUnit;
34+
import java.util.concurrent.TimeoutException;
3135

36+
import static org.junit.Assert.assertEquals;
3237
import static org.junit.Assert.assertTrue;
3338

3439
public class AkkaRpcServiceTest extends TestLogger {
@@ -70,4 +75,44 @@ public void run() {
7075

7176
assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
7277
}
78+
79+
/**
80+
* Tests that the {@link AkkaRpcService} can execute runnables
81+
*/
82+
@Test
83+
public void testExecuteRunnable() throws Exception {
84+
final OneShotLatch latch = new OneShotLatch();
85+
86+
akkaRpcService.execute(new Runnable() {
87+
@Override
88+
public void run() {
89+
latch.trigger();
90+
}
91+
});
92+
93+
latch.await(30L, TimeUnit.SECONDS);
94+
}
95+
96+
/**
97+
* Tests that the {@link AkkaRpcService} can execute callables and returns their result as
98+
* a {@link Future}.
99+
*/
100+
@Test
101+
public void testExecuteCallable() throws InterruptedException, ExecutionException, TimeoutException {
102+
final OneShotLatch latch = new OneShotLatch();
103+
final int expected = 42;
104+
105+
Future<Integer> result = akkaRpcService.execute(new Callable<Integer>() {
106+
@Override
107+
public Integer call() throws Exception {
108+
latch.trigger();
109+
return expected;
110+
}
111+
});
112+
113+
int actual = result.get(30L, TimeUnit.SECONDS);
114+
115+
assertEquals(expected, actual);
116+
assertTrue(latch.isTriggered());
117+
}
73118
}

0 commit comments

Comments
 (0)