From 2298cfedbf880b3a6065a307224c5f3e9e326a0b Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Fri, 5 Jun 2015 22:39:29 +0200 Subject: [PATCH] [FLINK-2133] [jobmanager] Fix possible deadlock when vertices transition to final state. --- .../runtime/executiongraph/Execution.java | 2 +- .../executiongraph/ExecutionGraph.java | 116 +++++----- .../executiongraph/ExecutionJobVertex.java | 2 +- .../TerminalStateDeadlockTest.java | 209 ++++++++++++++++++ .../operators/testutils/DummyInvokable.java | 2 +- 5 files changed, 262 insertions(+), 69 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 7972d43567c60..76a58e87cd031 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -284,7 +284,7 @@ public void slotAllocated(SimpleSlot slot) { return true; } else { - // call race, already deployed + // call race, already deployed, or already done return false; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 73e827f71be06..705496dfabc77 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -179,10 +179,8 @@ public class ExecutionGraph implements Serializable { * that was not recoverable and triggered job failure */ private volatile Throwable failureCause; - /** The position of the vertex that is next expected to finish. - * This is an index into the "verticesInCreationOrder" collection. - * Once this value has reached the number of vertices, the job is done. */ - private volatile int nextVertexToFinish; + /** The number of job vertices that have reached a terminal state */ + private volatile int numFinishedJobVertices; // ------ Fields that are relevant to the execution and need to be cleared before archiving ------- @@ -200,7 +198,7 @@ public class ExecutionGraph implements Serializable { private CheckpointCoordinator checkpointCoordinator; // ------ Fields that are only relevant for archived execution graphs ------------ - private ExecutionConfig executionConfig = null; + private ExecutionConfig executionConfig; // -------------------------------------------------------------------------------------------- // Constructors @@ -598,7 +596,7 @@ public void restart() { for (int i = 0; i < stateTimestamps.length; i++) { stateTimestamps[i] = 0; } - nextVertexToFinish = 0; + numFinishedJobVertices = 0; transitionState(JobStatus.RESTARTING, JobStatus.CREATED); // if we have checkpointed state, reload it into the executions @@ -653,7 +651,7 @@ public ExecutionConfig getExecutionConfig() { */ public void waitUntilFinished() throws InterruptedException { synchronized (progressLock) { - while (nextVertexToFinish < verticesInCreationOrder.size()) { + while (!state.isTerminalState()) { progressLock.wait(); } } @@ -680,76 +678,62 @@ private boolean transitionState(JobStatus current, JobStatus newState, Throwable void jobVertexInFinalState(ExecutionJobVertex ev) { synchronized (progressLock) { - int nextPos = nextVertexToFinish; - if (nextPos >= verticesInCreationOrder.size()) { - // already done, and we still get a report? - // this can happen when: - // - two job vertices finish almost simultaneously - // - The first one advances the position for the second as well (second is in final state) - // - the second (after it could grab the lock) tries to advance the position again - return; + if (numFinishedJobVertices >= verticesInCreationOrder.size()) { + throw new IllegalStateException("All vertices are already finished, cannot transition vertex to finished."); } + + numFinishedJobVertices++; - // see if we are the next to finish and then progress until the next unfinished one - if (verticesInCreationOrder.get(nextPos) == ev) { - do { - nextPos++; - } - while (nextPos < verticesInCreationOrder.size() && verticesInCreationOrder.get(nextPos).isInFinalState()); + if (numFinishedJobVertices == verticesInCreationOrder.size()) { - nextVertexToFinish = nextPos; - - if (nextPos == verticesInCreationOrder.size()) { + // we are done, transition to the final state + JobStatus current; + while (true) { + current = this.state; - // we are done, transition to the final state - JobStatus current; - while (true) { - current = this.state; - - if (current == JobStatus.RUNNING) { - if (transitionState(current, JobStatus.FINISHED)) { - postRunCleanup(); - break; - } + if (current == JobStatus.RUNNING) { + if (transitionState(current, JobStatus.FINISHED)) { + postRunCleanup(); + break; } - else if (current == JobStatus.CANCELLING) { - if (transitionState(current, JobStatus.CANCELED)) { - postRunCleanup(); - break; - } + } + else if (current == JobStatus.CANCELLING) { + if (transitionState(current, JobStatus.CANCELED)) { + postRunCleanup(); + break; } - else if (current == JobStatus.FAILING) { - if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING)) { - numberOfRetriesLeft--; - future(new Callable() { - @Override - public Object call() throws Exception { - try { - Thread.sleep(delayBeforeRetrying); - } - catch(InterruptedException e){ - // should only happen on shutdown - } - restart(); - return null; + } + else if (current == JobStatus.FAILING) { + if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING)) { + numberOfRetriesLeft--; + future(new Callable() { + @Override + public Object call() throws Exception { + try { + Thread.sleep(delayBeforeRetrying); + } + catch(InterruptedException e){ + // should only happen on shutdown } - }, AkkaUtils.globalExecutionContext()); - break; - } - else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) { - postRunCleanup(); - break; - } + restart(); + return null; + } + }, AkkaUtils.globalExecutionContext()); + break; } - else { - fail(new Exception("ExecutionGraph went into final state from state " + current)); + else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) { + postRunCleanup(); + break; } } - // done transitioning the state - - // also, notify waiters - progressLock.notifyAll(); + else { + fail(new Exception("ExecutionGraph went into final state from state " + current)); + } } + // done transitioning the state + + // also, notify waiters + progressLock.notifyAll(); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 59b3bb659970b..e53bc10479d22 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -474,7 +474,7 @@ private List[] computeLocalInputSplitsPerTask(InputSplit[] for (InputSplit split : splits) { // check that split has exactly one local host if(!(split instanceof LocatableInputSplit)) { - new JobException("Invalid InputSplit type " + split.getClass().getCanonicalName() + ". " + + throw new JobException("Invalid InputSplit type " + split.getClass().getCanonicalName() + ". " + "Strictly local assignment requires LocatableInputSplit"); } LocatableInputSplit lis = (LocatableInputSplit) split; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java new file mode 100644 index 0000000000000..8cba6cabf4c00 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import akka.actor.ActorRef; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.jobgraph.AbstractJobVertex; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; + +import org.junit.Test; + +import scala.concurrent.duration.FiniteDuration; + +import java.lang.reflect.Field; +import java.net.InetAddress; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +public class TerminalStateDeadlockTest { + + private final Field stateField; + private final Field resourceField; + private final Field execGraphStateField; + private final Field execGraphSchedulerField; + + private final SimpleSlot resource; + + + public TerminalStateDeadlockTest() { + try { + // the reflection fields to access the private fields + this.stateField = Execution.class.getDeclaredField("state"); + this.stateField.setAccessible(true); + + this.resourceField = Execution.class.getDeclaredField("assignedResource"); + this.resourceField.setAccessible(true); + + this.execGraphStateField = ExecutionGraph.class.getDeclaredField("state"); + this.execGraphStateField.setAccessible(true); + + this.execGraphSchedulerField = ExecutionGraph.class.getDeclaredField("scheduler"); + this.execGraphSchedulerField.setAccessible(true); + + // the dummy resource + InetAddress address = InetAddress.getByName("127.0.0.1"); + InstanceConnectionInfo ci = new InstanceConnectionInfo(address, 12345); + + HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000); + Instance instance = new Instance(ActorRef.noSender(), ci, new InstanceID(), resources, 4); + + this.resource = instance.allocateSimpleSlot(new JobID()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + + // silence the compiler + throw new RuntimeException(); + } + } + + + + // ------------------------------------------------------------------------ + + @Test + public void testProvokeDeadlock() { + try { + final JobID jobId = resource.getJobID(); + final JobVertexID vid1 = new JobVertexID(); + final JobVertexID vid2 = new JobVertexID(); + + + final Configuration jobConfig = new Configuration(); + + final List vertices; + { + AbstractJobVertex v1 = new AbstractJobVertex("v1", vid1); + AbstractJobVertex v2 = new AbstractJobVertex("v2", vid2); + v1.setParallelism(1); + v2.setParallelism(1); + v1.setInvokableClass(DummyInvokable.class); + v2.setInvokableClass(DummyInvokable.class); + vertices = Arrays.asList(v1, v2); + } + + final Scheduler scheduler = new Scheduler(); + + final Executor executor = Executors.newFixedThreadPool(4); + + // try a lot! + for (int i = 0; i < 20000; i++) { + final TestExecGraph eg = new TestExecGraph(jobId); + eg.attachJobGraph(vertices); + eg.setDelayBeforeRetrying(0); + eg.setNumberOfRetriesLeft(1); + + final Execution e1 = eg.getJobVertex(vid1).getTaskVertices()[0].getCurrentExecutionAttempt(); + final Execution e2 = eg.getJobVertex(vid2).getTaskVertices()[0].getCurrentExecutionAttempt(); + + initializeExecution(e1); + initializeExecution(e2); + + execGraphStateField.set(eg, JobStatus.FAILING); + execGraphSchedulerField.set(eg, scheduler); + + Runnable r1 = new Runnable() { + @Override + public void run() { + e1.cancelingComplete(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + e2.cancelingComplete(); + } + }; + + executor.execute(r1); + executor.execute(r2); + + eg.waitTillDone(); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private void initializeExecution(Execution exec) throws IllegalAccessException { + // set state to canceling + stateField.set(exec, ExecutionState.CANCELING); + + // assign a resource + resourceField.set(exec, resource); + } + + + static class TestExecGraph extends ExecutionGraph { + + private static final long serialVersionUID = -7606144898417942044L; + + private static final Configuration EMPTY_CONFIG = new Configuration(); + + private static final FiniteDuration TIMEOUT = new FiniteDuration(30, TimeUnit.SECONDS); + + private volatile boolean done; + + TestExecGraph(JobID jobId) { + super(jobId, "test graph", EMPTY_CONFIG, TIMEOUT); + } + + @Override + public void scheduleForExecution(Scheduler scheduler) { + // notify that we are done with the "restarting" + synchronized (this) { + done = true; + this.notifyAll(); + } + } + + public void waitTillDone() { + try { + synchronized (this) { + while (!done) { + this.wait(); + } + } + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyInvokable.java index 5e99cf4c90e47..e4b905661a63f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyInvokable.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyInvokable.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.util.InstantiationUtil; /** * An invokable that does nothing. @@ -34,6 +33,7 @@ public void registerInputOutput() {} @Override public void invoke() {} + @Override public ClassLoader getUserCodeClassLoader() { return getClass().getClassLoader(); }