diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java index 70360443fd6dc..a3f98f13d79d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.Preconditions; import java.util.concurrent.atomic.AtomicReference; @@ -44,10 +45,10 @@ * an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the * JobManager. All slots had a default unknown resource profile. */ -public class AllocatedSlot implements SlotContext { +public class AllocatedSlot { /** The ID under which the slot is allocated. Uniquely identifies the slot. */ - private final AllocationID slotAllocationId; + private final AllocationID allocationId; /** The location information of the TaskManager to which this slot belongs */ private final TaskManagerLocation taskManagerLocation; @@ -68,13 +69,13 @@ public class AllocatedSlot implements SlotContext { // ------------------------------------------------------------------------ public AllocatedSlot( - AllocationID slotAllocationId, + AllocationID allocationId, TaskManagerLocation location, int physicalSlotNumber, ResourceProfile resourceProfile, TaskManagerGateway taskManagerGateway, SlotOwner slotOwner) { - this.slotAllocationId = checkNotNull(slotAllocationId); + this.allocationId = checkNotNull(allocationId); this.taskManagerLocation = checkNotNull(location); this.physicalSlotNumber = physicalSlotNumber; this.resourceProfile = checkNotNull(resourceProfile); @@ -92,7 +93,7 @@ public AllocatedSlot( * @return The ID under which the slot is allocated */ public AllocationID getAllocationId() { - return slotAllocationId; + return allocationId; } /** @@ -182,12 +183,16 @@ public boolean releaseLogicalSlot() { /** * Allocates a logical {@link SimpleSlot}. * + * @param slotRequestId identifying the corresponding slot request + * @param locality specifying the locality of the allocated slot * @return an allocated logical simple slot * @throws SlotException if we could not allocate a simple slot */ - public SimpleSlot allocateSimpleSlot(Locality locality) throws SlotException { + public SimpleSlot allocateSimpleSlot(SlotRequestID slotRequestId, Locality locality) throws SlotException { + final AllocatedSlotContext allocatedSlotContext = new AllocatedSlotContext( + slotRequestId); - final SimpleSlot simpleSlot = new SimpleSlot(this, slotOwner, physicalSlotNumber); + final SimpleSlot simpleSlot = new SimpleSlot(allocatedSlotContext, slotOwner, physicalSlotNumber); if (logicalSlotReference.compareAndSet(null, simpleSlot)) { simpleSlot.setLocality(locality); @@ -200,12 +205,16 @@ public SimpleSlot allocateSimpleSlot(Locality locality) throws SlotException { /** * Allocates a logical {@link SharedSlot}. * + * @param slotRequestId identifying the corresponding slot request * @param slotSharingGroupAssignment the slot sharing group to which the shared slot shall belong * @return an allocated logical shared slot * @throws SlotException if we could not allocate a shared slot */ - public SharedSlot allocateSharedSlot(SlotSharingGroupAssignment slotSharingGroupAssignment) throws SlotException { - final SharedSlot sharedSlot = new SharedSlot(this, slotOwner, slotSharingGroupAssignment); + public SharedSlot allocateSharedSlot(SlotRequestID slotRequestId, SlotSharingGroupAssignment slotSharingGroupAssignment) throws SlotException { + + final AllocatedSlotContext allocatedSlotContext = new AllocatedSlotContext( + slotRequestId); + final SharedSlot sharedSlot = new SharedSlot(allocatedSlotContext, slotOwner, slotSharingGroupAssignment); if (logicalSlotReference.compareAndSet(null, sharedSlot)) { @@ -236,6 +245,43 @@ public final boolean equals(Object obj) { @Override public String toString() { - return "AllocatedSlot " + slotAllocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber; + return "AllocatedSlot " + allocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber; + } + + /** + * Slot context for {@link AllocatedSlot}. + */ + private final class AllocatedSlotContext implements SlotContext { + + private final SlotRequestID slotRequestId; + + private AllocatedSlotContext(SlotRequestID slotRequestId) { + this.slotRequestId = Preconditions.checkNotNull(slotRequestId); + } + + @Override + public SlotRequestID getSlotRequestId() { + return slotRequestId; + } + + @Override + public AllocationID getAllocationId() { + return allocationId; + } + + @Override + public TaskManagerLocation getTaskManagerLocation() { + return taskManagerLocation; + } + + @Override + public int getPhysicalSlotNumber() { + return physicalSlotNumber; + } + + @Override + public TaskManagerGateway getTaskManagerGateway() { + return taskManagerGateway; + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java index 54c8971926bbf..44ee29d2049c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java @@ -276,12 +276,15 @@ public SharedSlot allocateSharedSlot(SlotSharingGroupAssignment sharingGroupAssi *

The method will transition the slot to the "released" state. If the slot is already in state * "released", this method will do nothing.

* - * @param slot The slot to return. + * @param logicalSlot The slot to return. * @return Future which is completed with true, if the slot was returned, false if not. */ @Override - public CompletableFuture returnAllocatedSlot(Slot slot) { - checkNotNull(slot); + public CompletableFuture returnAllocatedSlot(LogicalSlot logicalSlot) { + checkNotNull(logicalSlot); + checkArgument(logicalSlot instanceof Slot); + + final Slot slot = ((Slot) logicalSlot); checkArgument(!slot.isAlive(), "slot is still alive"); checkArgument(slot.getOwner() == this, "slot belongs to the wrong TaskManager."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java index e6632657e9ed2..b3104ac03592c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java @@ -92,6 +92,14 @@ public interface LogicalSlot { */ AllocationID getAllocationId(); + /** + * Gets the slot request id uniquely identifying the request with which this + * slot has been allocated. + * + * @return Unique id identifying the slot request with which this slot was allocated + */ + SlotRequestID getSlotRequestId(); + /** * Payload for a logical slot. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java index 8637159c5b220..8c9fe1a4b29cc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java @@ -213,6 +213,11 @@ public AllocationID getAllocationId() { return getSlotContext().getAllocationId(); } + @Override + public SlotRequestID getSlotRequestId() { + return getSlotContext().getSlotRequestId(); + } + /** * Gets the set of all slots allocated as sub-slots of this shared slot. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java index d397c08adc122..e98832f896a62 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java @@ -97,6 +97,7 @@ public SimpleSlot( parent != null ? parent.getSlotContext() : new SimpleSlotContext( + NO_SLOT_REQUEST_ID, NO_ALLOCATION_ID, location, slotNumber, @@ -274,6 +275,11 @@ public AllocationID getAllocationId() { return getSlotContext().getAllocationId(); } + @Override + public SlotRequestID getSlotRequestId() { + return getSlotContext().getSlotRequestId(); + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java index 6262c9a9125d9..e82f0758321df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java @@ -61,7 +61,8 @@ public abstract class Slot { private static final int RELEASED = 2; // temporary placeholder for Slots that are not constructed from an AllocatedSlot (prior to FLIP-6) - protected static final AllocationID NO_ALLOCATION_ID = new AllocationID(0, 0); + protected static final AllocationID NO_ALLOCATION_ID = new AllocationID(0L, 0L); + protected static final SlotRequestID NO_SLOT_REQUEST_ID = new SlotRequestID(0L, 0L); // ------------------------------------------------------------------------ @@ -111,6 +112,7 @@ protected Slot( // create a simple slot context this.slotContext = new SimpleSlotContext( + NO_SLOT_REQUEST_ID, NO_ALLOCATION_ID, location, slotNumber, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index 2ccea75b6e46c..a72f57be24459 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; -import org.apache.flink.runtime.jobmanager.slots.SlotContext; import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality; import org.apache.flink.runtime.jobmanager.slots.SlotException; import org.apache.flink.runtime.jobmanager.slots.SlotOwner; @@ -278,25 +277,31 @@ public CompletableFuture allocateSlot( } @Override - public void returnAllocatedSlot(SlotContext allocatedSlot) { - internalReturnAllocatedSlot(allocatedSlot.getAllocationId()); + public void returnAllocatedSlot(SlotRequestID slotRequestId) { + final AllocatedSlot allocatedSlot = allocatedSlots.remove(slotRequestId); + + if (allocatedSlot != null) { + internalReturnAllocatedSlot(allocatedSlot); + } else { + log.debug("There is no allocated slot with request id {}. Ignoring this request.", slotRequestId); + } } @Override - public CompletableFuture cancelSlotRequest(SlotRequestID requestId) { - final PendingRequest pendingRequest = removePendingRequest(requestId); + public CompletableFuture cancelSlotRequest(SlotRequestID slotRequestId) { + final PendingRequest pendingRequest = removePendingRequest(slotRequestId); if (pendingRequest != null) { - failPendingRequest(pendingRequest, new CancellationException("Allocation with request id" + requestId + " cancelled.")); + failPendingRequest(pendingRequest, new CancellationException("Allocation with request id" + slotRequestId + " cancelled.")); } else { - final AllocatedSlot allocatedSlot = allocatedSlots.get(requestId); + final AllocatedSlot allocatedSlot = allocatedSlots.get(slotRequestId); if (allocatedSlot != null) { - LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", allocatedSlot, requestId); + LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", allocatedSlot, slotRequestId); // TODO: Avoid having to send another message to do the slot releasing (e.g. introduce Slot#cancelExecution) and directly return slot allocatedSlot.triggerLogicalSlotRelease(); } else { - LOG.debug("There was no slot allocation with {} to be cancelled.", requestId); + LOG.debug("There was no slot allocation with {} to be cancelled.", slotRequestId); } } @@ -316,7 +321,7 @@ CompletableFuture internalAllocateSlot( final SimpleSlot simpleSlot; try { - simpleSlot = allocatedSlot.allocateSimpleSlot(slotFromPool.locality()); + simpleSlot = allocatedSlot.allocateSimpleSlot(requestId, slotFromPool.locality()); } catch (SlotException e) { availableSlots.add(allocatedSlot, clock.relativeTimeMillis()); @@ -335,9 +340,9 @@ CompletableFuture internalAllocateSlot( return allocatedSlotFuture.thenApply( (AllocatedSlot allocatedSlot) -> { try { - return allocatedSlot.allocateSimpleSlot(Locality.UNKNOWN); + return allocatedSlot.allocateSimpleSlot(requestId, Locality.UNKNOWN); } catch (SlotException e) { - returnAllocatedSlot(allocatedSlot); + internalReturnAllocatedSlot(allocatedSlot); throw new CompletionException("Could not allocate a logical simple slot.", e); } @@ -495,31 +500,25 @@ private void checkTimeoutRequestWaitingForResourceManager(SlotRequestID slotRequ * Return the slot back to this pool without releasing it. It's mainly called by failed / cancelled tasks, and the * slot can be reused by other pending requests if the resource profile matches.n * - * @param allocationId identifying the slot which is returned + * @param allocatedSlot which shall be returned */ - private void internalReturnAllocatedSlot(AllocationID allocationId) { - final AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationId); - - if (allocatedSlot != null) { - if (allocatedSlot.releaseLogicalSlot()) { + private void internalReturnAllocatedSlot(AllocatedSlot allocatedSlot) { + if (allocatedSlot.releaseLogicalSlot()) { - final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot); + final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot); - if (pendingRequest != null) { - LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]", - pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId()); + if (pendingRequest != null) { + LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]", + pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId()); - allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); - pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot); - } else { - LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId()); - availableSlots.add(allocatedSlot, clock.relativeTimeMillis()); - } + allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); + pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot); } else { - LOG.debug("Failed to mark the logical slot of {} as released.", allocatedSlot); + LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId()); + availableSlots.add(allocatedSlot, clock.relativeTimeMillis()); } } else { - LOG.debug("Could not find allocated slot {}. Ignoring returning slot.", allocationId); + LOG.debug("Failed to mark the logical slot of {} as released.", allocatedSlot); } } @@ -820,25 +819,48 @@ boolean contains(AllocationID slotAllocationId) { } /** - * Remove an allocation with slot. + * Removes the allocated slot specified by the provided slot allocation id. * - * @param slotId The ID of the slot to be removed + * @param allocationID identifying the allocated slot to remove + * @return The removed allocated slot or null. */ - AllocatedSlot remove(final AllocationID slotId) { - AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyA(slotId); + @Nullable + AllocatedSlot remove(final AllocationID allocationID) { + AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyA(allocationID); + if (allocatedSlot != null) { - final ResourceID taskManagerId = allocatedSlot.getTaskManagerLocation().getResourceID(); - Set slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId); + removeAllocatedSlot(allocatedSlot); + } - slotsForTM.remove(allocatedSlot); + return allocatedSlot; + } - if (slotsForTM.isEmpty()) { - allocatedSlotsByTaskManager.remove(taskManagerId); - } - return allocatedSlot; + /** + * Removes the allocated slot specified by the provided slot request id. + * + * @param slotRequestId identifying the allocated slot to remove + * @return The removed allocated slot or null. + */ + @Nullable + AllocatedSlot remove(final SlotRequestID slotRequestId) { + final AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyB(slotRequestId); + + if (allocatedSlot != null) { + removeAllocatedSlot(allocatedSlot); } - else { - return null; + + return allocatedSlot; + } + + private void removeAllocatedSlot(final AllocatedSlot allocatedSlot) { + Preconditions.checkNotNull(allocatedSlot); + final ResourceID taskManagerId = allocatedSlot.getTaskManagerLocation().getResourceID(); + Set slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId); + + slotsForTM.remove(allocatedSlot); + + if (slotsForTM.isEmpty()) { + allocatedSlotsByTaskManager.remove(taskManagerId); } } @@ -1106,8 +1128,8 @@ private static class ProviderAndOwner implements SlotOwner, SlotProvider { } @Override - public CompletableFuture returnAllocatedSlot(Slot slot) { - gateway.returnAllocatedSlot(slot.getSlotContext()); + public CompletableFuture returnAllocatedSlot(LogicalSlot slot) { + gateway.returnAllocatedSlot(slot.getSlotRequestId()); return CompletableFuture.completedFuture(true); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java index 71de054b31d95..103bc612920e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java @@ -23,7 +23,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; -import org.apache.flink.runtime.jobmanager.slots.SlotContext; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; @@ -31,7 +30,6 @@ import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.util.AbstractID; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -99,20 +97,14 @@ CompletableFuture allocateSlot( Iterable locationPreferences, @RpcTimeout Time timeout); - void returnAllocatedSlot(SlotContext slotInformation); + void returnAllocatedSlot(SlotRequestID slotRequestId); /** * Cancel a slot allocation request. * - * @param requestId identifying the slot allocation request + * @param slotRequestId identifying the slot allocation request * @return Future acknowledge if the slot allocation has been cancelled */ - CompletableFuture cancelSlotRequest(SlotRequestID requestId); + CompletableFuture cancelSlotRequest(SlotRequestID slotRequestId); - /** - * Request ID identifying different slot requests. - */ - final class SlotRequestID extends AbstractID { - private static final long serialVersionUID = -6072105912250154283L; - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java new file mode 100644 index 0000000000000..8e199441cdaf3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import org.apache.flink.util.AbstractID; + +/** + * Request ID identifying different slot requests. + */ +public final class SlotRequestID extends AbstractID { + private static final long serialVersionUID = -6072105912250154283L; + + public SlotRequestID(long lowerPart, long upperPart) { + super(lowerPart, upperPart); + } + + public SlotRequestID() {} +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java index 5dccc1fb87ccd..a5b75d74fdc6d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmanager.slots; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.instance.SlotRequestID; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.Preconditions; @@ -27,6 +28,8 @@ */ public class SimpleSlotContext implements SlotContext { + private final SlotRequestID slotRequestId; + private final AllocationID allocationId; private final TaskManagerLocation taskManagerLocation; @@ -36,16 +39,23 @@ public class SimpleSlotContext implements SlotContext { private final TaskManagerGateway taskManagerGateway; public SimpleSlotContext( + SlotRequestID slotRequestId, AllocationID allocationId, TaskManagerLocation taskManagerLocation, int physicalSlotNumber, TaskManagerGateway taskManagerGateway) { + this.slotRequestId = Preconditions.checkNotNull(slotRequestId); this.allocationId = Preconditions.checkNotNull(allocationId); this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); this.physicalSlotNumber = physicalSlotNumber; this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway); } + @Override + public SlotRequestID getSlotRequestId() { + return slotRequestId; + } + @Override public AllocationID getAllocationId() { return allocationId; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java index d8a1aa41f2cde..1e0317a1f75ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java @@ -20,6 +20,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.instance.SlotRequestID; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; /** @@ -30,9 +31,17 @@ public interface SlotContext { /** - * Gets the ID under which the slot is allocated, which uniquely identifies the slot. + * Gets the slot request id under which the slot has been requested. This id uniquely identifies the logical slot. * - * @return The ID under which the slot is allocated + * @return The id under which the slot has been requested + */ + SlotRequestID getSlotRequestId(); + + /** + * Gets the id under which the slot has been allocated on the TaskManager. This id uniquely identifies the + * physical slot. + * + * @return The id under whic teh slot has been allocated on the TaskManager */ AllocationID getAllocationId(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java index cb4488d783994..bc1ced46807f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.jobmanager.slots; -import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.instance.LogicalSlot; import java.util.concurrent.CompletableFuture; @@ -30,8 +30,8 @@ public interface SlotOwner { /** * Return the given slot to the slot owner. * - * @param slot to return + * @param logicalSlot to return * @return Future which is completed with true if the slot could be returned, otherwise with false */ - CompletableFuture returnAllocatedSlot(Slot slot); + CompletableFuture returnAllocatedSlot(LogicalSlot logicalSlot); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java index 586f51b65b461..18e6cf1b7fa2a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java @@ -30,8 +30,8 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.instance.LogicalSlot; import org.apache.flink.runtime.instance.SimpleSlot; -import org.apache.flink.runtime.instance.Slot; import org.apache.flink.runtime.instance.SlotProvider; +import org.apache.flink.runtime.instance.SlotRequestID; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -284,7 +284,7 @@ public void testOneSlotFailureAbortsDeploy() throws Exception { final BlockingQueue returnedSlots = new ArrayBlockingQueue<>(parallelism); final TestingSlotOwner slotOwner = new TestingSlotOwner(); slotOwner.setReturnAllocatedSlotConsumer( - (Slot slot) -> returnedSlots.offer(slot.getSlotContext().getAllocationId())); + (LogicalSlot logicalSlot) -> returnedSlots.offer(logicalSlot.getAllocationId())); final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism]; final SimpleSlot[] targetSlots = new SimpleSlot[parallelism]; @@ -365,7 +365,7 @@ public void testTimeoutForSlotAllocation() throws Exception { final BlockingQueue returnedSlots = new ArrayBlockingQueue<>(2); final TestingSlotOwner slotOwner = new TestingSlotOwner(); slotOwner.setReturnAllocatedSlotConsumer( - (Slot slot) -> returnedSlots.offer(slot.getSlotContext().getAllocationId())); + (LogicalSlot logicalSlot) -> returnedSlots.offer(logicalSlot.getAllocationId())); final TaskManagerGateway taskManager = mock(TaskManagerGateway.class); final SimpleSlot[] slots = new SimpleSlot[parallelism]; @@ -448,6 +448,7 @@ private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId, SlotO ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345); SimpleSlotContext slot = new SimpleSlotContext( + new SlotRequestID(), new AllocationID(), location, 0, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 06ffaa0c45816..c97329fa7f076 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -42,6 +42,7 @@ import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SlotProvider; +import org.apache.flink.runtime.instance.SlotRequestID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -244,10 +245,11 @@ public static SimpleSlot createMockSimpleSlot(TaskManagerGateway gateway) { ResourceID.generate(), InetAddress.getLoopbackAddress(), 6572); final SimpleSlotContext allocatedSlot = new SimpleSlotContext( - new AllocationID(), - location, - 0, - gateway); + new SlotRequestID(), + new AllocationID(), + location, + 0, + gateway); return new SimpleSlot( allocatedSlot, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java index 71d6f517a5f38..e3fd0df618ebe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.LogicalSlot; import org.apache.flink.runtime.instance.SimpleSlot; -import org.apache.flink.runtime.instance.Slot; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; @@ -306,7 +305,7 @@ public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt(); - CompletableFuture returnedSlotFuture = slotOwner.getReturnedSlotFuture(); + CompletableFuture returnedSlotFuture = slotOwner.getReturnedSlotFuture(); CompletableFuture terminationFuture = executionVertex.cancel(); // run canceling in a separate thread to allow an interleaving between termination @@ -334,15 +333,15 @@ public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception */ private static final class SingleSlotTestingSlotOwner implements SlotOwner { - final CompletableFuture returnedSlot = new CompletableFuture<>(); + final CompletableFuture returnedSlot = new CompletableFuture<>(); - public CompletableFuture getReturnedSlotFuture() { + public CompletableFuture getReturnedSlotFuture() { return returnedSlot; } @Override - public CompletableFuture returnAllocatedSlot(Slot slot) { - return CompletableFuture.completedFuture(returnedSlot.complete(slot)); + public CompletableFuture returnAllocatedSlot(LogicalSlot logicalSlot) { + return CompletableFuture.completedFuture(returnedSlot.complete(logicalSlot)); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java index 7f97d126054d9..63cebf3a5c265 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; -import org.apache.flink.runtime.instance.AllocatedSlot; +import org.apache.flink.runtime.jobmanager.slots.SlotContext; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.util.TestLogger; @@ -371,8 +371,8 @@ public void testTddProducedPartitionsLazyScheduling() throws Exception { result.getPartitions()[0].addConsumerGroup(); result.getPartitions()[0].addConsumer(mockEdge, 0); - AllocatedSlot allocatedSlot = mock(AllocatedSlot.class); - when(allocatedSlot.getAllocationId()).thenReturn(new AllocationID()); + SlotContext slotContext = mock(SlotContext.class); + when(slotContext.getAllocationId()).thenReturn(new AllocationID()); LogicalSlot slot = mock(LogicalSlot.class); when(slot.getAllocationId()).thenReturn(new AllocationID()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java index 98f72596a83e4..bffbb6a8e9d7c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java @@ -31,13 +31,14 @@ import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SlotProvider; +import org.apache.flink.runtime.instance.SlotRequestID; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.slots.SlotContext; import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext; +import org.apache.flink.runtime.jobmanager.slots.SlotContext; import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -234,7 +235,11 @@ private void initializeLocation(ExecutionVertex vertex, TaskManagerLocation loca // - exposing test methods in the ExecutionVertex leads to undesirable setters SlotContext slot = new SimpleSlotContext( - new AllocationID(), location, 0, mock(TaskManagerGateway.class)); + new SlotRequestID(), + new AllocationID(), + location, + 0, + mock(TaskManagerGateway.class)); SimpleSlot simpleSlot = new SimpleSlot(slot, mock(SlotOwner.class), 0); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java index 9a19d243a22e2..82953d6ff62b7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.Slot; import org.apache.flink.runtime.instance.SlotProvider; +import org.apache.flink.runtime.instance.SlotRequestID; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.slots.SlotContext; @@ -33,6 +34,7 @@ import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.Preconditions; import java.net.InetAddress; import java.util.ArrayDeque; @@ -61,10 +63,11 @@ public SimpleSlotProvider(JobID jobId, int numSlots, TaskManagerGateway taskMana for (int i = 0; i < numSlots; i++) { SimpleSlotContext as = new SimpleSlotContext( - new AllocationID(), - new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i), - 0, - taskManagerGateway); + new SlotRequestID(), + new AllocationID(), + new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i), + 0, + taskManagerGateway); slots.add(as); } } @@ -94,7 +97,11 @@ public CompletableFuture allocateSlot( } @Override - public CompletableFuture returnAllocatedSlot(Slot slot) { + public CompletableFuture returnAllocatedSlot(LogicalSlot logicalSlot) { + Preconditions.checkArgument(logicalSlot instanceof Slot); + + final Slot slot = ((Slot) logicalSlot); + synchronized (slots) { slots.add(slot.getSlotContext()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java index bc396c1fe92e5..223d43c42b8e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java @@ -41,7 +41,7 @@ public void testOperations() throws Exception { SlotPool.AllocatedSlots allocatedSlots = new SlotPool.AllocatedSlots(); final AllocationID allocation1 = new AllocationID(); - final SlotPoolGateway.SlotRequestID slotRequestID = new SlotPoolGateway.SlotRequestID(); + final SlotRequestID slotRequestID = new SlotRequestID(); final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); final ResourceID resource1 = taskManagerLocation.getResourceID(); final AllocatedSlot slot1 = createSlot(allocation1, taskManagerLocation); @@ -56,7 +56,7 @@ public void testOperations() throws Exception { assertEquals(1, allocatedSlots.size()); final AllocationID allocation2 = new AllocationID(); - final SlotPoolGateway.SlotRequestID slotRequestID2 = new SlotPoolGateway.SlotRequestID(); + final SlotRequestID slotRequestID2 = new SlotRequestID(); final AllocatedSlot slot2 = createSlot(allocation2, taskManagerLocation); allocatedSlots.add(slotRequestID2, slot2); @@ -71,7 +71,7 @@ public void testOperations() throws Exception { assertEquals(2, allocatedSlots.size()); final AllocationID allocation3 = new AllocationID(); - final SlotPoolGateway.SlotRequestID slotRequestID3 = new SlotPoolGateway.SlotRequestID(); + final SlotRequestID slotRequestID3 = new SlotRequestID(); final TaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation(); final ResourceID resource2 = taskManagerLocation2.getResourceID(); final AllocatedSlot slot3 = createSlot(allocation3, taskManagerLocation2); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java index 5d82f47c30225..60e1d342b13e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java @@ -111,7 +111,7 @@ public void testSlotAllocationNoResourceManager() throws Exception { pool.start(JobMasterId.generate(), "foobar"); CompletableFuture future = pool.allocateSlot( - new SlotPoolGateway.SlotRequestID(), + new SlotRequestID(), new ScheduledUnit(SchedulerTestUtils.getDummyTask()), DEFAULT_TESTING_PROFILE, Collections.emptyList(), @@ -144,7 +144,7 @@ public void testCancelSlotAllocationWithoutResourceManager() throws Exception { pool.start(JobMasterId.generate(), "foobar"); SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); - SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID(); + SlotRequestID requestId = new SlotRequestID(); CompletableFuture future = slotPoolGateway.allocateSlot( requestId, new ScheduledUnit(SchedulerTestUtils.getDummyTask()), @@ -188,7 +188,7 @@ public void testCancelSlotAllocationWithResourceManager() throws Exception { ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); pool.connectToResourceManager(resourceManagerGateway); - SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID(); + SlotRequestID requestId = new SlotRequestID(); CompletableFuture future = slotPoolGateway.allocateSlot( requestId, new ScheduledUnit(SchedulerTestUtils.getDummyTask()), @@ -239,7 +239,7 @@ public void testCancelSlotAllocationWhileSlotFulfilled() throws Exception { pool.connectToResourceManager(resourceManagerGateway); - SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID(); + SlotRequestID requestId = new SlotRequestID(); CompletableFuture future = slotPoolGateway.allocateSlot( requestId, new ScheduledUnit(SchedulerTestUtils.getDummyTask()), @@ -295,7 +295,7 @@ public void testProviderAndOwner() throws Exception { TestingUtils.infiniteTime(), TestingUtils.infiniteTime()); - final CompletableFuture cancelFuture = new CompletableFuture<>(); + final CompletableFuture cancelFuture = new CompletableFuture<>(); pool.setCancelSlotAllocationConsumer( slotRequestID -> cancelFuture.complete(slotRequestID)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java index 9d90a122980ea..ec20f6b320878 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; -import org.apache.flink.runtime.jobmanager.slots.SlotContext; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.messages.Acknowledge; @@ -103,7 +102,7 @@ public void testAllocateSimpleSlot() throws Exception { SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()); - SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID(); + SlotRequestID requestId = new SlotRequestID(); CompletableFuture future = slotPoolGateway.allocateSlot(requestId, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); assertFalse(future.isDone()); @@ -137,8 +136,8 @@ public void testAllocationFulfilledByReturnedSlot() throws Exception { SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); slotPool.registerTaskManager(taskManagerLocation.getResourceID()); - CompletableFuture future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); - CompletableFuture future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture future1 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture future2 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); assertFalse(future1.isDone()); assertFalse(future2.isDone()); @@ -187,7 +186,7 @@ public void testAllocateWithFreeSlot() throws Exception { SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()); - CompletableFuture future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture future1 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); assertFalse(future1.isDone()); ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); @@ -208,7 +207,7 @@ public void testAllocateWithFreeSlot() throws Exception { // return this slot to pool slot1.releaseSlot(); - CompletableFuture future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture future2 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); // second allocation fulfilled by previous slot returning LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS); @@ -233,7 +232,7 @@ public void testOfferSlot() throws Exception { SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()); - CompletableFuture future = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture future = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); assertFalse(future.isDone()); ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); @@ -284,8 +283,8 @@ public void testReleaseResource() throws Exception { final SlotPool slotPool = new SlotPool(rpcService, jobId) { @Override - public void returnAllocatedSlot(SlotContext allocatedSlot) { - super.returnAllocatedSlot(allocatedSlot); + public void returnAllocatedSlot(SlotRequestID slotRequestId) { + super.returnAllocatedSlot(slotRequestId); slotReturnFuture.complete(true); } @@ -295,14 +294,14 @@ public void returnAllocatedSlot(SlotContext allocatedSlot) { SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()); - CompletableFuture future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture future1 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class)); final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); - CompletableFuture future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture future2 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); final SlotOffer slotOffer = new SlotOffer( slotRequest.getAllocationId(), @@ -357,7 +356,7 @@ public void testSlotRequestCancellationUponFailingRequest() throws Exception { slotPoolGateway.connectToResourceManager(resourceManagerGateway); CompletableFuture slotFuture = slotPoolGateway.allocateSlot( - new SlotPoolGateway.SlotRequestID(), + new SlotRequestID(), scheduledUnit, ResourceProfile.UNKNOWN, Collections.emptyList(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java index 925933d28805e..20660170ca153 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java @@ -44,27 +44,32 @@ public class TestingLogicalSlot implements LogicalSlot { private final int slotNumber; private final CompletableFuture releaseFuture = new CompletableFuture<>(); - + private final AllocationID allocationId; + private final SlotRequestID slotRequestId; + public TestingLogicalSlot() { this( new LocalTaskManagerLocation(), new SimpleAckingTaskManagerGateway(), 0, - new AllocationID()); + new AllocationID(), + new SlotRequestID()); } public TestingLogicalSlot( TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, int slotNumber, - AllocationID allocationId) { + AllocationID allocationId, + SlotRequestID slotRequestId) { this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway); this.payloadReference = new AtomicReference<>(); this.slotNumber = slotNumber; this.allocationId = Preconditions.checkNotNull(allocationId); + this.slotRequestId = Preconditions.checkNotNull(slotRequestId); } @Override @@ -109,4 +114,9 @@ public int getPhysicalSlotNumber() { public AllocationID getAllocationId() { return allocationId; } + + @Override + public SlotRequestID getSlotRequestId() { + return slotRequestId; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java index 689454265b4d1..6d17ad01a0e07 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.jobmanager.slots; -import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.instance.LogicalSlot; import java.util.concurrent.CompletableFuture; @@ -27,7 +27,7 @@ */ public class DummySlotOwner implements SlotOwner { @Override - public CompletableFuture returnAllocatedSlot(Slot slot) { + public CompletableFuture returnAllocatedSlot(LogicalSlot logicalSlot) { return CompletableFuture.completedFuture(false); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java index 7c124ef3c0585..e7f9485f68dad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.jobmanager.slots; -import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.instance.LogicalSlot; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; @@ -28,18 +28,18 @@ */ public class TestingSlotOwner implements SlotOwner { - private volatile Consumer returnAllocatedSlotConsumer; + private volatile Consumer returnAllocatedSlotConsumer; - public void setReturnAllocatedSlotConsumer(Consumer returnAllocatedSlotConsumer) { + public void setReturnAllocatedSlotConsumer(Consumer returnAllocatedSlotConsumer) { this.returnAllocatedSlotConsumer = returnAllocatedSlotConsumer; } @Override - public CompletableFuture returnAllocatedSlot(Slot slot) { - final Consumer currentReturnAllocatedSlotConsumer = this.returnAllocatedSlotConsumer; + public CompletableFuture returnAllocatedSlot(LogicalSlot logicalSlot) { + final Consumer currentReturnAllocatedSlotConsumer = this.returnAllocatedSlotConsumer; if (currentReturnAllocatedSlotConsumer != null) { - currentReturnAllocatedSlotConsumer.accept(slot); + currentReturnAllocatedSlotConsumer.accept(logicalSlot); } return CompletableFuture.completedFuture(true);