diff --git a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java new file mode 100644 index 00000000000000..4ec75c51a84c5b --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.types; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.function.Consumer; + +/** + * Serializable {@link Optional}. + */ +public final class SerializableOptional implements Serializable { + private static final long serialVersionUID = -3312769593551775940L; + + private static final SerializableOptional EMPTY = new SerializableOptional<>(null); + + @Nullable + private final T value; + + private SerializableOptional(@Nullable T value) { + this.value = value; + } + + public T get() { + if (value == null) { + throw new NoSuchElementException("No value present"); + } + return value; + } + + public boolean isPresent() { + return value != null; + } + + public void ifPresent(Consumer consumer) { + if (value != null) { + consumer.accept(value); + } + } + + public static SerializableOptional of(@Nonnull T value) { + return new SerializableOptional<>(value); + } + + @SuppressWarnings("unchecked") + public static SerializableOptional empty() { + return (SerializableOptional) EMPTY; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 01cb2b6b0991f6..697bfa07c3e209 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -104,6 +104,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.InstantiationUtil; @@ -834,13 +835,25 @@ public void failSlot( final Exception cause) { if (registeredTaskManagers.containsKey(taskManagerId)) { - slotPoolGateway.failAllocation(allocationId, cause); + internalFailAllocation(allocationId, cause); } else { log.warn("Cannot fail slot " + allocationId + " because the TaskManager " + taskManagerId + " is unknown."); } } + private void internalFailAllocation(AllocationID allocationId, Exception cause) { + final CompletableFuture> emptyTaskExecutorFuture = slotPoolGateway.failAllocation(allocationId, cause); + + emptyTaskExecutorFuture.thenAcceptAsync( + resourceIdOptional -> resourceIdOptional.ifPresent(this::releaseEmptyTaskManager), + getMainThreadExecutor()); + } + + private CompletableFuture releaseEmptyTaskManager(ResourceID resourceId) { + return disconnectTaskManager(resourceId, new FlinkException(String.format("No more slots registered at JobMaster %s.", resourceId))); + } + @Override public CompletableFuture registerTaskManager( final String taskManagerRpcAddress, @@ -983,7 +996,7 @@ public CompletableFuture requestOperatorBackP @Override public void notifyAllocationFailure(AllocationID allocationID, Exception cause) { - slotPoolGateway.failAllocation(allocationID, cause); + internalFailAllocation(allocationID, cause); } //---------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index 13f0462455c4ac..b53ee93e6438bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -50,6 +50,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.clock.Clock; import org.apache.flink.runtime.util.clock.SystemClock; +import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.AbstractID; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; @@ -1001,32 +1002,50 @@ public CompletableFuture offerSlot( * and decided to take it back. * * @param allocationID Represents the allocation which should be failed - * @param cause The cause of the failure + * @param cause The cause of the failure + * @return Optional task executor if it has no more slots registered */ @Override - public void failAllocation(final AllocationID allocationID, final Exception cause) { + public CompletableFuture> failAllocation(final AllocationID allocationID, final Exception cause) { final PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID); if (pendingRequest != null) { // request was still pending failPendingRequest(pendingRequest, cause); - } - else if (availableSlots.tryRemove(allocationID)) { - log.debug("Failed available slot [{}].", allocationID, cause); + return CompletableFuture.completedFuture(SerializableOptional.empty()); } else { - AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID); - if (allocatedSlot != null) { - // release the slot. - // since it is not in 'allocatedSlots' any more, it will be dropped o return' - allocatedSlot.releasePayload(cause); - } - else { - log.trace("Outdated request to fail slot [{}].", allocationID, cause); - } + return tryFailingAllocatedSlot(allocationID, cause); } + // TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase } + private CompletableFuture> tryFailingAllocatedSlot(AllocationID allocationID, Exception cause) { + AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID); + + if (allocatedSlot == null) { + allocatedSlot = allocatedSlots.remove(allocationID); + } + + if (allocatedSlot != null) { + log.debug("Failed allocated slot [{}]: {}", allocationID, cause.getMessage()); + + // notify TaskExecutor about the failure + allocatedSlot.getTaskManagerGateway().freeSlot(allocationID, cause, rpcTimeout); + // release the slot. + // since it is not in 'allocatedSlots' any more, it will be dropped o return' + allocatedSlot.releasePayload(cause); + + final ResourceID taskManagerId = allocatedSlot.getTaskManagerId(); + + if (!availableSlots.containsTaskManager(taskManagerId) && !allocatedSlots.containResource(taskManagerId)) { + return CompletableFuture.completedFuture(SerializableOptional.of(taskManagerId)); + } + } + + return CompletableFuture.completedFuture(SerializableOptional.empty()); + } + // ------------------------------------------------------------------------ // Resource // ------------------------------------------------------------------------ @@ -1107,7 +1126,7 @@ private void checkIdleSlot() { for (AllocatedSlot expiredSlot : expiredSlots) { final AllocationID allocationID = expiredSlot.getAllocationId(); - if (availableSlots.tryRemove(allocationID)) { + if (availableSlots.tryRemove(allocationID) != null) { log.info("Releasing idle slot [{}].", allocationID); final CompletableFuture freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot( @@ -1502,7 +1521,7 @@ Set removeAllForTaskManager(final ResourceID taskManager) { } } - boolean tryRemove(AllocationID slotId) { + AllocatedSlot tryRemove(AllocationID slotId) { final SlotAndTimestamp sat = availableSlots.remove(slotId); if (sat != null) { final AllocatedSlot slot = sat.slot(); @@ -1522,15 +1541,15 @@ boolean tryRemove(AllocationID slotId) { availableSlotsByHost.remove(host); } - return true; + return slot; } else { - return false; + return null; } } private void remove(AllocationID slotId) throws IllegalStateException { - if (!tryRemove(slotId)) { + if (tryRemove(slotId) == null) { throw new IllegalStateException("slot not contained"); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java index 34d9c7ff601a9a..3e546ff36748a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.types.SerializableOptional; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -126,8 +127,9 @@ CompletableFuture> offerSlots( * * @param allocationID identifying the slot which is being failed * @param cause of the failure + * @return An optional task executor id if this task executor has no more slots registered */ - void failAllocation(AllocationID allocationID, Exception cause); + CompletableFuture> failAllocation(AllocationID allocationID, Exception cause); // ------------------------------------------------------------------------ // allocating and disposing slots diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java index 5c62a7370967c7..e53b48021ee28e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java @@ -34,9 +34,12 @@ import org.apache.flink.runtime.messages.StackTrace; import org.apache.flink.runtime.messages.StackTraceSampleResponse; +import javax.annotation.Nonnull; + import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -54,6 +57,9 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway { private volatile BiFunction> freeSlotFunction; + @Nonnull + private volatile BiConsumer disconnectFromJobManagerConsumer = (ignoredA, ignoredB) -> {}; + public SimpleAckingTaskManagerGateway() { optSubmitConsumer = Optional.empty(); optCancelConsumer = Optional.empty(); @@ -71,13 +77,19 @@ public void setFreeSlotFunction(BiFunction disconnectFromJobManagerConsumer) { + this.disconnectFromJobManagerConsumer = disconnectFromJobManagerConsumer; + } + @Override public String getAddress() { return address; } @Override - public void disconnectFromJobManager(InstanceID instanceId, Exception cause) {} + public void disconnectFromJobManager(InstanceID instanceId, Exception cause) { + disconnectFromJobManagerConsumer.accept(instanceId, cause); + } @Override public void stopCluster(ApplicationStatus applicationStatus, String message) {} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 66ca769165a42c..4c4f72fd25eea1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -544,20 +544,6 @@ public void testSlotRequestTimeoutWhenNoSlotOffering() throws Exception { } } - private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException { - final JobVertex jobVertex = new JobVertex("Test vertex"); - jobVertex.setInvokableClass(NoOpInvokable.class); - - final ExecutionConfig executionConfig = new ExecutionConfig(); - executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L)); - - final JobGraph jobGraph = new JobGraph(jobVertex); - jobGraph.setAllowQueuedScheduling(true); - jobGraph.setExecutionConfig(executionConfig); - - return jobGraph; - } - /** * Tests that we can close an unestablished ResourceManager connection. */ @@ -917,6 +903,72 @@ public void testRequestPartitionState() throws Exception { } } + /** + * Tests that the TaskExecutor is released if all of its slots have been freed. + */ + @Test + public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception { + final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build(); + + final JobGraph jobGraph = createSingleVertexJobWithRestartStrategy(); + + final JobMaster jobMaster = createJobMaster( + configuration, + jobGraph, + haServices, + jobManagerSharedServices, + heartbeatServices); + + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); + rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID()); + + final CompletableFuture allocationIdFuture = new CompletableFuture<>(); + + testingResourceManagerGateway.setRequestSlotConsumer( + slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId())); + + final CompletableFuture disconnectTaskExecutorFuture = new CompletableFuture<>(); + final CompletableFuture freedSlotFuture = new CompletableFuture<>(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setFreeSlotFunction( + (allocationID, throwable) -> { + freedSlotFuture.complete(allocationID); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setDisconnectJobManagerConsumer((jobID, throwable) -> disconnectTaskExecutorFuture.complete(jobID)) + .createTestingTaskExecutorGateway(); + final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), testingTaskExecutorGateway); + + try { + jobMaster.start(jobMasterId, testingTimeout).get(); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + final AllocationID allocationId = allocationIdFuture.get(); + + jobMasterGateway.registerTaskManager(testingTaskExecutorGateway.getAddress(), taskManagerLocation, testingTimeout).get(); + + final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); + final CompletableFuture> acceptedSlotOffers = jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout); + + final Collection slotOffers = acceptedSlotOffers.get(); + + // check that we accepted the offered slot + assertThat(slotOffers, hasSize(1)); + + // now fail the allocation and check that we close the connection to the TaskExecutor + jobMasterGateway.notifyAllocationFailure(allocationId, new FlinkException("Fail alloction test exception")); + + // we should free the slot and then disconnect from the TaskExecutor because we use no longer slots from it + assertThat(freedSlotFuture.get(), equalTo(allocationId)); + assertThat(disconnectTaskExecutorFuture.get(), equalTo(jobGraph.getJobID())); + } finally { + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + } + } + private JobGraph producerConsumerJobGraph() { final JobVertex producer = new JobVertex("Producer"); producer.setInvokableClass(NoOpInvokable.class); @@ -1006,6 +1058,20 @@ private JobMaster createJobMaster( JobMasterTest.class.getClassLoader()); } + private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException { + final JobVertex jobVertex = new JobVertex("Test vertex"); + jobVertex.setInvokableClass(NoOpInvokable.class); + + final ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L)); + + final JobGraph jobGraph = new JobGraph(jobVertex); + jobGraph.setAllowQueuedScheduling(true); + jobGraph.setExecutionConfig(executionConfig); + + return jobGraph; + } + /** * No op implementation of {@link OnCompletionActions}. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java index 9815cb289da885..3a9925c4f20786 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.concurrent.FutureUtils; @@ -44,6 +45,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.clock.ManualClock; +import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; @@ -59,7 +61,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -69,6 +73,8 @@ import static org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -692,12 +698,7 @@ public void testReleasingIdleSlotFailed() throws Exception { slotPool.triggerCheckIdleSlot(); - CompletableFuture allocatedSlotFuture = slotPoolGateway.allocateSlot( - new SlotRequestId(), - new DummyScheduledUnit(), - SlotProfile.noRequirements(), - true, - timeout); + CompletableFuture allocatedSlotFuture = allocateSlot(slotPoolGateway, new SlotRequestId()); // wait until the slot has been fulfilled with the previously idling slot final LogicalSlot logicalSlot = allocatedSlotFuture.get(); @@ -712,12 +713,7 @@ public void testReleasingIdleSlotFailed() throws Exception { slotPool.triggerCheckIdleSlot(); // request a new slot after the idling slot has been released - allocatedSlotFuture = slotPoolGateway.allocateSlot( - new SlotRequestId(), - new DummyScheduledUnit(), - SlotProfile.noRequirements(), - true, - timeout); + allocatedSlotFuture = allocateSlot(slotPoolGateway, new SlotRequestId()); // release the TaskExecutor before we get a response from the slot releasing slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID(), null).get(); @@ -739,6 +735,114 @@ public void testReleasingIdleSlotFailed() throws Exception { } } + /** + * Tests that failed slots are freed on the {@link TaskExecutor}. + */ + @Test + public void testFreeFailedSlots() throws Exception { + final SlotPool slotPool = new SlotPool(rpcService, jobId, LocationPreferenceSchedulingStrategy.getInstance()); + + try { + final int parallelism = 5; + final ArrayBlockingQueue allocationIds = new ArrayBlockingQueue<>(parallelism); + resourceManagerGateway.setRequestSlotConsumer( + slotRequest -> allocationIds.offer(slotRequest.getAllocationId())); + + final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); + + final Map> slotRequestFutures = new HashMap<>(parallelism); + + for (int i = 0; i < parallelism; i++) { + final SlotRequestId slotRequestId = new SlotRequestId(); + slotRequestFutures.put(slotRequestId, allocateSlot(slotPoolGateway, slotRequestId)); + } + + final List slotOffers = new ArrayList<>(parallelism); + + for (int i = 0; i < parallelism; i++) { + slotOffers.add(new SlotOffer(allocationIds.take(), i, ResourceProfile.UNKNOWN)); + } + + slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()); + slotPoolGateway.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers); + + // wait for the completion of both slot futures + FutureUtils.waitForAll(slotRequestFutures.values()).get(); + + final ArrayBlockingQueue freedSlots = new ArrayBlockingQueue<>(1); + taskManagerGateway.setFreeSlotFunction( + (allocationID, throwable) -> { + freedSlots.offer(allocationID); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); + + final FlinkException failException = new FlinkException("Test fail exception"); + // fail allocations one by one + for (int i = 0; i < parallelism - 1; i++) { + final SlotOffer slotOffer = slotOffers.get(i); + final CompletableFuture> emptyTaskExecutorFuture = slotPoolGateway.failAllocation( + slotOffer.getAllocationId(), + failException); + + assertThat(emptyTaskExecutorFuture.get().isPresent(), is(false)); + assertThat(freedSlots.take(), is(equalTo(slotOffer.getAllocationId()))); + } + + final SlotOffer slotOffer = slotOffers.get(parallelism - 1); + final CompletableFuture> emptyTaskExecutorFuture = slotPoolGateway.failAllocation( + slotOffer.getAllocationId(), + failException); + assertThat(emptyTaskExecutorFuture.get().get(), is(equalTo(taskManagerLocation.getResourceID()))); + assertThat(freedSlots.take(), is(equalTo(slotOffer.getAllocationId()))); + + } finally { + RpcUtils.terminateRpcEndpoint(slotPool, timeout); + } + } + + /** + * Tests that failing an allocation fails the pending slot request + */ + @Test + public void testFailingAllocationFailsPendingSlotRequests() throws Exception { + final SlotPool slotPool = new SlotPool(rpcService, jobId, LocationPreferenceSchedulingStrategy.getInstance()); + + try { + final CompletableFuture allocationIdFuture = new CompletableFuture<>(); + resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId())); + final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); + + final CompletableFuture slotFuture = allocateSlot(slotPoolGateway, new SlotRequestId()); + + final AllocationID allocationId = allocationIdFuture.get(); + + assertThat(slotFuture.isDone(), is(false)); + + final FlinkException cause = new FlinkException("Fail pending slot request failure."); + final CompletableFuture> responseFuture = slotPoolGateway.failAllocation(allocationId, cause); + + assertThat(responseFuture.get().isPresent(), is(false)); + + try { + slotFuture.get(); + fail("Expected a slot allocation failure."); + } catch (ExecutionException ee) { + assertThat(ExceptionUtils.stripExecutionException(ee), equalTo(cause)); + } + } finally { + RpcUtils.terminateRpcEndpoint(slotPool, timeout); + } + } + + private CompletableFuture allocateSlot(SlotPoolGateway slotPoolGateway, SlotRequestId slotRequestId) { + return slotPoolGateway.allocateSlot( + slotRequestId, + new DummyScheduledUnit(), + SlotProfile.noRequirements(), + true, + timeout); + } + private static SlotPoolGateway setupSlotPool( SlotPool slotPool, ResourceManagerGateway resourceManagerGateway) throws Exception { @@ -750,13 +854,4 @@ private static SlotPoolGateway setupSlotPool( return slotPool.getSelfGateway(SlotPoolGateway.class); } - - private AllocatedSlot createSlot(final AllocationID allocationId) { - return new AllocatedSlot( - allocationId, - taskManagerLocation, - 0, - ResourceProfile.UNKNOWN, - taskManagerGateway); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java index a9e99495e34aa5..912de36c88193a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java @@ -59,13 +59,16 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { private final Function, CompletableFuture> requestSlotFunction; - TestingTaskExecutorGateway(String address, String hostname, Consumer heartbeatJobManagerConsumer, BiConsumer disconnectJobManagerConsumer, BiFunction> submitTaskConsumer, Function, CompletableFuture> requestSlotFunction) { + private final BiFunction> freeSlotFunction; + + TestingTaskExecutorGateway(String address, String hostname, Consumer heartbeatJobManagerConsumer, BiConsumer disconnectJobManagerConsumer, BiFunction> submitTaskConsumer, Function, CompletableFuture> requestSlotFunction, BiFunction> freeSlotFunction) { this.address = Preconditions.checkNotNull(address); this.hostname = Preconditions.checkNotNull(hostname); this.heartbeatJobManagerConsumer = Preconditions.checkNotNull(heartbeatJobManagerConsumer); this.disconnectJobManagerConsumer = Preconditions.checkNotNull(disconnectJobManagerConsumer); this.submitTaskConsumer = Preconditions.checkNotNull(submitTaskConsumer); this.requestSlotFunction = Preconditions.checkNotNull(requestSlotFunction); + this.freeSlotFunction = Preconditions.checkNotNull(freeSlotFunction); } @Override @@ -141,7 +144,7 @@ public void disconnectResourceManager(Exception cause) { @Override public CompletableFuture freeSlot(AllocationID allocationId, Throwable cause, Time timeout) { - return CompletableFuture.completedFuture(Acknowledge.get()); + return freeSlotFunction.apply(allocationId, cause); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java index 1c2f1328a5afe6..e59eefd0c8d7ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java @@ -43,6 +43,7 @@ public class TestingTaskExecutorGatewayBuilder { private static final BiConsumer NOOP_DISCONNECT_JOBMANAGER_CONSUMER = (ignoredA, ignoredB) -> {}; private static final BiFunction> NOOP_SUBMIT_TASK_CONSUMER = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(Acknowledge.get()); private static final Function, CompletableFuture> NOOP_REQUEST_SLOT_FUNCTION = ignored -> CompletableFuture.completedFuture(Acknowledge.get()); + private static final BiFunction> NOOP_FREE_SLOT_FUNCTION = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(Acknowledge.get()); private String address = "foobar:1234"; private String hostname = "foobar"; @@ -50,6 +51,7 @@ public class TestingTaskExecutorGatewayBuilder { private BiConsumer disconnectJobManagerConsumer = NOOP_DISCONNECT_JOBMANAGER_CONSUMER; private BiFunction> submitTaskConsumer = NOOP_SUBMIT_TASK_CONSUMER; private Function, CompletableFuture> requestSlotFunction = NOOP_REQUEST_SLOT_FUNCTION; + private BiFunction> freeSlotFunction = NOOP_FREE_SLOT_FUNCTION; public TestingTaskExecutorGatewayBuilder setAddress(String address) { this.address = address; @@ -81,7 +83,12 @@ public TestingTaskExecutorGatewayBuilder setRequestSlotFunction(Function> freeSlotFunction) { + this.freeSlotFunction = freeSlotFunction; + return this; + } + public TestingTaskExecutorGateway createTestingTaskExecutorGateway() { - return new TestingTaskExecutorGateway(address, hostname, heartbeatJobManagerConsumer, disconnectJobManagerConsumer, submitTaskConsumer, requestSlotFunction); + return new TestingTaskExecutorGateway(address, hostname, heartbeatJobManagerConsumer, disconnectJobManagerConsumer, submitTaskConsumer, requestSlotFunction, freeSlotFunction); } }