From 1c11436f85697333b900830d4cb7e74cc1059f48 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Fri, 26 Feb 2016 17:12:05 +0100 Subject: [PATCH] [FLINK-3443] [runtime] Prevent cancelled jobs from restarting --- .../executiongraph/ExecutionGraph.java | 3 +- .../ExecutionGraphRestartTest.java | 49 +++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 9a6eb8542f649..308781f97e7fc 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -809,7 +809,8 @@ public void stop() throws StoppingException { public void fail(Throwable t) { while (true) { JobStatus current = state; - if (current == JobStatus.FAILED || current == JobStatus.FAILING) { + if (current == JobStatus.FAILED || current == JobStatus.FAILING + || current == JobStatus.CANCELED || current == JobStatus.CANCELLING) { return; } else if (transitionState(current, JobStatus.FAILING, t)) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 925b574e3e411..e9212bd853c61 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.execution.ExecutionState; @@ -490,6 +491,54 @@ public void testFailingExecutionAfterRestart() throws Exception { assertEquals(JobStatus.FINISHED, eg.getState()); } + /** + * Tests that a graph is not restarted after cancellation via a call to + * {@link ExecutionGraph#fail(Throwable)}. This can happen when a slot is + * released concurrently with cancellation. + */ + @Test + public void testNoRestartAfterCancel() throws Exception { + Instance instance = ExecutionGraphTestUtils.getInstance( + new SimpleActorGateway(TestingUtils.directExecutionContext()), + 2); + + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + scheduler.newInstanceAvailable(instance); + + JobVertex vertex = new JobVertex("Test Vertex"); + vertex.setInvokableClass(Tasks.NoOpInvokable.class); + vertex.setParallelism(1); + + JobGraph jobGraph = new JobGraph("Test Job", vertex); + jobGraph.setRestartStrategyConfiguration(RestartStrategies.fixedDelayRestart( + Integer.MAX_VALUE, Integer.MAX_VALUE)); + + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + new JobID(), + "test job", + new Configuration(), + AkkaUtils.getDefaultTimeout(), + new FixedDelayRestartStrategy(1, 1000)); + + eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + + assertEquals(JobStatus.CREATED, eg.getState()); + + eg.scheduleForExecution(scheduler); + assertEquals(JobStatus.RUNNING, eg.getState()); + + // Fail right after cancel (for example with concurrent slot release) + eg.cancel(); + eg.fail(new Exception("Test Exception")); + assertEquals(JobStatus.CANCELLING, eg.getState()); + + Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt(); + + execution.cancelingComplete(); + assertEquals(JobStatus.CANCELED, eg.getState()); + } + private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException { eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));