From 3847e304c93a30c18560719d8f169ca424d734e6 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Fri, 12 Feb 2016 22:09:29 +0100 Subject: [PATCH 1/2] [hotfix] Rename UnrecoverableException to SuppressRestartsException --- ...rableException.java => SuppressRestartsException.java} | 6 +++--- .../flink/runtime/executiongraph/ExecutionGraph.java | 8 ++++---- .../org/apache/flink/runtime/jobmanager/JobManager.scala | 6 ++---- .../runtime/executiongraph/ExecutionGraphRestartTest.java | 6 +++--- .../apache/flink/test/checkpointing/SavepointITCase.java | 4 ++-- 5 files changed, 14 insertions(+), 16 deletions(-) rename flink-runtime/src/main/java/org/apache/flink/runtime/execution/{UnrecoverableException.java => SuppressRestartsException.java} (87%) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java similarity index 87% rename from flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java index 5a6cd7e8d6bdc..61a9064ccbbd5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java @@ -19,17 +19,17 @@ package org.apache.flink.runtime.execution; /** - * Exception thrown on unrecoverable failures. + * Exception thrown in order to suppress job restarts. * *

This exception acts as a wrapper around the real cause and suppresses * job restarts. The JobManager will not restart a job, which * fails with this Exception. */ -public class UnrecoverableException extends RuntimeException { +public class SuppressRestartsException extends RuntimeException { private static final long serialVersionUID = 221873676920848349L; - public UnrecoverableException(Throwable cause) { + public SuppressRestartsException(Throwable cause) { super("Unrecoverable failure. This suppresses job restarts. Please check the " + "stack trace for the root cause.", cause); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 7d83ae235a8d6..2c57011c5d863 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -42,7 +42,7 @@ import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.execution.UnrecoverableException; +import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -1037,14 +1037,14 @@ else if (current == JobStatus.CANCELLING) { } } else if (current == JobStatus.FAILING) { - boolean isRecoverable = !(failureCause instanceof UnrecoverableException); + boolean allowRestart = !(failureCause instanceof SuppressRestartsException); - if (isRecoverable && restartStrategy.canRestart() && + if (allowRestart && restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) { restartStrategy.restart(this); break; - } else if ((!isRecoverable || !restartStrategy.canRestart()) && + } else if ((!allowRestart || !restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, failureCause)) { postRunCleanup(); break; diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 1a162250220fa..f4e8486fe5b50 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -38,7 +38,7 @@ import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.checkpoint._ import org.apache.flink.runtime.client._ -import org.apache.flink.runtime.execution.UnrecoverableException +import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.{RestartStrategy, RestartStrategyFactory} import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex} @@ -1120,12 +1120,10 @@ class JobManager( executionGraph.restoreSavepoint(savepointPath) } catch { case e: Exception => - throw new UnrecoverableException(e) + throw new SuppressRestartsException(e) } } } - - submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) } jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID)) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 7cef66a5bb82b..b69ce141cb859 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -22,7 +22,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.execution.UnrecoverableException; +import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; @@ -335,7 +335,7 @@ public void testCancelWhileFailing() throws Exception { } @Test - public void testNoRestartOnUnrecoverableException() throws Exception { + public void testNoRestartOnSuppressException() throws Exception { Instance instance = ExecutionGraphTestUtils.getInstance( new SimpleActorGateway(TestingUtils.directExecutionContext()), NUM_TASKS); @@ -367,7 +367,7 @@ public void testNoRestartOnUnrecoverableException() throws Exception { // Fail with unrecoverable Exception eg.getAllExecutionVertices().iterator().next().fail( - new UnrecoverableException(new Exception("Test Exception"))); + new SuppressRestartsException(new Exception("Test Exception"))); assertEquals(JobStatus.FAILING, eg.getState()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index b89f35a98b14a..c9d0e92b57024 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -36,7 +36,7 @@ import org.apache.flink.runtime.checkpoint.SavepointStoreFactory; import org.apache.flink.runtime.checkpoint.StateForTask; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; -import org.apache.flink.runtime.execution.UnrecoverableException; +import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -736,7 +736,7 @@ public void testSubmitWithUnknownSavepointPath() throws Exception { flink.submitJobAndWait(jobGraph, false); } catch (Exception e) { - assertEquals(UnrecoverableException.class, e.getCause().getClass()); + assertEquals(SuppressRestartsException.class, e.getCause().getClass()); assertEquals(IllegalArgumentException.class, e.getCause().getCause().getClass()); } } From 07753af54db020ddef5540ab0ae3dcbb3af69c54 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Tue, 16 Feb 2016 17:49:13 +0100 Subject: [PATCH 2/2] [FLINK-3396] [runtime] Suppress job restart if adding to job graph store fails A failure to add the job graph to the submitted job graphs in ZooKeeper could lead to a job restart w/o the job graph ever being added to the submitted graphs store. Although the job submission was not ACK'd in this case before, it received job status messages. Now the job will not be ACK'd as before, but the job will be failed. --- .../flink/runtime/jobmanager/JobManager.scala | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index f4e8486fe5b50..0e2710ea51ec5 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -905,7 +905,6 @@ class JobManager( * @param isRecovery Flag indicating whether this is a recovery or initial submission */ private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = { - if (jobGraph == null) { jobInfo.client ! decorateMessage(JobResultFailure( new SerializedThrowable( @@ -1080,8 +1079,7 @@ class JobManager( executionGraph.registerExecutionListener(gateway) executionGraph.registerJobStatusListener(gateway) } - } - catch { + } catch { case t: Throwable => log.error(s"Failed to submit job $jobId ($jobName)", t) @@ -1108,8 +1106,7 @@ class JobManager( try { if (isRecovery) { executionGraph.restoreLatestCheckpointedState() - } - else { + } else { val snapshotSettings = jobGraph.getSnapshotSettings if (snapshotSettings != null) { val savepointPath = snapshotSettings.getSavepointPath() @@ -1124,6 +1121,15 @@ class JobManager( } } } + + try { + submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) + } catch { + case t: Throwable => + // Don't restart the execution if this fails. Otherwise, the + // job graph will skip ZooKeeper in case of HA. + new SuppressRestartsException(t) + } } jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID)) @@ -1150,8 +1156,7 @@ class JobManager( } catch { case t: Throwable => try { executionGraph.fail(t) - } - catch { + } catch { case tt: Throwable => log.error("Error while marking ExecutionGraph as failed.", tt) }