Skip to content

Commit

Permalink
[FLINK-8088] Associate logical slots with the slot request id
Browse files Browse the repository at this point in the history
Before logical slots like the SimpleSlot and SharedSlot where associated to the
actually allocated slot via the AllocationID. This, however, was sub-optimal because
allocated slots can be re-used to fulfill also other slot requests (logical slots).
Therefore, we should bind the logical slots to the right id with the right lifecycle
which is the slot request id.

This closes #5089.
  • Loading branch information
tillrohrmann committed Dec 1, 2017
1 parent 84d86be commit 6c632cc
Show file tree
Hide file tree
Showing 24 changed files with 289 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;

import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -44,10 +45,10 @@
* an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the
* JobManager. All slots had a default unknown resource profile.
*/
public class AllocatedSlot implements SlotContext {
public class AllocatedSlot {

/** The ID under which the slot is allocated. Uniquely identifies the slot. */
private final AllocationID slotAllocationId;
private final AllocationID allocationId;

/** The location information of the TaskManager to which this slot belongs */
private final TaskManagerLocation taskManagerLocation;
Expand All @@ -68,13 +69,13 @@ public class AllocatedSlot implements SlotContext {
// ------------------------------------------------------------------------

public AllocatedSlot(
AllocationID slotAllocationId,
AllocationID allocationId,
TaskManagerLocation location,
int physicalSlotNumber,
ResourceProfile resourceProfile,
TaskManagerGateway taskManagerGateway,
SlotOwner slotOwner) {
this.slotAllocationId = checkNotNull(slotAllocationId);
this.allocationId = checkNotNull(allocationId);
this.taskManagerLocation = checkNotNull(location);
this.physicalSlotNumber = physicalSlotNumber;
this.resourceProfile = checkNotNull(resourceProfile);
Expand All @@ -92,7 +93,7 @@ public AllocatedSlot(
* @return The ID under which the slot is allocated
*/
public AllocationID getAllocationId() {
return slotAllocationId;
return allocationId;
}

/**
Expand Down Expand Up @@ -182,12 +183,16 @@ public boolean releaseLogicalSlot() {
/**
* Allocates a logical {@link SimpleSlot}.
*
* @param slotRequestId identifying the corresponding slot request
* @param locality specifying the locality of the allocated slot
* @return an allocated logical simple slot
* @throws SlotException if we could not allocate a simple slot
*/
public SimpleSlot allocateSimpleSlot(Locality locality) throws SlotException {
public SimpleSlot allocateSimpleSlot(SlotRequestID slotRequestId, Locality locality) throws SlotException {
final AllocatedSlotContext allocatedSlotContext = new AllocatedSlotContext(
slotRequestId);

final SimpleSlot simpleSlot = new SimpleSlot(this, slotOwner, physicalSlotNumber);
final SimpleSlot simpleSlot = new SimpleSlot(allocatedSlotContext, slotOwner, physicalSlotNumber);

if (logicalSlotReference.compareAndSet(null, simpleSlot)) {
simpleSlot.setLocality(locality);
Expand All @@ -200,12 +205,16 @@ public SimpleSlot allocateSimpleSlot(Locality locality) throws SlotException {
/**
* Allocates a logical {@link SharedSlot}.
*
* @param slotRequestId identifying the corresponding slot request
* @param slotSharingGroupAssignment the slot sharing group to which the shared slot shall belong
* @return an allocated logical shared slot
* @throws SlotException if we could not allocate a shared slot
*/
public SharedSlot allocateSharedSlot(SlotSharingGroupAssignment slotSharingGroupAssignment) throws SlotException {
final SharedSlot sharedSlot = new SharedSlot(this, slotOwner, slotSharingGroupAssignment);
public SharedSlot allocateSharedSlot(SlotRequestID slotRequestId, SlotSharingGroupAssignment slotSharingGroupAssignment) throws SlotException {

final AllocatedSlotContext allocatedSlotContext = new AllocatedSlotContext(
slotRequestId);
final SharedSlot sharedSlot = new SharedSlot(allocatedSlotContext, slotOwner, slotSharingGroupAssignment);

if (logicalSlotReference.compareAndSet(null, sharedSlot)) {

Expand Down Expand Up @@ -236,6 +245,43 @@ public final boolean equals(Object obj) {

@Override
public String toString() {
return "AllocatedSlot " + slotAllocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber;
return "AllocatedSlot " + allocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber;
}

/**
* Slot context for {@link AllocatedSlot}.
*/
private final class AllocatedSlotContext implements SlotContext {

private final SlotRequestID slotRequestId;

private AllocatedSlotContext(SlotRequestID slotRequestId) {
this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
}

@Override
public SlotRequestID getSlotRequestId() {
return slotRequestId;
}

@Override
public AllocationID getAllocationId() {
return allocationId;
}

@Override
public TaskManagerLocation getTaskManagerLocation() {
return taskManagerLocation;
}

@Override
public int getPhysicalSlotNumber() {
return physicalSlotNumber;
}

@Override
public TaskManagerGateway getTaskManagerGateway() {
return taskManagerGateway;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,15 @@ public SharedSlot allocateSharedSlot(SlotSharingGroupAssignment sharingGroupAssi
* <p>The method will transition the slot to the "released" state. If the slot is already in state
* "released", this method will do nothing.</p>
*
* @param slot The slot to return.
* @param logicalSlot The slot to return.
* @return Future which is completed with true, if the slot was returned, false if not.
*/
@Override
public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
checkNotNull(slot);
public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot) {
checkNotNull(logicalSlot);
checkArgument(logicalSlot instanceof Slot);

final Slot slot = ((Slot) logicalSlot);
checkArgument(!slot.isAlive(), "slot is still alive");
checkArgument(slot.getOwner() == this, "slot belongs to the wrong TaskManager.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ public interface LogicalSlot {
*/
AllocationID getAllocationId();

/**
* Gets the slot request id uniquely identifying the request with which this
* slot has been allocated.
*
* @return Unique id identifying the slot request with which this slot was allocated
*/
SlotRequestID getSlotRequestId();

/**
* Payload for a logical slot.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ public AllocationID getAllocationId() {
return getSlotContext().getAllocationId();
}

@Override
public SlotRequestID getSlotRequestId() {
return getSlotContext().getSlotRequestId();
}

/**
* Gets the set of all slots allocated as sub-slots of this shared slot.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public SimpleSlot(
parent != null ?
parent.getSlotContext() :
new SimpleSlotContext(
NO_SLOT_REQUEST_ID,
NO_ALLOCATION_ID,
location,
slotNumber,
Expand Down Expand Up @@ -274,6 +275,11 @@ public AllocationID getAllocationId() {
return getSlotContext().getAllocationId();
}

@Override
public SlotRequestID getSlotRequestId() {
return getSlotContext().getSlotRequestId();
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public abstract class Slot {
private static final int RELEASED = 2;

// temporary placeholder for Slots that are not constructed from an AllocatedSlot (prior to FLIP-6)
protected static final AllocationID NO_ALLOCATION_ID = new AllocationID(0, 0);
protected static final AllocationID NO_ALLOCATION_ID = new AllocationID(0L, 0L);
protected static final SlotRequestID NO_SLOT_REQUEST_ID = new SlotRequestID(0L, 0L);

// ------------------------------------------------------------------------

Expand Down Expand Up @@ -111,6 +112,7 @@ protected Slot(

// create a simple slot context
this.slotContext = new SimpleSlotContext(
NO_SLOT_REQUEST_ID,
NO_ALLOCATION_ID,
location,
slotNumber,
Expand Down

0 comments on commit 6c632cc

Please sign in to comment.