Skip to content

Commit

Permalink
[comments] Suppress resart of savepoint recovery failure
Browse files Browse the repository at this point in the history
  • Loading branch information
uce committed Dec 16, 2015
1 parent f846991 commit 351eaba
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.blob.BlobServer
import org.apache.flink.runtime.checkpoint._
import org.apache.flink.runtime.client._
import org.apache.flink.runtime.execution.UnrecoverableException
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex}
import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
Expand Down Expand Up @@ -1059,7 +1060,12 @@ class JobManager(

// Reset state back to savepoint
if (savepointPath != null) {
executionGraph.restoreSavepoint(savepointPath)
try {
executionGraph.restoreSavepoint(savepointPath)
} catch {
case e: Exception =>
throw new UnrecoverableException(e)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.runtime.checkpoint.SavepointStoreFactory;
import org.apache.flink.runtime.checkpoint.StateForTask;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.UnrecoverableException;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
Expand Down Expand Up @@ -732,7 +733,13 @@ public void testSubmitWithUnknownSavepointPath() throws Exception {

LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");

flink.submitJobAndWait(jobGraph, true);
try {
flink.submitJobAndWait(jobGraph, false);
}
catch (Exception e) {
assertEquals(UnrecoverableException.class, e.getCause().getClass());
assertEquals(IllegalArgumentException.class, e.getCause().getCause().getClass());
}
}
finally {
if (flink != null) {
Expand Down

0 comments on commit 351eaba

Please sign in to comment.