Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public void slotAllocated(SimpleSlot slot) {
return true;
}
else {
// call race, already deployed
// call race, already deployed, or already done
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,8 @@ public class ExecutionGraph implements Serializable {
* that was not recoverable and triggered job failure */
private volatile Throwable failureCause;

/** The position of the vertex that is next expected to finish.
* This is an index into the "verticesInCreationOrder" collection.
* Once this value has reached the number of vertices, the job is done. */
private volatile int nextVertexToFinish;
/** The number of job vertices that have reached a terminal state */
private volatile int numFinishedJobVertices;


// ------ Fields that are relevant to the execution and need to be cleared before archiving -------
Expand All @@ -200,7 +198,7 @@ public class ExecutionGraph implements Serializable {
private CheckpointCoordinator checkpointCoordinator;

// ------ Fields that are only relevant for archived execution graphs ------------
private ExecutionConfig executionConfig = null;
private ExecutionConfig executionConfig;

// --------------------------------------------------------------------------------------------
// Constructors
Expand Down Expand Up @@ -598,7 +596,7 @@ public void restart() {
for (int i = 0; i < stateTimestamps.length; i++) {
stateTimestamps[i] = 0;
}
nextVertexToFinish = 0;
numFinishedJobVertices = 0;
transitionState(JobStatus.RESTARTING, JobStatus.CREATED);

// if we have checkpointed state, reload it into the executions
Expand Down Expand Up @@ -653,7 +651,7 @@ public ExecutionConfig getExecutionConfig() {
*/
public void waitUntilFinished() throws InterruptedException {
synchronized (progressLock) {
while (nextVertexToFinish < verticesInCreationOrder.size()) {
while (!state.isTerminalState()) {
progressLock.wait();
}
}
Expand All @@ -680,76 +678,62 @@ private boolean transitionState(JobStatus current, JobStatus newState, Throwable

void jobVertexInFinalState(ExecutionJobVertex ev) {
synchronized (progressLock) {
int nextPos = nextVertexToFinish;
if (nextPos >= verticesInCreationOrder.size()) {
// already done, and we still get a report?
// this can happen when:
// - two job vertices finish almost simultaneously
// - The first one advances the position for the second as well (second is in final state)
// - the second (after it could grab the lock) tries to advance the position again
return;
if (numFinishedJobVertices >= verticesInCreationOrder.size()) {
throw new IllegalStateException("All vertices are already finished, cannot transition vertex to finished.");
}

numFinishedJobVertices++;

// see if we are the next to finish and then progress until the next unfinished one
if (verticesInCreationOrder.get(nextPos) == ev) {
do {
nextPos++;
}
while (nextPos < verticesInCreationOrder.size() && verticesInCreationOrder.get(nextPos).isInFinalState());
if (numFinishedJobVertices == verticesInCreationOrder.size()) {

nextVertexToFinish = nextPos;

if (nextPos == verticesInCreationOrder.size()) {
// we are done, transition to the final state
JobStatus current;
while (true) {
current = this.state;

// we are done, transition to the final state
JobStatus current;
while (true) {
current = this.state;

if (current == JobStatus.RUNNING) {
if (transitionState(current, JobStatus.FINISHED)) {
postRunCleanup();
break;
}
if (current == JobStatus.RUNNING) {
if (transitionState(current, JobStatus.FINISHED)) {
postRunCleanup();
break;
}
else if (current == JobStatus.CANCELLING) {
if (transitionState(current, JobStatus.CANCELED)) {
postRunCleanup();
break;
}
}
else if (current == JobStatus.CANCELLING) {
if (transitionState(current, JobStatus.CANCELED)) {
postRunCleanup();
break;
}
else if (current == JobStatus.FAILING) {
if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING)) {
numberOfRetriesLeft--;
future(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
Thread.sleep(delayBeforeRetrying);
}
catch(InterruptedException e){
// should only happen on shutdown
}
restart();
return null;
}
else if (current == JobStatus.FAILING) {
if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING)) {
numberOfRetriesLeft--;
future(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
Thread.sleep(delayBeforeRetrying);
}
catch(InterruptedException e){
// should only happen on shutdown
}
}, AkkaUtils.globalExecutionContext());
break;
}
else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) {
postRunCleanup();
break;
}
restart();
return null;
}
}, AkkaUtils.globalExecutionContext());
break;
}
else {
fail(new Exception("ExecutionGraph went into final state from state " + current));
else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) {
postRunCleanup();
break;
}
}
// done transitioning the state

// also, notify waiters
progressLock.notifyAll();
else {
fail(new Exception("ExecutionGraph went into final state from state " + current));
}
}
// done transitioning the state

// also, notify waiters
progressLock.notifyAll();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ private List<LocatableInputSplit>[] computeLocalInputSplitsPerTask(InputSplit[]
for (InputSplit split : splits) {
// check that split has exactly one local host
if(!(split instanceof LocatableInputSplit)) {
new JobException("Invalid InputSplit type " + split.getClass().getCanonicalName() + ". " +
throw new JobException("Invalid InputSplit type " + split.getClass().getCanonicalName() + ". " +
"Strictly local assignment requires LocatableInputSplit");
}
LocatableInputSplit lis = (LocatableInputSplit) split;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.executiongraph;

import akka.actor.ActorRef;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;

import org.junit.Test;

import scala.concurrent.duration.FiniteDuration;

import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.*;

public class TerminalStateDeadlockTest {

private final Field stateField;
private final Field resourceField;
private final Field execGraphStateField;
private final Field execGraphSchedulerField;

private final SimpleSlot resource;


public TerminalStateDeadlockTest() {
try {
// the reflection fields to access the private fields
this.stateField = Execution.class.getDeclaredField("state");
this.stateField.setAccessible(true);

this.resourceField = Execution.class.getDeclaredField("assignedResource");
this.resourceField.setAccessible(true);

this.execGraphStateField = ExecutionGraph.class.getDeclaredField("state");
this.execGraphStateField.setAccessible(true);

this.execGraphSchedulerField = ExecutionGraph.class.getDeclaredField("scheduler");
this.execGraphSchedulerField.setAccessible(true);

// the dummy resource
InetAddress address = InetAddress.getByName("127.0.0.1");
InstanceConnectionInfo ci = new InstanceConnectionInfo(address, 12345);

HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000);
Instance instance = new Instance(ActorRef.noSender(), ci, new InstanceID(), resources, 4);

this.resource = instance.allocateSimpleSlot(new JobID());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());

// silence the compiler
throw new RuntimeException();
}
}



// ------------------------------------------------------------------------

@Test
public void testProvokeDeadlock() {
try {
final JobID jobId = resource.getJobID();
final JobVertexID vid1 = new JobVertexID();
final JobVertexID vid2 = new JobVertexID();


final Configuration jobConfig = new Configuration();

final List<AbstractJobVertex> vertices;
{
AbstractJobVertex v1 = new AbstractJobVertex("v1", vid1);
AbstractJobVertex v2 = new AbstractJobVertex("v2", vid2);
v1.setParallelism(1);
v2.setParallelism(1);
v1.setInvokableClass(DummyInvokable.class);
v2.setInvokableClass(DummyInvokable.class);
vertices = Arrays.asList(v1, v2);
}

final Scheduler scheduler = new Scheduler();

final Executor executor = Executors.newFixedThreadPool(4);

// try a lot!
for (int i = 0; i < 20000; i++) {
final TestExecGraph eg = new TestExecGraph(jobId);
eg.attachJobGraph(vertices);
eg.setDelayBeforeRetrying(0);
eg.setNumberOfRetriesLeft(1);

final Execution e1 = eg.getJobVertex(vid1).getTaskVertices()[0].getCurrentExecutionAttempt();
final Execution e2 = eg.getJobVertex(vid2).getTaskVertices()[0].getCurrentExecutionAttempt();

initializeExecution(e1);
initializeExecution(e2);

execGraphStateField.set(eg, JobStatus.FAILING);
execGraphSchedulerField.set(eg, scheduler);

Runnable r1 = new Runnable() {
@Override
public void run() {
e1.cancelingComplete();
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
e2.cancelingComplete();
}
};

executor.execute(r1);
executor.execute(r2);

eg.waitTillDone();
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

private void initializeExecution(Execution exec) throws IllegalAccessException {
// set state to canceling
stateField.set(exec, ExecutionState.CANCELING);

// assign a resource
resourceField.set(exec, resource);
}


static class TestExecGraph extends ExecutionGraph {

private static final long serialVersionUID = -7606144898417942044L;

private static final Configuration EMPTY_CONFIG = new Configuration();

private static final FiniteDuration TIMEOUT = new FiniteDuration(30, TimeUnit.SECONDS);

private volatile boolean done;

TestExecGraph(JobID jobId) {
super(jobId, "test graph", EMPTY_CONFIG, TIMEOUT);
}

@Override
public void scheduleForExecution(Scheduler scheduler) {
// notify that we are done with the "restarting"
synchronized (this) {
done = true;
this.notifyAll();
}
}

public void waitTillDone() {
try {
synchronized (this) {
while (!done) {
this.wait();
}
}
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
Loading