Skip to content

Commit

Permalink
[FLINK-8088] Associate logical slots with the slot request id
Browse files Browse the repository at this point in the history
Before logical slots like the SimpleSlot and SharedSlot where associated to the
actually allocated slot via the AllocationID. This, however, was sub-optimal because
allocated slots can be re-used to fulfill also other slot requests (logical slots).
Therefore, we should bind the logical slots to the right id with the right lifecycle
which is the slot request id.

This closes #5089.
  • Loading branch information
tillrohrmann committed Dec 14, 2017
1 parent a569f38 commit bc1c375
Show file tree
Hide file tree
Showing 24 changed files with 289 additions and 129 deletions.
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;


import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;


Expand All @@ -44,10 +45,10 @@
* an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the * an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the
* JobManager. All slots had a default unknown resource profile. * JobManager. All slots had a default unknown resource profile.
*/ */
public class AllocatedSlot implements SlotContext { public class AllocatedSlot {


/** The ID under which the slot is allocated. Uniquely identifies the slot. */ /** The ID under which the slot is allocated. Uniquely identifies the slot. */
private final AllocationID slotAllocationId; private final AllocationID allocationId;


/** The location information of the TaskManager to which this slot belongs */ /** The location information of the TaskManager to which this slot belongs */
private final TaskManagerLocation taskManagerLocation; private final TaskManagerLocation taskManagerLocation;
Expand All @@ -68,13 +69,13 @@ public class AllocatedSlot implements SlotContext {
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


public AllocatedSlot( public AllocatedSlot(
AllocationID slotAllocationId, AllocationID allocationId,
TaskManagerLocation location, TaskManagerLocation location,
int physicalSlotNumber, int physicalSlotNumber,
ResourceProfile resourceProfile, ResourceProfile resourceProfile,
TaskManagerGateway taskManagerGateway, TaskManagerGateway taskManagerGateway,
SlotOwner slotOwner) { SlotOwner slotOwner) {
this.slotAllocationId = checkNotNull(slotAllocationId); this.allocationId = checkNotNull(allocationId);
this.taskManagerLocation = checkNotNull(location); this.taskManagerLocation = checkNotNull(location);
this.physicalSlotNumber = physicalSlotNumber; this.physicalSlotNumber = physicalSlotNumber;
this.resourceProfile = checkNotNull(resourceProfile); this.resourceProfile = checkNotNull(resourceProfile);
Expand All @@ -92,7 +93,7 @@ public AllocatedSlot(
* @return The ID under which the slot is allocated * @return The ID under which the slot is allocated
*/ */
public AllocationID getAllocationId() { public AllocationID getAllocationId() {
return slotAllocationId; return allocationId;
} }


/** /**
Expand Down Expand Up @@ -182,12 +183,16 @@ public boolean releaseLogicalSlot() {
/** /**
* Allocates a logical {@link SimpleSlot}. * Allocates a logical {@link SimpleSlot}.
* *
* @param slotRequestId identifying the corresponding slot request
* @param locality specifying the locality of the allocated slot
* @return an allocated logical simple slot * @return an allocated logical simple slot
* @throws SlotException if we could not allocate a simple slot * @throws SlotException if we could not allocate a simple slot
*/ */
public SimpleSlot allocateSimpleSlot(Locality locality) throws SlotException { public SimpleSlot allocateSimpleSlot(SlotRequestID slotRequestId, Locality locality) throws SlotException {
final AllocatedSlotContext allocatedSlotContext = new AllocatedSlotContext(
slotRequestId);


final SimpleSlot simpleSlot = new SimpleSlot(this, slotOwner, physicalSlotNumber); final SimpleSlot simpleSlot = new SimpleSlot(allocatedSlotContext, slotOwner, physicalSlotNumber);


if (logicalSlotReference.compareAndSet(null, simpleSlot)) { if (logicalSlotReference.compareAndSet(null, simpleSlot)) {
simpleSlot.setLocality(locality); simpleSlot.setLocality(locality);
Expand All @@ -200,12 +205,16 @@ public SimpleSlot allocateSimpleSlot(Locality locality) throws SlotException {
/** /**
* Allocates a logical {@link SharedSlot}. * Allocates a logical {@link SharedSlot}.
* *
* @param slotRequestId identifying the corresponding slot request
* @param slotSharingGroupAssignment the slot sharing group to which the shared slot shall belong * @param slotSharingGroupAssignment the slot sharing group to which the shared slot shall belong
* @return an allocated logical shared slot * @return an allocated logical shared slot
* @throws SlotException if we could not allocate a shared slot * @throws SlotException if we could not allocate a shared slot
*/ */
public SharedSlot allocateSharedSlot(SlotSharingGroupAssignment slotSharingGroupAssignment) throws SlotException { public SharedSlot allocateSharedSlot(SlotRequestID slotRequestId, SlotSharingGroupAssignment slotSharingGroupAssignment) throws SlotException {
final SharedSlot sharedSlot = new SharedSlot(this, slotOwner, slotSharingGroupAssignment);
final AllocatedSlotContext allocatedSlotContext = new AllocatedSlotContext(
slotRequestId);
final SharedSlot sharedSlot = new SharedSlot(allocatedSlotContext, slotOwner, slotSharingGroupAssignment);


if (logicalSlotReference.compareAndSet(null, sharedSlot)) { if (logicalSlotReference.compareAndSet(null, sharedSlot)) {


Expand Down Expand Up @@ -236,6 +245,43 @@ public final boolean equals(Object obj) {


@Override @Override
public String toString() { public String toString() {
return "AllocatedSlot " + slotAllocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber; return "AllocatedSlot " + allocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber;
}

/**
* Slot context for {@link AllocatedSlot}.
*/
private final class AllocatedSlotContext implements SlotContext {

private final SlotRequestID slotRequestId;

private AllocatedSlotContext(SlotRequestID slotRequestId) {
this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
}

@Override
public SlotRequestID getSlotRequestId() {
return slotRequestId;
}

@Override
public AllocationID getAllocationId() {
return allocationId;
}

@Override
public TaskManagerLocation getTaskManagerLocation() {
return taskManagerLocation;
}

@Override
public int getPhysicalSlotNumber() {
return physicalSlotNumber;
}

@Override
public TaskManagerGateway getTaskManagerGateway() {
return taskManagerGateway;
}
} }
} }
Expand Up @@ -276,12 +276,15 @@ public SharedSlot allocateSharedSlot(SlotSharingGroupAssignment sharingGroupAssi
* <p>The method will transition the slot to the "released" state. If the slot is already in state * <p>The method will transition the slot to the "released" state. If the slot is already in state
* "released", this method will do nothing.</p> * "released", this method will do nothing.</p>
* *
* @param slot The slot to return. * @param logicalSlot The slot to return.
* @return Future which is completed with true, if the slot was returned, false if not. * @return Future which is completed with true, if the slot was returned, false if not.
*/ */
@Override @Override
public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) { public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot) {
checkNotNull(slot); checkNotNull(logicalSlot);
checkArgument(logicalSlot instanceof Slot);

final Slot slot = ((Slot) logicalSlot);
checkArgument(!slot.isAlive(), "slot is still alive"); checkArgument(!slot.isAlive(), "slot is still alive");
checkArgument(slot.getOwner() == this, "slot belongs to the wrong TaskManager."); checkArgument(slot.getOwner() == this, "slot belongs to the wrong TaskManager.");


Expand Down
Expand Up @@ -92,6 +92,14 @@ public interface LogicalSlot {
*/ */
AllocationID getAllocationId(); AllocationID getAllocationId();


/**
* Gets the slot request id uniquely identifying the request with which this
* slot has been allocated.
*
* @return Unique id identifying the slot request with which this slot was allocated
*/
SlotRequestID getSlotRequestId();

/** /**
* Payload for a logical slot. * Payload for a logical slot.
*/ */
Expand Down
Expand Up @@ -213,6 +213,11 @@ public AllocationID getAllocationId() {
return getSlotContext().getAllocationId(); return getSlotContext().getAllocationId();
} }


@Override
public SlotRequestID getSlotRequestId() {
return getSlotContext().getSlotRequestId();
}

/** /**
* Gets the set of all slots allocated as sub-slots of this shared slot. * Gets the set of all slots allocated as sub-slots of this shared slot.
* *
Expand Down
Expand Up @@ -97,6 +97,7 @@ public SimpleSlot(
parent != null ? parent != null ?
parent.getSlotContext() : parent.getSlotContext() :
new SimpleSlotContext( new SimpleSlotContext(
NO_SLOT_REQUEST_ID,
NO_ALLOCATION_ID, NO_ALLOCATION_ID,
location, location,
slotNumber, slotNumber,
Expand Down Expand Up @@ -274,6 +275,11 @@ public AllocationID getAllocationId() {
return getSlotContext().getAllocationId(); return getSlotContext().getAllocationId();
} }


@Override
public SlotRequestID getSlotRequestId() {
return getSlotContext().getSlotRequestId();
}

// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Utilities // Utilities
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand Down
Expand Up @@ -61,7 +61,8 @@ public abstract class Slot {
private static final int RELEASED = 2; private static final int RELEASED = 2;


// temporary placeholder for Slots that are not constructed from an AllocatedSlot (prior to FLIP-6) // temporary placeholder for Slots that are not constructed from an AllocatedSlot (prior to FLIP-6)
protected static final AllocationID NO_ALLOCATION_ID = new AllocationID(0, 0); protected static final AllocationID NO_ALLOCATION_ID = new AllocationID(0L, 0L);
protected static final SlotRequestID NO_SLOT_REQUEST_ID = new SlotRequestID(0L, 0L);


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


Expand Down Expand Up @@ -111,6 +112,7 @@ protected Slot(


// create a simple slot context // create a simple slot context
this.slotContext = new SimpleSlotContext( this.slotContext = new SimpleSlotContext(
NO_SLOT_REQUEST_ID,
NO_ALLOCATION_ID, NO_ALLOCATION_ID,
location, location,
slotNumber, slotNumber,
Expand Down
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality; import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
import org.apache.flink.runtime.jobmanager.slots.SlotException; import org.apache.flink.runtime.jobmanager.slots.SlotException;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
Expand Down Expand Up @@ -278,25 +277,31 @@ public CompletableFuture<LogicalSlot> allocateSlot(
} }


@Override @Override
public void returnAllocatedSlot(SlotContext allocatedSlot) { public void returnAllocatedSlot(SlotRequestID slotRequestId) {
internalReturnAllocatedSlot(allocatedSlot.getAllocationId()); final AllocatedSlot allocatedSlot = allocatedSlots.remove(slotRequestId);

if (allocatedSlot != null) {
internalReturnAllocatedSlot(allocatedSlot);
} else {
log.debug("There is no allocated slot with request id {}. Ignoring this request.", slotRequestId);
}
} }


@Override @Override
public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID requestId) { public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID slotRequestId) {
final PendingRequest pendingRequest = removePendingRequest(requestId); final PendingRequest pendingRequest = removePendingRequest(slotRequestId);


if (pendingRequest != null) { if (pendingRequest != null) {
failPendingRequest(pendingRequest, new CancellationException("Allocation with request id" + requestId + " cancelled.")); failPendingRequest(pendingRequest, new CancellationException("Allocation with request id" + slotRequestId + " cancelled."));
} else { } else {
final AllocatedSlot allocatedSlot = allocatedSlots.get(requestId); final AllocatedSlot allocatedSlot = allocatedSlots.get(slotRequestId);


if (allocatedSlot != null) { if (allocatedSlot != null) {
LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", allocatedSlot, requestId); LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", allocatedSlot, slotRequestId);
// TODO: Avoid having to send another message to do the slot releasing (e.g. introduce Slot#cancelExecution) and directly return slot // TODO: Avoid having to send another message to do the slot releasing (e.g. introduce Slot#cancelExecution) and directly return slot
allocatedSlot.triggerLogicalSlotRelease(); allocatedSlot.triggerLogicalSlotRelease();
} else { } else {
LOG.debug("There was no slot allocation with {} to be cancelled.", requestId); LOG.debug("There was no slot allocation with {} to be cancelled.", slotRequestId);
} }
} }


Expand All @@ -316,7 +321,7 @@ CompletableFuture<LogicalSlot> internalAllocateSlot(


final SimpleSlot simpleSlot; final SimpleSlot simpleSlot;
try { try {
simpleSlot = allocatedSlot.allocateSimpleSlot(slotFromPool.locality()); simpleSlot = allocatedSlot.allocateSimpleSlot(requestId, slotFromPool.locality());
} catch (SlotException e) { } catch (SlotException e) {
availableSlots.add(allocatedSlot, clock.relativeTimeMillis()); availableSlots.add(allocatedSlot, clock.relativeTimeMillis());


Expand All @@ -335,9 +340,9 @@ CompletableFuture<LogicalSlot> internalAllocateSlot(
return allocatedSlotFuture.thenApply( return allocatedSlotFuture.thenApply(
(AllocatedSlot allocatedSlot) -> { (AllocatedSlot allocatedSlot) -> {
try { try {
return allocatedSlot.allocateSimpleSlot(Locality.UNKNOWN); return allocatedSlot.allocateSimpleSlot(requestId, Locality.UNKNOWN);
} catch (SlotException e) { } catch (SlotException e) {
returnAllocatedSlot(allocatedSlot); internalReturnAllocatedSlot(allocatedSlot);


throw new CompletionException("Could not allocate a logical simple slot.", e); throw new CompletionException("Could not allocate a logical simple slot.", e);
} }
Expand Down Expand Up @@ -495,31 +500,25 @@ private void checkTimeoutRequestWaitingForResourceManager(SlotRequestID slotRequ
* Return the slot back to this pool without releasing it. It's mainly called by failed / cancelled tasks, and the * Return the slot back to this pool without releasing it. It's mainly called by failed / cancelled tasks, and the
* slot can be reused by other pending requests if the resource profile matches.n * slot can be reused by other pending requests if the resource profile matches.n
* *
* @param allocationId identifying the slot which is returned * @param allocatedSlot which shall be returned
*/ */
private void internalReturnAllocatedSlot(AllocationID allocationId) { private void internalReturnAllocatedSlot(AllocatedSlot allocatedSlot) {
final AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationId); if (allocatedSlot.releaseLogicalSlot()) {

if (allocatedSlot != null) {
if (allocatedSlot.releaseLogicalSlot()) {


final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot); final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);


if (pendingRequest != null) { if (pendingRequest != null) {
LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]", LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId()); pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());


allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot); pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
} else {
LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
}
} else { } else {
LOG.debug("Failed to mark the logical slot of {} as released.", allocatedSlot); LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
} }
} else { } else {
LOG.debug("Could not find allocated slot {}. Ignoring returning slot.", allocationId); LOG.debug("Failed to mark the logical slot of {} as released.", allocatedSlot);
} }
} }


Expand Down Expand Up @@ -820,25 +819,48 @@ boolean contains(AllocationID slotAllocationId) {
} }


/** /**
* Remove an allocation with slot. * Removes the allocated slot specified by the provided slot allocation id.
* *
* @param slotId The ID of the slot to be removed * @param allocationID identifying the allocated slot to remove
* @return The removed allocated slot or null.
*/ */
AllocatedSlot remove(final AllocationID slotId) { @Nullable
AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyA(slotId); AllocatedSlot remove(final AllocationID allocationID) {
AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyA(allocationID);

if (allocatedSlot != null) { if (allocatedSlot != null) {
final ResourceID taskManagerId = allocatedSlot.getTaskManagerLocation().getResourceID(); removeAllocatedSlot(allocatedSlot);
Set<AllocatedSlot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId); }


slotsForTM.remove(allocatedSlot); return allocatedSlot;
}


if (slotsForTM.isEmpty()) { /**
allocatedSlotsByTaskManager.remove(taskManagerId); * Removes the allocated slot specified by the provided slot request id.
} *
return allocatedSlot; * @param slotRequestId identifying the allocated slot to remove
* @return The removed allocated slot or null.
*/
@Nullable
AllocatedSlot remove(final SlotRequestID slotRequestId) {
final AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyB(slotRequestId);

if (allocatedSlot != null) {
removeAllocatedSlot(allocatedSlot);
} }
else {
return null; return allocatedSlot;
}

private void removeAllocatedSlot(final AllocatedSlot allocatedSlot) {
Preconditions.checkNotNull(allocatedSlot);
final ResourceID taskManagerId = allocatedSlot.getTaskManagerLocation().getResourceID();
Set<AllocatedSlot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId);

slotsForTM.remove(allocatedSlot);

if (slotsForTM.isEmpty()) {
allocatedSlotsByTaskManager.remove(taskManagerId);
} }
} }


Expand Down Expand Up @@ -1106,8 +1128,8 @@ private static class ProviderAndOwner implements SlotOwner, SlotProvider {
} }


@Override @Override
public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) { public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot slot) {
gateway.returnAllocatedSlot(slot.getSlotContext()); gateway.returnAllocatedSlot(slot.getSlotRequestId());
return CompletableFuture.completedFuture(true); return CompletableFuture.completedFuture(true);
} }


Expand Down

0 comments on commit bc1c375

Please sign in to comment.