Skip to content

Commit

Permalink
[FLINK-1376] [runtime] Add proper shared slot release in case of a fa…
Browse files Browse the repository at this point in the history
…tal TaskManager failure.

Fixes concurrent modification exception of SharedSlot's subSlots field by synchronizing all state changing operations through the associated assignment group. Fixes deadlock where Instance.markDead first acquires InstanceLock and then by releasing the associated slots the assignment group lockcan block with a direct releaseSlot call on a SharedSlot which first acquires the assignment group lock and then the instance lock in order to return the slot to the instance.

Fixes colocation shared slot releasing. A colocation constraint is now realized as a SharedSlot in a SharedSlot where the colocated tasks allocate sub slots.

This cloes #317
  • Loading branch information
tillrohrmann authored and StephanEwen committed Feb 5, 2015
1 parent 9d181a8 commit db1b8b9
Show file tree
Hide file tree
Showing 38 changed files with 1,333 additions and 953 deletions.
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionEdge; import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.RemoteAddress; import org.apache.flink.runtime.io.network.RemoteAddress;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;


Expand Down Expand Up @@ -114,7 +114,7 @@ public void read(DataInputView in) throws IOException {


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


public static PartitionInfo fromEdge(ExecutionEdge edge, AllocatedSlot consumerSlot) { public static PartitionInfo fromEdge(ExecutionEdge edge, SimpleSlot consumerSlot) {
IntermediateResultPartition partition = edge.getSource(); IntermediateResultPartition partition = edge.getSource();
IntermediateResultPartitionID partitionId = partition.getPartitionId(); IntermediateResultPartitionID partitionId = partition.getPartitionId();


Expand All @@ -125,7 +125,7 @@ public static PartitionInfo fromEdge(ExecutionEdge edge, AllocatedSlot consumerS
RemoteAddress producerAddress = null; RemoteAddress producerAddress = null;
PartitionLocation producerLocation = PartitionLocation.UNKNOWN; PartitionLocation producerLocation = PartitionLocation.UNKNOWN;


AllocatedSlot producerSlot = producer.getAssignedResource(); SimpleSlot producerSlot = producer.getAssignedResource();
ExecutionState producerState = producer.getState(); ExecutionState producerState = producer.getState();


// The producer needs to be running, otherwise the consumer might request a partition, // The producer needs to be running, otherwise the consumer might request a partition,
Expand All @@ -145,7 +145,7 @@ public static PartitionInfo fromEdge(ExecutionEdge edge, AllocatedSlot consumerS
return new PartitionInfo(partitionId, producerExecutionId, producerLocation, producerAddress); return new PartitionInfo(partitionId, producerExecutionId, producerLocation, producerAddress);
} }


public static PartitionInfo[] fromEdges(ExecutionEdge[] edges, AllocatedSlot consumerSlot) { public static PartitionInfo[] fromEdges(ExecutionEdge[] edges, SimpleSlot consumerSlot) {
// Every edge consumes a different result partition, which might be of // Every edge consumes a different result partition, which might be of
// local, remote, or unknown location. // local, remote, or unknown location.
PartitionInfo[] partitions = new PartitionInfo[edges.length]; PartitionInfo[] partitions = new PartitionInfo[edges.length];
Expand Down
Expand Up @@ -101,8 +101,8 @@ public class Execution implements Serializable {




private volatile ExecutionState state = CREATED; private volatile ExecutionState state = CREATED;

private volatile AllocatedSlot assignedResource; // once assigned, never changes private volatile SimpleSlot assignedResource; // once assigned, never changes


private volatile Throwable failureCause; // once assigned, never changes private volatile Throwable failureCause; // once assigned, never changes


Expand Down Expand Up @@ -141,7 +141,7 @@ public ExecutionState getState() {
return state; return state;
} }


public AllocatedSlot getAssignedResource() { public SimpleSlot getAssignedResource() {
return assignedResource; return assignedResource;
} }


Expand Down Expand Up @@ -185,7 +185,7 @@ public boolean scheduleForExecution(Scheduler scheduler, boolean queued) throws


// sanity check // sanity check
if (locationConstraint != null && sharingGroup == null) { if (locationConstraint != null && sharingGroup == null) {
throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing allowed."); throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing not allowed.");
} }


if (transitionState(CREATED, SCHEDULED)) { if (transitionState(CREATED, SCHEDULED)) {
Expand All @@ -201,7 +201,7 @@ public boolean scheduleForExecution(Scheduler scheduler, boolean queued) throws


future.setFutureAction(new SlotAllocationFutureAction() { future.setFutureAction(new SlotAllocationFutureAction() {
@Override @Override
public void slotAllocated(AllocatedSlot slot) { public void slotAllocated(SimpleSlot slot) {
try { try {
deployToSlot(slot); deployToSlot(slot);
} }
Expand All @@ -216,7 +216,7 @@ public void slotAllocated(AllocatedSlot slot) {
}); });
} }
else { else {
AllocatedSlot slot = scheduler.scheduleImmediately(toSchedule); SimpleSlot slot = scheduler.scheduleImmediately(toSchedule);
try { try {
deployToSlot(slot); deployToSlot(slot);
} }
Expand All @@ -237,7 +237,7 @@ public void slotAllocated(AllocatedSlot slot) {
} }
} }


public void deployToSlot(final AllocatedSlot slot) throws JobException { public void deployToSlot(final SimpleSlot slot) throws JobException {
// sanity checks // sanity checks
if (slot == null) { if (slot == null) {
throw new NullPointerException(); throw new NullPointerException();
Expand Down Expand Up @@ -406,7 +406,7 @@ boolean scheduleOrUpdateConsumers(List<List<ExecutionEdge>> consumers) throws Ex
} }
} }
else if (consumerState == RUNNING) { else if (consumerState == RUNNING) {
AllocatedSlot consumerSlot = consumerVertex.getCurrentAssignedResource(); SimpleSlot consumerSlot = consumerVertex.getCurrentAssignedResource();
ExecutionAttemptID consumerExecutionId = consumerVertex.getCurrentExecutionAttempt().getAttemptId(); ExecutionAttemptID consumerExecutionId = consumerVertex.getCurrentExecutionAttempt().getAttemptId();


PartitionInfo partitionInfo = PartitionInfo.fromEdge(edge, consumerSlot); PartitionInfo partitionInfo = PartitionInfo.fromEdge(edge, consumerSlot);
Expand Down Expand Up @@ -635,7 +635,7 @@ else if (currentState == CANCELING || currentState == FAILED) {
} }


private void sendCancelRpcCall() { private void sendCancelRpcCall() {
final AllocatedSlot slot = this.assignedResource; final SimpleSlot slot = this.assignedResource;
if (slot == null) { if (slot == null) {
return; return;
} }
Expand All @@ -662,7 +662,7 @@ public void onComplete(Throwable failure, Object success) throws Throwable {
} }


private void sendFailIntermediateResultPartitionsRPCCall() { private void sendFailIntermediateResultPartitionsRPCCall() {
final AllocatedSlot slot = this.assignedResource; final SimpleSlot slot = this.assignedResource;
if (slot == null) { if (slot == null) {
return; return;
} }
Expand All @@ -680,7 +680,7 @@ private void sendFailIntermediateResultPartitionsRPCCall() {
} }
} }


private boolean sendUpdateTaskRpcCall(final AllocatedSlot consumerSlot, final ExecutionAttemptID executionId, final IntermediateDataSetID resultId, final PartitionInfo partitionInfo) throws Exception { private boolean sendUpdateTaskRpcCall(final SimpleSlot consumerSlot, final ExecutionAttemptID executionId, final IntermediateDataSetID resultId, final PartitionInfo partitionInfo) throws Exception {
final Instance instance = consumerSlot.getInstance(); final Instance instance = consumerSlot.getInstance();


final TaskManagerMessages.TaskOperationResult result = AkkaUtils.ask( final TaskManagerMessages.TaskOperationResult result = AkkaUtils.ask(
Expand Down
Expand Up @@ -18,14 +18,14 @@


package org.apache.flink.runtime.executiongraph; package org.apache.flink.runtime.executiongraph;


import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor; import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartitionInfo; import org.apache.flink.runtime.deployment.PartitionInfo;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
Expand Down Expand Up @@ -178,7 +178,7 @@ public Throwable getFailureCause() {
return currentExecution.getFailureCause(); return currentExecution.getFailureCause();
} }


public AllocatedSlot getCurrentAssignedResource() { public SimpleSlot getCurrentAssignedResource() {
return currentExecution.getAssignedResource(); return currentExecution.getAssignedResource();
} }


Expand Down Expand Up @@ -304,7 +304,7 @@ public Iterable<Instance> getPreferredLocations() {
ExecutionEdge[] sources = inputEdges[i]; ExecutionEdge[] sources = inputEdges[i];
if (sources != null) { if (sources != null) {
for (int k = 0; k < sources.length; k++) { for (int k = 0; k < sources.length; k++) {
AllocatedSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource(); SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource();
if (sourceSlot != null) { if (sourceSlot != null) {
locations.add(sourceSlot.getInstance()); locations.add(sourceSlot.getInstance());
if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) { if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
Expand Down Expand Up @@ -346,7 +346,7 @@ public boolean scheduleForExecution(Scheduler scheduler, boolean queued) throws
return this.currentExecution.scheduleForExecution(scheduler, queued); return this.currentExecution.scheduleForExecution(scheduler, queued);
} }


public void deployToSlot(AllocatedSlot slot) throws JobException { public void deployToSlot(SimpleSlot slot) throws JobException {
this.currentExecution.deployToSlot(slot); this.currentExecution.deployToSlot(slot);
} }


Expand Down Expand Up @@ -397,7 +397,7 @@ void notifyStateTransition(ExecutionAttemptID executionId, ExecutionState newSta
getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, executionId, newState, error); getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, executionId, newState, error);
} }


TaskDeploymentDescriptor createDeploymentDescriptor(ExecutionAttemptID executionId, AllocatedSlot slot) { TaskDeploymentDescriptor createDeploymentDescriptor(ExecutionAttemptID executionId, SimpleSlot slot) {
// Produced intermediate results // Produced intermediate results
List<PartitionDeploymentDescriptor> producedPartitions = new ArrayList<PartitionDeploymentDescriptor>(resultPartitions.length); List<PartitionDeploymentDescriptor> producedPartitions = new ArrayList<PartitionDeploymentDescriptor>(resultPartitions.length);


Expand Down

This file was deleted.

0 comments on commit db1b8b9

Please sign in to comment.