From 0878c126007b69bbf5775b6c25b77e5c486cb8e6 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 29 Aug 2016 15:49:59 +0200 Subject: [PATCH] [FLINK-4528] [rpc] Marks main thread execution methods in RpcEndpoint as protected --- .../org/apache/flink/runtime/rpc/RpcEndpoint.java | 8 ++++---- .../TaskExecutorToResourceManagerConnection.java | 13 ++++++++----- .../runtime/rpc/{akka => }/AsyncCallsTest.java | 11 ++++------- 3 files changed, 16 insertions(+), 16 deletions(-) rename flink-runtime/src/test/java/org/apache/flink/runtime/rpc/{akka => }/AsyncCallsTest.java (94%) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 7b3f8a189d5d8..e9e2b2c40d3de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -161,7 +161,7 @@ public String getAddress() { * * @return Main thread execution context */ - public ExecutionContext getMainThreadExecutionContext() { + protected ExecutionContext getMainThreadExecutionContext() { return mainThreadExecutionContext; } @@ -184,7 +184,7 @@ public RpcService getRpcService() { * * @param runnable Runnable to be executed in the main thread of the underlying RPC endpoint */ - public void runAsync(Runnable runnable) { + protected void runAsync(Runnable runnable) { ((MainThreadExecutor) self).runAsync(runnable); } @@ -195,7 +195,7 @@ public void runAsync(Runnable runnable) { * @param runnable Runnable to be executed * @param delay The delay after which the runnable will be executed */ - public void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) { + protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) { ((MainThreadExecutor) self).scheduleRunAsync(runnable, unit.toMillis(delay)); } @@ -209,7 +209,7 @@ public void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) { * @param Return type of the callable * @return Future for the result of the callable. */ - public Future callAsync(Callable callable, Timeout timeout) { + protected Future callAsync(Callable callable, Timeout timeout) { return ((MainThreadExecutor) self).callAsync(callable, timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java index f398b7d06d6dc..3b64963c39ba9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; +import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -57,7 +58,7 @@ public class TaskExecutorToResourceManagerConnection { private ResourceManagerRegistration pendingRegistration; - private ResourceManagerGateway registeredResourceManager; + private volatile ResourceManagerGateway registeredResourceManager; private InstanceID registrationId; @@ -93,22 +94,24 @@ public void start() { registration.startRegistration(); Future> future = registration.getFuture(); + + ExecutionContext executionContext = taskExecutor.getRpcService().getExecutionContext(); future.onSuccess(new OnSuccess>() { @Override public void onSuccess(Tuple2 result) { - registeredResourceManager = result.f0; registrationId = result.f1.getRegistrationId(); + registeredResourceManager = result.f0; } - }, taskExecutor.getMainThreadExecutionContext()); + }, executionContext); // this future should only ever fail if there is a bug, not if the registration is declined future.onFailure(new OnFailure() { @Override public void onFailure(Throwable failure) { - taskExecutor.onFatalError(failure); + taskExecutor.onFatalErrorAsync(failure); } - }, taskExecutor.getMainThreadExecutionContext()); + }, executionContext); } public void close() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java similarity index 94% rename from flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java index d33987ccf6d69..1791056424e55 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java @@ -16,18 +16,15 @@ * limitations under the License. */ -package org.apache.flink.runtime.rpc.akka; +package org.apache.flink.runtime.rpc; import akka.actor.ActorSystem; import akka.util.Timeout; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.rpc.RpcEndpoint; -import org.apache.flink.runtime.rpc.RpcGateway; -import org.apache.flink.runtime.rpc.RpcMethod; -import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.Test; @@ -51,7 +48,7 @@ public class AsyncCallsTest extends TestLogger { private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); - private static AkkaRpcService akkaRpcService = + private static AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS)); @AfterClass @@ -173,7 +170,7 @@ public void run() { // test RPC endpoint // ------------------------------------------------------------------------ - interface TestGateway extends RpcGateway { + public interface TestGateway extends RpcGateway { void someCall();