From 7ae30e3b727adb3f499286181d35d5a8d99b4e19 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 29 Nov 2016 17:31:08 +0100 Subject: [PATCH] [FLINK-5193] [jm] Harden job recovery in case of recovery failures When recovering multiple jobs a single recovery failure caused all jobs to be not recovered. This PR changes this behaviour to make the recovery of jobs independent so that a single failure won't stall the complete recovery. Furthermore, this PR improves the error reporting for failures originating in the ZooKeeperSubmittedJobGraphStore. Add test case Fix failing JobManagerHACheckpointRecoveryITCase --- .../StandaloneSubmittedJobGraphStore.java | 11 +- .../jobmanager/SubmittedJobGraphStore.java | 19 ++- .../ZooKeeperSubmittedJobGraphStore.java | 109 ++++++------ .../zookeeper/ZooKeeperStateHandleStore.java | 44 ++++- .../flink/runtime/jobmanager/JobManager.scala | 38 ++--- .../jobmanager/JobManagerHARecoveryTest.java | 161 +++++++++++++++++- .../StandaloneSubmittedJobGraphStoreTest.java | 11 +- ...ooKeeperSubmittedJobGraphsStoreITCase.java | 29 ++-- .../JobManagerHACheckpointRecoveryITCase.java | 4 +- 9 files changed, 307 insertions(+), 119 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java index 3041cde2a326d..d1ca1a3885301 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java @@ -20,10 +20,9 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobGraph; -import scala.Option; +import java.util.Collection; import java.util.Collections; -import java.util.List; /** * {@link SubmittedJobGraph} instances for JobManagers running in {@link HighAvailabilityMode#NONE}. @@ -54,12 +53,12 @@ public void removeJobGraph(JobID jobId) throws Exception { } @Override - public Option recoverJobGraph(JobID jobId) throws Exception { - return Option.empty(); + public Collection getJobIds() throws Exception { + return Collections.emptyList(); } @Override - public List recoverJobGraphs() throws Exception { - return Collections.emptyList(); + public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception { + return null; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java index bd628cd433c90..55c2e7998b84b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java @@ -19,10 +19,8 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.jobgraph.JobGraph; -import scala.Option; -import java.util.List; +import java.util.Collection; /** * {@link SubmittedJobGraph} instances for recovery. @@ -39,17 +37,12 @@ public interface SubmittedJobGraphStore { */ void stop() throws Exception; - /** - * Returns a list of all submitted {@link JobGraph} instances. - */ - List recoverJobGraphs() throws Exception; - /** * Returns the {@link SubmittedJobGraph} with the given {@link JobID}. * *

An Exception is thrown, if no job graph with the given ID exists. */ - Option recoverJobGraph(JobID jobId) throws Exception; + SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception; /** * Adds the {@link SubmittedJobGraph} instance. @@ -63,6 +56,14 @@ public interface SubmittedJobGraphStore { */ void removeJobGraph(JobID jobId) throws Exception; + /** + * Get all job ids of submitted job graphs to the submitted job graph store. + * + * @return Collection of submitted job ids + * @throws Exception if the operation fails + */ + Collection getJobIds() throws Exception; + /** * A listener for {@link SubmittedJobGraph} instances. This is used to react to races between * multiple running {@link SubmittedJobGraphStore} instances (on multiple job managers). diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java index c1dc656ab6668..aaafa762e81b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java @@ -24,18 +24,15 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; import java.util.ArrayList; -import java.util.Collections; -import java.util.ConcurrentModificationException; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -156,47 +153,7 @@ public void stop() throws Exception { } @Override - public List recoverJobGraphs() throws Exception { - synchronized (cacheLock) { - verifyIsRunning(); - - LOG.debug("Recovering all job graphs from ZooKeeper at {}.", zooKeeperFullBasePath); - List, String>> submitted; - - while (true) { - try { - submitted = jobGraphsInZooKeeper.getAll(); - break; - } - catch (ConcurrentModificationException e) { - LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying."); - } - } - - LOG.info("Found {} job graphs.", submitted.size()); - - if (submitted.size() != 0) { - List jobGraphs = new ArrayList<>(submitted.size()); - - for (Tuple2, String> jobStateHandle : submitted) { - SubmittedJobGraph jobGraph = jobStateHandle.f0.retrieveState(); - addedJobGraphs.add(jobGraph.getJobId()); - - jobGraphs.add(jobGraph); - } - - LOG.info("Recovered {} job graphs: {}.", jobGraphs.size(), jobGraphs); - return jobGraphs; - } - else { - LOG.info("No job graph to recover."); - return Collections.emptyList(); - } - } - } - - @Override - public Option recoverJobGraph(JobID jobId) throws Exception { + public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception { checkNotNull(jobId, "Job ID"); String path = getPathForJob(jobId); @@ -205,17 +162,29 @@ public Option recoverJobGraph(JobID jobId) throws Exception { synchronized (cacheLock) { verifyIsRunning(); - try { - SubmittedJobGraph jobGraph = jobGraphsInZooKeeper.get(path).retrieveState(); - addedJobGraphs.add(jobGraph.getJobId()); + RetrievableStateHandle jobGraphRetrievableStateHandle; - LOG.info("Recovered {}.", jobGraph); - - return Option.apply(jobGraph); + try { + jobGraphRetrievableStateHandle = jobGraphsInZooKeeper.get(path); + } catch (KeeperException.NoNodeException ignored) { + return null; + } catch (Exception e) { + throw new Exception("Could not retrieve the submitted job graph state handle " + + "for " + path + "from the submitted job graph store.", e); } - catch (KeeperException.NoNodeException ignored) { - return Option.empty(); + SubmittedJobGraph jobGraph; + + try { + jobGraph = jobGraphRetrievableStateHandle.retrieveState(); + } catch (Exception e) { + throw new Exception("Failed to retrieve the submitted job graph from state handle.", e); } + + addedJobGraphs.add(jobGraph.getJobId()); + + LOG.info("Recovered {}.", jobGraph); + + return jobGraph; } } @@ -283,6 +252,31 @@ public void removeJobGraph(JobID jobId) throws Exception { LOG.info("Removed job graph {} from ZooKeeper.", jobId); } + @Override + public Collection getJobIds() throws Exception { + Collection paths; + + LOG.debug("Retrieving all stored job ids from ZooKeeper under {}.", zooKeeperFullBasePath); + + try { + paths = jobGraphsInZooKeeper.getAllPaths(); + } catch (Exception e) { + throw new Exception("Failed to retrieve entry paths from ZooKeeperStateHandleStore.", e); + } + + List jobIds = new ArrayList<>(paths.size()); + + for (String path : paths) { + try { + jobIds.add(jobIdfromPath(path)); + } catch (Exception exception) { + LOG.warn("Could not parse job id from {}. This indicates a malformed path.", path, exception); + } + } + + return jobIds; + } + /** * Monitors ZooKeeper for changes. * @@ -405,4 +399,13 @@ public static String getPathForJob(JobID jobId) { return String.format("/%s", jobId); } + /** + * Returns the JobID from the given path in ZooKeeper. + * + * @param path in ZooKeeper + * @return JobID associated with the given path + */ + public static JobID jobIdfromPath(final String path) { + return JobID.fromHexString(path); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java index 14d9f6f737f9e..dd32efb38d79b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java @@ -30,8 +30,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.Executor; @@ -225,8 +228,45 @@ public int exists(String pathInZooKeeper) throws Exception { public RetrievableStateHandle get(String pathInZooKeeper) throws Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); - byte[] data = client.getData().forPath(pathInZooKeeper); - return InstantiationUtil.deserializeObject(data, Thread.currentThread().getContextClassLoader()); + byte[] data; + + try { + data = client.getData().forPath(pathInZooKeeper); + } catch (Exception e) { + throw new Exception("Failed to retrieve state handle data under " + pathInZooKeeper + + " from ZooKeeper.", e); + } + + try { + return InstantiationUtil.deserializeObject(data, Thread.currentThread().getContextClassLoader()); + } catch (IOException | ClassNotFoundException e) { + throw new Exception("Failed to deserialize state handle from ZooKeeper data from " + + pathInZooKeeper + '.', e); + } + } + + /** + * Return a list of all valid paths for state handles. + * + * @return List of valid state handle paths in ZooKeeper + * @throws Exception if a ZooKeeper operation fails + */ + public Collection getAllPaths() throws Exception { + final String path = "/"; + + while(true) { + Stat stat = client.checkExists().forPath(path); + + if (stat == null) { + return Collections.emptyList(); + } else { + try { + return client.getChildren().forPath(path); + } catch (KeeperException.NoNodeException ignored) { + // Concurrent deletion, retry + } + } + } } /** diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 982efe84da3c4..1dfd3dba7b471 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -495,6 +495,8 @@ class JobManager( case RecoverSubmittedJob(submittedJobGraph) => if (!currentJobs.contains(submittedJobGraph.getJobId)) { + log.info(s"Submitting recovered job ${submittedJobGraph.getJobId}.") + submitJob( submittedJobGraph.getJobGraph(), submittedJobGraph.getJobInfo(), @@ -516,7 +518,7 @@ class JobManager( log.info(s"Attempting to recover job $jobId.") val submittedJobGraphOption = submittedJobGraphs.recoverJobGraph(jobId) - submittedJobGraphOption match { + Option(submittedJobGraphOption) match { case Some(submittedJobGraph) => if (!leaderElectionService.hasLeadership()) { // we've lost leadership. mission: abort. @@ -529,37 +531,31 @@ class JobManager( } } } catch { - case t: Throwable => log.error(s"Failed to recover job $jobId.", t) + case t: Throwable => log.warn(s"Failed to recover job $jobId.", t) } }(context.dispatcher) case RecoverAllJobs => future { - try { - // The ActorRef, which is part of the submitted job graph can only be - // de-serialized in the scope of an actor system. - akka.serialization.JavaSerializer.currentSystem.withValue( - context.system.asInstanceOf[ExtendedActorSystem]) { - - log.info(s"Attempting to recover all jobs.") + log.info("Attempting to recover all jobs.") - val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala + try { + val jobIdsToRecover = submittedJobGraphs.getJobIds().asScala - if (!leaderElectionService.hasLeadership()) { - // we've lost leadership. mission: abort. - log.warn(s"Lost leadership during recovery. Aborting recovery of ${jobGraphs.size} " + - s"jobs.") - } else { - log.info(s"Re-submitting ${jobGraphs.size} job graphs.") + if (jobIdsToRecover.isEmpty) { + log.info("There are no jobs to recover.") + } else { + log.info(s"There are ${jobIdsToRecover.size} jobs to recover. Starting the job " + + s"recovery.") - jobGraphs.foreach{ - submittedJobGraph => - self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph)) - } + jobIdsToRecover foreach { + jobId => self ! decorateMessage(RecoverJob(jobId)) } } } catch { - case t: Throwable => log.error("Fatal error: Failed to recover jobs.", t) + case e: Exception => + log.warn("Failed to recover job ids from submitted job graph store. Aborting " + + "recovery.", e) } }(context.dispatcher) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 69aac31189dca..36412f5c20d43 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -20,8 +20,13 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.Identify; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.japi.pf.FI; +import akka.japi.pf.ReceiveBuilder; +import akka.pattern.Patterns; +import akka.testkit.CallingThreadDispatcher; import akka.testkit.JavaTestKit; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; @@ -31,6 +36,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.BlobService; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; @@ -39,8 +45,10 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.instance.InstanceManager; @@ -53,10 +61,12 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; @@ -77,25 +87,35 @@ import org.junit.rules.TemporaryFolder; import scala.Int; import scala.Option; +import scala.PartialFunction; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; +import scala.runtime.BoxedUnit; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class JobManagerHARecoveryTest { @@ -295,6 +315,131 @@ public void testJobRecoveryWhenLosingLeadership() throws Exception { } } + /** + * Tests that a failing job recovery won't cause other job recoveries to fail. + */ + @Test + public void testFailingJobRecovery() throws Exception { + final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); + final FiniteDuration jobRecoveryTimeout = new FiniteDuration(0, TimeUnit.SECONDS); + Deadline deadline = new FiniteDuration(1, TimeUnit.MINUTES).fromNow(); + final Configuration flinkConfiguration = new Configuration(); + UUID leaderSessionID = UUID.randomUUID(); + ActorRef jobManager = null; + JobID jobId1 = new JobID(); + JobID jobId2 = new JobID(); + + // set HA mode to zookeeper so that we try to recover jobs + flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + + try { + final SubmittedJobGraphStore submittedJobGraphStore = mock(SubmittedJobGraphStore.class); + + SubmittedJobGraph submittedJobGraph = mock(SubmittedJobGraph.class); + when(submittedJobGraph.getJobId()).thenReturn(jobId2); + + when(submittedJobGraphStore.getJobIds()).thenReturn(Arrays.asList(jobId1, jobId2)); + + // fail the first job recovery + when(submittedJobGraphStore.recoverJobGraph(eq(jobId1))).thenThrow(new Exception("Test exception")); + // succeed the second job recovery + when(submittedJobGraphStore.recoverJobGraph(eq(jobId2))).thenReturn(submittedJobGraph); + + final TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService(); + + final Collection recoveredJobs = new ArrayList<>(2); + + Props jobManagerProps = Props.create( + TestingFailingHAJobManager.class, + flinkConfiguration, + Executors.directExecutor(), + Executors.directExecutor(), + mock(InstanceManager.class), + mock(Scheduler.class), + new BlobLibraryCacheManager(mock(BlobService.class), 1 << 20), + ActorRef.noSender(), + new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100), + timeout, + myLeaderElectionService, + submittedJobGraphStore, + mock(CheckpointRecoveryFactory.class), + jobRecoveryTimeout, + Option.apply(null), + recoveredJobs).withDispatcher(CallingThreadDispatcher.Id()); + + jobManager = system.actorOf(jobManagerProps, "jobmanager"); + + Future started = Patterns.ask(jobManager, new Identify(42), deadline.timeLeft().toMillis()); + + Await.ready(started, deadline.timeLeft()); + + // make the job manager the leader --> this triggers the recovery of all jobs + myLeaderElectionService.isLeader(leaderSessionID); + + // check that we have successfully recovered the second job + assertThat(recoveredJobs, containsInAnyOrder(jobId2)); + } finally { + TestingUtils.stopActor(jobManager); + } + } + + static class TestingFailingHAJobManager extends JobManager { + + private final Collection recoveredJobs; + + public TestingFailingHAJobManager( + Configuration flinkConfiguration, + Executor futureExecutor, + Executor ioExecutor, + InstanceManager instanceManager, + Scheduler scheduler, + BlobLibraryCacheManager libraryCacheManager, + ActorRef archive, + RestartStrategyFactory restartStrategyFactory, + FiniteDuration timeout, + LeaderElectionService leaderElectionService, + SubmittedJobGraphStore submittedJobGraphs, + CheckpointRecoveryFactory checkpointRecoveryFactory, + FiniteDuration jobRecoveryTimeout, + Option metricsRegistry, + Collection recoveredJobs) { + super( + flinkConfiguration, + futureExecutor, + ioExecutor, + instanceManager, + scheduler, + libraryCacheManager, + archive, + restartStrategyFactory, + timeout, + leaderElectionService, + submittedJobGraphs, + checkpointRecoveryFactory, + jobRecoveryTimeout, + metricsRegistry); + + this.recoveredJobs = recoveredJobs; + } + + @Override + public PartialFunction handleMessage() { + return ReceiveBuilder.match( + JobManagerMessages.RecoverSubmittedJob.class, + new FI.UnitApply() { + @Override + public void apply(JobManagerMessages.RecoverSubmittedJob submitJob) throws Exception { + recoveredJobs.add(submitJob.submittedJobGraph().getJobId()); + } + }).matchAny(new FI.UnitApply() { + @Override + public void apply(Object o) throws Exception { + TestingFailingHAJobManager.super.handleMessage().apply(o); + } + }).build(); + } + } + /** * A checkpoint store, which supports shutdown and suspend. You can use this to test HA * as long as the factory always returns the same store instance. @@ -391,16 +536,11 @@ public void stop() throws Exception { } @Override - public List recoverJobGraphs() throws Exception { - return new ArrayList<>(storedJobs.values()); - } - - @Override - public Option recoverJobGraph(JobID jobId) throws Exception { + public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception { if (storedJobs.containsKey(jobId)) { - return Option.apply(storedJobs.get(jobId)); + return storedJobs.get(jobId); } else { - return Option.apply(null); + return null; } } @@ -414,6 +554,11 @@ public void removeJobGraph(JobID jobId) throws Exception { storedJobs.remove(jobId); } + @Override + public Collection getJobIds() throws Exception { + return storedJobs.keySet(); + } + boolean contains(JobID jobId) { return storedJobs.containsKey(jobId); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java index 8ebb7f8da480c..079a10eb66b53 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java @@ -19,14 +19,13 @@ package org.apache.flink.runtime.jobmanager; import akka.actor.ActorRef; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.jobgraph.JobGraph; import org.junit.Test; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; public class StandaloneSubmittedJobGraphStoreTest { @@ -41,14 +40,14 @@ public void testNoOps() throws Exception { new JobGraph("testNoOps"), new JobInfo(ActorRef.noSender(), ListeningBehaviour.DETACHED, 0, Integer.MAX_VALUE)); - assertEquals(0, jobGraphs.recoverJobGraphs().size()); + assertEquals(0, jobGraphs.getJobIds().size()); jobGraphs.putJobGraph(jobGraph); - assertEquals(0, jobGraphs.recoverJobGraphs().size()); + assertEquals(0, jobGraphs.getJobIds().size()); jobGraphs.removeJobGraph(jobGraph.getJobGraph().getJobID()); - assertEquals(0, jobGraphs.recoverJobGraphs().size()); + assertEquals(0, jobGraphs.getJobIds().size()); - assertTrue(jobGraphs.recoverJobGraph(new JobID()).isEmpty()); + assertNull(jobGraphs.recoverJobGraph(new JobID())); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java index d156f02daa287..9454d90e05c0b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java @@ -40,8 +40,8 @@ import org.mockito.stubbing.Answer; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -101,32 +101,36 @@ public void testPutAndRemoveJobGraph() throws Exception { SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0); // Empty state - assertEquals(0, jobGraphs.recoverJobGraphs().size()); + assertEquals(0, jobGraphs.getJobIds().size()); // Add initial jobGraphs.putJobGraph(jobGraph); // Verify initial job graph - List actual = jobGraphs.recoverJobGraphs(); - assertEquals(1, actual.size()); + Collection jobIds = jobGraphs.getJobIds(); + assertEquals(1, jobIds.size()); - verifyJobGraphs(jobGraph, actual.get(0)); + JobID jobId = jobIds.iterator().next(); + + verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId)); // Update (same ID) jobGraph = createSubmittedJobGraph(jobGraph.getJobId(), 1); jobGraphs.putJobGraph(jobGraph); // Verify updated - actual = jobGraphs.recoverJobGraphs(); - assertEquals(1, actual.size()); + jobIds = jobGraphs.getJobIds(); + assertEquals(1, jobIds.size()); + + jobId = jobIds.iterator().next(); - verifyJobGraphs(jobGraph, actual.get(0)); + verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId)); // Remove jobGraphs.removeJobGraph(jobGraph.getJobId()); // Empty state - assertEquals(0, jobGraphs.recoverJobGraphs().size()); + assertEquals(0, jobGraphs.getJobIds().size()); // Nothing should have been notified verify(listener, atMost(1)).onAddedJobGraph(any(JobID.class)); @@ -162,11 +166,12 @@ public void testRecoverJobGraphs() throws Exception { jobGraphs.putJobGraph(jobGraph); } - List actual = jobGraphs.recoverJobGraphs(); + Collection actual = jobGraphs.getJobIds(); assertEquals(expected.size(), actual.size()); - for (SubmittedJobGraph jobGraph : actual) { + for (JobID jobId : actual) { + SubmittedJobGraph jobGraph = jobGraphs.recoverJobGraph(jobId); assertTrue(expected.containsKey(jobGraph.getJobId())); verifyJobGraphs(expected.get(jobGraph.getJobId()), jobGraph); @@ -175,7 +180,7 @@ public void testRecoverJobGraphs() throws Exception { } // Empty state - assertEquals(0, jobGraphs.recoverJobGraphs().size()); + assertEquals(0, jobGraphs.getJobIds().size()); // Nothing should have been notified verify(listener, atMost(expected.size())).onAddedJobGraph(any(JobID.class)); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index 49eaeb7db14b0..3f08b5a7f9023 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -370,7 +370,7 @@ public void testCheckpointRecoveryFailure() throws Exception { nonLeadingJobManagerProcess = jobManagerProcess[0]; } - // BLocking JobGraph + // Blocking JobGraph JobVertex blockingVertex = new JobVertex("Blocking vertex"); blockingVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class); JobGraph jobGraph = new JobGraph(blockingVertex); @@ -399,7 +399,7 @@ public void testCheckpointRecoveryFailure() throws Exception { String output = nonLeadingJobManagerProcess.getProcessOutput(); if (output != null) { - if (output.contains("Fatal error: Failed to recover jobs") && + if (output.contains("Failed to recover job") && output.contains("java.io.FileNotFoundException")) { success = true;