From cc7cdd75d23f2d4d201c0458f4543be1bc7e09f0 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 21 Sep 2016 17:26:21 +0200 Subject: [PATCH] [FLINK-4656] [rpc] Port the existing code to Flink's own future abstraction --- .../impl/FlinkCompletableFuture.java | 22 +++++- .../runtime/concurrent/impl/FlinkFuture.java | 4 + .../flink/runtime/jobmaster/JobMaster.java | 2 +- .../runtime/jobmaster/JobMasterGateway.java | 2 +- .../registration/RetryingRegistration.java | 65 ++++++++-------- .../resourcemanager/ResourceManager.java | 13 ++-- .../ResourceManagerGateway.java | 9 +-- .../slotmanager/SlotManager.java | 9 +-- ...xecutor.java => MainThreadExecutable.java} | 8 +- .../apache/flink/runtime/rpc/RpcEndpoint.java | 60 ++++++--------- .../apache/flink/runtime/rpc/RpcService.java | 17 ++--- .../rpc/akka/AkkaInvocationHandler.java | 42 +++++------ .../flink/runtime/rpc/akka/AkkaRpcActor.java | 21 +++++- .../runtime/rpc/akka/AkkaRpcService.java | 28 +++---- .../runtime/taskexecutor/TaskExecutor.java | 12 +-- .../taskexecutor/TaskExecutorGateway.java | 6 +- ...skExecutorToResourceManagerConnection.java | 34 ++++----- .../RetryingRegistrationTest.java | 75 ++++++++++--------- .../registration/TestRegistrationGateway.java | 6 +- .../ResourceManagerHATest.java | 4 +- .../slotmanager/SlotProtocolTest.java | 14 ++-- .../flink/runtime/rpc/AsyncCallsTest.java | 13 ++-- .../runtime/rpc/RpcCompletenessTest.java | 9 ++- .../flink/runtime/rpc/TestingGatewayBase.java | 18 ++--- .../flink/runtime/rpc/TestingRpcService.java | 20 ++--- .../runtime/rpc/TestingSerialRpcService.java | 54 +++++++------ .../runtime/rpc/akka/AkkaRpcActorTest.java | 19 ++--- .../runtime/rpc/akka/AkkaRpcServiceTest.java | 4 +- .../rpc/akka/MainThreadValidationTest.java | 7 +- .../rpc/akka/MessageSerializationTest.java | 19 ++--- .../taskexecutor/TaskExecutorTest.java | 9 +-- 31 files changed, 316 insertions(+), 309 deletions(-) rename flink-runtime/src/main/java/org/apache/flink/runtime/rpc/{MainThreadExecutor.java => MainThreadExecutable.java} (91%) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java index 5566880c1cca0..e648a71bee70b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java @@ -18,9 +18,11 @@ package org.apache.flink.runtime.concurrent.impl; +import akka.dispatch.Futures; import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.util.Preconditions; import scala.concurrent.Promise; +import scala.concurrent.Promise$; import java.util.concurrent.CancellationException; @@ -34,7 +36,17 @@ public class FlinkCompletableFuture extends FlinkFuture implements Complet private final Promise promise; public FlinkCompletableFuture() { - promise = new scala.concurrent.impl.Promise.DefaultPromise<>(); + promise = Futures.promise(); + scalaFuture = promise.future(); + } + + private FlinkCompletableFuture(T value) { + promise = Promise$.MODULE$.successful(value); + scalaFuture = promise.future(); + } + + private FlinkCompletableFuture(Throwable t) { + promise = Promise$.MODULE$.failed(t); scalaFuture = promise.future(); } @@ -68,4 +80,12 @@ public boolean completeExceptionally(Throwable t) { public boolean cancel(boolean mayInterruptIfRunning) { return completeExceptionally(new CancellationException("Future has been canceled.")); } + + public static FlinkCompletableFuture completed(T value) { + return new FlinkCompletableFuture<>(value); + } + + public static FlinkCompletableFuture completedExceptionally(Throwable t) { + return new FlinkCompletableFuture<>(t); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java index 361cd3d1aa612..b28a1bd5414af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java @@ -59,6 +59,10 @@ public FlinkFuture(scala.concurrent.Future scalaFuture) { this.scalaFuture = Preconditions.checkNotNull(scalaFuture); } + public scala.concurrent.Future getScalaFuture() { + return scalaFuture; + } + //----------------------------------------------------------------------------------- // Future's methods //----------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 0a6a7ef0938ec..1537396f347f4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -36,7 +36,7 @@ /** * JobMaster implementation. The job master is responsible for the execution of a single - * {@link org.apache.flink.runtime.jobgraph.JobGraph}. + * {@link JobGraph}. *

* It offers the following methods as part of its rpc interface to interact with the JobMaster * remotely: diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index a53e383942744..86bf17c13250d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -18,10 +18,10 @@ package org.apache.flink.runtime.jobmaster; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.taskmanager.TaskExecutionState; -import scala.concurrent.Future; /** * {@link JobMaster} rpc gateway interface diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java index ea49e42d87826..32dd9781c8386 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java @@ -18,19 +18,17 @@ package org.apache.flink.runtime.registration; -import akka.dispatch.OnFailure; -import akka.dispatch.OnSuccess; - import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.CompletableFuture; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcService; import org.slf4j.Logger; -import scala.concurrent.Future; -import scala.concurrent.Promise; -import scala.concurrent.impl.Promise.DefaultPromise; - import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -86,7 +84,7 @@ public abstract class RetryingRegistration> completionPromise; + private final CompletableFuture> completionFuture; private final long initialRegistrationTimeout; @@ -140,7 +138,7 @@ public RetryingRegistration( this.delayOnError = delayOnError; this.delayOnRefusedRegistration = delayOnRefusedRegistration; - this.completionPromise = new DefaultPromise<>(); + this.completionFuture = new FlinkCompletableFuture<>(); } // ------------------------------------------------------------------------ @@ -148,7 +146,7 @@ public RetryingRegistration( // ------------------------------------------------------------------------ public Future> getFuture() { - return completionPromise.future(); + return completionFuture; } /** @@ -184,28 +182,30 @@ public void startRegistration() { Future resourceManagerFuture = rpcService.connect(targetAddress, targetType); // upon success, start the registration attempts - resourceManagerFuture.onSuccess(new OnSuccess() { + resourceManagerFuture.thenAcceptAsync(new AcceptFunction() { @Override - public void onSuccess(Gateway result) { + public void accept(Gateway result) { log.info("Resolved {} address, beginning registration", targetName); register(result, 1, initialRegistrationTimeout); } - }, rpcService.getExecutionContext()); - + }, rpcService.getExecutor()); + // upon failure, retry, unless this is cancelled - resourceManagerFuture.onFailure(new OnFailure() { + resourceManagerFuture.exceptionallyAsync(new ApplyFunction() { @Override - public void onFailure(Throwable failure) { + public Void apply(Throwable failure) { if (!isCanceled()) { log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress, failure); startRegistration(); } + + return null; } - }, rpcService.getExecutionContext()); + }, rpcService.getExecutor()); } catch (Throwable t) { cancel(); - completionPromise.tryFailure(t); + completionFuture.completeExceptionally(t); } } @@ -225,15 +225,14 @@ private void register(final Gateway gateway, final int attempt, final long timeo Future registrationFuture = invokeRegistration(gateway, leaderId, timeoutMillis); // if the registration was successful, let the TaskExecutor know - registrationFuture.onSuccess(new OnSuccess() { - + registrationFuture.thenAcceptAsync(new AcceptFunction() { @Override - public void onSuccess(RegistrationResponse result) throws Throwable { + public void accept(RegistrationResponse result) { if (!isCanceled()) { if (result instanceof RegistrationResponse.Success) { // registration successful! Success success = (Success) result; - completionPromise.success(new Tuple2<>(gateway, success)); + completionFuture.complete(Tuple2.of(gateway, success)); } else { // registration refused or unknown @@ -241,7 +240,7 @@ public void onSuccess(RegistrationResponse result) throws Throwable { RegistrationResponse.Decline decline = (RegistrationResponse.Decline) result; log.info("Registration at {} was declined: {}", targetName, decline.getReason()); } else { - log.error("Received unknown response to registration attempt: " + result); + log.error("Received unknown response to registration attempt: {}", result); } log.info("Pausing and re-attempting registration in {} ms", delayOnRefusedRegistration); @@ -249,12 +248,12 @@ public void onSuccess(RegistrationResponse result) throws Throwable { } } } - }, rpcService.getExecutionContext()); + }, rpcService.getExecutor()); // upon failure, retry - registrationFuture.onFailure(new OnFailure() { + registrationFuture.exceptionallyAsync(new ApplyFunction() { @Override - public void onFailure(Throwable failure) { + public Void apply(Throwable failure) { if (!isCanceled()) { if (failure instanceof TimeoutException) { // we simply have not received a response in time. maybe the timeout was @@ -262,26 +261,28 @@ public void onFailure(Throwable failure) { // currently down. if (log.isDebugEnabled()) { log.debug("Registration at {} ({}) attempt {} timed out after {} ms", - targetName, targetAddress, attempt, timeoutMillis); + targetName, targetAddress, attempt, timeoutMillis); } - + long newTimeoutMillis = Math.min(2 * timeoutMillis, maxRegistrationTimeout); register(gateway, attempt + 1, newTimeoutMillis); } else { // a serious failure occurred. we still should not give up, but keep trying - log.error("Registration at " + targetName + " failed due to an error", failure); + log.error("Registration at {} failed due to an error", targetName, failure); log.info("Pausing and re-attempting registration in {} ms", delayOnError); - + registerLater(gateway, 1, initialRegistrationTimeout, delayOnError); } } + + return null; } - }, rpcService.getExecutionContext()); + }, rpcService.getExecutor()); } catch (Throwable t) { cancel(); - completionPromise.tryFailure(t); + completionFuture.completeExceptionally(t); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index d9a71349383dc..537071085aedb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -18,12 +18,11 @@ package org.apache.flink.runtime.resourcemanager; -import akka.dispatch.Futures; -import akka.dispatch.Mapper; - import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.leaderelection.LeaderContender; @@ -38,7 +37,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.Future; import java.util.HashMap; import java.util.Map; @@ -126,10 +124,9 @@ public Future registerJobMaster(JobMasterRegistration jobM getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class); final JobID jobID = jobMasterRegistration.getJobID(); - return jobMasterFuture.map(new Mapper() { + return jobMasterFuture.thenApplyAsync(new ApplyFunction() { @Override - public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) { - + public RegistrationResponse apply(JobMasterGateway jobMasterGateway) { final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway); if (existingGateway != null) { LOG.info("Replacing existing gateway {} for JobID {} with {}.", @@ -137,7 +134,7 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) { } return new RegistrationResponse(true); } - }, getMainThreadExecutionContext()); + }, getMainThreadExecutor()); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index c8e348875dbb7..5c8786cf0e995 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -18,14 +18,13 @@ package org.apache.flink.runtime.resourcemanager; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.jobmaster.JobMaster; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - import java.util.UUID; /** @@ -42,7 +41,7 @@ public interface ResourceManagerGateway extends RpcGateway { */ Future registerJobMaster( JobMasterRegistration jobMasterRegistration, - @RpcTimeout FiniteDuration timeout); + @RpcTimeout Time timeout); /** * Register a {@link JobMaster} at the resource manager. @@ -73,5 +72,5 @@ Future registerTaskE UUID resourceManagerLeaderId, String taskExecutorAddress, ResourceID resourceID, - @RpcTimeout FiniteDuration timeout); + @RpcTimeout Time timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index 96fde7d1fe7c2..97176b25b32ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -19,11 +19,13 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.ResourceSlot; import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; @@ -33,14 +35,11 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; -import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -79,7 +78,7 @@ public abstract class SlotManager implements LeaderRetrievalListener { /** All allocations, we can lookup allocations either by SlotID or AllocationID */ private final AllocationMap allocationMap; - private final FiniteDuration timeout; + private final Time timeout; /** The current leader id set by the ResourceManager */ private UUID leaderID; @@ -90,7 +89,7 @@ public SlotManager() { this.freeSlots = new HashMap<>(16); this.allocationMap = new AllocationMap(); this.taskManagerGateways = new HashMap<>(); - this.timeout = new FiniteDuration(10, TimeUnit.SECONDS); + this.timeout = Time.seconds(10); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java similarity index 91% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java index 5e4fead8c5531..ec1c9842f5fc2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.rpc; -import akka.util.Timeout; -import scala.concurrent.Future; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.Future; import java.util.concurrent.Callable; import java.util.concurrent.TimeoutException; @@ -32,7 +32,7 @@ * implementation which allows to dispatch local procedures to the main thread of the underlying * RPC endpoint. */ -public interface MainThreadExecutor { +public interface MainThreadExecutable { /** * Execute the runnable in the main thread of the underlying RPC endpoint. @@ -51,7 +51,7 @@ public interface MainThreadExecutor { * @param Return value of the callable * @return Future of the callable result */ - Future callAsync(Callable callable, Timeout callTimeout); + Future callAsync(Callable callable, Time callTimeout); /** * Execute the runnable in the main thread of the underlying RPC endpoint, with diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index e9e2b2c40d3de..4e5e49a527708 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -18,16 +18,15 @@ package org.apache.flink.runtime.rpc; -import akka.util.Timeout; - +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.ReflectionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; - import java.util.concurrent.Callable; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -49,8 +48,8 @@ * thread, we don't have to reason about concurrent accesses, in the same way in the Actor Model * of Erlang or Akka. * - *

The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Timeout)} - * and the {@link #getMainThreadExecutionContext()} to execute code in the RPC endoint's main thread. + *

The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Time)} + * and the {@link #getMainThreadExecutor()} to execute code in the RPC endoint's main thread. * * @param The RPC gateway counterpart for the implementing RPC endpoint */ @@ -69,9 +68,9 @@ public abstract class RpcEndpoint { /** Self gateway which can be used to schedule asynchronous calls on yourself */ private final C self; - /** The main thread execution context to be used to execute future callbacks in the main thread + /** The main thread executor to be used to execute future callbacks in the main thread * of the executing rpc server. */ - private final ExecutionContext mainThreadExecutionContext; + private final Executor mainThreadExecutor; /** A reference to the endpoint's main thread, if the current method is called by the main thread */ final AtomicReference currentMainThread = new AtomicReference<>(null); @@ -89,7 +88,7 @@ protected RpcEndpoint(final RpcService rpcService) { this.selfGatewayType = ReflectionUtil.getTemplateType1(getClass()); this.self = rpcService.startServer(this); - this.mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self); + this.mainThreadExecutor = new MainThreadExecutor((MainThreadExecutable) self); } /** @@ -120,7 +119,7 @@ public void start() { * Shuts down the underlying RPC endpoint via the RPC service. * After this method was called, the RPC endpoint will no longer be reachable, neither remotely, * not via its {@link #getSelf() self gateway}. It will also not accepts executions in main thread - * any more (via {@link #callAsync(Callable, Timeout)} and {@link #runAsync(Runnable)}). + * any more (via {@link #callAsync(Callable, Time)} and {@link #runAsync(Runnable)}). * *

This method can be overridden to add RPC endpoint specific shut down code. * The overridden method should always call the parent shut down method. @@ -161,8 +160,8 @@ public String getAddress() { * * @return Main thread execution context */ - protected ExecutionContext getMainThreadExecutionContext() { - return mainThreadExecutionContext; + protected Executor getMainThreadExecutor() { + return mainThreadExecutor; } /** @@ -185,7 +184,7 @@ public RpcService getRpcService() { * @param runnable Runnable to be executed in the main thread of the underlying RPC endpoint */ protected void runAsync(Runnable runnable) { - ((MainThreadExecutor) self).runAsync(runnable); + ((MainThreadExecutable) self).runAsync(runnable); } /** @@ -196,7 +195,7 @@ protected void runAsync(Runnable runnable) { * @param delay The delay after which the runnable will be executed */ protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) { - ((MainThreadExecutor) self).scheduleRunAsync(runnable, unit.toMillis(delay)); + ((MainThreadExecutable) self).scheduleRunAsync(runnable, unit.toMillis(delay)); } /** @@ -209,8 +208,8 @@ protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) { * @param Return type of the callable * @return Future for the result of the callable. */ - protected Future callAsync(Callable callable, Timeout timeout) { - return ((MainThreadExecutor) self).callAsync(callable, timeout); + protected Future callAsync(Callable callable, Time timeout) { + return ((MainThreadExecutable) self).callAsync(callable, timeout); } // ------------------------------------------------------------------------ @@ -241,36 +240,19 @@ public void validateRunsInMainThread() { // ------------------------------------------------------------------------ /** - * Execution context which executes runnables in the main thread context. A reported failure - * will cause the underlying rpc server to shut down. + * Executor which executes runnables in the main thread context. */ - private class MainThreadExecutionContext implements ExecutionContext { + private class MainThreadExecutor implements Executor { - private final MainThreadExecutor gateway; + private final MainThreadExecutable gateway; - MainThreadExecutionContext(MainThreadExecutor gateway) { - this.gateway = gateway; + MainThreadExecutor(MainThreadExecutable gateway) { + this.gateway = Preconditions.checkNotNull(gateway); } @Override public void execute(Runnable runnable) { gateway.runAsync(runnable); } - - @Override - public void reportFailure(final Throwable t) { - gateway.runAsync(new Runnable() { - @Override - public void run() { - log.error("Encountered failure in the main thread execution context.", t); - shutDown(); - } - }); - } - - @Override - public ExecutionContext prepare() { - return this; - } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index 78c1cec33bc9a..a367ff2fdad9a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -18,10 +18,10 @@ package org.apache.flink.runtime.rpc; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; /** @@ -68,23 +68,22 @@ public interface RpcService { void stopService(); /** - * Gets the execution context, provided by this RPC service. This execution - * context can be used for example for the {@code onComplete(...)} or {@code onSuccess(...)} - * methods of Futures. + * Gets the executor, provided by this RPC service. This executor can be used for example for + * the {@code handleAsync(...)} or {@code thenAcceptAsync(...)} methods of futures. * - *

IMPORTANT: This execution context does not isolate the method invocations against + *

IMPORTANT: This executor does not isolate the method invocations against * any concurrent invocations and is therefore not suitable to run completion methods of futures * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the - * {@link RpcEndpoint#getMainThreadExecutionContext() MainThreadExecutionContext} of that + * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that * {@code RpcEndpoint}. * * @return The execution context provided by the RPC service */ - ExecutionContext getExecutionContext(); + Executor getExecutor(); /** * Execute the runnable in the execution context of this RPC Service, as returned by - * {@link #getExecutionContext()}, after a scheduled delay. + * {@link #getExecutor()}, after a scheduled delay. * * @param runnable Runnable to be executed * @param delay The delay after which the runnable will be executed diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index bfa04f6bedeb4..8f4deff8f144f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -20,9 +20,11 @@ import akka.actor.ActorRef; import akka.pattern.Patterns; -import akka.util.Timeout; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.rpc.MainThreadExecutor; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.rpc.MainThreadExecutable; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.StartStoppable; @@ -34,9 +36,6 @@ import org.apache.flink.runtime.rpc.akka.messages.RunAsync; import org.apache.flink.util.Preconditions; import org.apache.log4j.Logger; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.io.IOException; import java.lang.annotation.Annotation; @@ -53,7 +52,7 @@ * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is * executed. */ -class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor, StartStoppable { +class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutable, StartStoppable { private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class); private final String address; @@ -64,11 +63,11 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea private final boolean isLocal; // default timeout for asks - private final Timeout timeout; + private final Time timeout; private final long maximumFramesize; - AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Timeout timeout, long maximumFramesize) { + AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Time timeout, long maximumFramesize) { this.address = Preconditions.checkNotNull(address); this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint); this.isLocal = this.rpcEndpoint.path().address().hasLocalScope(); @@ -82,7 +81,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl Object result; - if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutor.class) || + if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(RpcGateway.class)) { result = method.invoke(this, args); @@ -90,7 +89,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl String methodName = method.getName(); Class[] parameterTypes = method.getParameterTypes(); Annotation[][] parameterAnnotations = method.getParameterAnnotations(); - Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout); + Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout); Tuple2[], Object[]> filteredArguments = filterArguments( parameterTypes, @@ -130,13 +129,14 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl result = null; } else if (returnType.equals(Future.class)) { // execute an asynchronous call - result = Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout); + result = new FlinkFuture<>(Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds())); } else { // execute a synchronous call - Future futureResult = Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout); - FiniteDuration duration = timeout.duration(); + scala.concurrent.Future scalaFuture = Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds()); - result = Await.result(futureResult, duration); + Future futureResult = new FlinkFuture<>(scalaFuture); + + return futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit()); } } @@ -167,12 +167,12 @@ public void scheduleRunAsync(Runnable runnable, long delay) { } @Override - public Future callAsync(Callable callable, Timeout callTimeout) { + public Future callAsync(Callable callable, Time callTimeout) { if(isLocal) { @SuppressWarnings("unchecked") - Future result = (Future) Patterns.ask(rpcEndpoint, new CallAsync(callable), callTimeout); + scala.concurrent.Future result = (scala.concurrent.Future) Patterns.ask(rpcEndpoint, new CallAsync(callable), callTimeout.toMilliseconds()); - return result; + return new FlinkFuture<>(result); } else { throw new RuntimeException("Trying to send a Callable to a remote actor at " + rpcEndpoint.path() + ". This is not supported."); @@ -204,17 +204,17 @@ public void stop() { * has been found * @return Timeout extracted from the array of arguments or the default timeout */ - private static Timeout extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, Timeout defaultTimeout) { + private static Time extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, Time defaultTimeout) { if (args != null) { Preconditions.checkArgument(parameterAnnotations.length == args.length); for (int i = 0; i < parameterAnnotations.length; i++) { if (isRpcTimeout(parameterAnnotations[i])) { - if (args[i] instanceof FiniteDuration) { - return new Timeout((FiniteDuration) args[i]); + if (args[i] instanceof Time) { + return (Time) args[i]; } else { throw new RuntimeException("The rpc timeout parameter must be of type " + - FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() + + Time.class.getName() + ". The type " + args[i].getClass().getName() + " is not supported."); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 2373be9414ed2..59daa46e9f6f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -21,8 +21,11 @@ import akka.actor.ActorRef; import akka.actor.Status; import akka.actor.UntypedActorWithStash; +import akka.dispatch.Futures; import akka.japi.Procedure; import akka.pattern.Patterns; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.rpc.MainThreadValidatorUtil; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; @@ -35,7 +38,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import java.io.IOException; @@ -146,8 +148,23 @@ private void handleRpcInvocation(RpcInvocation rpcInvocation) { Object result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); if (result instanceof Future) { + final Future future = (Future) result; + // pipe result to sender - Patterns.pipe((Future) result, getContext().dispatcher()).to(getSender()); + if (future instanceof FlinkFuture) { + // FlinkFutures are currently backed by Scala's futures + FlinkFuture flinkFuture = (FlinkFuture) future; + + Patterns.pipe(flinkFuture.getScalaFuture(), getContext().dispatcher()).to(getSender()); + } else { + // We have to unpack the Flink future and pack it into a Scala future + Patterns.pipe(Futures.future(new Callable() { + @Override + public Object call() throws Exception { + return future.get(); + } + }, getContext().dispatcher()), getContext().dispatcher()); + } } else { // tell the sender the result of the computation getSender().tell(new Status.Success(result), getSelf()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 060a1ef499cb0..36f11152f26fd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -26,11 +26,13 @@ import akka.actor.PoisonPill; import akka.actor.Props; import akka.dispatch.Mapper; -import akka.pattern.AskableActorSelection; -import akka.util.Timeout; +import akka.pattern.Patterns; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.rpc.MainThreadExecutor; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.rpc.MainThreadExecutable; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; @@ -39,8 +41,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import javax.annotation.concurrent.ThreadSafe; @@ -48,6 +48,7 @@ import java.lang.reflect.Proxy; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkArgument; @@ -68,13 +69,13 @@ public class AkkaRpcService implements RpcService { private final Object lock = new Object(); private final ActorSystem actorSystem; - private final Timeout timeout; + private final Time timeout; private final Set actors = new HashSet<>(4); private final long maximumFramesize; private volatile boolean stopped; - public AkkaRpcService(final ActorSystem actorSystem, final Timeout timeout) { + public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) { this.actorSystem = checkNotNull(actorSystem, "actor system"); this.timeout = checkNotNull(timeout, "timeout"); @@ -95,10 +96,9 @@ public Future connect(final String address, final Clas address, clazz.getName()); final ActorSelection actorSel = actorSystem.actorSelection(address); - final AskableActorSelection asker = new AskableActorSelection(actorSel); - final Future identify = asker.ask(new Identify(42), timeout); - return identify.map(new Mapper(){ + final scala.concurrent.Future identify = Patterns.ask(actorSel, new Identify(42), timeout.toMilliseconds()); + final scala.concurrent.Future resultFuture = identify.map(new Mapper(){ @Override public C checkedApply(Object obj) throws Exception { @@ -128,6 +128,8 @@ public C checkedApply(Object obj) throws Exception { } } }, actorSystem.dispatcher()); + + return new FlinkFuture<>(resultFuture); } @Override @@ -159,7 +161,7 @@ public > C startServer(S rpcEndpo classLoader, new Class[]{ rpcEndpoint.getSelfGatewayType(), - MainThreadExecutor.class, + MainThreadExecutable.class, StartStoppable.class, AkkaGateway.class}, akkaInvocationHandler); @@ -209,7 +211,7 @@ public void stopService() { } @Override - public ExecutionContext getExecutionContext() { + public Executor getExecutor() { return actorSystem.dispatcher(); } @@ -219,6 +221,6 @@ public void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) { checkNotNull(unit, "unit"); checkArgument(delay >= 0, "delay must be zero or larger"); - actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, getExecutionContext()); + actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, actorSystem.dispatcher()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index fadae5f4c04ae..d84a6a91c8d23 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.taskexecutor; import akka.actor.ActorSystem; -import akka.util.Timeout; import com.typesafe.config.Config; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.LocalConnectionManager; @@ -35,6 +35,7 @@ import org.apache.flink.runtime.resourcemanager.SlotRequestReply; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.Preconditions; +import org.jboss.netty.channel.ChannelException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +79,7 @@ import java.io.File; import java.io.IOException; +import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.UUID; @@ -198,7 +200,7 @@ public void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLe this, newLeaderAddress, newLeaderId, - getMainThreadExecutionContext()); + getMainThreadExecutor()); resourceManagerConnection.start(); } } @@ -302,9 +304,9 @@ private static void runTaskManager( LOG.debug("Using akka configuration\n " + akkaConfig); taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig); } catch (Throwable t) { - if (t instanceof org.jboss.netty.channel.ChannelException) { + if (t instanceof ChannelException) { Throwable cause = t.getCause(); - if (cause != null && t.getCause() instanceof java.net.BindException) { + if (cause != null && t.getCause() instanceof BindException) { String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort); throw new IOException("Unable to bind TaskManager actor system to address " + address + " - " + cause.getMessage(), t); @@ -314,7 +316,7 @@ private static void runTaskManager( } // start akka rpc service based on actor system - final Timeout timeout = new Timeout(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS); + final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis()); final AkkaRpcService akkaRpcService = new AkkaRpcService(taskManagerSystem, timeout); // start high availability service to implement getResourceManagerLeaderRetriever method only diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index 65323a8c62f12..09628021175e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -18,12 +18,12 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.resourcemanager.SlotRequestReply; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.util.UUID; @@ -48,5 +48,5 @@ public interface TaskExecutorGateway extends RpcGateway { Future requestSlot( AllocationID allocationID, UUID resourceManagerLeaderID, - @RpcTimeout FiniteDuration timeout); + @RpcTimeout Time timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java index 28062b6e2496a..647359d54cfba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java @@ -18,11 +18,12 @@ package org.apache.flink.runtime.taskexecutor; -import akka.dispatch.OnFailure; -import akka.dispatch.OnSuccess; - +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.registration.RegistrationResponse; @@ -31,12 +32,8 @@ import org.slf4j.Logger; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - import java.util.UUID; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -57,7 +54,7 @@ public class TaskExecutorToResourceManagerConnection { private final String resourceManagerAddress; /** Execution context to be used to execute the on complete action of the ResourceManagerRegistration */ - private final ExecutionContext executionContext; + private final Executor executor; private TaskExecutorToResourceManagerConnection.ResourceManagerRegistration pendingRegistration; @@ -74,13 +71,13 @@ public TaskExecutorToResourceManagerConnection( TaskExecutor taskExecutor, String resourceManagerAddress, UUID resourceManagerLeaderId, - ExecutionContext executionContext) { + Executor executor) { this.log = checkNotNull(log); this.taskExecutor = checkNotNull(taskExecutor); this.resourceManagerAddress = checkNotNull(resourceManagerAddress); this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId); - this.executionContext = checkNotNull(executionContext); + this.executor = checkNotNull(executor); } // ------------------------------------------------------------------------ @@ -100,21 +97,22 @@ public void start() { Future> future = pendingRegistration.getFuture(); - future.onSuccess(new OnSuccess>() { + future.thenAcceptAsync(new AcceptFunction>() { @Override - public void onSuccess(Tuple2 result) { + public void accept(Tuple2 result) { registrationId = result.f1.getRegistrationId(); registeredResourceManager = result.f0; } - }, executionContext); + }, executor); // this future should only ever fail if there is a bug, not if the registration is declined - future.onFailure(new OnFailure() { + future.exceptionallyAsync(new ApplyFunction() { @Override - public void onFailure(Throwable failure) { + public Void apply(Throwable failure) { taskExecutor.onFatalErrorAsync(failure); + return null; } - }, executionContext); + }, executor); } public void close() { @@ -197,7 +195,7 @@ private static class ResourceManagerRegistration protected Future invokeRegistration( ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception { - FiniteDuration timeout = new FiniteDuration(timeoutMillis, TimeUnit.MILLISECONDS); + Time timeout = Time.milliseconds(timeoutMillis); return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, timeout); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java index 80fa19cc67bc7..e56a9ec5432e2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java @@ -18,9 +18,9 @@ package org.apache.flink.runtime.registration; -import akka.dispatch.Futures; - import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.util.TestLogger; @@ -29,18 +29,13 @@ import org.slf4j.LoggerFactory; -import scala.concurrent.Await; -import scala.concurrent.ExecutionContext$; -import scala.concurrent.Future; -import scala.concurrent.Promise; -import scala.concurrent.duration.FiniteDuration; - import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -71,8 +66,8 @@ public void testSimpleSuccessfulRegistration() throws Exception { // multiple accesses return the same future assertEquals(future, registration.getFuture()); - Tuple2 success = - Await.result(future, new FiniteDuration(10, SECONDS)); + Tuple2 success = + future.get(10L, TimeUnit.SECONDS); // validate correct invocation and result assertEquals(testId, success.f1.getCorrelationId()); @@ -83,7 +78,7 @@ public void testSimpleSuccessfulRegistration() throws Exception { rpc.stopService(); } } - + @Test public void testPropagateFailures() throws Exception { final String testExceptionMessage = "testExceptionMessage"; @@ -96,9 +91,15 @@ public void testPropagateFailures() throws Exception { registration.startRegistration(); Future future = registration.getFuture(); - assertTrue(future.failed().isCompleted()); + assertTrue(future.isDone()); - assertEquals(testExceptionMessage, future.failed().value().get().get().getMessage()); + try { + future.get(); + + fail("We expected an ExecutionException."); + } catch (ExecutionException e) { + assertEquals(testExceptionMessage, e.getCause().getMessage()); + } } @Test @@ -113,16 +114,16 @@ public void testRetryConnectOnFailure() throws Exception { // RPC service that fails upon the first connection, but succeeds on the second RpcService rpc = mock(RpcService.class); when(rpc.connect(anyString(), any(Class.class))).thenReturn( - Futures.failed(new Exception("test connect failure")), // first connection attempt fails - Futures.successful(testGateway) // second connection attempt succeeds + FlinkCompletableFuture.completedExceptionally(new Exception("test connect failure")), // first connection attempt fails + FlinkCompletableFuture.completed(testGateway) // second connection attempt succeeds ); - when(rpc.getExecutionContext()).thenReturn(ExecutionContext$.MODULE$.fromExecutor(executor)); + when(rpc.getExecutor()).thenReturn(executor); TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "foobar address", leaderId); registration.startRegistration(); Tuple2 success = - Await.result(registration.getFuture(), new FiniteDuration(10, SECONDS)); + registration.getFuture().get(10L, TimeUnit.SECONDS); // validate correct invocation and result assertEquals(testId, success.f1.getCorrelationId()); @@ -151,23 +152,23 @@ public void testRetriesOnTimeouts() throws Exception { try { rpc.registerGateway(testEndpointAddress, testGateway); - + TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); - + long started = System.nanoTime(); registration.startRegistration(); - + Future> future = registration.getFuture(); Tuple2 success = - Await.result(future, new FiniteDuration(10, SECONDS)); - + future.get(10L, TimeUnit.SECONDS); + long finished = System.nanoTime(); long elapsedMillis = (finished - started) / 1000000; - + // validate correct invocation and result assertEquals(testId, success.f1.getCorrelationId()); assertEquals(leaderId, testGateway.getInvocations().take().leaderId()); - + // validate that some retry-delay / back-off behavior happened assertTrue("retries did not properly back off", elapsedMillis >= 3 * TestRetryingRegistration.INITIAL_TIMEOUT); } @@ -199,10 +200,10 @@ public void testDecline() throws Exception { long started = System.nanoTime(); registration.startRegistration(); - + Future> future = registration.getFuture(); Tuple2 success = - Await.result(future, new FiniteDuration(10, SECONDS)); + future.get(10L, TimeUnit.SECONDS); long finished = System.nanoTime(); long elapsedMillis = (finished - started) / 1000000; @@ -212,7 +213,7 @@ public void testDecline() throws Exception { assertEquals(leaderId, testGateway.getInvocations().take().leaderId()); // validate that some retry-delay / back-off behavior happened - assertTrue("retries did not properly back off", elapsedMillis >= + assertTrue("retries did not properly back off", elapsedMillis >= 2 * TestRetryingRegistration.INITIAL_TIMEOUT + TestRetryingRegistration.DELAY_ON_DECLINE); } finally { @@ -220,7 +221,7 @@ public void testDecline() throws Exception { rpc.stopService(); } } - + @Test @SuppressWarnings("unchecked") public void testRetryOnError() throws Exception { @@ -235,9 +236,9 @@ public void testRetryOnError() throws Exception { TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn( - Futures.failed(new Exception("test exception")), - Futures.successful(new TestRegistrationSuccess(testId))); - + FlinkCompletableFuture.completedExceptionally(new Exception("test exception")), + FlinkCompletableFuture.completed(new TestRegistrationSuccess(testId))); + rpc.registerGateway(testEndpointAddress, testGateway); TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); @@ -247,11 +248,11 @@ public void testRetryOnError() throws Exception { Future> future = registration.getFuture(); Tuple2 success = - Await.result(future, new FiniteDuration(10, SECONDS)); + future.get(10, TimeUnit.SECONDS); long finished = System.nanoTime(); long elapsedMillis = (finished - started) / 1000000; - + assertEquals(testId, success.f1.getCorrelationId()); // validate that some retry-delay / back-off behavior happened @@ -271,10 +272,10 @@ public void testCancellation() throws Exception { TestingRpcService rpc = new TestingRpcService(); try { - Promise result = Futures.promise(); + FlinkCompletableFuture result = new FlinkCompletableFuture<>(); TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); - when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result.future()); + when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result); rpc.registerGateway(testEndpointAddress, testGateway); @@ -283,7 +284,7 @@ public void testCancellation() throws Exception { // cancel and fail the current registration attempt registration.cancel(); - result.failure(new TimeoutException()); + result.completeExceptionally(new TimeoutException()); // there should not be a second registration attempt verify(testGateway, atMost(1)).registrationCall(any(UUID.class), anyLong()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java index 431fbe8284cb0..2843aebf26e9f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java @@ -20,11 +20,11 @@ import akka.dispatch.Futures; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.rpc.TestingGatewayBase; import org.apache.flink.util.Preconditions; -import scala.concurrent.Future; - import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -56,7 +56,7 @@ public Future registrationCall(UUID leaderId, long timeout } // return a completed future (for a proper value), or one that never completes and will time out (for null) - return response != null ? Futures.successful(response) : this.futureWithTimeout(timeout); + return response != null ? FlinkCompletableFuture.completed(response) : this.futureWithTimeout(timeout); } public BlockingQueue getInvocations() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java index 8183c0a857562..64a1191211dea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -21,7 +21,7 @@ import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; -import org.apache.flink.runtime.rpc.MainThreadExecutor; +import org.apache.flink.runtime.rpc.MainThreadExecutable; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcService; @@ -68,7 +68,7 @@ public void testGrantAndRevokeLeadership() throws Exception { Assert.assertNull(resourceManager.getLeaderSessionID()); } - private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutor, StartStoppable, RpcGateway { + private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutable, StartStoppable, RpcGateway { @Override public void runAsync(Runnable runnable) { runnable.run(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index 85d28805d4d64..1f9e7e833286b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -18,10 +18,12 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.highavailability.NonHaServices; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; @@ -40,10 +42,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; import java.util.Collections; import java.util.UUID; @@ -99,7 +97,7 @@ public void testSlotsUnavailableRequest() throws Exception { Future registrationFuture = resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID)); try { - Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS)); + registrationFuture.get(5, TimeUnit.SECONDS); } catch (Exception e) { Assert.fail("JobManager registration Future didn't become ready."); } @@ -141,7 +139,7 @@ public void testSlotsUnavailableRequest() throws Exception { slotManager.updateSlotStatus(slotReport); // 4) Slot becomes available and TaskExecutor gets a SlotRequest - verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(FiniteDuration.class)); + verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class)); } /** @@ -171,7 +169,7 @@ public void testSlotAvailableRequest() throws Exception { Future registrationFuture = resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID)); try { - Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS)); + registrationFuture.get(5, TimeUnit.SECONDS); } catch (Exception e) { Assert.fail("JobManager registration Future didn't become ready."); } @@ -207,7 +205,7 @@ public void testSlotAvailableRequest() throws Exception { // 4) a SlotRequest is routed to the TaskExecutor - verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(FiniteDuration.class)); + verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class)); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java index 1791056424e55..7c6b0ee34f746 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java @@ -21,18 +21,16 @@ import akka.actor.ActorSystem; import akka.util.Timeout; +import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -49,7 +47,7 @@ public class AsyncCallsTest extends TestLogger { private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); private static AkkaRpcService akkaRpcService = - new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS)); + new AkkaRpcService(actorSystem, Time.milliseconds(10000L)); @AfterClass public static void shutdown() { @@ -104,8 +102,9 @@ public String call() throws Exception { } return "test"; } - }, new Timeout(30, TimeUnit.SECONDS)); - String str = Await.result(result, new FiniteDuration(30, TimeUnit.SECONDS)); + }, Time.seconds(30L)); + + String str = result.get(30, TimeUnit.SECONDS); assertEquals("test", str); // validate that no concurrent access happened diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java index b431eb92be2cd..ee3f784198e76 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java @@ -18,14 +18,14 @@ package org.apache.flink.runtime.rpc; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.util.ReflectionUtil; import org.apache.flink.util.TestLogger; import org.junit.Test; import org.reflections.Reflections; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.lang.annotation.Annotation; import java.lang.reflect.Method; @@ -43,6 +43,7 @@ public class RpcCompletenessTest extends TestLogger { private static final Class futureClass = Future.class; + private static final Class timeoutClass = Time.class; @Test @SuppressWarnings({"rawtypes", "unchecked"}) @@ -147,8 +148,8 @@ private void checkGatewayMethod(Method gatewayMethod) { for (int i = 0; i < parameterAnnotations.length; i++) { if (RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) { assertTrue( - "The rpc timeout has to be of type " + FiniteDuration.class.getName() + ".", - parameterTypes[i].equals(FiniteDuration.class)); + "The rpc timeout has to be of type " + timeoutClass.getName() + ".", + parameterTypes[i].equals(timeoutClass)); rpcTimeoutParameters++; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java index 8133a87e78b01..caf5e81f4099f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java @@ -18,9 +18,9 @@ package org.apache.flink.runtime.rpc; -import akka.dispatch.Futures; -import scala.concurrent.Future; -import scala.concurrent.Promise; +import org.apache.flink.runtime.concurrent.CompletableFuture; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -73,25 +73,25 @@ public String getAddress() { // ------------------------------------------------------------------------ public Future futureWithTimeout(long timeoutMillis) { - Promise promise = Futures.promise(); - executor.schedule(new FutureTimeout(promise), timeoutMillis, TimeUnit.MILLISECONDS); - return promise.future(); + FlinkCompletableFuture future = new FlinkCompletableFuture<>(); + executor.schedule(new FutureTimeout(future), timeoutMillis, TimeUnit.MILLISECONDS); + return future; } // ------------------------------------------------------------------------ private static final class FutureTimeout implements Runnable { - private final Promise promise; + private final CompletableFuture promise; - private FutureTimeout(Promise promise) { + private FutureTimeout(CompletableFuture promise) { this.promise = promise; } @Override public void run() { try { - promise.failure(new TimeoutException()); + promise.completeExceptionally(new TimeoutException()); } catch (Throwable t) { System.err.println("CAUGHT AN ERROR IN THE TEST: " + t.getMessage()); t.printStackTrace(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java index 2212680629290..f1640565519c2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java @@ -18,18 +18,14 @@ package org.apache.flink.runtime.rpc; -import akka.dispatch.Futures; -import akka.util.Timeout; - +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -69,7 +65,7 @@ public TestingRpcService() { * Creates a new {@code TestingRpcService}, using the given configuration. */ public TestingRpcService(Configuration configuration) { - super(AkkaUtils.createLocalActorSystem(configuration), new Timeout(new FiniteDuration(10, TimeUnit.SECONDS))); + super(AkkaUtils.createLocalActorSystem(configuration), Time.seconds(10)); this.registeredConnections = new ConcurrentHashMap<>(); } @@ -103,13 +99,13 @@ public Future connect(String address, Class clazz) if (clazz.isAssignableFrom(gateway.getClass())) { @SuppressWarnings("unchecked") C typedGateway = (C) gateway; - return Futures.successful(typedGateway); + return FlinkCompletableFuture.completed(typedGateway); } else { - return Futures.failed( - new Exception("Gateway registered under " + address + " is not of type " + clazz)); + return FlinkCompletableFuture.completedExceptionally( + new Exception("Gateway registered under " + address + " is not of type " + clazz)); } } else { - return Futures.failed(new Exception("No gateway registered under that name")); + return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name")); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java index 01776edc5a549..957453a9d3660 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java @@ -18,16 +18,13 @@ package org.apache.flink.runtime.rpc; -import akka.dispatch.ExecutionContexts; import akka.dispatch.Futures; -import akka.util.Timeout; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.util.DirectExecutorService; import org.apache.flink.util.Preconditions; -import scala.concurrent.Await; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.lang.annotation.Annotation; import java.lang.reflect.InvocationHandler; @@ -37,6 +34,7 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -67,8 +65,8 @@ public void scheduleRunnable(final Runnable runnable, final long delay, final Ti } @Override - public ExecutionContext getExecutionContext() { - return ExecutionContexts.fromExecutorService(executorService); + public Executor getExecutor() { + return executorService; } @Override @@ -94,7 +92,7 @@ public > C startServer(S rpcEndpo classLoader, new Class[]{ rpcEndpoint.getSelfGatewayType(), - MainThreadExecutor.class, + MainThreadExecutable.class, StartStoppable.class, RpcGateway.class }, @@ -114,13 +112,13 @@ public Future connect(String address, Class clazz) if (clazz.isAssignableFrom(gateway.getClass())) { @SuppressWarnings("unchecked") C typedGateway = (C) gateway; - return Futures.successful(typedGateway); + return FlinkCompletableFuture.completed(typedGateway); } else { - return Futures.failed( + return FlinkCompletableFuture.completedExceptionally( new Exception("Gateway registered under " + address + " is not of type " + clazz)); } } else { - return Futures.failed(new Exception("No gateway registered under that name")); + return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name")); } } @@ -141,20 +139,20 @@ public void clearGateways() { registeredConnections.clear(); } - private static final class TestingSerialInvocationHandler> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable { + private static final class TestingSerialInvocationHandler> implements InvocationHandler, RpcGateway, MainThreadExecutable, StartStoppable { private final T rpcEndpoint; /** default timeout for asks */ - private final Timeout timeout; + private final Time timeout; private final String address; private TestingSerialInvocationHandler(String address, T rpcEndpoint) { - this(address, rpcEndpoint, new Timeout(new FiniteDuration(10, TimeUnit.SECONDS))); + this(address, rpcEndpoint, Time.seconds(10)); } - private TestingSerialInvocationHandler(String address, T rpcEndpoint, Timeout timeout) { + private TestingSerialInvocationHandler(String address, T rpcEndpoint, Time timeout) { this.rpcEndpoint = rpcEndpoint; this.timeout = timeout; this.address = address; @@ -163,7 +161,7 @@ private TestingSerialInvocationHandler(String address, T rpcEndpoint, Timeout ti @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Class declaringClass = method.getDeclaringClass(); - if (declaringClass.equals(MainThreadExecutor.class) || + if (declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(RpcGateway.class)) { return method.invoke(this, args); @@ -171,7 +169,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl final String methodName = method.getName(); Class[] parameterTypes = method.getParameterTypes(); Annotation[][] parameterAnnotations = method.getParameterAnnotations(); - Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout); + Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout); final Tuple2[], Object[]> filteredArguments = filterArguments( parameterTypes, @@ -201,13 +199,13 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl private Object handleRpcInvocationSync(final String methodName, final Class[] parameterTypes, final Object[] args, - final Timeout futureTimeout) throws Exception { + final Time futureTimeout) throws Exception { final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes); Object result = rpcMethod.invoke(rpcEndpoint, args); if (result instanceof Future) { Future future = (Future) result; - return Await.result(future, futureTimeout.duration()); + return future.get(futureTimeout.getSize(), futureTimeout.getUnit()); } else { return result; } @@ -219,11 +217,11 @@ public void runAsync(Runnable runnable) { } @Override - public Future callAsync(Callable callable, Timeout callTimeout) { + public Future callAsync(Callable callable, Time callTimeout) { try { - return Futures.successful(callable.call()); + return FlinkCompletableFuture.completed(callable.call()); } catch (Throwable e) { - return Futures.failed(e); + return FlinkCompletableFuture.completedExceptionally(e); } } @@ -281,18 +279,18 @@ private Method lookupRpcMethod(final String methodName, * has been found * @return Timeout extracted from the array of arguments or the default timeout */ - private static Timeout extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, - Timeout defaultTimeout) { + private static Time extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, + Time defaultTimeout) { if (args != null) { Preconditions.checkArgument(parameterAnnotations.length == args.length); for (int i = 0; i < parameterAnnotations.length; i++) { if (isRpcTimeout(parameterAnnotations[i])) { - if (args[i] instanceof FiniteDuration) { - return new Timeout((FiniteDuration) args[i]); + if (args[i] instanceof Time) { + return (Time) args[i]; } else { throw new RuntimeException("The rpc timeout parameter must be of type " + - FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() + + Time.class.getName() + ". The type " + args[i].getClass().getName() + " is not supported."); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index a6ceb9104a1ef..5624d12fc5255 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -19,8 +19,9 @@ package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorSystem; -import akka.util.Timeout; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcMethod; @@ -30,13 +31,12 @@ import org.hamcrest.core.Is; import org.junit.AfterClass; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class AkkaRpcActorTest extends TestLogger { @@ -47,7 +47,7 @@ public class AkkaRpcActorTest extends TestLogger { private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); - private static Timeout timeout = new Timeout(10000, TimeUnit.MILLISECONDS); + private static Time timeout = Time.milliseconds(10000L); private static AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, timeout); @@ -69,7 +69,7 @@ public void testAddressResolution() throws Exception { Future futureRpcGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class); - DummyRpcGateway rpcGateway = Await.result(futureRpcGateway, timeout.duration()); + DummyRpcGateway rpcGateway = futureRpcGateway.get(timeout.getSize(), timeout.getUnit()); assertEquals(rpcEndpoint.getAddress(), rpcGateway.getAddress()); } @@ -82,11 +82,12 @@ public void testFailingAddressResolution() throws Exception { Future futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class); try { - DummyRpcGateway gateway = Await.result(futureRpcGateway, timeout.duration()); + DummyRpcGateway gateway = futureRpcGateway.get(timeout.getSize(), timeout.getUnit()); fail("The rpc connection resolution should have failed."); - } catch (RpcConnectionException exception) { + } catch (ExecutionException exception) { // we're expecting a RpcConnectionException + assertTrue(exception.getCause() instanceof RpcConnectionException); } } @@ -111,7 +112,7 @@ public void testMessageStashing() throws Exception { // now process the rpc rpcEndpoint.start(); - Integer actualValue = Await.result(result, timeout.duration()); + Integer actualValue = result.get(timeout.getSize(), timeout.getUnit()); assertThat("The new foobar value should have been returned.", actualValue, Is.is(expectedValue)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index f55069e44e5b1..4e9e51805b637 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorSystem; -import akka.util.Timeout; +import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.util.TestLogger; @@ -40,7 +40,7 @@ public class AkkaRpcServiceTest extends TestLogger { private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); private static AkkaRpcService akkaRpcService = - new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS)); + new AkkaRpcService(actorSystem, Time.milliseconds(10000)); @AfterClass public static void shutdown() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java index 9ffafdaec46b6..9ec1f7edab8c7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java @@ -18,8 +18,7 @@ package org.apache.flink.runtime.rpc.akka; -import akka.util.Timeout; - +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.rpc.RpcEndpoint; @@ -30,8 +29,6 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; -import java.util.concurrent.TimeUnit; - import static org.junit.Assert.assertTrue; public class MainThreadValidationTest extends TestLogger { @@ -48,7 +45,7 @@ public void failIfNotInMainThread() { // actual test AkkaRpcService akkaRpcService = new AkkaRpcService( AkkaUtils.createDefaultActorSystem(), - new Timeout(10000, TimeUnit.MILLISECONDS)); + Time.milliseconds(10000)); try { TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java index 9d2ed99c080cd..0d5dc282d06e6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java @@ -19,10 +19,11 @@ package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorSystem; -import akka.util.Timeout; import com.typesafe.config.Config; import com.typesafe.config.ConfigValueFactory; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcMethod; @@ -32,13 +33,9 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.io.IOException; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -52,7 +49,7 @@ public class MessageSerializationTest extends TestLogger { private static AkkaRpcService akkaRpcService1; private static AkkaRpcService akkaRpcService2; - private static final FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS); + private static final Time timeout = Time.seconds(10L); private static final int maxFrameSize = 32000; @BeforeClass @@ -63,8 +60,8 @@ public static void setup() { actorSystem1 = AkkaUtils.createActorSystem(modifiedAkkaConfig); actorSystem2 = AkkaUtils.createActorSystem(modifiedAkkaConfig); - akkaRpcService1 = new AkkaRpcService(actorSystem1, new Timeout(timeout)); - akkaRpcService2 = new AkkaRpcService(actorSystem2, new Timeout(timeout)); + akkaRpcService1 = new AkkaRpcService(actorSystem1, timeout); + akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout); } @AfterClass @@ -113,7 +110,7 @@ public void testNonSerializableRemoteMessageTransfer() throws Exception { Future remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class); - TestGateway remoteGateway = Await.result(remoteGatewayFuture, timeout); + TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit()); remoteGateway.foobar(new Object()); @@ -134,7 +131,7 @@ public void testSerializableRemoteMessageTransfer() throws Exception { Future remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class); - TestGateway remoteGateway = Await.result(remoteGatewayFuture, timeout); + TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit()); int expected = 42; @@ -158,7 +155,7 @@ public void testMaximumFramesizeRemoteMessageTransfer() throws Exception { Future remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class); - TestGateway remoteGateway = Await.result(remoteGatewayFuture, timeout); + TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit()); int bufferSize = maxFrameSize + 1; byte[] buffer = new byte[bufferSize]; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index a8d5bd76cf829..09aab183782e2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.highavailability.NonHaServices; @@ -29,8 +30,6 @@ import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; - import java.util.UUID; import static org.junit.Assert.*; @@ -56,7 +55,7 @@ public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception { taskManager.start(); verify(rmGateway, timeout(5000)).registerTaskExecutor( - any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class)); + any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(Time.class)); } finally { rpc.stopService(); @@ -97,7 +96,7 @@ public void testTriggerRegistrationOnLeaderChange() throws Exception { testLeaderService.notifyListener(address1, leaderId1); verify(rmGateway1, timeout(5000)).registerTaskExecutor( - eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class)); + eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(Time.class)); assertNotNull(taskManager.getResourceManagerConnection()); // cancel the leader @@ -107,7 +106,7 @@ public void testTriggerRegistrationOnLeaderChange() throws Exception { testLeaderService.notifyListener(address2, leaderId2); verify(rmGateway2, timeout(5000)).registerTaskExecutor( - eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class)); + eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(Time.class)); assertNotNull(taskManager.getResourceManagerConnection()); } finally {