Skip to content

Commit

Permalink
[FLINK-9324] Wait for slot release before completing release future i…
Browse files Browse the repository at this point in the history
…n SingleLogicalSlot

This commit properly waits for the completion of the SingleLogicalSlot's release future
until the SlotOwner has acknowledged the release. That way the ExecutionGraph will only
recover after all of its slots have been returned to the SlotPool.

As a side effect, the changes in this commit should reduce the number of redundant release
calls sent to the SlotOwner which cluttered the debug logs.

This closes #5980.
  • Loading branch information
tillrohrmann committed May 10, 2018
1 parent 3c86b6b commit c7eb6ac
Show file tree
Hide file tree
Showing 10 changed files with 498 additions and 127 deletions.
Expand Up @@ -42,7 +42,7 @@
* an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the
* JobManager. All slots had a default unknown resource profile.
*/
public class AllocatedSlot implements SlotContext {
class AllocatedSlot implements SlotContext {

This comment has been minimized.

Copy link
@tisonkun

tisonkun Aug 28, 2018

Member

@tillrohrmann What is the purpose to change access level here?

This comment has been minimized.

Copy link
@tillrohrmann

tillrohrmann Aug 29, 2018

Author Contributor

The AllocatedSlot should only be used by the SlotPool and not outside of this class. Therefore, restricting the access to package private will prevent that his class is used somewhere else.


/** The ID under which the slot is allocated. Uniquely identifies the slot. */
private final AllocationID allocationId;
Expand Down Expand Up @@ -171,21 +171,13 @@ public boolean tryAssignPayload(Payload payload) {
* then it is removed from the slot.
*
* @param cause of the release operation
* @return true if the payload could be released and was removed from the slot, otherwise false
*/
public boolean releasePayload(Throwable cause) {
public void releasePayload(Throwable cause) {
final Payload payload = payloadReference.get();

if (payload != null) {
if (payload.release(cause)) {
payloadReference.set(null);

return true;
} else {
return false;
}
} else {
return true;
payload.release(cause);
payloadReference.set(null);
}
}

Expand Down Expand Up @@ -222,12 +214,10 @@ public String toString() {
interface Payload {

/**
* Releases the payload. If the payload could be released, then it returns true,
* otherwise false.
* Releases the payload
*
* @param cause of the payload release
* @return true if the payload could be released, otherwise false
*/
boolean release(Throwable cause);
void release(Throwable cause);
}
}
Expand Up @@ -33,6 +33,7 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;

/**
* Implementation of the {@link LogicalSlot} which is used by the {@link SlotPool}.
Expand All @@ -44,6 +45,11 @@ public class SingleLogicalSlot implements LogicalSlot, AllocatedSlot.Payload {
Payload.class,
"payload");

private static final AtomicReferenceFieldUpdater<SingleLogicalSlot, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
SingleLogicalSlot.class,
State.class,
"state");

private final SlotRequestId slotRequestId;

private final SlotContext slotContext;
Expand All @@ -58,6 +64,10 @@ public class SingleLogicalSlot implements LogicalSlot, AllocatedSlot.Payload {
// owner of this slot to which it is returned upon release
private final SlotOwner slotOwner;

private final CompletableFuture<Void> releaseFuture;

private volatile State state;

// LogicalSlot.Payload of this slot
private volatile Payload payload;

Expand All @@ -72,8 +82,10 @@ public SingleLogicalSlot(
this.slotSharingGroupId = slotSharingGroupId;
this.locality = Preconditions.checkNotNull(locality);
this.slotOwner = Preconditions.checkNotNull(slotOwner);
this.releaseFuture = new CompletableFuture<>();

payload = null;
this.state = State.ALIVE;
this.payload = null;
}

@Override
Expand All @@ -93,20 +105,11 @@ public Locality getLocality() {

@Override
public boolean isAlive() {
final Payload currentPayload = payload;

if (currentPayload != null) {
return !currentPayload.getTerminalStateFuture().isDone();
} else {
// We are always alive if there is no payload assigned yet.
// If this slot is released and no payload is assigned, then the TERMINATED_PAYLOAD is assigned
return true;
}
return state == State.ALIVE;
}

@Override
public boolean tryAssignPayload(Payload payload) {
Preconditions.checkNotNull(payload);
return PAYLOAD_UPDATER.compareAndSet(this, null, payload);
}

Expand All @@ -118,15 +121,12 @@ public Payload getPayload() {

@Override
public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
// set an already terminated payload if the payload of this slot is still empty
tryAssignPayload(TERMINATED_PAYLOAD);

// notify the payload that the slot will be released
payload.fail(cause);
if (STATE_UPDATER.compareAndSet(this, State.ALIVE, State.RELEASING)) {
final CompletableFuture<?> payloadTerminalStateFuture = signalPayloadRelease(cause);
returnSlotToOwner(payloadTerminalStateFuture);
}

// Wait until the payload has been terminated. Only then, we return the slot to its rightful owner
return payload.getTerminalStateFuture()
.whenComplete((Object ignored, Throwable throwable) -> slotOwner.returnAllocatedSlot(this));
return releaseFuture;
}

@Override
Expand Down Expand Up @@ -159,10 +159,55 @@ public SlotSharingGroupId getSlotSharingGroupId() {
* the logical slot.
*
* @param cause of the payload release
* @return true if the logical slot's payload could be released, otherwise false
*/
@Override
public boolean release(Throwable cause) {
return releaseSlot(cause).isDone();
public void release(Throwable cause) {
if (STATE_UPDATER.compareAndSet(this, State.ALIVE, State.RELEASING)) {
signalPayloadRelease(cause);
}
markReleased();
releaseFuture.complete(null);
}

private CompletableFuture<?> signalPayloadRelease(Throwable cause) {
tryAssignPayload(TERMINATED_PAYLOAD);
payload.fail(cause);

return payload.getTerminalStateFuture();
}

private void returnSlotToOwner(CompletableFuture<?> terminalStateFuture) {
final CompletableFuture<Boolean> slotReturnFuture = terminalStateFuture.handle((Object ignored, Throwable throwable) -> {
if (state == State.RELEASING) {
return slotOwner.returnAllocatedSlot(this);
} else {
return CompletableFuture.completedFuture(true);
}
}).thenCompose(Function.identity());

slotReturnFuture.whenComplete(
(Object ignored, Throwable throwable) -> {
markReleased();

if (throwable != null) {
releaseFuture.completeExceptionally(throwable);
} else {
releaseFuture.complete(null);
}
});
}

private void markReleased() {
state = State.RELEASED;
}

// -------------------------------------------------------------------------
// Internal classes
// -------------------------------------------------------------------------

enum State {
ALIVE,
RELEASING,
RELEASED
}
}
Expand Up @@ -16,10 +16,9 @@
* limitations under the License.
*/

package org.apache.flink.runtime.jobmanager.slots;
package org.apache.flink.runtime.jobmaster.slotpool;

import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot;

import javax.annotation.Nonnull;

Expand Down
Expand Up @@ -35,7 +35,6 @@
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
Expand Down Expand Up @@ -764,10 +763,8 @@ public CompletableFuture<Acknowledge> releaseSlot(SlotRequestId slotRequestId, @
final AllocatedSlot allocatedSlot = allocatedSlots.remove(slotRequestId);

if (allocatedSlot != null) {
// sanity check
if (allocatedSlot.releasePayload(cause)) {
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
}
allocatedSlot.releasePayload(cause);
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
} else {
log.debug("There is no allocated slot with slot request id {}. Ignoring the release slot request.", slotRequestId);
}
Expand Down
Expand Up @@ -44,6 +44,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

/**
* Manager which is responsible for slot sharing. Slot sharing allows to run different
Expand Down Expand Up @@ -281,9 +282,8 @@ public boolean contains(AbstractID groupId) {
* Release the task slot.
*
* @param cause for the release
* @return true if the slot could be released, otherwise false
*/
public abstract boolean release(Throwable cause);
public abstract void release(Throwable cause);
}

/**
Expand Down Expand Up @@ -433,60 +433,51 @@ public boolean contains(AbstractID groupId) {
}

@Override
public boolean release(Throwable cause) {
public void release(Throwable cause) {
releasingChildren = true;

// first release all children and remove them if they could be released immediately
children.values().removeIf(node -> {
boolean release = node.release(cause);
for (TaskSlot taskSlot : children.values()) {
taskSlot.release(cause);
allTaskSlots.remove(taskSlot.getSlotRequestId());
}

if (release) {
allTaskSlots.remove(node.getSlotRequestId());
}

return release;
});
children.clear();

releasingChildren = false;

if (children.isEmpty()) {
if (parent != null) {
// we remove ourselves from our parent if we no longer have children
parent.releaseChild(getGroupId());
} else if (allTaskSlots.remove(getSlotRequestId()) != null) {
// we are the root node --> remove the root node from the list of task slots
if (parent != null) {
// we remove ourselves from our parent if we no longer have children
parent.releaseChild(getGroupId());
} else if (allTaskSlots.remove(getSlotRequestId()) != null) {
// we are the root node --> remove the root node from the list of task slots

if (!slotContextFuture.isDone() || slotContextFuture.isCompletedExceptionally()) {
synchronized (lock) {
// the root node should still be unresolved
unresolvedRootSlots.remove(getSlotRequestId());
}
} else {
// the root node should be resolved --> we can access the slot context
final SlotContext slotContext = slotContextFuture.getNow(null);
if (!slotContextFuture.isDone() || slotContextFuture.isCompletedExceptionally()) {
synchronized (lock) {
// the root node should still be unresolved
unresolvedRootSlots.remove(getSlotRequestId());
}
} else {
// the root node should be resolved --> we can access the slot context
final SlotContext slotContext = slotContextFuture.getNow(null);

if (slotContext != null) {
synchronized (lock) {
final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());
if (slotContext != null) {
synchronized (lock) {
final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());

if (multiTaskSlots != null) {
multiTaskSlots.remove(this);
if (multiTaskSlots != null) {
multiTaskSlots.remove(this);

if (multiTaskSlots.isEmpty()) {
resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
}
if (multiTaskSlots.isEmpty()) {
resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
}
}
}
}

// release the underlying allocated slot
allocatedSlotActions.releaseSlot(allocatedSlotRequestId, null, cause);
}

return true;
} else {
return false;
// release the underlying allocated slot
allocatedSlotActions.releaseSlot(allocatedSlotRequestId, null, cause);
}
}

Expand Down Expand Up @@ -518,7 +509,7 @@ public final class SingleTaskSlot extends TaskSlot {
private final MultiTaskSlot parent;

// future containing a LogicalSlot which is completed once the underlying SlotContext future is completed
private final CompletableFuture<LogicalSlot> logicalSlotFuture;
private final CompletableFuture<SingleLogicalSlot> singleLogicalSlotFuture;

private SingleTaskSlot(
SlotRequestId slotRequestId,
Expand All @@ -530,7 +521,7 @@ private SingleTaskSlot(
this.parent = Preconditions.checkNotNull(parent);

Preconditions.checkNotNull(locality);
logicalSlotFuture = parent.getSlotContextFuture()
singleLogicalSlotFuture = parent.getSlotContextFuture()
.thenApply(
(SlotContext slotContext) ->
new SingleLogicalSlot(
Expand All @@ -542,29 +533,23 @@ private SingleTaskSlot(
}

CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
return logicalSlotFuture;
return singleLogicalSlotFuture.thenApply(Function.identity());
}

@Override
public boolean release(Throwable cause) {
logicalSlotFuture.completeExceptionally(cause);
public void release(Throwable cause) {
singleLogicalSlotFuture.completeExceptionally(cause);

boolean pendingLogicalSlotRelease = false;

if (logicalSlotFuture.isDone() && !logicalSlotFuture.isCompletedExceptionally()) {
if (singleLogicalSlotFuture.isDone() && !singleLogicalSlotFuture.isCompletedExceptionally()) {
// we have a single task slot which we first have to release
final LogicalSlot logicalSlot = logicalSlotFuture.getNow(null);
final SingleLogicalSlot singleLogicalSlot = singleLogicalSlotFuture.getNow(null);

if ((logicalSlot != null) && (logicalSlot.isAlive())) {
pendingLogicalSlotRelease = logicalSlot.releaseSlot(cause).isDone();
}
}

if (!pendingLogicalSlotRelease) {
parent.releaseChild(getGroupId());
singleLogicalSlot.release(cause);
}

return !pendingLogicalSlotRelease;
parent.releaseChild(getGroupId());
}
}

Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.testutils.category.New;
Expand Down

0 comments on commit c7eb6ac

Please sign in to comment.