From be89c60d2e4858cf6b92cdd26fc937732f904bf9 Mon Sep 17 00:00:00 2001 From: yanghua Date: Thu, 22 Mar 2018 09:58:08 +0800 Subject: [PATCH 1/9] [FLINK-8931] TASK_KILLING is not covered by match in TaskMonitor#whenUnhandled This closes #5744. --- .../scala/org/apache/flink/mesos/scheduler/TaskMonitor.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/TaskMonitor.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/TaskMonitor.scala index 7840fd479bd24..76a2a90e1fc07 100644 --- a/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/TaskMonitor.scala +++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/TaskMonitor.scala @@ -163,6 +163,7 @@ class TaskMonitor( LOG.warn(s"Mesos task ${goal.taskID.getValue} failed unexpectedly.") context.parent ! TaskTerminated(goal.taskID, msg.status()) stop() + case TASK_KILLING => stay() } case Event(msg: StatusUpdate, StateData(goal: Released)) => @@ -175,6 +176,7 @@ class TaskMonitor( LOG.info(s"Mesos task ${goal.taskID.getValue} exited as planned.") context.parent ! TaskTerminated(goal.taskID, msg.status()) stop() + case TASK_KILLING => stay() } } From e0c3fb81921480445ffc88b07300c0a1cfb1f49c Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 21 Mar 2018 15:09:46 +0100 Subject: [PATCH 2/9] [hotfix] Improve Flip-6 component logging --- .../flink/runtime/jobmaster/JobMaster.java | 4 ++-- .../runtime/jobmaster/slotpool/SlotPool.java | 5 +++-- .../resourcemanager/ResourceManager.java | 17 ++++++++--------- .../slotmanager/ResourceActions.java | 3 ++- .../slotmanager/SlotManager.java | 8 ++++---- .../runtime/taskexecutor/TaskExecutor.java | 2 ++ .../slotmanager/SlotManagerTest.java | 8 ++++---- 7 files changed, 25 insertions(+), 22 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 6878032f0e17f..bc18fab7e39ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -689,6 +689,8 @@ public CompletableFuture scheduleOrUpdateConsumers( @Override public CompletableFuture disconnectTaskManager(final ResourceID resourceID, final Exception cause) { + log.debug("Disconnect TaskExecutor {} because: {}", resourceID, cause.getMessage()); + taskManagerHeartbeatManager.unmonitorTarget(resourceID); CompletableFuture releaseFuture = slotPoolGateway.releaseTaskManager(resourceID); @@ -1516,8 +1518,6 @@ private TaskManagerHeartbeatListener(JobMasterGateway jobMasterGateway) { @Override public void notifyHeartbeatTimeout(ResourceID resourceID) { - log.info("Heartbeat of TaskManager with id {} timed out.", resourceID); - jobMasterGateway.disconnectTaskManager( resourceID, new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed out.")); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index 42264b5321989..3e8b788239547 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -322,7 +322,6 @@ private CompletableFuture internalAllocateSlot( SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) { - final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); if (slotSharingGroupId != null) { @@ -955,7 +954,7 @@ public CompletableFuture offerSlot( } final AllocatedSlot allocatedSlot = new AllocatedSlot( - slotOffer.getAllocationId(), + allocationID, taskManagerLocation, slotOffer.getSlotIndex(), slotOffer.getResourceProfile(), @@ -971,6 +970,8 @@ public CompletableFuture offerSlot( // we could not complete the pending slot future --> try to fulfill another pending request allocatedSlots.remove(pendingRequest.getSlotRequestId()); tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); + } else { + log.debug("Fulfilled slot request {} with allocated slot {}.", pendingRequest.getSlotRequestId(), allocationID); } } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 0ae4ab6af3a08..cae9c6cdb383c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -682,7 +682,7 @@ private RegistrationResponse registerTaskExecutorInternal( WorkerRegistration oldRegistration = taskExecutors.remove(taskExecutorResourceId); if (oldRegistration != null) { // TODO :: suggest old taskExecutor to stop itself - log.info("Replacing old instance of worker for ResourceID {}", taskExecutorResourceId); + log.info("Replacing old registration of TaskExecutor {}.", taskExecutorResourceId); // remove old task manager registration from slot manager slotManager.unregisterTaskManager(oldRegistration.getInstanceID()); @@ -779,14 +779,14 @@ protected void closeTaskManagerConnection(final ResourceID resourceID, final Exc WorkerRegistration workerRegistration = taskExecutors.remove(resourceID); if (workerRegistration != null) { - log.info("Task manager {} failed because {}.", resourceID, cause.getMessage()); + log.info("Closing TaskExecutor connection {} because: {}", resourceID, cause.getMessage()); // TODO :: suggest failed task executor to stop itself slotManager.unregisterTaskManager(workerRegistration.getInstanceID()); workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause); } else { - log.debug("Could not find a registered task manager with the process id {}.", resourceID); + log.debug("No open TaskExecutor connection {}. Ignoring close TaskExecutor connection.", resourceID); } } @@ -816,7 +816,7 @@ protected void jobLeaderLostLeadership(JobID jobId, JobMasterId oldJobMasterId) } } - protected void releaseResource(InstanceID instanceId) { + protected void releaseResource(InstanceID instanceId, Exception cause) { WorkerType worker = null; // TODO: Improve performance by having an index on the instanceId @@ -829,10 +829,9 @@ protected void releaseResource(InstanceID instanceId) { if (worker != null) { if (stopWorker(worker)) { - closeTaskManagerConnection(worker.getResourceID(), - new FlinkException("Worker was stopped.")); + closeTaskManagerConnection(worker.getResourceID(), cause); } else { - log.debug("Worker {} was not stopped.", worker.getResourceID()); + log.debug("Worker {} could not be stopped.", worker.getResourceID()); } } else { // unregister in order to clean up potential left over state @@ -990,10 +989,10 @@ protected abstract void shutDownApplication( private class ResourceActionsImpl implements ResourceActions { @Override - public void releaseResource(InstanceID instanceId) { + public void releaseResource(InstanceID instanceId, Exception cause) { validateRunsInMainThread(); - ResourceManager.this.releaseResource(instanceId); + ResourceManager.this.releaseResource(instanceId, cause); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java index 753e5e2a09218..84e7c4e785d48 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java @@ -33,8 +33,9 @@ public interface ResourceActions { * Releases the resource with the given instance id. * * @param instanceId identifying which resource to release + * @param cause why the resource is released */ - void releaseResource(InstanceID instanceId); + void releaseResource(InstanceID instanceId, Exception cause); /** * Requests to allocate a resource with the given {@link ResourceProfile}. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index 120e1aa1a5ac3..6cdd997c38d03 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -879,8 +880,6 @@ private void checkTaskManagerTimeouts() { // first retrieve the timed out TaskManagers for (TaskManagerRegistration taskManagerRegistration : taskManagerRegistrations.values()) { - LOG.debug("Evaluating TaskManager {} for idleness.", taskManagerRegistration.getInstanceId()); - if (currentTime - taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) { // we collect the instance ids first in order to avoid concurrent modifications by the // ResourceActions.releaseResource call @@ -890,7 +889,8 @@ private void checkTaskManagerTimeouts() { // second we trigger the release resource callback which can decide upon the resource release for (InstanceID timedOutTaskManagerId : timedOutTaskManagerIds) { - resourceActions.releaseResource(timedOutTaskManagerId); + LOG.debug("Release TaskExecutor {} because it exceeded the idle timeout.", timedOutTaskManagerId); + resourceActions.releaseResource(timedOutTaskManagerId, new FlinkException("TaskExecutor exceeded the idle timeout.")); } } } @@ -976,7 +976,7 @@ public void unregisterTaskManagersAndReleaseResources() { internalUnregisterTaskManager(taskManagerRegistration); - resourceActions.releaseResource(taskManagerRegistration.getInstanceId()); + resourceActions.releaseResource(taskManagerRegistration.getInstanceId(), new FlinkException("Triggering of SlotManager#unregisterTaskManagersAndReleaseResources.")); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index f25601e534b0c..3523992e2d405 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -1262,6 +1262,8 @@ private void unregisterTaskAndNotifyFinalState( private void freeSlotInternal(AllocationID allocationId, Throwable cause) { checkNotNull(allocationId); + log.debug("Free slot with allocation id {} because: {}", allocationId, cause.getMessage()); + try { final JobID jobId = taskSlotTable.getOwningJob(allocationId); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index 4907756e910d3..90ed1648ad181 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -685,7 +685,7 @@ public void run() { }); verify(resourceManagerActions, timeout(100L * tmTimeout).times(1)) - .releaseResource(eq(taskManagerConnection.getInstanceID())); + .releaseResource(eq(taskManagerConnection.getInstanceID()), any(Exception.class)); } } @@ -1027,13 +1027,13 @@ public void testTimeoutForUnusedTaskManager() throws Exception { assertTrue(idleFuture2.get()); - verify(resourceManagerActions, timeout(verifyTimeout).times(1)).releaseResource(eq(taskManagerConnection.getInstanceID())); + verify(resourceManagerActions, timeout(verifyTimeout).times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()), any(Exception.class)); } } /** * Tests that a task manager timeout does not remove the slots from the SlotManager. - * A timeout should only trigger the {@link ResourceActions#releaseResource(InstanceID)} + * A timeout should only trigger the {@link ResourceActions#releaseResource(InstanceID, Exception)} * callback. The receiver of the callback can then decide what to do with the TaskManager. * * FLINK-7793 @@ -1064,7 +1064,7 @@ public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception { assertEquals(1, slotManager.getNumberRegisteredSlots()); // wait for the timeout call to happen - verify(resourceActions, timeout(taskManagerTimeout.toMilliseconds() * 20L).atLeast(1)).releaseResource(eq(taskExecutorConnection.getInstanceID())); + verify(resourceActions, timeout(taskManagerTimeout.toMilliseconds() * 20L).atLeast(1)).releaseResource(eq(taskExecutorConnection.getInstanceID()), any(Exception.class)); assertEquals(1, slotManager.getNumberRegisteredSlots()); From 1813e41b10f6704f41ed6f13906da5c30ca806d9 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 21 Mar 2018 16:48:52 +0100 Subject: [PATCH 3/9] [FLINK-9047] Fix slot recycling in case of failed release In case that a slot cannot be released it will only recycled/reused if the owning TaskExecutor is still registered at the SlotPool. If this is not the case then we drop the slot from the SlotPool. This closes #5739. --- .../jobmaster/slotpool/SingleLogicalSlot.java | 10 +- .../runtime/jobmaster/slotpool/SlotPool.java | 12 +- .../utils/SimpleAckingTaskManagerGateway.java | 18 +-- .../jobmaster/slotpool/SlotPoolTest.java | 120 +++++++++++++++++- 4 files changed, 139 insertions(+), 21 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java index 9bd559bc166ef..0736b5684ab5b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java @@ -19,13 +19,13 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotContext; import org.apache.flink.runtime.jobmaster.SlotOwner; import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.instance.SlotSharingGroupId; -import org.apache.flink.runtime.jobmanager.scheduler.Locality; -import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.Preconditions; @@ -33,7 +33,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.function.Function; /** * Implementation of the {@link LogicalSlot} which is used by the {@link SlotPool}. @@ -127,8 +126,7 @@ public CompletableFuture releaseSlot(@Nullable Throwable cause) { // Wait until the payload has been terminated. Only then, we return the slot to its rightful owner return payload.getTerminalStateFuture() - .handle((Object ignored, Throwable throwable) -> slotOwner.returnAllocatedSlot(this)) - .thenApply(Function.identity()); + .whenComplete((Object ignored, Throwable throwable) -> slotOwner.returnAllocatedSlot(this)); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index 3e8b788239547..6040b41833695 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -1041,6 +1041,7 @@ else if (availableSlots.tryRemove(allocationID)) { */ @Override public CompletableFuture registerTaskManager(final ResourceID resourceID) { + log.debug("Register new TaskExecutor {}.", resourceID); registeredTaskManagers.add(resourceID); return CompletableFuture.completedFuture(Acknowledge.get()); @@ -1119,8 +1120,15 @@ private void checkIdleSlot() { freeSlotFuture.whenCompleteAsync( (Acknowledge ignored, Throwable throwable) -> { if (throwable != null) { - log.info("Releasing idle slot {} failed.", allocationID, throwable); - tryFulfillSlotRequestOrMakeAvailable(expiredSlot); + if (registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) { + log.debug("Releasing slot {} of registered TaskExecutor {} failed. " + + "Trying to fulfill a different slot request.", allocationID, expiredSlot.getTaskManagerId(), + throwable); + tryFulfillSlotRequestOrMakeAvailable(expiredSlot); + } else { + log.debug("Releasing slot {} failed and owning TaskExecutor {} is no " + + "longer registered. Discarding slot.", allocationID, expiredSlot.getTaskManagerId()); + } } }, getMainThreadExecutor()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java index 628f0041371e5..5c62a7370967c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.blob.TransientBlobKey; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.clusterframework.ApplicationStatus; @@ -38,6 +37,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; import java.util.function.Consumer; /** @@ -52,7 +52,7 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway { private Optional> optCancelConsumer; - private volatile Consumer> freeSlotConsumer; + private volatile BiFunction> freeSlotFunction; public SimpleAckingTaskManagerGateway() { optSubmitConsumer = Optional.empty(); @@ -67,8 +67,8 @@ public void setCancelConsumer(Consumer predicate) { optCancelConsumer = Optional.of(predicate); } - public void setFreeSlotConsumer(Consumer> consumer) { - freeSlotConsumer = consumer; + public void setFreeSlotFunction(BiFunction> freeSlotFunction) { + this.freeSlotFunction = freeSlotFunction; } @Override @@ -150,12 +150,12 @@ public CompletableFuture requestTaskManagerStdout(Time timeout @Override public CompletableFuture freeSlot(AllocationID allocationId, Throwable cause, Time timeout) { - final Consumer> currentFreeSlotConsumer = freeSlotConsumer; + final BiFunction> currentFreeSlotFunction = freeSlotFunction; - if (currentFreeSlotConsumer != null) { - currentFreeSlotConsumer.accept(Tuple2.of(allocationId, cause)); + if (currentFreeSlotFunction != null) { + return currentFreeSlotFunction.apply(allocationId, cause); + } else { + return CompletableFuture.completedFuture(Acknowledge.get()); } - - return CompletableFuture.completedFuture(Acknowledge.get()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java index c3819747595b8..502b076e5714a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -38,6 +39,7 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -58,6 +60,7 @@ import javax.annotation.Nullable; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -67,6 +70,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE; import static org.hamcrest.MatcherAssert.assertThat; @@ -559,9 +563,15 @@ public void testShutdownReleasesAllSlots() throws Exception { final ArrayBlockingQueue freedSlotQueue = new ArrayBlockingQueue<>(numSlotOffers); - taskManagerGateway.setFreeSlotConsumer(tuple -> { - while(!freedSlotQueue.offer(tuple.f0)) {} - }); + taskManagerGateway.setFreeSlotFunction( + (AllocationID allocationID, Throwable cause) -> { + try { + freedSlotQueue.put(allocationID); + return CompletableFuture.completedFuture(Acknowledge.get()); + } catch (InterruptedException e) { + return FutureUtils.completedExceptionally(e); + } + }); final CompletableFuture> acceptedSlotOffersFuture = slotPoolGateway.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers); @@ -598,7 +608,16 @@ public void testCheckIdleSlot() throws Exception { try { final BlockingQueue freedSlots = new ArrayBlockingQueue<>(1); - taskManagerGateway.setFreeSlotConsumer((tuple) -> freedSlots.offer(tuple.f0)); + taskManagerGateway.setFreeSlotFunction( + (AllocationID allocationId, Throwable cause) -> + { + try { + freedSlots.put(allocationId); + return CompletableFuture.completedFuture(Acknowledge.get()); + } catch (InterruptedException e) { + return FutureUtils.completedExceptionally(e); + } + }); final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); @@ -634,6 +653,99 @@ public void testCheckIdleSlot() throws Exception { } } + /** + * Tests that idle slots which cannot be released are only recycled if the owning {@link TaskExecutor} + * is still registered at the {@link SlotPool}. See FLINK-9047. + */ + @Test + public void testReleasingIdleSlotFailed() throws Exception { + final ManualClock clock = new ManualClock(); + final SlotPool slotPool = new SlotPool( + rpcService, + jobId, + clock, + TestingUtils.infiniteTime(), + timeout); + + try { + final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); + + final AllocationID expiredAllocationId = new AllocationID(); + final SlotOffer slotToExpire = new SlotOffer(expiredAllocationId, 0, ResourceProfile.UNKNOWN); + + final ArrayDeque> responseQueue = new ArrayDeque<>(2); + taskManagerGateway.setFreeSlotFunction((AllocationID allocationId, Throwable cause) -> { + if (responseQueue.isEmpty()) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } else { + return responseQueue.pop(); + } + }); + + responseQueue.add(FutureUtils.completedExceptionally(new FlinkException("Test failure"))); + + final CompletableFuture responseFuture = new CompletableFuture<>(); + responseQueue.add(responseFuture); + + assertThat( + slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(), + Matchers.is(Acknowledge.get())); + + assertThat( + slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotToExpire).get(), + Matchers.is(true)); + + clock.advanceTime(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + slotPool.triggerCheckIdleSlot(); + + CompletableFuture allocatedSlotFuture = slotPoolGateway.allocateSlot( + new SlotRequestId(), + new DummyScheduledUnit(), + SlotProfile.noRequirements(), + true, + timeout); + + // wait until the slot has been fulfilled with the previously idling slot + final LogicalSlot logicalSlot = allocatedSlotFuture.get(); + assertThat(logicalSlot.getAllocationId(), Matchers.is(expiredAllocationId)); + + // return the slot + slotPool.getSlotOwner().returnAllocatedSlot(logicalSlot).get(); + + // advance the time so that the returned slot is now idling + clock.advanceTime(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + slotPool.triggerCheckIdleSlot(); + + // request a new slot after the idling slot has been released + allocatedSlotFuture = slotPoolGateway.allocateSlot( + new SlotRequestId(), + new DummyScheduledUnit(), + SlotProfile.noRequirements(), + true, + timeout); + + // release the TaskExecutor before we get a response from the slot releasing + slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID()).get(); + + // let the slot releasing fail --> since the owning TaskExecutor is no longer registered + // the slot should be discarded + responseFuture.completeExceptionally(new FlinkException("Second test exception")); + + try { + // since the slot must have been discarded, we cannot fulfill the slot request + allocatedSlotFuture.get(10L, TimeUnit.MILLISECONDS); + fail("Expected to fail with a timeout."); + } catch (TimeoutException ignored) { + // expected + } + + } finally { + RpcUtils.terminateRpcEndpoint(slotPool, timeout); + } + } + private static SlotPoolGateway setupSlotPool( SlotPool slotPool, ResourceManagerGateway resourceManagerGateway) throws Exception { From 2797c62a5c30af38d8e6478c6ad5546ec53a3234 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 21 Mar 2018 18:44:25 +0100 Subject: [PATCH 4/9] [hotfix] Remove unused method from SlotPool --- .../flink/runtime/jobmaster/slotpool/SlotPool.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index 6040b41833695..662e71a7136c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -69,7 +69,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -724,15 +723,6 @@ private void slotRequestToResourceManagerFailed(SlotRequestId slotRequestID, Thr } } - private void checkTimeoutSlotAllocation(SlotRequestId slotRequestID) { - PendingRequest request = pendingRequests.removeKeyA(slotRequestID); - if (request != null) { - failPendingRequest( - request, - new TimeoutException("Slot allocation request " + slotRequestID + " timed out")); - } - } - private void stashRequestWaitingForResourceManager(final PendingRequest pendingRequest) { log.info("Cannot serve slot request, no ResourceManager connected. " + From 7889286b201a0accba9beb7439a7a4a004a80c9d Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 21 Mar 2018 21:08:13 +0100 Subject: [PATCH 5/9] [hotfix] Make RestServerEndpoint#uploadDir protected --- .../flink/runtime/dispatcher/DispatcherRestEndpoint.java | 4 ---- .../org/apache/flink/runtime/rest/RestServerEndpoint.java | 3 ++- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index 45185528395d3..8072cf45a7100 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -42,7 +42,6 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; import java.io.IOException; -import java.nio.file.Path; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -52,8 +51,6 @@ */ public class DispatcherRestEndpoint extends WebMonitorEndpoint { - private final Path uploadDir; - private WebMonitorExtension webSubmissionExtension; public DispatcherRestEndpoint( @@ -80,7 +77,6 @@ public DispatcherRestEndpoint( leaderElectionService, fatalErrorHandler); - uploadDir = endpointConfiguration.getUploadDir(); webSubmissionExtension = WebMonitorExtension.empty(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java index dfb01ca2657c9..80d9140d041b9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java @@ -77,8 +77,9 @@ public abstract class RestServerEndpoint { private final String restBindAddress; private final int restBindPort; private final SSLEngine sslEngine; - private final Path uploadDir; private final int maxContentLength; + + protected final Path uploadDir; protected final Map responseHeaders; private final CompletableFuture terminationFuture; From 48b0a3876e35483dbfef603db772983012a8dd43 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 21 Mar 2018 21:04:40 +0100 Subject: [PATCH 6/9] [FLINK-9027] [web] Clean up web UI resources by installing shut down hook The ClusterEntrypoint creates temp directory for the RestServerEndpoint. This directory contains the web ui files and if not differently configured the web upload directory. In case of a hard shut down, as it happens with bin/stop-cluster.sh the ClusterEntrypoint will clean up this directory by installing a shut down hook. All future directory cleanup tasks should go into this method ClusterEntrypoin#cleanupDirectories. This closes #5740. --- .../program/rest/RestClusterClientTest.java | 5 -- .../runtime/entrypoint/ClusterEntrypoint.java | 55 +++++++++++++++++-- .../runtime/minicluster/MiniCluster.java | 2 +- .../runtime/rest/RestServerEndpoint.java | 14 ++--- .../rest/RestServerEndpointConfiguration.java | 3 +- .../handler/RestHandlerConfiguration.java | 20 +++---- .../webmonitor/WebMonitorEndpoint.java | 11 ++-- .../rest/RestServerEndpointITCase.java | 2 +- 8 files changed, 74 insertions(+), 38 deletions(-) diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index e108a0b116eb3..e98ba436abf29 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -666,11 +666,6 @@ private class TestRestServerEndpoint extends RestServerEndpoint implements AutoC @Override protected void startInternal() throws Exception {} - - @Override - public void close() throws Exception { - shutDownAsync().get(); - } } @FunctionalInterface diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 63c8072a82620..8a4db0367bfe1 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -69,7 +69,9 @@ import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever; import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FileUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ShutdownHookUtil; import akka.actor.ActorSystem; import org.slf4j.Logger; @@ -78,10 +80,14 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; +import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -159,9 +165,13 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { @GuardedBy("lock") private JobManagerMetricGroup jobManagerMetricGroup; + private final Thread shutDownHook; + protected ClusterEntrypoint(Configuration configuration) { - this.configuration = Preconditions.checkNotNull(configuration); + this.configuration = generateClusterConfiguration(configuration); this.terminationFuture = new CompletableFuture<>(); + + shutDownHook = ShutdownHookUtil.addShutdownHook(this::cleanupDirectories, getClass().getSimpleName(), LOG); } public CompletableFuture getTerminationFuture() { @@ -479,7 +489,7 @@ protected CompletableFuture stopClusterComponents() { } if (webMonitorEndpoint != null) { - terminationFutures.add(webMonitorEndpoint.shutDownAsync()); + terminationFutures.add(webMonitorEndpoint.closeAsync()); } if (dispatcher != null) { @@ -523,6 +533,17 @@ public void onFatalError(Throwable exception) { // Internal methods // -------------------------------------------------- + private Configuration generateClusterConfiguration(Configuration configuration) { + final Configuration resultConfiguration = new Configuration(Preconditions.checkNotNull(configuration)); + + final String webTmpDir = configuration.getString(WebOptions.TMP_DIR); + final Path uniqueWebTmpDir = Paths.get(webTmpDir, "flink-web-" + UUID.randomUUID()); + + resultConfiguration.setString(WebOptions.TMP_DIR, uniqueWebTmpDir.toAbsolutePath().toString()); + + return resultConfiguration; + } + private CompletableFuture shutDownAsync(boolean cleanupHaData) { if (isShutDown.compareAndSet(false, true)) { LOG.info("Stopping {}.", getClass().getSimpleName()); @@ -535,11 +556,22 @@ private CompletableFuture shutDownAsync(boolean cleanupHaData) { serviceShutdownFuture.whenComplete( (Void ignored2, Throwable serviceThrowable) -> { + Throwable finalException = null; + if (serviceThrowable != null) { - terminationFuture.completeExceptionally( - ExceptionUtils.firstOrSuppressed(serviceThrowable, componentThrowable)); + finalException = ExceptionUtils.firstOrSuppressed(serviceThrowable, componentThrowable); } else if (componentThrowable != null) { - terminationFuture.completeExceptionally(componentThrowable); + finalException = componentThrowable; + } + + try { + cleanupDirectories(); + } catch (IOException e) { + finalException = ExceptionUtils.firstOrSuppressed(e, finalException); + } + + if (finalException != null) { + terminationFuture.completeExceptionally(finalException); } else { terminationFuture.complete(null); } @@ -576,6 +608,19 @@ private void shutDownAndTerminate( } } + /** + * Clean up of temporary directories created by the {@link ClusterEntrypoint}. + * + * @throws IOException if the temporary directories could not be cleaned up + */ + private void cleanupDirectories() throws IOException { + ShutdownHookUtil.removeShutdownHook(shutDownHook, getClass().getSimpleName(), LOG); + + final String webTmpDir = configuration.getString(WebOptions.TMP_DIR); + + FileUtils.deleteDirectory(new File(webTmpDir)); + } + // -------------------------------------------------- // Abstract methods // -------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index bc75a547b55f5..0da6f333b6830 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -758,7 +758,7 @@ private CompletableFuture shutDownDispatcher() { } if (dispatcherRestEndpoint != null) { - terminationFutures.add(dispatcherRestEndpoint.shutDownAsync()); + terminationFutures.add(dispatcherRestEndpoint.closeAsync()); dispatcherRestEndpoint = null; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java index 80d9140d041b9..15fbbb24866ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java @@ -25,8 +25,8 @@ import org.apache.flink.runtime.rest.handler.PipelineErrorHandler; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.rest.handler.RouterHandler; +import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.FileUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; @@ -67,7 +67,7 @@ /** * An abstract class for netty-based REST server endpoints. */ -public abstract class RestServerEndpoint { +public abstract class RestServerEndpoint implements AutoCloseableAsync { protected final Logger log = LoggerFactory.getLogger(getClass()); @@ -256,7 +256,8 @@ public String getRestBaseUrl() { } } - public final CompletableFuture shutDownAsync() { + @Override + public CompletableFuture closeAsync() { synchronized (lock) { log.info("Shutting down rest endpoint."); @@ -370,12 +371,7 @@ protected CompletableFuture shutDownInternal() { }); }); - return FutureUtils.runAfterwards( - channelTerminationFuture, - () -> { - log.info("Cleaning upload directory {}", uploadDir); - FileUtils.cleanDirectory(uploadDir.toFile()); - }); + return channelTerminationFuture; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java index 1fac08e53edc9..8af76f5bfd5c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java @@ -36,7 +36,6 @@ import java.nio.file.Paths; import java.util.Collections; import java.util.Map; -import java.util.UUID; import static java.util.Objects.requireNonNull; @@ -172,7 +171,7 @@ public static RestServerEndpointConfiguration fromConfiguration(Configuration co final Path uploadDir = Paths.get( config.getString(WebOptions.UPLOAD_DIR, config.getString(WebOptions.TMP_DIR)), - "flink-web-upload-" + UUID.randomUUID()); + "flink-web-upload"); final int maxContentLength = config.getInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java index f92946bd0f5f4..3f6516aadde72 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java @@ -23,8 +23,8 @@ import org.apache.flink.configuration.WebOptions; import org.apache.flink.util.Preconditions; -import java.io.File; -import java.util.UUID; +import java.nio.file.Path; +import java.nio.file.Paths; /** * Configuration object containing values for the rest handler configuration. @@ -37,20 +37,20 @@ public class RestHandlerConfiguration { private final Time timeout; - private final File tmpDir; + private final Path webUiDir; public RestHandlerConfiguration( long refreshInterval, int maxCheckpointStatisticCacheEntries, Time timeout, - File tmpDir) { + Path webUiDir) { Preconditions.checkArgument(refreshInterval > 0L, "The refresh interval (ms) should be larger than 0."); this.refreshInterval = refreshInterval; this.maxCheckpointStatisticCacheEntries = maxCheckpointStatisticCacheEntries; this.timeout = Preconditions.checkNotNull(timeout); - this.tmpDir = Preconditions.checkNotNull(tmpDir); + this.webUiDir = Preconditions.checkNotNull(webUiDir); } public long getRefreshInterval() { @@ -65,8 +65,8 @@ public Time getTimeout() { return timeout; } - public File getTmpDir() { - return tmpDir; + public Path getWebUiDir() { + return webUiDir; } public static RestHandlerConfiguration fromConfiguration(Configuration configuration) { @@ -76,13 +76,13 @@ public static RestHandlerConfiguration fromConfiguration(Configuration configura final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT)); - final String rootDir = "flink-web-" + UUID.randomUUID(); - final File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR), rootDir); + final String rootDir = "flink-web-ui"; + final Path webUiDir = Paths.get(configuration.getString(WebOptions.TMP_DIR), rootDir); return new RestHandlerConfiguration( refreshInterval, maxCheckpointStatisticCacheEntries, timeout, - tmpDir); + webUiDir); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 50ad7eb1bceeb..d4aa94e19feaa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -127,6 +127,7 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -498,7 +499,7 @@ protected List> initiali executor, metricFetcher); - final File tmpDir = restConfiguration.getTmpDir(); + final Path webUiDir = restConfiguration.getWebUiDir(); Optional> optWebContent; @@ -507,7 +508,7 @@ protected List> initiali leaderRetriever, restAddressFuture, timeout, - tmpDir); + webUiDir.toFile()); } catch (IOException e) { log.warn("Could not load web content handler.", e); optWebContent = Optional.empty(); @@ -635,15 +636,15 @@ protected CompletableFuture shutDownInternal() { final CompletableFuture shutdownFuture = super.shutDownInternal(); - final File tmpDir = restConfiguration.getTmpDir(); + final Path webUiDir = restConfiguration.getWebUiDir(); return FutureUtils.runAfterwardsAsync( shutdownFuture, () -> { Exception exception = null; try { - log.info("Removing cache directory {}", tmpDir); - FileUtils.deleteDirectory(tmpDir); + log.info("Removing cache directory {}", webUiDir); + FileUtils.deleteDirectory(webUiDir.toFile()); } catch (Exception e) { exception = e; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java index 784c14158a3b4..e510798069e9d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java @@ -154,7 +154,7 @@ public void teardown() throws Exception { } if (serverEndpoint != null) { - serverEndpoint.shutDownAsync().get(); + serverEndpoint.close(); serverEndpoint = null; } } From 3139cedaab4774f7ae6c2730fb7688a3d419fdb0 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 21 Mar 2018 21:48:19 +0100 Subject: [PATCH 7/9] [hotfix] Log final status and exit code under lock --- .../flink/runtime/entrypoint/ClusterEntrypoint.java | 10 +++++----- .../flink/runtime/entrypoint/JobClusterEntrypoint.java | 3 +-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 8a4db0367bfe1..50d0db335f442 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -587,12 +587,12 @@ private void shutDownAndTerminate( ApplicationStatus applicationStatus, boolean cleanupHaData) { - LOG.info("Shut down and terminate {} with return code {} and application status {}.", - getClass().getSimpleName(), - returnCode, - applicationStatus); - if (isTerminating.compareAndSet(false, true)) { + LOG.info("Shut down and terminate {} with return code {} and application status {}.", + getClass().getSimpleName(), + returnCode, + applicationStatus); + shutDownAsync(cleanupHaData).whenComplete( (Void ignored, Throwable t) -> { if (t != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java index df950a343be94..95d9c742bef4a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java @@ -44,7 +44,6 @@ import javax.annotation.Nullable; -import java.io.IOException; import java.util.concurrent.Executor; /** @@ -83,7 +82,7 @@ protected MiniDispatcherRestEndpoint createRestEndpoint( @Override protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore( Configuration configuration, - ScheduledExecutor scheduledExecutor) throws IOException { + ScheduledExecutor scheduledExecutor) { return new MemoryArchivedExecutionGraphStore(); } From dd9513e9fc0ff47f10aa7728eb5f9ac5451ab101 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 21 Mar 2018 22:14:58 +0100 Subject: [PATCH 8/9] [hotfix] Add FutureUtils#composeAfterwards --- .../flink/runtime/concurrent/FutureUtils.java | 34 +++++ .../runtime/concurrent/FutureUtilsTest.java | 119 ++++++++++++++++++ 2 files changed, 153 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index e0164a92f6a99..51740e3a1aba5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -411,6 +411,40 @@ public static CompletableFuture runAfterwardsAsync( return resultFuture; } + /** + * Run the given asynchronous action after the completion of the given future. The given future can be + * completed normally or exceptionally. In case of an exceptional completion, the + * asynchronous action's exception will be added to the initial exception. + * + * @param future to wait for its completion + * @param composedAction asynchronous action which is triggered after the future's completion + * @return Future which is completed after the asynchronous action has completed. This future can contain + * an exception if an error occurred in the given future or asynchronous action. + */ + public static CompletableFuture composeAfterwards( + CompletableFuture future, + Supplier> composedAction) { + final CompletableFuture resultFuture = new CompletableFuture<>(); + + future.whenComplete( + (Object outerIgnored, Throwable outerThrowable) -> { + final CompletableFuture composedActionFuture = composedAction.get(); + + composedActionFuture.whenComplete( + (Object innerIgnored, Throwable innerThrowable) -> { + if (innerThrowable != null) { + resultFuture.completeExceptionally(ExceptionUtils.firstOrSuppressed(innerThrowable, outerThrowable)); + } else if (outerThrowable != null) { + resultFuture.completeExceptionally(outerThrowable); + } else { + resultFuture.complete(null); + } + }); + }); + + return resultFuture; + } + // ------------------------------------------------------------------------ // composing futures // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java index 57f6bd01029ab..df2a0c748c343 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java @@ -46,6 +46,7 @@ import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; @@ -327,6 +328,124 @@ public void testRunAfterwardsExceptional() throws Exception { } } + @Test + public void testComposeAfterwards() throws ExecutionException, InterruptedException { + final CompletableFuture inputFuture = new CompletableFuture<>(); + final OneShotLatch composeLatch = new OneShotLatch(); + + final CompletableFuture composeFuture = FutureUtils.composeAfterwards( + inputFuture, + () -> { + composeLatch.trigger(); + return CompletableFuture.completedFuture(null); + }); + + assertThat(composeLatch.isTriggered(), is(false)); + assertThat(composeFuture.isDone(), is(false)); + + inputFuture.complete(null); + + assertThat(composeLatch.isTriggered(), is(true)); + assertThat(composeFuture.isDone(), is(true)); + + // check that tthis future is not exceptionally completed + composeFuture.get(); + } + + @Test + public void testComposeAfterwardsFirstExceptional() throws InterruptedException { + final CompletableFuture inputFuture = new CompletableFuture<>(); + final OneShotLatch composeLatch = new OneShotLatch(); + final FlinkException testException = new FlinkException("Test exception"); + + final CompletableFuture composeFuture = FutureUtils.composeAfterwards( + inputFuture, + () -> { + composeLatch.trigger(); + return CompletableFuture.completedFuture(null); + }); + + assertThat(composeLatch.isTriggered(), is(false)); + assertThat(composeFuture.isDone(), is(false)); + + inputFuture.completeExceptionally(testException); + + assertThat(composeLatch.isTriggered(), is(true)); + assertThat(composeFuture.isDone(), is(true)); + + // check that this future is not exceptionally completed + try { + composeFuture.get(); + fail("Expected an exceptional completion"); + } catch (ExecutionException ee) { + assertThat(ExceptionUtils.stripExecutionException(ee), is(testException)); + } + } + + @Test + public void testComposeAfterwardsSecondExceptional() throws InterruptedException { + final CompletableFuture inputFuture = new CompletableFuture<>(); + final OneShotLatch composeLatch = new OneShotLatch(); + final FlinkException testException = new FlinkException("Test exception"); + + final CompletableFuture composeFuture = FutureUtils.composeAfterwards( + inputFuture, + () -> { + composeLatch.trigger(); + return FutureUtils.completedExceptionally(testException); + }); + + assertThat(composeLatch.isTriggered(), is(false)); + assertThat(composeFuture.isDone(), is(false)); + + inputFuture.complete(null); + + assertThat(composeLatch.isTriggered(), is(true)); + assertThat(composeFuture.isDone(), is(true)); + + // check that this future is not exceptionally completed + try { + composeFuture.get(); + fail("Expected an exceptional completion"); + } catch (ExecutionException ee) { + assertThat(ExceptionUtils.stripExecutionException(ee), is(testException)); + } + } + + @Test + public void testComposeAfterwardsBothExceptional() throws InterruptedException { + final CompletableFuture inputFuture = new CompletableFuture<>(); + final FlinkException testException1 = new FlinkException("Test exception1"); + final FlinkException testException2 = new FlinkException("Test exception2"); + final OneShotLatch composeLatch = new OneShotLatch(); + + final CompletableFuture composeFuture = FutureUtils.composeAfterwards( + inputFuture, + () -> { + composeLatch.trigger(); + return FutureUtils.completedExceptionally(testException2); + }); + + assertThat(composeLatch.isTriggered(), is(false)); + assertThat(composeFuture.isDone(), is(false)); + + inputFuture.completeExceptionally(testException1); + + assertThat(composeLatch.isTriggered(), is(true)); + assertThat(composeFuture.isDone(), is(true)); + + // check that this future is not exceptionally completed + try { + composeFuture.get(); + fail("Expected an exceptional completion"); + } catch (ExecutionException ee) { + final Throwable actual = ExceptionUtils.stripExecutionException(ee); + assertThat(actual, is(testException1)); + assertThat(actual.getSuppressed(), arrayWithSize(1)); + assertThat(actual.getSuppressed()[0], is(testException2)); + } + } + @Test public void testCompleteAll() throws Exception { final CompletableFuture inputFuture1 = new CompletableFuture<>(); From 00793d62382af3bd12b2d941516646f249fec7e5 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 21 Mar 2018 22:19:28 +0100 Subject: [PATCH 9/9] [FLINK-8900] [yarn] Properly unregister application from Yarn RM This closes #5741. --- .../MesosResourceManager.java | 4 +- .../MesosResourceManagerTest.java | 2 +- .../runtime/entrypoint/ClusterEntrypoint.java | 81 ++++++++++++------- .../resourcemanager/ResourceManager.java | 16 ++-- .../ResourceManagerGateway.java | 11 ++- .../StandaloneResourceManager.java | 2 +- .../TestingResourceManager.java | 2 +- .../utils/TestingResourceManagerGateway.java | 4 +- .../flink/yarn/YarnResourceManager.java | 6 +- 9 files changed, 77 insertions(+), 51 deletions(-) diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index 1f58b119f2768..4f4a6d1942c3a 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -362,9 +362,9 @@ public CompletableFuture postStop() { } @Override - protected void shutDownApplication( + protected void internalDeregisterApplication( ApplicationStatus finalStatus, - @Nullable String optionalDiagnostics) throws ResourceManagerException { + @Nullable String diagnostics) throws ResourceManagerException { LOG.info("Shutting down and unregistering as a Mesos framework."); Exception exception = null; diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index 412e18da65a30..5d9a6cffd7038 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -737,7 +737,7 @@ public void testStopWorker() throws Exception { public void testShutdownApplication() throws Exception { new Context() {{ startResourceManager(); - resourceManager.shutDownCluster(ApplicationStatus.SUCCEEDED, ""); + resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, ""); // verify that the Mesos framework is shutdown verify(rmServices.schedulerDriver).stop(false); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 50d0db335f442..b25729bd1e023 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -197,6 +197,7 @@ protected void startCluster() { shutDownAndTerminate( STARTUP_FAILURE_RETURN_CODE, ApplicationStatus.FAILED, + t.getMessage(), false); } } @@ -245,6 +246,7 @@ protected void runCluster(Configuration configuration) throws Exception { shutDownAndTerminate( SUCCESS_RETURN_CODE, ApplicationStatus.SUCCEEDED, + throwable != null ? throwable.getMessage() : null, true); }); } @@ -544,38 +546,34 @@ private Configuration generateClusterConfiguration(Configuration configuration) return resultConfiguration; } - private CompletableFuture shutDownAsync(boolean cleanupHaData) { + private CompletableFuture shutDownAsync( + boolean cleanupHaData, + ApplicationStatus applicationStatus, + @Nullable String diagnostics) { if (isShutDown.compareAndSet(false, true)) { LOG.info("Stopping {}.", getClass().getSimpleName()); - final CompletableFuture componentShutdownFuture = stopClusterComponents(); - - componentShutdownFuture.whenComplete( - (Void ignored1, Throwable componentThrowable) -> { - final CompletableFuture serviceShutdownFuture = stopClusterServices(cleanupHaData); - - serviceShutdownFuture.whenComplete( - (Void ignored2, Throwable serviceThrowable) -> { - Throwable finalException = null; - - if (serviceThrowable != null) { - finalException = ExceptionUtils.firstOrSuppressed(serviceThrowable, componentThrowable); - } else if (componentThrowable != null) { - finalException = componentThrowable; - } - - try { - cleanupDirectories(); - } catch (IOException e) { - finalException = ExceptionUtils.firstOrSuppressed(e, finalException); - } - - if (finalException != null) { - terminationFuture.completeExceptionally(finalException); - } else { - terminationFuture.complete(null); - } - }); + final CompletableFuture shutDownApplicationFuture = deregisterApplication(applicationStatus, diagnostics); + + final CompletableFuture componentShutdownFuture = FutureUtils.composeAfterwards( + shutDownApplicationFuture, + this::stopClusterComponents); + + final CompletableFuture serviceShutdownFuture = FutureUtils.composeAfterwards( + componentShutdownFuture, + () -> stopClusterServices(cleanupHaData)); + + final CompletableFuture cleanupDirectoriesFuture = FutureUtils.runAfterwards( + serviceShutdownFuture, + this::cleanupDirectories); + + cleanupDirectoriesFuture.whenComplete( + (Void ignored2, Throwable serviceThrowable) -> { + if (serviceThrowable != null) { + terminationFuture.completeExceptionally(serviceThrowable); + } else { + terminationFuture.complete(null); + } }); } @@ -585,6 +583,7 @@ private CompletableFuture shutDownAsync(boolean cleanupHaData) { private void shutDownAndTerminate( int returnCode, ApplicationStatus applicationStatus, + @Nullable String diagnostics, boolean cleanupHaData) { if (isTerminating.compareAndSet(false, true)) { @@ -593,7 +592,10 @@ private void shutDownAndTerminate( returnCode, applicationStatus); - shutDownAsync(cleanupHaData).whenComplete( + shutDownAsync( + cleanupHaData, + applicationStatus, + diagnostics).whenComplete( (Void ignored, Throwable t) -> { if (t != null) { LOG.info("Could not properly shut down cluster entrypoint.", t); @@ -608,6 +610,25 @@ private void shutDownAndTerminate( } } + /** + * Deregister the Flink application from the resource management system by signalling + * the {@link ResourceManager}. + * + * @param applicationStatus to terminate the application with + * @param diagnostics additional information about the shut down, can be {@code null} + * @return Future which is completed once the shut down + */ + private CompletableFuture deregisterApplication(ApplicationStatus applicationStatus, @Nullable String diagnostics) { + synchronized (lock) { + if (resourceManager != null) { + final ResourceManagerGateway selfGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); + return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null); + } else { + return CompletableFuture.completedFuture(null); + } + } + } + /** * Clean up of temporary directories created by the {@link ClusterEntrypoint}. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index cae9c6cdb383c..c75346900cdd8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -486,19 +486,21 @@ public void unRegisterInfoMessageListener(final String address) { * Cleanup application and shut down cluster. * * @param finalStatus of the Flink application - * @param optionalDiagnostics diagnostics message for the Flink application or {@code null} + * @param diagnostics diagnostics message for the Flink application or {@code null} */ @Override - public void shutDownCluster( + public CompletableFuture deregisterApplication( final ApplicationStatus finalStatus, - @Nullable final String optionalDiagnostics) { - log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus, optionalDiagnostics); + @Nullable final String diagnostics) { + log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus, diagnostics); try { - shutDownApplication(finalStatus, optionalDiagnostics); + internalDeregisterApplication(finalStatus, diagnostics); } catch (ResourceManagerException e) { log.warn("Could not properly shutdown the application.", e); } + + return CompletableFuture.completedFuture(Acknowledge.get()); } @Override @@ -946,7 +948,7 @@ public void handleError(final Exception exception) { protected abstract void initialize() throws ResourceManagerException; /** - * The framework specific code for shutting down the application. This should report the + * The framework specific code to deregister the application. This should report the * application's final status and shut down the resource manager cleanly. * *

This method also needs to make sure all pending containers that are not registered @@ -956,7 +958,7 @@ public void handleError(final Exception exception) { * @param optionalDiagnostics A diagnostics message or {@code null}. * @throws ResourceManagerException if the application could not be shut down. */ - protected abstract void shutDownApplication( + protected abstract void internalDeregisterApplication( ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws ResourceManagerException; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index 836bc0b0faf00..bd282d6cff0bc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -41,6 +41,8 @@ import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import javax.annotation.Nullable; + import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -133,11 +135,12 @@ void notifySlotAvailable( void unRegisterInfoMessageListener(String infoMessageListenerAddress); /** - * shutdown cluster - * @param finalStatus - * @param optionalDiagnostics + * Deregister Flink from the underlying resource management system. + * + * @param finalStatus final status with which to deregister the Flink application + * @param diagnostics additional information for the resource management system, can be {@code null} */ - void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics); + CompletableFuture deregisterApplication(final ApplicationStatus finalStatus, @Nullable final String diagnostics); /** * Gets the currently registered number of TaskManagers. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java index 7226d296f373d..d8e0e480a2a4d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java @@ -72,7 +72,7 @@ protected void initialize() throws ResourceManagerException { } @Override - protected void shutDownApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) { + protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nullable String diagnostics) { } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java index 3db9be032e6ea..2bd976bd9bf32 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java @@ -68,7 +68,7 @@ protected void initialize() throws ResourceManagerException { } @Override - protected void shutDownApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws ResourceManagerException { + protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nullable String diagnostics) throws ResourceManagerException { // noop } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java index 33c6c08d66704..9b4041414d62e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java @@ -214,8 +214,8 @@ public void unRegisterInfoMessageListener(String infoMessageListenerAddress) { } @Override - public void shutDownCluster(ApplicationStatus finalStatus, String optionalDiagnostics) { - + public CompletableFuture deregisterApplication(ApplicationStatus finalStatus, String diagnostics) { + return CompletableFuture.completedFuture(Acknowledge.get()); } @Override diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 97db2ad8a37a1..bfe7d65262ac6 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -266,16 +266,16 @@ public CompletableFuture postStop() { } @Override - protected void shutDownApplication( + protected void internalDeregisterApplication( ApplicationStatus finalStatus, - @Nullable String optionalDiagnostics) { + @Nullable String diagnostics) { // first, de-register from YARN FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus); log.info("Unregister application from the YARN Resource Manager with final status {}.", yarnStatus); try { - resourceManagerClient.unregisterApplicationMaster(yarnStatus, optionalDiagnostics, ""); + resourceManagerClient.unregisterApplicationMaster(yarnStatus, diagnostics, ""); } catch (Throwable t) { log.error("Could not unregister the application master.", t); }