From afe1d171132bb3724e672a1c4ce74a3f7c185908 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 31 Jul 2017 15:07:18 +0200 Subject: [PATCH 1/4] [FLINK-7313] [futures] Add Flink future and Scala future to Java 8 CompletableFuture conversion Add DirectExecutionContext Add Scala Future to Java 8 CompletableFuture utility to FutureUtils Add Flink future to Java 8's CompletableFuture conversion utility to FutureUtils Add base class for Flink's unchecked future exceptions --- .../flink/runtime/concurrent/Executors.java | 37 ++++++++++++ .../concurrent/FlinkFutureException.java | 47 +++++++++++++++ .../flink/runtime/concurrent/FutureUtils.java | 58 +++++++++++++++++++ 3 files changed, 142 insertions(+) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java index e8a9be9584125..04cdce741a79b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java @@ -26,6 +26,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import scala.concurrent.ExecutionContext; + /** * Collection of {@link Executor} implementations */ @@ -58,6 +60,41 @@ public void execute(@Nonnull Runnable command) { } } + /** + * Return a direct execution context. The direct execution context executes the runnable directly + * in the calling thread. + * + * @return Direct execution context. + */ + public static ExecutionContext directExecutionContext() { + return DirectExecutionContext.INSTANCE; + } + + /** + * Direct execution context. + */ + private static class DirectExecutionContext implements ExecutionContext { + + static final DirectExecutionContext INSTANCE = new DirectExecutionContext(); + + private DirectExecutionContext() {} + + @Override + public void execute(Runnable runnable) { + runnable.run(); + } + + @Override + public void reportFailure(Throwable cause) { + throw new IllegalStateException("Error in direct execution context.", cause); + } + + @Override + public ExecutionContext prepare() { + return this; + } + } + /** * Gracefully shutdown the given {@link ExecutorService}. The call waits the given timeout that * all ExecutorServices terminate. If the ExecutorServices do not terminate in this time, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java new file mode 100644 index 0000000000000..c728904825313 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.concurrent.CompletionStage; + +/** + * Base class for exceptions which are thrown in {@link CompletionStage}. + * + *

The exception has to extend {@link FlinkRuntimeException} because only + * unchecked exceptions can be thrown in a future's stage. Additionally we let + * it extend the Flink runtime exception because it designates the exception to + * come from a Flink stage. + */ +public class FlinkFutureException extends FlinkRuntimeException { + private static final long serialVersionUID = -8878194471694178210L; + + public FlinkFutureException(String message) { + super(message); + } + + public FlinkFutureException(Throwable cause) { + super(cause); + } + + public FlinkFutureException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index a27af5666fd25..9cdbe1f461680 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -21,6 +21,8 @@ import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.util.Preconditions; +import akka.dispatch.OnComplete; + import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -286,4 +288,60 @@ public int getNumFuturesCompleted() { return numCompleted.get(); } } + + // ------------------------------------------------------------------------ + // Converting futures + // ------------------------------------------------------------------------ + + /** + * Converts a Scala {@link scala.concurrent.Future} to a {@link java.util.concurrent.CompletableFuture}. + * + * @param scalaFuture to convert to a Java 8 CompletableFuture + * @param type of the future value + * @return Java 8 CompletableFuture + */ + public static java.util.concurrent.CompletableFuture toJava(scala.concurrent.Future scalaFuture) { + final java.util.concurrent.CompletableFuture result = new java.util.concurrent.CompletableFuture<>(); + + scalaFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, T success) throws Throwable { + if (failure != null) { + result.completeExceptionally(failure); + } else { + result.complete(success); + } + } + }, Executors.directExecutionContext()); + + return result; + } + + /** + * Converts a Flink {@link Future} into a {@link CompletableFuture}. + * + * @param flinkFuture to convert to a Java 8 CompletableFuture + * @param type of the future value + * @return Java 8 CompletableFuture + * + * @deprecated Will be removed once we completely remove Flink's futures + */ + @Deprecated + public static java.util.concurrent.CompletableFuture toJava(Future flinkFuture) { + final java.util.concurrent.CompletableFuture result = new java.util.concurrent.CompletableFuture<>(); + + flinkFuture.handle( + (t, throwable) -> { + if (throwable != null) { + result.completeExceptionally(throwable); + } else { + result.complete(t); + } + + return null; + } + ); + + return result; + } } From 8425e85f3ea0499fb59fc3c25c035a622e3b282d Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 31 Jul 2017 18:47:22 +0200 Subject: [PATCH 2/4] [FLINK-7321] [futures] Replace Flink's futures with Java 8's CompletableFuture in HeartbeatManager --- .../runtime/heartbeat/HeartbeatListener.java | 5 ++-- .../heartbeat/HeartbeatManagerImpl.java | 24 ++++++---------- .../heartbeat/HeartbeatManagerSenderImpl.java | 23 ++++++--------- .../flink/runtime/jobmaster/JobMaster.java | 9 +++--- .../resourcemanager/ResourceManager.java | 9 +++--- .../runtime/taskexecutor/TaskExecutor.java | 21 ++++++-------- .../heartbeat/HeartbeatManagerTest.java | 28 ++++++++++--------- 7 files changed, 53 insertions(+), 66 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java index c6307aad94dd5..734eb4c853b41 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java @@ -19,7 +19,8 @@ package org.apache.flink.runtime.heartbeat; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.Future; + +import java.util.concurrent.CompletableFuture; /** * Interface for the interaction with the {@link HeartbeatManager}. The heartbeat listener is used @@ -58,5 +59,5 @@ public interface HeartbeatListener { * * @return Future containing the next payload for heartbeats */ - Future retrievePayload(); + CompletableFuture retrievePayload(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java index d97cfa0882e95..99f44f935ab8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java @@ -19,9 +19,6 @@ package org.apache.flink.runtime.heartbeat; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.AcceptFunction; -import org.apache.flink.runtime.concurrent.ApplyFunction; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.util.Preconditions; @@ -30,6 +27,7 @@ import javax.annotation.concurrent.ThreadSafe; import java.util.Collection; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; @@ -193,24 +191,18 @@ public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) heartbeatListener.reportPayload(requestOrigin, heartbeatPayload); } - Future futurePayload = heartbeatListener.retrievePayload(); + CompletableFuture futurePayload = heartbeatListener.retrievePayload(); if (futurePayload != null) { - Future sendHeartbeatFuture = futurePayload.thenAcceptAsync(new AcceptFunction() { - @Override - public void accept(O retrievedPayload) { - heartbeatTarget.receiveHeartbeat(getOwnResourceID(), retrievedPayload); - } - }, executor); - - sendHeartbeatFuture.exceptionally(new ApplyFunction() { - @Override - public Void apply(Throwable failure) { + CompletableFuture sendHeartbeatFuture = futurePayload.thenAcceptAsync( + retrievedPayload -> heartbeatTarget.receiveHeartbeat(getOwnResourceID(), retrievedPayload), + executor); + + sendHeartbeatFuture.exceptionally((Throwable failure) -> { log.warn("Could not send heartbeat to target with id {}.", requestOrigin, failure); return null; - } - }); + }); } else { heartbeatTarget.receiveHeartbeat(ownResourceID, null); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java index 5b3a95748f82e..eb8234369b9d4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java @@ -19,13 +19,11 @@ package org.apache.flink.runtime.heartbeat; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.AcceptFunction; -import org.apache.flink.runtime.concurrent.ApplyFunction; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.slf4j.Logger; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -65,25 +63,20 @@ public void run() { if (!stopped) { log.debug("Trigger heartbeat request."); for (HeartbeatMonitor heartbeatMonitor : getHeartbeatTargets()) { - Future futurePayload = getHeartbeatListener().retrievePayload(); + CompletableFuture futurePayload = getHeartbeatListener().retrievePayload(); final HeartbeatTarget heartbeatTarget = heartbeatMonitor.getHeartbeatTarget(); if (futurePayload != null) { - Future requestHeartbeatFuture = futurePayload.thenAcceptAsync(new AcceptFunction() { - @Override - public void accept(O payload) { - heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload); - } - }, getExecutor()); + CompletableFuture requestHeartbeatFuture = futurePayload.thenAcceptAsync( + payload -> heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload), + getExecutor()); - requestHeartbeatFuture.exceptionally(new ApplyFunction() { - @Override - public Void apply(Throwable failure) { + requestHeartbeatFuture.exceptionally( + (Throwable failure) -> { log.warn("Could not request the heartbeat from target {}.", heartbeatTarget, failure); return null; - } - }); + }); } else { heartbeatTarget.requestHeartbeat(getOwnResourceID(), null); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 947a91486f4df..87afb7d54afdb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -106,6 +106,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; @@ -1123,8 +1124,8 @@ public void reportPayload(ResourceID resourceID, Void payload) { } @Override - public Future retrievePayload() { - return FlinkCompletableFuture.completed(null); + public CompletableFuture retrievePayload() { + return CompletableFuture.completedFuture(null); } } @@ -1150,8 +1151,8 @@ public void reportPayload(ResourceID resourceID, Void payload) { } @Override - public Future retrievePayload() { - return FlinkCompletableFuture.completed(null); + public CompletableFuture retrievePayload() { + return CompletableFuture.completedFuture(null); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 0dfbbcd2faad4..438ec65b462cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -67,6 +67,7 @@ import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; @@ -999,8 +1000,8 @@ public void run() { } @Override - public Future retrievePayload() { - return FlinkCompletableFuture.completed(null); + public CompletableFuture retrievePayload() { + return CompletableFuture.completedFuture(null); } } @@ -1032,8 +1033,8 @@ public void reportPayload(ResourceID resourceID, Void payload) { } @Override - public Future retrievePayload() { - return FlinkCompletableFuture.completed(null); + public CompletableFuture retrievePayload() { + return CompletableFuture.completedFuture(null); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index cdec08edfe142..4c4b0a7e1861f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; @@ -99,7 +99,7 @@ import java.util.Map; import java.util.Objects; import java.util.UUID; -import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; import static org.apache.flink.util.Preconditions.checkArgument; @@ -1328,8 +1328,8 @@ public void reportPayload(ResourceID resourceID, Void payload) { } @Override - public Future retrievePayload() { - return FlinkCompletableFuture.completed(null); + public CompletableFuture retrievePayload() { + return CompletableFuture.completedFuture(null); } } @@ -1355,14 +1355,11 @@ public void reportPayload(ResourceID resourceID, Void payload) { } @Override - public Future retrievePayload() { - return callAsync( - new Callable() { - @Override - public SlotReport call() throws Exception { - return taskSlotTable.createSlotReport(getResourceID()); - } - }, taskManagerConfiguration.getTimeout()); + public CompletableFuture retrievePayload() { + return FutureUtils.toJava( + callAsync( + () -> taskSlotTable.createSlotReport(getResourceID()), + taskManagerConfiguration.getTimeout())); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java index 031f48cc577e0..3043713fd792e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java @@ -19,11 +19,8 @@ package org.apache.flink.runtime.heartbeat; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.CompletableFuture; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.util.DirectExecutorService; import org.apache.flink.util.TestLogger; @@ -31,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -72,7 +70,7 @@ public void testRegularHeartbeat() { Object expectedObject = new Object(); - when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(expectedObject)); + when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(expectedObject)); HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl<>( heartbeatTimeout, @@ -113,7 +111,7 @@ public void testHeartbeatMonitorUpdate() { Object expectedObject = new Object(); - when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(expectedObject)); + when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(expectedObject)); HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl<>( heartbeatTimeout, @@ -165,7 +163,7 @@ public void testHeartbeatTimeout() throws Exception { HeartbeatTarget heartbeatTarget = mock(HeartbeatTarget.class); - Future timeoutFuture = heartbeatListener.getTimeoutFuture(); + CompletableFuture timeoutFuture = heartbeatListener.getTimeoutFuture(); heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget); @@ -199,11 +197,11 @@ public void testHeartbeatCluster() throws Exception { ResourceID resourceID2 = new ResourceID("barfoo"); HeartbeatListener heartbeatListener = mock(HeartbeatListener.class); - when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(object)); + when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(object)); TestingHeartbeatListener heartbeatListener2 = new TestingHeartbeatListener(object2); - Future futureTimeout = heartbeatListener2.getTimeoutFuture(); + CompletableFuture futureTimeout = heartbeatListener2.getTimeoutFuture(); HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl<>( heartbeatTimeout, @@ -266,7 +264,7 @@ public void testTargetUnmonitoring() throws InterruptedException, ExecutionExcep heartbeatManager.unmonitorTarget(targetID); - Future timeout = heartbeatListener.getTimeoutFuture(); + CompletableFuture timeout = heartbeatListener.getTimeoutFuture(); try { timeout.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS); @@ -278,7 +276,7 @@ public void testTargetUnmonitoring() throws InterruptedException, ExecutionExcep static class TestingHeartbeatListener implements HeartbeatListener { - private final CompletableFuture future = new FlinkCompletableFuture<>(); + private final CompletableFuture future = new CompletableFuture<>(); private final Object payload; @@ -288,7 +286,11 @@ static class TestingHeartbeatListener implements HeartbeatListener getTimeoutFuture() { + public CompletableFuture getFuture() { + return future; + } + + CompletableFuture getTimeoutFuture() { return future; } @@ -307,8 +309,8 @@ public void reportPayload(ResourceID resourceID, Object payload) { } @Override - public Future retrievePayload() { - return FlinkCompletableFuture.completed(payload); + public CompletableFuture retrievePayload() { + return CompletableFuture.completedFuture(payload); } } } From 9de4ab0680b040d64be3ebb4144fece4a5ef8438 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 31 Jul 2017 19:35:14 +0200 Subject: [PATCH 3/4] [FLINK-7324] [futures] Replace Flink's future with Java 8's CompletableFuture in SlotPool --- .../flink/runtime/concurrent/FutureUtils.java | 22 ++++++ .../flink/runtime/instance/SlotPool.java | 70 +++++++++---------- 2 files changed, 56 insertions(+), 36 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index 9cdbe1f461680..0a73f125481fe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -344,4 +344,26 @@ public static java.util.concurrent.CompletableFuture toJava(Future fli return result; } + + /** + * Converts a Java 8 {@link java.util.concurrent.CompletableFuture} into a Flink {@link Future}. + * + * @param javaFuture to convert to a Flink future + * @param type of the future value + * @return Flink future + */ + public static Future toFlinkFuture(java.util.concurrent.CompletableFuture javaFuture) { + FlinkCompletableFuture result = new FlinkCompletableFuture<>(); + + javaFuture.whenComplete( + (value, throwable) -> { + if (throwable == null) { + result.complete(value); + } else { + result.completeExceptionally(throwable); + } + }); + + return result; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index 8cf6a9bed85d9..c74d9a6149c57 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -25,10 +25,8 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.concurrent.AcceptFunction; -import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; @@ -57,6 +55,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; import static org.apache.flink.util.Preconditions.checkArgument; @@ -246,7 +245,7 @@ public void connectToResourceManager(UUID resourceManagerLeaderId, ResourceManag // work on all slots waiting for this connection for (PendingRequest pending : waitingForResourceManager.values()) { - requestSlotFromResourceManager(pending.allocationID(), pending.future(), pending.resourceProfile()); + requestSlotFromResourceManager(pending.allocationID(), pending.getFuture(), pending.resourceProfile()); } // all sent off @@ -269,7 +268,7 @@ public Future allocateSlot( ResourceProfile resources, Iterable locationPreferences) { - return internalAllocateSlot(task, resources, locationPreferences); + return FutureUtils.toFlinkFuture(internalAllocateSlot(task, resources, locationPreferences)); } @RpcMethod @@ -278,7 +277,7 @@ public void returnAllocatedSlot(Slot slot) { } - Future internalAllocateSlot( + CompletableFuture internalAllocateSlot( ScheduledUnit task, ResourceProfile resources, Iterable locationPreferences) { @@ -288,12 +287,12 @@ Future internalAllocateSlot( if (slotFromPool != null) { SimpleSlot slot = createSimpleSlot(slotFromPool.slot(), slotFromPool.locality()); allocatedSlots.add(slot); - return FlinkCompletableFuture.completed(slot); + return CompletableFuture.completedFuture(slot); } // the request will be completed by a future final AllocationID allocationID = new AllocationID(); - final FlinkCompletableFuture future = new FlinkCompletableFuture<>(); + final CompletableFuture future = new CompletableFuture<>(); // (2) need to request a slot @@ -310,34 +309,33 @@ Future internalAllocateSlot( private void requestSlotFromResourceManager( final AllocationID allocationID, - final FlinkCompletableFuture future, + final CompletableFuture future, final ResourceProfile resources) { LOG.info("Requesting slot with profile {} from resource manager (request = {}).", resources, allocationID); pendingRequests.put(allocationID, new PendingRequest(allocationID, future, resources)); - Future rmResponse = resourceManagerGateway.requestSlot( + CompletableFuture rmResponse = FutureUtils.toJava( + resourceManagerGateway.requestSlot( jobManagerLeaderId, resourceManagerLeaderId, new SlotRequest(jobId, allocationID, resources, jobManagerAddress), - resourceManagerRequestsTimeout); + resourceManagerRequestsTimeout)); - Future slotRequestProcessingFuture = rmResponse.thenAcceptAsync(new AcceptFunction() { - @Override - public void accept(Acknowledge value) { + CompletableFuture slotRequestProcessingFuture = rmResponse.thenAcceptAsync( + (Acknowledge value) -> { slotRequestToResourceManagerSuccess(allocationID); - } - }, getMainThreadExecutor()); + }, + getMainThreadExecutor()); // on failure, fail the request future - slotRequestProcessingFuture.exceptionallyAsync(new ApplyFunction() { - - @Override - public Void apply(Throwable failure) { - slotRequestToResourceManagerFailed(allocationID, failure); - return null; - } - }, getMainThreadExecutor()); + slotRequestProcessingFuture.whenCompleteAsync( + (Void v, Throwable failure) -> { + if (failure != null) { + slotRequestToResourceManagerFailed(allocationID, failure); + } + }, + getMainThreadExecutor()); } private void slotRequestToResourceManagerSuccess(final AllocationID allocationID) { @@ -354,7 +352,7 @@ public void run() { private void slotRequestToResourceManagerFailed(AllocationID allocationID, Throwable failure) { PendingRequest request = pendingRequests.remove(allocationID); if (request != null) { - request.future().completeExceptionally(new NoResourceAvailableException( + request.getFuture().completeExceptionally(new NoResourceAvailableException( "No pooled slot available and request to ResourceManager for new slot failed", failure)); } else { if (LOG.isDebugEnabled()) { @@ -365,15 +363,15 @@ private void slotRequestToResourceManagerFailed(AllocationID allocationID, Throw private void checkTimeoutSlotAllocation(AllocationID allocationID) { PendingRequest request = pendingRequests.remove(allocationID); - if (request != null && !request.future().isDone()) { - request.future().completeExceptionally(new TimeoutException("Slot allocation request timed out")); + if (request != null && !request.getFuture().isDone()) { + request.getFuture().completeExceptionally(new TimeoutException("Slot allocation request timed out")); } } private void stashRequestWaitingForResourceManager( final AllocationID allocationID, final ResourceProfile resources, - final FlinkCompletableFuture future) { + final CompletableFuture future) { LOG.info("Cannot serve slot request, no ResourceManager connected. " + "Adding as pending request {}", allocationID); @@ -390,8 +388,8 @@ public void run() { private void checkTimeoutRequestWaitingForResourceManager(AllocationID allocationID) { PendingRequest request = waitingForResourceManager.remove(allocationID); - if (request != null && !request.future().isDone()) { - request.future().completeExceptionally(new NoResourceAvailableException( + if (request != null && !request.getFuture().isDone()) { + request.getFuture().completeExceptionally(new NoResourceAvailableException( "No slot available and no connection to Resource Manager established.")); } } @@ -426,7 +424,7 @@ private void internalReturnAllocatedSlot(Slot slot) { SimpleSlot newSlot = createSimpleSlot(taskManagerSlot, Locality.UNKNOWN); allocatedSlots.add(newSlot); - pendingRequest.future().complete(newSlot); + pendingRequest.getFuture().complete(newSlot); } else { LOG.debug("Adding returned slot [{}] to available slots", taskManagerSlot.getSlotAllocationId()); @@ -513,7 +511,7 @@ public boolean offerSlot(final AllocatedSlot slot) { if (pendingRequest != null) { // we were waiting for this! SimpleSlot resultSlot = createSimpleSlot(slot, Locality.UNKNOWN); - pendingRequest.future().complete(resultSlot); + pendingRequest.getFuture().complete(resultSlot); allocatedSlots.add(resultSlot); } else { @@ -552,7 +550,7 @@ public void failAllocation(final AllocationID allocationID, final Exception caus if (pendingRequest != null) { // request was still pending LOG.debug("Failed pending request [{}] with ", allocationID, cause); - pendingRequest.future().completeExceptionally(cause); + pendingRequest.getFuture().completeExceptionally(cause); } else if (availableSlots.tryRemove(allocationID)) { LOG.debug("Failed available slot [{}] with ", allocationID, cause); @@ -999,13 +997,13 @@ private static class PendingRequest { private final AllocationID allocationID; - private final FlinkCompletableFuture future; + private final CompletableFuture future; private final ResourceProfile resourceProfile; PendingRequest( AllocationID allocationID, - FlinkCompletableFuture future, + CompletableFuture future, ResourceProfile resourceProfile) { this.allocationID = allocationID; this.future = future; @@ -1016,7 +1014,7 @@ public AllocationID allocationID() { return allocationID; } - public FlinkCompletableFuture future() { + public CompletableFuture getFuture() { return future; } From 085488e3e7c394ea9dd77943ae2c293563027dc3 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 1 Aug 2017 10:46:40 +0200 Subject: [PATCH 4/4] [FLINK-7332] [futures] Replace Flink's futures with Java 8's CompletableFuture in TaskExecutor --- .../runtime/taskexecutor/TaskExecutor.java | 100 ++++++++---------- 1 file changed, 46 insertions(+), 54 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 4c4b0a7e1861f..aa4d6d27ed06b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -27,9 +27,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.SlotID; -import org.apache.flink.runtime.concurrent.AcceptFunction; -import org.apache.flink.runtime.concurrent.ApplyFunction; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; @@ -771,55 +768,51 @@ private void offerSlotsToJobManager(final JobID jobId) { reservedSlots.add(offer); } - Future> acceptedSlotsFuture = jobMasterGateway.offerSlots( - getResourceID(), - reservedSlots, - leaderId, - taskManagerConfiguration.getTimeout()); - - Future acceptedSlotsAcceptFuture = acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction>() { - @Override - public void accept(Iterable acceptedSlots) { - // check if the response is still valid - if (isJobManagerConnectionValid(jobId, leaderId)) { - // mark accepted slots active - for (SlotOffer acceptedSlot : acceptedSlots) { - reservedSlots.remove(acceptedSlot); - } - - final Exception e = new Exception("The slot was rejected by the JobManager."); + CompletableFuture> acceptedSlotsFuture = FutureUtils.toJava( + jobMasterGateway.offerSlots( + getResourceID(), + reservedSlots, + leaderId, + taskManagerConfiguration.getTimeout())); + + acceptedSlotsFuture.whenCompleteAsync( + (Iterable acceptedSlots, Throwable throwable) -> { + if (throwable != null) { + if (throwable instanceof TimeoutException) { + log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering."); + // We ran into a timeout. Try again. + offerSlotsToJobManager(jobId); + } else { + log.warn("Slot offering to JobManager failed. Freeing the slots " + + "and returning them to the ResourceManager.", throwable); - for (SlotOffer rejectedSlot: reservedSlots) { - freeSlot(rejectedSlot.getAllocationId(), e); + // We encountered an exception. Free the slots and return them to the RM. + for (SlotOffer reservedSlot: reservedSlots) { + freeSlot(reservedSlot.getAllocationId(), throwable); + } } } else { - // discard the response since there is a new leader for the job - log.debug("Discard offer slot response since there is a new leader " + - "for the job {}.", jobId); - } - } - }, getMainThreadExecutor()); + // check if the response is still valid + if (isJobManagerConnectionValid(jobId, leaderId)) { + // mark accepted slots active + for (SlotOffer acceptedSlot : acceptedSlots) { + reservedSlots.remove(acceptedSlot); + } - acceptedSlotsAcceptFuture.exceptionally(new ApplyFunction() { - @Override - public Void apply(Throwable throwable) { - if (throwable instanceof TimeoutException) { - log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering."); - // We ran into a timeout. Try again. - offerSlotsToJobManager(jobId); - } else { - log.warn("Slot offering to JobManager failed. Freeing the slots " + - "and returning them to the ResourceManager.", throwable); + final Exception e = new Exception("The slot was rejected by the JobManager."); - // We encountered an exception. Free the slots and return them to the RM. - for (SlotOffer reservedSlot: reservedSlots) { - freeSlot(reservedSlot.getAllocationId(), throwable); + for (SlotOffer rejectedSlot : reservedSlots) { + freeSlot(rejectedSlot.getAllocationId(), e); + } + } else { + // discard the response since there is a new leader for the job + log.debug("Discard offer slot response since there is a new leader " + + "for the job {}.", jobId); } } + }, + getMainThreadExecutor()); - return null; - } - }); } else { log.debug("There are no unassigned slots for the job {}.", jobId); } @@ -992,17 +985,16 @@ private void updateTaskExecutionState( { final ExecutionAttemptID executionAttemptID = taskExecutionState.getID(); - Future futureAcknowledge = jobMasterGateway.updateTaskExecutionState( - jobMasterLeaderId, taskExecutionState); + CompletableFuture futureAcknowledge = FutureUtils.toJava( + jobMasterGateway.updateTaskExecutionState(jobMasterLeaderId, taskExecutionState)); - futureAcknowledge.exceptionallyAsync(new ApplyFunction() { - @Override - public Void apply(Throwable value) { - failTask(executionAttemptID, value); - - return null; - } - }, getMainThreadExecutor()); + futureAcknowledge.whenCompleteAsync( + (ack, throwable) -> { + if (throwable != null) { + failTask(executionAttemptID, throwable); + } + }, + getMainThreadExecutor()); } private void unregisterTaskAndNotifyFinalState(