From 952793e532cc20f33e97e7692e35ea508e715f1e Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Wed, 16 Dec 2015 14:09:22 +0100 Subject: [PATCH] [FLINK-3050] [runtime] Add UnrecoverableException to suppress job restarts --- .../execution/UnrecoverableException.java | 37 +++++++++++ .../executiongraph/ExecutionGraph.java | 11 +++- .../ExecutionGraphRestartTest.java | 63 ++++++++++++++++++- 3 files changed, 108 insertions(+), 3 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java new file mode 100644 index 0000000000000..5a6cd7e8d6bdc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.execution; + +/** + * Exception thrown on unrecoverable failures. + * + *

This exception acts as a wrapper around the real cause and suppresses + * job restarts. The JobManager will not restart a job, which + * fails with this Exception. + */ +public class UnrecoverableException extends RuntimeException { + + private static final long serialVersionUID = 221873676920848349L; + + public UnrecoverableException(Throwable cause) { + super("Unrecoverable failure. This suppresses job restarts. Please check the " + + "stack trace for the root cause.", cause); + } + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 1e3a4eb121db7..edf4df70f8570 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.execution.UnrecoverableException; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -910,7 +911,11 @@ else if (current == JobStatus.CANCELLING) { } } else if (current == JobStatus.FAILING) { - if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING)) { + boolean isRecoverable = !(failureCause instanceof UnrecoverableException); + + if (isRecoverable && numberOfRetriesLeft > 0 && + transitionState(current, JobStatus.RESTARTING)) { + numberOfRetriesLeft--; if (delayBeforeRetrying > 0) { @@ -939,7 +944,9 @@ public Object call() throws Exception { } break; } - else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) { + else if ((!isRecoverable || numberOfRetriesLeft <= 0) && + transitionState(current, JobStatus.FAILED, failureCause)) { + postRunCleanup(); break; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index a50aa2e5d5267..127ae339b5c08 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.execution.UnrecoverableException; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -37,17 +38,20 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.fail; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; public class ExecutionGraphRestartTest { private final static int NUM_TASKS = 31; @Test - public void testNotRestartManually() throws Exception { + public void testNoManualRestart() throws Exception { Instance instance = ExecutionGraphTestUtils.getInstance( new SimpleActorGateway(TestingUtils.directExecutionContext()), NUM_TASKS); @@ -83,6 +87,7 @@ public void testNotRestartManually() throws Exception { assertEquals(JobStatus.FAILED, eg.getState()); + // This should not restart the graph. eg.restart(); assertEquals(JobStatus.FAILED, eg.getState()); @@ -299,4 +304,60 @@ public void testCancelWhileFailing() throws Exception { assertEquals(JobStatus.CANCELED, executionGraph.getState()); } + + @Test + public void testNoRestartOnUnrecoverableException() throws Exception { + Instance instance = ExecutionGraphTestUtils.getInstance( + new SimpleActorGateway(TestingUtils.directExecutionContext()), + NUM_TASKS); + + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + scheduler.newInstanceAvailable(instance); + + JobVertex sender = new JobVertex("Task"); + sender.setInvokableClass(Tasks.NoOpInvokable.class); + sender.setParallelism(NUM_TASKS); + + JobGraph jobGraph = new JobGraph("Pointwise job", sender); + + ExecutionGraph eg = spy(new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + new JobID(), + "Test job", + new Configuration(), + AkkaUtils.getDefaultTimeout())); + + eg.setNumberOfRetriesLeft(1); + eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + + assertEquals(JobStatus.CREATED, eg.getState()); + + eg.scheduleForExecution(scheduler); + + assertEquals(JobStatus.RUNNING, eg.getState()); + + // Fail with unrecoverable Exception + eg.getAllExecutionVertices().iterator().next().fail( + new UnrecoverableException(new Exception("Test Exception"))); + + assertEquals(JobStatus.FAILING, eg.getState()); + + for (ExecutionVertex vertex : eg.getAllExecutionVertices()) { + vertex.getCurrentExecutionAttempt().cancelingComplete(); + } + + FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES); + + // Wait for async restart + Deadline deadline = timeout.fromNow(); + while (deadline.hasTimeLeft() && eg.getState() != JobStatus.FAILED) { + Thread.sleep(100); + } + + assertEquals(JobStatus.FAILED, eg.getState()); + + // No restart + verify(eg, never()).restart(); + assertEquals(1, eg.getNumberOfRetriesLeft()); + } }