From 7ddb674cb17c35f17aa073d3bfd6897d7fc13b9e Mon Sep 17 00:00:00 2001 From: gyao Date: Thu, 30 Nov 2017 15:44:23 +0100 Subject: [PATCH] [FLINK-8176][flip6] Start SubmittedJobGraphStore in Dispatcher Implement SubmittedJobGraphListener interface in Dispatcher Call start() on SubmittedJobGraphStore with Dispatcher as listener. To enable this, the dispatcher must implement the SubmittedJobGraphListener interface. Add simple unit tests for the new methods. Refactor DispatcherTest to remove redundancy. [FLINK-8176][flip6] Make InMemorySubmittedJobGraphStore thread-safe [FLINK-8176][flip6] Add method isStarted() to TestingLeaderElectionService [FLINK-8176][flip6] Return same RunningJobsRegistry instance from TestingHighAvailabilityServices [FLINK-8176][flip6] Fix race conditions in Dispatcher and DispatcherTest Check if jobManagerRunner exists before submitting job. Replace JobManagerRunner mock used in tests with real instance. Do not run job graph recovery in actor main thread when job graph is recovered from SubmittedJobGraphListener#onAddedJobGraph(JobID). [FLINK-8176][flip6] Rename variables in DispatcherTest [FLINK-8176][flip6] Remove injectMocks in DispatcherTest [FLINK-8176][flip6] Update Dispatcher's SubmittedJobGraphListener callbacks Always attempt the job submission if onAddedJobGraph or onRemovedJobGraph are called. The checks in submitJob and removeJob are sufficient. This closes #5107. --- .../flink/runtime/dispatcher/Dispatcher.java | 46 +++- .../runtime/dispatcher/DispatcherTest.java | 250 ++++++++++++------ .../TestingHighAvailabilityServices.java | 4 +- .../TestingLeaderElectionService.java | 9 + .../InMemorySubmittedJobGraphStore.java | 26 +- 5 files changed, 235 insertions(+), 100 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 8a26f9560938d..ea3a6ad621a7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.dispatcher; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; @@ -63,6 +64,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -73,7 +75,8 @@ * the jobs and to recover them in case of a master failure. Furthermore, it knows * about the state of the Flink session cluster. */ -public abstract class Dispatcher extends FencedRpcEndpoint implements DispatcherGateway, LeaderContender { +public abstract class Dispatcher extends FencedRpcEndpoint implements + DispatcherGateway, LeaderContender, SubmittedJobGraphStore.SubmittedJobGraphListener { public static final String DISPATCHER_NAME = "dispatcher"; @@ -173,6 +176,7 @@ public void postStop() throws Exception { public void start() throws Exception { super.start(); + submittedJobGraphStore.start(this); leaderElectionService.start(this); } @@ -197,7 +201,8 @@ public CompletableFuture submitJob(JobGraph jobGraph, Time timeout) new JobSubmissionException(jobId, "Could not retrieve the job status.", e)); } - if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) { + if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING && + !jobManagerRunners.containsKey(jobId)) { try { submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null)); } catch (Exception e) { @@ -248,7 +253,8 @@ public CompletableFuture submitJob(JobGraph jobGraph, Time timeout) @Override public CompletableFuture> listJobs(Time timeout) { - return CompletableFuture.completedFuture(jobManagerRunners.keySet()); + return CompletableFuture.completedFuture( + Collections.unmodifiableSet(new HashSet<>(jobManagerRunners.keySet()))); } @Override @@ -399,7 +405,8 @@ private void clearState() throws Exception { /** * Recovers all jobs persisted via the submitted job graph store. */ - private void recoverJobs() { + @VisibleForTesting + void recoverJobs() { log.info("Recovering all persisted jobs."); getRpcService().execute( @@ -507,6 +514,37 @@ public void handleError(final Exception exception) { onFatalError(new DispatcherException("Received an error from the LeaderElectionService.", exception)); } + //------------------------------------------------------ + // SubmittedJobGraphListener + //------------------------------------------------------ + + @Override + public void onAddedJobGraph(final JobID jobId) { + getRpcService().execute(() -> { + final SubmittedJobGraph submittedJobGraph; + try { + submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId); + } catch (final Exception e) { + log.error("Could not recover job graph for job {}.", jobId, e); + return; + } + runAsync(() -> { + submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT); + }); + }); + } + + @Override + public void onRemovedJobGraph(final JobID jobId) { + runAsync(() -> { + try { + removeJob(jobId, false); + } catch (final Exception e) { + log.error("Could not remove job {}.", jobId, e); + } + }); + } + //------------------------------------------------------ // Utility classes //------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index d5b63d4f615ce..8627c8ecf5c63 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -20,57 +20,95 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.RunningJobsRegistry; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.OnCompletionActions; -import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerServices; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.TestLogger; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.rules.TestName; import org.mockito.Mockito; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; /** * Test for the {@link Dispatcher} component. */ public class DispatcherTest extends TestLogger { + private static RpcService rpcService; + + private static final Time TIMEOUT = Time.seconds(10L); + + private static final JobID TEST_JOB_ID = new JobID(); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule public TestName name = new TestName(); - private static RpcService rpcService; - private static final Time timeout = Time.seconds(10L); + private JobGraph jobGraph; + + private TestingFatalErrorHandler fatalErrorHandler; + + private SubmittedJobGraphStore submittedJobGraphStore; + + private TestingLeaderElectionService dispatcherLeaderElectionService; + + private TestingLeaderElectionService jobMasterLeaderElectionService; + + private RunningJobsRegistry runningJobsRegistry; + + /** Instance under test. */ + private TestingDispatcher dispatcher; @BeforeClass public static void setup() { @@ -86,60 +124,77 @@ public static void teardown() { } } - /** - * Tests that we can submit a job to the Dispatcher which then spawns a - * new JobManagerRunner. - */ - @Test - public void testJobSubmission() throws Exception { - TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler(); - - TestingLeaderElectionService dispatcherLeaderElectionService = new TestingLeaderElectionService(); - TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); - haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService); - haServices.setSubmittedJobGraphStore(new StandaloneSubmittedJobGraphStore()); + @Before + public void setUp() throws Exception { + final JobVertex testVertex = new JobVertex("testVertex"); + testVertex.setInvokableClass(NoOpInvokable.class); + jobGraph = new JobGraph(TEST_JOB_ID, "testJob", testVertex); + jobGraph.setAllowQueuedScheduling(true); - HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L); - JobManagerRunner jobManagerRunner = mock(JobManagerRunner.class); + fatalErrorHandler = new TestingFatalErrorHandler(); + final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L); + submittedJobGraphStore = spy(new InMemorySubmittedJobGraphStore()); - final JobGraph jobGraph = mock(JobGraph.class); - final JobID jobId = new JobID(); - when(jobGraph.getJobID()).thenReturn(jobId); + dispatcherLeaderElectionService = new TestingLeaderElectionService(); + jobMasterLeaderElectionService = new TestingLeaderElectionService(); - final TestingDispatcher dispatcher = new TestingDispatcher( + final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService); + haServices.setSubmittedJobGraphStore(submittedJobGraphStore); + haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, jobMasterLeaderElectionService); + haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); + haServices.setResourceManagerLeaderRetriever(new TestingLeaderRetrievalService()); + runningJobsRegistry = haServices.getRunningJobsRegistry(); + + final Configuration blobServerConfig = new Configuration(); + blobServerConfig.setString( + BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + + dispatcher = new TestingDispatcher( rpcService, Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(), new Configuration(), haServices, mock(ResourceManagerGateway.class), - mock(BlobServer.class), + new BlobServer(blobServerConfig, new VoidBlobStore()), heartbeatServices, - mock(MetricRegistryImpl.class), + new NoOpMetricRegistry(), fatalErrorHandler, - jobManagerRunner, - jobId); + TEST_JOB_ID); - try { - dispatcher.start(); + dispatcher.start(); + } - CompletableFuture leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID()); + @After + public void tearDown() throws Exception { + try { + fatalErrorHandler.rethrowError(); + } finally { + RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT); + } + } - // wait for the leader to be elected - leaderFuture.get(); + /** + * Tests that we can submit a job to the Dispatcher which then spawns a + * new JobManagerRunner. + */ + @Test + public void testJobSubmission() throws Exception { + CompletableFuture leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID()); - DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + // wait for the leader to be elected + leaderFuture.get(); - CompletableFuture acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - acknowledgeFuture.get(); + CompletableFuture acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT); - verify(jobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start(); + acknowledgeFuture.get(); - // check that no error has occurred - fatalErrorHandler.rethrowError(); - } finally { - RpcUtils.terminateRpcEndpoint(dispatcher, timeout); - } + assertTrue( + "jobManagerRunner was not started", + dispatcherLeaderElectionService.isStarted()); } /** @@ -147,61 +202,63 @@ public void testJobSubmission() throws Exception { */ @Test public void testLeaderElection() throws Exception { - TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler(); - TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); - UUID expectedLeaderSessionId = UUID.randomUUID(); - CompletableFuture leaderSessionIdFuture = new CompletableFuture<>(); - SubmittedJobGraphStore mockSubmittedJobGraphStore = mock(SubmittedJobGraphStore.class); - TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService() { - @Override - public void confirmLeaderSessionID(UUID leaderSessionId) { - super.confirmLeaderSessionID(leaderSessionId); - leaderSessionIdFuture.complete(leaderSessionId); - } - }; - haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore); - haServices.setDispatcherLeaderElectionService(testingLeaderElectionService); - HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L); - final JobID jobId = new JobID(); + assertNull(dispatcherLeaderElectionService.getConfirmationFuture()); - final TestingDispatcher dispatcher = new TestingDispatcher( - rpcService, - Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(), - new Configuration(), - haServices, - mock(ResourceManagerGateway.class), - mock(BlobServer.class), - heartbeatServices, - mock(MetricRegistryImpl.class), - fatalErrorHandler, - mock(JobManagerRunner.class), - jobId); + dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId); - try { - dispatcher.start(); + UUID actualLeaderSessionId = dispatcherLeaderElectionService.getConfirmationFuture() + .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + + assertEquals(expectedLeaderSessionId, actualLeaderSessionId); + + verify(submittedJobGraphStore, Mockito.timeout(TIMEOUT.toMilliseconds()).atLeast(1)).getJobIds(); + } + + /** + * Test callbacks from + * {@link org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener}. + */ + @Test + public void testSubmittedJobGraphListener() throws Exception { + dispatcher.recoverJobsEnabled.set(false); - assertFalse(leaderSessionIdFuture.isDone()); + dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get(); - testingLeaderElectionService.isLeader(expectedLeaderSessionId); + final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - UUID actualLeaderSessionId = leaderSessionIdFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get(); - assertEquals(expectedLeaderSessionId, actualLeaderSessionId); + final SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(TEST_JOB_ID); - verify(mockSubmittedJobGraphStore, Mockito.timeout(timeout.toMilliseconds()).atLeast(1)).getJobIds(); - } finally { - RpcUtils.terminateRpcEndpoint(dispatcher, timeout); - } + // pretend that other Dispatcher has removed job from submittedJobGraphStore + submittedJobGraphStore.removeJobGraph(TEST_JOB_ID); + dispatcher.onRemovedJobGraph(TEST_JOB_ID); + assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), empty()); + + // pretend that other Dispatcher has added a job to submittedJobGraphStore + runningJobsRegistry.clearJob(TEST_JOB_ID); + submittedJobGraphStore.putJobGraph(submittedJobGraph); + dispatcher.onAddedJobGraph(TEST_JOB_ID); + dispatcher.submitJobLatch.await(); + assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), hasSize(1)); } private static class TestingDispatcher extends Dispatcher { - private final JobManagerRunner jobManagerRunner; private final JobID expectedJobId; - protected TestingDispatcher( + private final CountDownLatch submitJobLatch = new CountDownLatch(2); + + /** + * Controls whether existing jobs in {@link SubmittedJobGraphStore} should be recovered + * when {@link TestingDispatcher} is granted leadership. + * */ + private final AtomicBoolean recoverJobsEnabled = new AtomicBoolean(true); + + private TestingDispatcher( RpcService rpcService, String endpointId, Configuration configuration, @@ -211,7 +268,6 @@ protected TestingDispatcher( HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, - JobManagerRunner jobManagerRunner, JobID expectedJobId) throws Exception { super( rpcService, @@ -225,7 +281,6 @@ protected TestingDispatcher( fatalErrorHandler, null); - this.jobManagerRunner = jobManagerRunner; this.expectedJobId = expectedJobId; } @@ -243,7 +298,32 @@ protected JobManagerRunner createJobManagerRunner( FatalErrorHandler fatalErrorHandler) throws Exception { assertEquals(expectedJobId, jobGraph.getJobID()); - return jobManagerRunner; + return new JobManagerRunner(resourceId, jobGraph, configuration, rpcService, + highAvailabilityServices, heartbeatServices, jobManagerServices, metricRegistry, + onCompleteActions, fatalErrorHandler, null); + } + + @Override + public CompletableFuture submitJob(final JobGraph jobGraph, final Time timeout) { + final CompletableFuture submitJobFuture = super.submitJob(jobGraph, timeout); + + try { + submitJobFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + submitJobLatch.countDown(); + return submitJobFuture; + } + + @Override + void recoverJobs() { + if (recoverJobsEnabled.get()) { + super.recoverJobs(); + } } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index dba7bef5fa264..db0b88edf7fed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -52,6 +52,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices private volatile SubmittedJobGraphStore submittedJobGraphStore; + private final RunningJobsRegistry runningJobsRegistry = new StandaloneRunningJobsRegistry(); + // ------------------------------------------------------------------------ // Setters for mock / testing implementations // ------------------------------------------------------------------------ @@ -185,7 +187,7 @@ public SubmittedJobGraphStore getSubmittedJobGraphStore() { @Override public RunningJobsRegistry getRunningJobsRegistry() { - return new StandaloneRunningJobsRegistry(); + return runningJobsRegistry; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java index d951db5de39a8..4ecb9b611aa22 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java @@ -86,4 +86,13 @@ public synchronized void reset() { public synchronized String getAddress() { return contender.getAddress(); } + + /** + * Returns true if {@link #start(LeaderContender)} was called, + * false otherwise. + */ + public synchronized boolean isStarted() { + return contender != null; + } + } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java index bf85771c95c28..ee208cee6967f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java @@ -26,9 +26,13 @@ import javax.annotation.Nullable; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import static java.util.Objects.requireNonNull; + /** * In-Memory implementation of {@link SubmittedJobGraphStore} for testing purposes. */ @@ -36,43 +40,45 @@ public class InMemorySubmittedJobGraphStore implements SubmittedJobGraphStore { private final Map storedJobs = new HashMap<>(); - private volatile boolean started; + private boolean started; @Override - public void start(@Nullable SubmittedJobGraphListener jobGraphListener) throws Exception { + public synchronized void start(@Nullable SubmittedJobGraphListener jobGraphListener) throws Exception { started = true; } @Override - public void stop() throws Exception { + public synchronized void stop() throws Exception { started = false; } @Override - public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception { + public synchronized SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception { verifyIsStarted(); - return storedJobs.getOrDefault(jobId, null); + return requireNonNull( + storedJobs.get(jobId), + "Job graph for job " + jobId + " does not exist"); } @Override - public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception { + public synchronized void putJobGraph(SubmittedJobGraph jobGraph) throws Exception { verifyIsStarted(); storedJobs.put(jobGraph.getJobId(), jobGraph); } @Override - public void removeJobGraph(JobID jobId) throws Exception { + public synchronized void removeJobGraph(JobID jobId) throws Exception { verifyIsStarted(); storedJobs.remove(jobId); } @Override - public Collection getJobIds() throws Exception { + public synchronized Collection getJobIds() throws Exception { verifyIsStarted(); - return storedJobs.keySet(); + return Collections.unmodifiableSet(new HashSet<>(storedJobs.keySet())); } - public boolean contains(JobID jobId) { + public synchronized boolean contains(JobID jobId) { verifyIsStarted(); return storedJobs.containsKey(jobId); }