Skip to content

Commit

Permalink
[FLINK-8078] Introduce LogicalSlot interface
Browse files Browse the repository at this point in the history
The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.

This closes #5086.
  • Loading branch information
tillrohrmann committed Dec 13, 2017
1 parent 0d55164 commit bb9c64b
Show file tree
Hide file tree
Showing 28 changed files with 614 additions and 437 deletions.
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.runtime.executiongraph.ExecutionEdge; import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraphException; import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
Expand Down Expand Up @@ -85,10 +85,9 @@ public String toString() {
*/ */
public static InputChannelDeploymentDescriptor[] fromEdges( public static InputChannelDeploymentDescriptor[] fromEdges(
ExecutionEdge[] edges, ExecutionEdge[] edges,
SimpleSlot consumerSlot, ResourceID consumerResourceId,
boolean allowLazyDeployment) throws ExecutionGraphException { boolean allowLazyDeployment) throws ExecutionGraphException {


final ResourceID consumerTaskManager = consumerSlot.getTaskManagerID();
final InputChannelDeploymentDescriptor[] icdd = new InputChannelDeploymentDescriptor[edges.length]; final InputChannelDeploymentDescriptor[] icdd = new InputChannelDeploymentDescriptor[edges.length];


// Each edge is connected to a different result partition // Each edge is connected to a different result partition
Expand All @@ -97,7 +96,7 @@ public static InputChannelDeploymentDescriptor[] fromEdges(
final Execution producer = consumedPartition.getProducer().getCurrentExecutionAttempt(); final Execution producer = consumedPartition.getProducer().getCurrentExecutionAttempt();


final ExecutionState producerState = producer.getState(); final ExecutionState producerState = producer.getState();
final SimpleSlot producerSlot = producer.getAssignedResource(); final LogicalSlot producerSlot = producer.getAssignedResource();


final ResultPartitionLocation partitionLocation; final ResultPartitionLocation partitionLocation;


Expand All @@ -111,7 +110,7 @@ public static InputChannelDeploymentDescriptor[] fromEdges(
final TaskManagerLocation partitionTaskManagerLocation = producerSlot.getTaskManagerLocation(); final TaskManagerLocation partitionTaskManagerLocation = producerSlot.getTaskManagerLocation();
final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID(); final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID();


if (partitionTaskManager.equals(consumerTaskManager)) { if (partitionTaskManager.equals(consumerResourceId)) {
// Consuming task is deployed to the same TaskManager as the partition => local // Consuming task is deployed to the same TaskManager as the partition => local
partitionLocation = ResultPartitionLocation.createLocal(); partitionLocation = ResultPartitionLocation.createLocal();
} }
Expand Down
Expand Up @@ -33,7 +33,7 @@
import org.apache.flink.runtime.deployment.ResultPartitionLocation; import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
Expand Down Expand Up @@ -98,9 +98,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER = private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state"); AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");


private static final AtomicReferenceFieldUpdater<Execution, SimpleSlot> ASSIGNED_SLOT_UPDATER = AtomicReferenceFieldUpdater.newUpdater( private static final AtomicReferenceFieldUpdater<Execution, LogicalSlot> ASSIGNED_SLOT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
Execution.class, Execution.class,
SimpleSlot.class, LogicalSlot.class,
"assignedResource"); "assignedResource");


private static final Logger LOG = ExecutionGraph.LOG; private static final Logger LOG = ExecutionGraph.LOG;
Expand Down Expand Up @@ -141,7 +141,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution


private volatile ExecutionState state = CREATED; private volatile ExecutionState state = CREATED;


private volatile SimpleSlot assignedResource; private volatile LogicalSlot assignedResource;


private volatile Throwable failureCause; // once assigned, never changes private volatile Throwable failureCause; // once assigned, never changes


Expand Down Expand Up @@ -240,29 +240,29 @@ public CompletableFuture<TaskManagerLocation> getTaskManagerLocationFuture() {
return taskManagerLocationFuture; return taskManagerLocationFuture;
} }


public SimpleSlot getAssignedResource() { public LogicalSlot getAssignedResource() {
return assignedResource; return assignedResource;
} }


/** /**
* Tries to assign the given slot to the execution. The assignment works only if the * Tries to assign the given slot to the execution. The assignment works only if the
* Execution is in state SCHEDULED. Returns true, if the resource could be assigned. * Execution is in state SCHEDULED. Returns true, if the resource could be assigned.
* *
* @param slot to assign to this execution * @param logicalSlot to assign to this execution
* @return true if the slot could be assigned to the execution, otherwise false * @return true if the slot could be assigned to the execution, otherwise false
*/ */
@VisibleForTesting @VisibleForTesting
boolean tryAssignResource(final SimpleSlot slot) { boolean tryAssignResource(final LogicalSlot logicalSlot) {
checkNotNull(slot); checkNotNull(logicalSlot);


// only allow to set the assigned resource in state SCHEDULED or CREATED // only allow to set the assigned resource in state SCHEDULED or CREATED
// note: we also accept resource assignment when being in state CREATED for testing purposes // note: we also accept resource assignment when being in state CREATED for testing purposes
if (state == SCHEDULED || state == CREATED) { if (state == SCHEDULED || state == CREATED) {
if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, slot)) { if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, logicalSlot)) {
// check for concurrent modification (e.g. cancelling call) // check for concurrent modification (e.g. cancelling call)
if (state == SCHEDULED || state == CREATED) { if (state == SCHEDULED || state == CREATED) {
checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet."); checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet.");
taskManagerLocationFuture.complete(slot.getTaskManagerLocation()); taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation());


return true; return true;
} else { } else {
Expand All @@ -283,7 +283,7 @@ boolean tryAssignResource(final SimpleSlot slot) {
@Override @Override
public TaskManagerLocation getAssignedResourceLocation() { public TaskManagerLocation getAssignedResourceLocation() {
// returns non-null only when a location is already assigned // returns non-null only when a location is already assigned
final SimpleSlot currentAssignedResource = assignedResource; final LogicalSlot currentAssignedResource = assignedResource;
return currentAssignedResource != null ? currentAssignedResource.getTaskManagerLocation() : null; return currentAssignedResource != null ? currentAssignedResource.getTaskManagerLocation() : null;
} }


Expand Down Expand Up @@ -442,14 +442,14 @@ public CompletableFuture<Execution> allocateAndAssignSlotForExecution(
queued, queued,
preferredLocations)) preferredLocations))
.thenApply( .thenApply(
(SimpleSlot slot) -> { (LogicalSlot logicalSlot) -> {
if (tryAssignResource(slot)) { if (tryAssignResource(logicalSlot)) {
return this; return this;
} else { } else {
// release the slot // release the slot
slot.releaseSlot(); logicalSlot.releaseSlot();


throw new CompletionException(new FlinkException("Could not assign slot " + slot + " to execution " + this + " because it has already been assigned ")); throw new CompletionException(new FlinkException("Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned "));
} }
}); });
} }
Expand All @@ -465,7 +465,7 @@ public CompletableFuture<Execution> allocateAndAssignSlotForExecution(
* @throws JobException if the execution cannot be deployed to the assigned resource * @throws JobException if the execution cannot be deployed to the assigned resource
*/ */
public void deploy() throws JobException { public void deploy() throws JobException {
final SimpleSlot slot = assignedResource; final LogicalSlot slot = assignedResource;


checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource."); checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");


Expand Down Expand Up @@ -493,7 +493,7 @@ public void deploy() throws JobException {


try { try {
// good, we are allowed to deploy // good, we are allowed to deploy
if (!slot.setExecutedVertex(this)) { if (!slot.setExecution(this)) {
throw new JobException("Could not assign the ExecutionVertex to the slot " + slot); throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
} }


Expand Down Expand Up @@ -545,7 +545,7 @@ public void deploy() throws JobException {
* Sends stop RPC call. * Sends stop RPC call.
*/ */
public void stop() { public void stop() {
final SimpleSlot slot = assignedResource; final LogicalSlot slot = assignedResource;


if (slot != null) { if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
Expand Down Expand Up @@ -608,7 +608,7 @@ else if (current == CREATED || current == SCHEDULED) {
try { try {
vertex.getExecutionGraph().deregisterExecution(this); vertex.getExecutionGraph().deregisterExecution(this);


final SimpleSlot slot = assignedResource; final LogicalSlot slot = assignedResource;


if (slot != null) { if (slot != null) {
slot.releaseSlot(); slot.releaseSlot();
Expand Down Expand Up @@ -691,7 +691,7 @@ else if (numConsumers == 0) {
// ---------------------------------------------------------------- // ----------------------------------------------------------------
else { else {
if (consumerState == RUNNING) { if (consumerState == RUNNING) {
final SimpleSlot consumerSlot = consumer.getAssignedResource(); final LogicalSlot consumerSlot = consumer.getAssignedResource();


if (consumerSlot == null) { if (consumerSlot == null) {
// The consumer has been reset concurrently // The consumer has been reset concurrently
Expand All @@ -702,7 +702,7 @@ else if (numConsumers == 0) {
.getCurrentAssignedResource().getTaskManagerLocation(); .getCurrentAssignedResource().getTaskManagerLocation();
final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID(); final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID();


final ResourceID consumerTaskManager = consumerSlot.getTaskManagerID(); final ResourceID consumerTaskManager = consumerSlot.getTaskManagerLocation().getResourceID();


final ResultPartitionID partitionId = new ResultPartitionID(partition.getPartitionId(), attemptId); final ResultPartitionID partitionId = new ResultPartitionID(partition.getPartitionId(), attemptId);


Expand Down Expand Up @@ -778,7 +778,7 @@ public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
int maxStrackTraceDepth, int maxStrackTraceDepth,
Time timeout) { Time timeout) {


final SimpleSlot slot = assignedResource; final LogicalSlot slot = assignedResource;


if (slot != null) { if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
Expand All @@ -802,7 +802,7 @@ public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
* @param timestamp of the completed checkpoint * @param timestamp of the completed checkpoint
*/ */
public void notifyCheckpointComplete(long checkpointId, long timestamp) { public void notifyCheckpointComplete(long checkpointId, long timestamp) {
final SimpleSlot slot = assignedResource; final LogicalSlot slot = assignedResource;


if (slot != null) { if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
Expand All @@ -822,7 +822,7 @@ public void notifyCheckpointComplete(long checkpointId, long timestamp) {
* @param checkpointOptions of the checkpoint to trigger * @param checkpointOptions of the checkpoint to trigger
*/ */
public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) { public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
final SimpleSlot slot = assignedResource; final LogicalSlot slot = assignedResource;


if (slot != null) { if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
Expand Down Expand Up @@ -880,7 +880,7 @@ void markFinished(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics met


updateAccumulatorsAndMetrics(userAccumulators, metrics); updateAccumulatorsAndMetrics(userAccumulators, metrics);


final SimpleSlot slot = assignedResource; final LogicalSlot slot = assignedResource;


if (slot != null) { if (slot != null) {
slot.releaseSlot(); slot.releaseSlot();
Expand Down Expand Up @@ -938,7 +938,7 @@ else if (current == CANCELING || current == RUNNING || current == DEPLOYING) {


if (transitionState(current, CANCELED)) { if (transitionState(current, CANCELED)) {
try { try {
final SimpleSlot slot = assignedResource; final LogicalSlot slot = assignedResource;


if (slot != null) { if (slot != null) {
slot.releaseSlot(); slot.releaseSlot();
Expand Down Expand Up @@ -1035,7 +1035,7 @@ private boolean processFail(Throwable t, boolean isCallback, Map<String, Accumul
updateAccumulatorsAndMetrics(userAccumulators, metrics); updateAccumulatorsAndMetrics(userAccumulators, metrics);


try { try {
final SimpleSlot slot = assignedResource; final LogicalSlot slot = assignedResource;
if (slot != null) { if (slot != null) {
slot.releaseSlot(); slot.releaseSlot();
} }
Expand Down Expand Up @@ -1119,7 +1119,7 @@ else if (currentState == CANCELING || currentState == FAILED) {
* The sending is tried up to NUM_CANCEL_CALL_TRIES times. * The sending is tried up to NUM_CANCEL_CALL_TRIES times.
*/ */
private void sendCancelRpcCall() { private void sendCancelRpcCall() {
final SimpleSlot slot = assignedResource; final LogicalSlot slot = assignedResource;


if (slot != null) { if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
Expand All @@ -1140,7 +1140,7 @@ private void sendCancelRpcCall() {
} }


private void sendFailIntermediateResultPartitionsRpcCall() { private void sendFailIntermediateResultPartitionsRpcCall() {
final SimpleSlot slot = assignedResource; final LogicalSlot slot = assignedResource;


if (slot != null) { if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
Expand All @@ -1158,7 +1158,7 @@ private void sendFailIntermediateResultPartitionsRpcCall() {
private void sendUpdatePartitionInfoRpcCall( private void sendUpdatePartitionInfoRpcCall(
final Iterable<PartitionInfo> partitionInfos) { final Iterable<PartitionInfo> partitionInfos) {


final SimpleSlot slot = assignedResource; final LogicalSlot slot = assignedResource;


if (slot != null) { if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
Expand Down Expand Up @@ -1318,7 +1318,7 @@ private void updateAccumulatorsAndMetrics(Map<String, Accumulator<?, ?>> userAcc


@Override @Override
public String toString() { public String toString() {
final SimpleSlot slot = assignedResource; final LogicalSlot slot = assignedResource;


return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getTaskNameWithSubtaskIndex(), return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getTaskNameWithSubtaskIndex(),
(slot == null ? "(unassigned)" : slot), state); (slot == null ? "(unassigned)" : slot), state);
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
Expand Down Expand Up @@ -272,7 +273,7 @@ public CompletableFuture<TaskManagerLocation> getCurrentTaskManagerLocationFutur
return currentExecution.getTaskManagerLocationFuture(); return currentExecution.getTaskManagerLocationFuture();
} }


public SimpleSlot getCurrentAssignedResource() { public LogicalSlot getCurrentAssignedResource() {
return currentExecution.getAssignedResource(); return currentExecution.getAssignedResource();
} }


Expand Down Expand Up @@ -744,7 +745,7 @@ void notifyStateTransition(Execution execution, ExecutionState newState, Throwab
*/ */
TaskDeploymentDescriptor createDeploymentDescriptor( TaskDeploymentDescriptor createDeploymentDescriptor(
ExecutionAttemptID executionId, ExecutionAttemptID executionId,
SimpleSlot targetSlot, LogicalSlot targetSlot,
TaskStateSnapshot taskStateHandles, TaskStateSnapshot taskStateHandles,
int attemptNumber) throws ExecutionGraphException { int attemptNumber) throws ExecutionGraphException {


Expand Down Expand Up @@ -779,8 +780,10 @@ TaskDeploymentDescriptor createDeploymentDescriptor(




for (ExecutionEdge[] edges : inputEdges) { for (ExecutionEdge[] edges : inputEdges) {
InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor.fromEdges(
.fromEdges(edges, targetSlot, lazyScheduling); edges,
targetSlot.getTaskManagerLocation().getResourceID(),
lazyScheduling);


// If the produced partition has multiple consumers registered, we // If the produced partition has multiple consumers registered, we
// need to request the one matching our sub task index. // need to request the one matching our sub task index.
Expand Down Expand Up @@ -829,10 +832,10 @@ TaskDeploymentDescriptor createDeploymentDescriptor(
serializedJobInformation, serializedJobInformation,
serializedTaskInformation, serializedTaskInformation,
executionId, executionId,
targetSlot.getAllocatedSlot().getSlotAllocationId(), targetSlot.getAllocationId(),
subTaskIndex, subTaskIndex,
attemptNumber, attemptNumber,
targetSlot.getRoot().getSlotNumber(), targetSlot.getPhysicalSlotNumber(),
taskStateHandles, taskStateHandles,
producedPartitions, producedPartitions,
consumedPartitions); consumedPartitions);
Expand Down

0 comments on commit bb9c64b

Please sign in to comment.