From afe1d171132bb3724e672a1c4ce74a3f7c185908 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 31 Jul 2017 15:07:18 +0200 Subject: [PATCH 1/2] [FLINK-7313] [futures] Add Flink future and Scala future to Java 8 CompletableFuture conversion Add DirectExecutionContext Add Scala Future to Java 8 CompletableFuture utility to FutureUtils Add Flink future to Java 8's CompletableFuture conversion utility to FutureUtils Add base class for Flink's unchecked future exceptions --- .../flink/runtime/concurrent/Executors.java | 37 ++++++++++++ .../concurrent/FlinkFutureException.java | 47 +++++++++++++++ .../flink/runtime/concurrent/FutureUtils.java | 58 +++++++++++++++++++ 3 files changed, 142 insertions(+) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java index e8a9be9584125..04cdce741a79b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java @@ -26,6 +26,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import scala.concurrent.ExecutionContext; + /** * Collection of {@link Executor} implementations */ @@ -58,6 +60,41 @@ public void execute(@Nonnull Runnable command) { } } + /** + * Return a direct execution context. The direct execution context executes the runnable directly + * in the calling thread. + * + * @return Direct execution context. + */ + public static ExecutionContext directExecutionContext() { + return DirectExecutionContext.INSTANCE; + } + + /** + * Direct execution context. + */ + private static class DirectExecutionContext implements ExecutionContext { + + static final DirectExecutionContext INSTANCE = new DirectExecutionContext(); + + private DirectExecutionContext() {} + + @Override + public void execute(Runnable runnable) { + runnable.run(); + } + + @Override + public void reportFailure(Throwable cause) { + throw new IllegalStateException("Error in direct execution context.", cause); + } + + @Override + public ExecutionContext prepare() { + return this; + } + } + /** * Gracefully shutdown the given {@link ExecutorService}. The call waits the given timeout that * all ExecutorServices terminate. If the ExecutorServices do not terminate in this time, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java new file mode 100644 index 0000000000000..c728904825313 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.concurrent.CompletionStage; + +/** + * Base class for exceptions which are thrown in {@link CompletionStage}. + * + *

The exception has to extend {@link FlinkRuntimeException} because only + * unchecked exceptions can be thrown in a future's stage. Additionally we let + * it extend the Flink runtime exception because it designates the exception to + * come from a Flink stage. + */ +public class FlinkFutureException extends FlinkRuntimeException { + private static final long serialVersionUID = -8878194471694178210L; + + public FlinkFutureException(String message) { + super(message); + } + + public FlinkFutureException(Throwable cause) { + super(cause); + } + + public FlinkFutureException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index a27af5666fd25..9cdbe1f461680 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -21,6 +21,8 @@ import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.util.Preconditions; +import akka.dispatch.OnComplete; + import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -286,4 +288,60 @@ public int getNumFuturesCompleted() { return numCompleted.get(); } } + + // ------------------------------------------------------------------------ + // Converting futures + // ------------------------------------------------------------------------ + + /** + * Converts a Scala {@link scala.concurrent.Future} to a {@link java.util.concurrent.CompletableFuture}. + * + * @param scalaFuture to convert to a Java 8 CompletableFuture + * @param type of the future value + * @return Java 8 CompletableFuture + */ + public static java.util.concurrent.CompletableFuture toJava(scala.concurrent.Future scalaFuture) { + final java.util.concurrent.CompletableFuture result = new java.util.concurrent.CompletableFuture<>(); + + scalaFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, T success) throws Throwable { + if (failure != null) { + result.completeExceptionally(failure); + } else { + result.complete(success); + } + } + }, Executors.directExecutionContext()); + + return result; + } + + /** + * Converts a Flink {@link Future} into a {@link CompletableFuture}. + * + * @param flinkFuture to convert to a Java 8 CompletableFuture + * @param type of the future value + * @return Java 8 CompletableFuture + * + * @deprecated Will be removed once we completely remove Flink's futures + */ + @Deprecated + public static java.util.concurrent.CompletableFuture toJava(Future flinkFuture) { + final java.util.concurrent.CompletableFuture result = new java.util.concurrent.CompletableFuture<>(); + + flinkFuture.handle( + (t, throwable) -> { + if (throwable != null) { + result.completeExceptionally(throwable); + } else { + result.complete(t); + } + + return null; + } + ); + + return result; + } } From 9c3cc4e37d2c67551ffb21ad6f7dcaa81e0d4d50 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 31 Jul 2017 18:06:20 +0200 Subject: [PATCH 2/2] [FLINK-7319] [futures] Replace Flink's Futures with Java 8 CompletableFuture in MesosResourceManager --- .../MesosResourceManager.java | 44 ++++++++----------- 1 file changed, 19 insertions(+), 25 deletions(-) diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index 260d5bf65f47c..736af59514d2e 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -45,10 +45,7 @@ import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.concurrent.ApplyFunction; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.InstanceID; @@ -79,11 +76,11 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import scala.Option; @@ -324,24 +321,23 @@ public void postStop() throws Exception { Exception exception = null; FiniteDuration stopTimeout = new FiniteDuration(5L, TimeUnit.SECONDS); - Future stopTaskMonitorFuture = stopActor(taskMonitor, stopTimeout); + CompletableFuture stopTaskMonitorFuture = stopActor(taskMonitor, stopTimeout); taskMonitor = null; - Future stopConnectionMonitorFuture = stopActor(connectionMonitor, stopTimeout); + CompletableFuture stopConnectionMonitorFuture = stopActor(connectionMonitor, stopTimeout); connectionMonitor = null; - Future stopLaunchCoordinatorFuture = stopActor(launchCoordinator, stopTimeout); + CompletableFuture stopLaunchCoordinatorFuture = stopActor(launchCoordinator, stopTimeout); launchCoordinator = null; - Future stopReconciliationCoordinatorFuture = stopActor(reconciliationCoordinator, stopTimeout); + CompletableFuture stopReconciliationCoordinatorFuture = stopActor(reconciliationCoordinator, stopTimeout); reconciliationCoordinator = null; - Future stopFuture = FutureUtils.waitForAll( - Arrays.asList( - stopTaskMonitorFuture, - stopConnectionMonitorFuture, - stopLaunchCoordinatorFuture, - stopReconciliationCoordinatorFuture)); + CompletableFuture stopFuture = CompletableFuture.allOf( + stopTaskMonitorFuture, + stopConnectionMonitorFuture, + stopLaunchCoordinatorFuture, + stopReconciliationCoordinatorFuture); // wait for the future to complete or to time out try { @@ -606,20 +602,18 @@ public void taskTerminated(TaskMonitor.TaskTerminated message) { * @param timeout for the graceful shut down * @return Future containing the result of the graceful shut down */ - private Future stopActor(final ActorRef actorRef, FiniteDuration timeout) { - return new FlinkFuture<>(Patterns.gracefulStop(actorRef, timeout)) + private CompletableFuture stopActor(final ActorRef actorRef, FiniteDuration timeout) { + return FutureUtils.toJava(Patterns.gracefulStop(actorRef, timeout)) .exceptionally( - new ApplyFunction() { - @Override - public Boolean apply(Throwable throwable) { - // The actor did not stop gracefully in time, try to directly stop it - actorSystem.stop(actorRef); + (Throwable throwable) -> { + // The actor did not stop gracefully in time, try to directly stop it + actorSystem.stop(actorRef); - log.warn("Could not stop actor {} gracefully.", actorRef.path(), throwable); + log.warn("Could not stop actor {} gracefully.", actorRef.path(), throwable); - return true; - } - }); + return true; + } + ); } /**