Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;

import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -89,4 +90,34 @@ public interface RpcService {
* @param delay The delay after which the runnable will be executed
*/
void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);

/**
* Execute the given runnable in the executor of the RPC service. This method can be used to run
* code outside of the main thread of a {@link RpcEndpoint}.
*
* <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against
* any concurrent invocations and is therefore not suitable to run completion methods of futures
* that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the
* {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that
* {@code RpcEndpoint}.
*
* @param runnable to execute
*/
void execute(Runnable runnable);

/**
* Execute the given callable and return its result as a {@link Future}. This method can be used
* to run code outside of the main thread of a {@link RpcEndpoint}.
*
* <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against
* any concurrent invocations and is therefore not suitable to run completion methods of futures
* that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the
* {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that
* {@code RpcEndpoint}.
*
* @param callable to execute
* @param <T> is the return value type
* @return Future containing the callable's future result
*/
<T> Future<T> execute(Callable<T> callable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;

import akka.pattern.Patterns;
Expand All @@ -48,6 +49,7 @@
import java.lang.reflect.Proxy;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -223,4 +225,16 @@ public void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) {

actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, actorSystem.dispatcher());
}

@Override
public void execute(Runnable runnable) {
actorSystem.dispatcher().execute(runnable);
}

@Override
public <T> Future<T> execute(Callable<T> callable) {
scala.concurrent.Future<T> scalaFuture = Futures.future(callable, actorSystem.dispatcher());

return new FlinkFuture<>(scalaFuture);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.rpc;

import akka.actor.ActorSystem;
import akka.util.Timeout;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,22 @@ public void scheduleRunnable(final Runnable runnable, final long delay, final Ti
}
}

@Override
public void execute(Runnable runnable) {
runnable.run();
}

@Override
public <T> Future<T> execute(Callable<T> callable) {
try {
T result = callable.call();

return FlinkCompletableFuture.completed(result);
} catch (Exception e) {
return FlinkCompletableFuture.completedExceptionally(e);
}
}

@Override
public Executor getExecutor() {
return executorService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.util.TestLogger;

import org.junit.AfterClass;
import org.junit.Test;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class AkkaRpcServiceTest extends TestLogger {
Expand Down Expand Up @@ -70,4 +75,44 @@ public void run() {

assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
}

/**
* Tests that the {@link AkkaRpcService} can execute runnables
*s/
@Test
public void testExecuteRunnable() throws Exception {
final OneShotLatch latch = new OneShotLatch();

akkaRpcService.execute(new Runnable() {
@Override
public void run() {
latch.trigger();
}
});

latch.await(30L, TimeUnit.SECONDS);
}

/**
* Tests that the {@link AkkaRpcService} can execute callables and returns their result as
* a {@link Future}.
*/
@Test
public void testExecuteCallable() throws InterruptedException, ExecutionException, TimeoutException {
final OneShotLatch latch = new OneShotLatch();
final int expected = 42;

Future<Integer> result = akkaRpcService.execute(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
latch.trigger();
return expected;
}
});

int actual = result.get(30L, TimeUnit.SECONDS);

assertEquals(expected, actual);
assertTrue(latch.isTriggered());
}
}