From da04254092b1d22848c663addb3bdbf7e9ebcab3 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 1 Feb 2018 14:37:15 +0100 Subject: [PATCH 1/3] [hotfix] Fix checkstyle violations in ExecutionGraph --- .../executiongraph/ExecutionGraph.java | 56 ++++++++++--------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 9331b2e9b6b51..6fdcec92adf0b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -128,7 +128,7 @@ * *

Global and local failover

* - * The Execution Graph has two failover modes: global failover and local failover. + *

The Execution Graph has two failover modes: global failover and local failover. * *

A global failover aborts the task executions for all vertices and restarts whole * data flow graph from the last completed checkpoint. Global failover is considered the @@ -151,7 +151,7 @@ public class ExecutionGraph implements AccessExecutionGraph { /** In place updater for the execution graph's current state. Avoids having to use an - * AtomicReference and thus makes the frequent read access a bit faster */ + * AtomicReference and thus makes the frequent read access a bit faster. */ private static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state"); @@ -172,38 +172,38 @@ public class ExecutionGraph implements AccessExecutionGraph { /** Job specific information like the job id, job name, job configuration, etc. */ private final JobInformation jobInformation; - /** Serialized job information or a blob key pointing to the offloaded job information */ + /** Serialized job information or a blob key pointing to the offloaded job information. */ private final Either, PermanentBlobKey> jobInformationOrBlobKey; /** The executor which is used to execute futures. */ private final ScheduledExecutorService futureExecutor; - /** The executor which is used to execute blocking io operations */ + /** The executor which is used to execute blocking io operations. */ private final Executor ioExecutor; /** {@code true} if all source tasks are stoppable. */ private boolean isStoppable = true; - /** All job vertices that are part of this graph */ + /** All job vertices that are part of this graph. */ private final ConcurrentHashMap tasks; - /** All vertices, in the order in which they were created **/ + /** All vertices, in the order in which they were created. **/ private final List verticesInCreationOrder; - /** All intermediate results that are part of this graph */ + /** All intermediate results that are part of this graph. */ private final ConcurrentHashMap intermediateResults; - /** The currently executed tasks, for callbacks */ + /** The currently executed tasks, for callbacks. */ private final ConcurrentHashMap currentExecutions; /** Listeners that receive messages when the entire job switches it status - * (such as from RUNNING to FINISHED) */ + * (such as from RUNNING to FINISHED). */ private final List jobStatusListeners; - /** Listeners that receive messages whenever a single task execution changes its status */ + /** Listeners that receive messages whenever a single task execution changes its status. */ private final List executionListeners; - /** The implementation that decides how to recover the failures of tasks */ + /** The implementation that decides how to recover the failures of tasks. */ private final FailoverStrategy failoverStrategy; /** Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when @@ -212,28 +212,28 @@ public class ExecutionGraph implements AccessExecutionGraph { * at {@code stateTimestamps[RUNNING.ordinal()]}. */ private final long[] stateTimestamps; - /** The timeout for all messages that require a response/acknowledgement */ + /** The timeout for all messages that require a response/acknowledgement. */ private final Time rpcTimeout; /** The timeout for slot allocations. */ private final Time allocationTimeout; - /** Strategy to use for restarts */ + /** Strategy to use for restarts. */ private final RestartStrategy restartStrategy; - /** The slot provider to use for allocating slots for tasks as they are needed */ + /** The slot provider to use for allocating slots for tasks as they are needed. */ private final SlotProvider slotProvider; - /** The classloader for the user code. Needed for calls into user code classes */ + /** The classloader for the user code. Needed for calls into user code classes. */ private final ClassLoader userClassLoader; /** Registered KvState instances reported by the TaskManagers. */ private final KvStateLocationRegistry kvStateLocationRegistry; - /** Blob writer used to offload RPC messages */ + /** Blob writer used to offload RPC messages. */ private final BlobWriter blobWriter; - /** The total number of vertices currently in the execution graph */ + /** The total number of vertices currently in the execution graph. */ private int numVerticesTotal; // ------ Configuration of the Execution ------- @@ -251,18 +251,18 @@ public class ExecutionGraph implements AccessExecutionGraph { private final AtomicInteger verticesFinished; - /** Current status of the job execution */ + /** Current status of the job execution. */ private volatile JobStatus state = JobStatus.CREATED; - /** A future that completes once the job has reached a terminal state */ + /** A future that completes once the job has reached a terminal state. */ private volatile CompletableFuture terminationFuture; /** On each global recovery, this version is incremented. The version breaks conflicts - * between concurrent restart attempts by local failover strategies */ + * between concurrent restart attempts by local failover strategies. */ private volatile long globalModVersion; /** The exception that caused the job to fail. This is set to the first root exception - * that was not recoverable and triggered job failure */ + * that was not recoverable and triggered job failure. */ private volatile Throwable failureCause; /** The extended failure cause information for the job. This exists in addition to 'failureCause', @@ -272,7 +272,7 @@ public class ExecutionGraph implements AccessExecutionGraph { // ------ Fields that are relevant to the execution and need to be cleared before archiving ------- - /** The coordinator for checkpoints, if snapshot checkpoints are enabled */ + /** The coordinator for checkpoints, if snapshot checkpoints are enabled. */ private CheckpointCoordinator checkpointCoordinator; /** Checkpoint stats tracker separate from the coordinator in order to be @@ -567,7 +567,8 @@ private ExecutionVertex[] collectExecutionVertices(List jobV // -------------------------------------------------------------------------------------------- /** - * Returns a list of BLOB keys referring to the JAR files required to run this job + * Returns a list of BLOB keys referring to the JAR files required to run this job. + * * @return list of BLOB keys referring to the JAR files required to run this job */ public Collection getRequiredJarFiles() { @@ -575,7 +576,8 @@ public Collection getRequiredJarFiles() { } /** - * Returns a list of classpaths referring to the directories/JAR files required to run this job + * Returns a list of classpaths referring to the directories/JAR files required to run this job. + * * @return list of classpaths referring to the directories/JAR files required to run this job */ public Collection getRequiredClasspaths() { @@ -1036,10 +1038,10 @@ public void stop() throws StoppingException { /** * Suspends the current ExecutionGraph. * - * The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal + *

The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal * state. All ExecutionJobVertices will be canceled and the onTerminalState() is executed. * - * The SUSPENDED state is a local terminal state which stops the execution of the job but does + *

The SUSPENDED state is a local terminal state which stops the execution of the job but does * not remove the job from the HA job store so that it can be recovered by another JobManager. * * @param suspensionCause Cause of the suspension @@ -1472,7 +1474,7 @@ private void onTerminalState(JobStatus status) { /** * Updates the state of one of the ExecutionVertex's Execution attempts. * If the new status if "FINISHED", this also updates the accumulators. - * + * * @param state The state update. * @return True, if the task update was properly applied, false, if the execution attempt was not found. */ From e06eff6a905fd6832b51d3334d7a9c7a60e0da20 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 1 Feb 2018 18:04:06 +0100 Subject: [PATCH 2/3] [FLINK-8627] Introduce new JobStatus#SUSPENDING to ExecutionGraph The new JobStatus#SUSPENDING says that an ExecutionGraph has been suspended but its clean up has not been done yet. Only after all Executions have been canceled, the ExecutionGraph will enter the SUSPENDED state and complete the termination future accordingly. This closes #5445. --- .../executiongraph/ExecutionGraph.java | 47 ++++++++++----- .../flink/runtime/jobgraph/JobStatus.java | 3 + .../ZooKeeperSubmittedJobGraphStore.java | 2 +- .../handler/legacy/ExecutionGraphCache.java | 16 +++-- .../ArchivedExecutionGraphTest.java | 1 + .../ExecutionGraphSuspendTest.java | 58 ++++++++++++++++--- .../legacy/ExecutionGraphCacheTest.java | 33 ++++++++--- .../legacy/JobAccumulatorsHandlerTest.java | 3 +- .../handler/legacy/JobConfigHandlerTest.java | 3 +- .../handler/legacy/JobDetailsHandlerTest.java | 3 +- .../handler/legacy/JobPlanHandlerTest.java | 3 +- .../JobVertexAccumulatorsHandlerTest.java | 3 +- .../legacy/JobVertexDetailsHandlerTest.java | 3 +- .../utils/ArchivedJobGenerationUtils.java | 2 +- 14 files changed, 134 insertions(+), 46 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 6fdcec92adf0b..3feabc897d74e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -1038,10 +1038,10 @@ public void stop() throws StoppingException { /** * Suspends the current ExecutionGraph. * - *

The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal + *

The JobStatus will be directly set to SUSPENDING iff the current state is not a terminal * state. All ExecutionJobVertices will be canceled and the onTerminalState() is executed. * - *

The SUSPENDED state is a local terminal state which stops the execution of the job but does + *

The SUSPENDING state is a local terminal state which stops the execution of the job but does * not remove the job from the HA job store so that it can be recovered by another JobManager. * * @param suspensionCause Cause of the suspension @@ -1050,24 +1050,34 @@ public void suspend(Throwable suspensionCause) { while (true) { JobStatus currentState = state; - if (currentState.isTerminalState()) { + if (currentState.isTerminalState() || currentState == JobStatus.SUSPENDING) { // stay in a terminal state return; - } else if (transitionState(currentState, JobStatus.SUSPENDED, suspensionCause)) { + } else if (transitionState(currentState, JobStatus.SUSPENDING, suspensionCause)) { initFailureCause(suspensionCause); // make sure no concurrent local actions interfere with the cancellation incrementGlobalModVersion(); + final ArrayList> executionJobVertexTerminationFutures = new ArrayList<>(verticesInCreationOrder.size()); + for (ExecutionJobVertex ejv: verticesInCreationOrder) { - ejv.cancel(); + executionJobVertexTerminationFutures.add(ejv.cancelWithFuture()); } - synchronized (progressLock) { - onTerminalState(JobStatus.SUSPENDED); + final ConjunctFuture jobVerticesTerminationFuture = FutureUtils.waitForAll(executionJobVertexTerminationFutures); - LOG.info("Job {} has been suspended.", getJobID()); - } + jobVerticesTerminationFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + if (throwable != null) { + LOG.debug("Flink could not properly clean up resource after suspension.", throwable); + } + + // the globalModVersion does not play a role because there is no way + // currently to leave the SUSPENDING state + allVerticesInTerminalState(-1L); + LOG.info("Job {} has been suspended.", getJobID()); + }); return; } @@ -1090,6 +1100,7 @@ public void failGlobal(Throwable t) { JobStatus current = state; // stay in these states if (current == JobStatus.FAILING || + current == JobStatus.SUSPENDING || current == JobStatus.SUSPENDED || current.isGloballyTerminalState()) { return; @@ -1155,7 +1166,7 @@ public void restart(long expectedGlobalVersion) { } else if (current == JobStatus.FAILED) { LOG.info("Failed job during restart. Aborting restart."); return; - } else if (current == JobStatus.SUSPENDED) { + } else if (current == JobStatus.SUSPENDING || current == JobStatus.SUSPENDED) { LOG.info("Suspended job during restart. Aborting restart."); return; } else if (current != JobStatus.RESTARTING) { @@ -1240,7 +1251,13 @@ public ArchivedExecutionConfig getArchivedExecutionConfig() { return null; } - @VisibleForTesting + /** + * Returns the termination future of this {@link ExecutionGraph}. The termination future + * is completed with the terminal {@link JobStatus} once the ExecutionGraph reaches this + * terminal state and all {@link Execution} have been terminated. + * + * @return Termination future of this {@link ExecutionGraph}. + */ public CompletableFuture getTerminationFuture() { return terminationFuture; } @@ -1380,9 +1397,11 @@ else if (current == JobStatus.FAILING) { } // concurrent job status change, let's check again } - else if (current == JobStatus.SUSPENDED) { - // we've already cleaned up when entering the SUSPENDED state - break; + else if (current == JobStatus.SUSPENDING) { + if (transitionState(current, JobStatus.SUSPENDED)) { + onTerminalState(JobStatus.SUSPENDED); + break; + } } else if (current.isGloballyTerminalState()) { LOG.warn("Job has entered globally terminal state without waiting for all " + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java index 4ef86bd0ae7a7..c04528eabaaa7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java @@ -47,6 +47,9 @@ public enum JobStatus { /** The job is currently undergoing a reset and total restart */ RESTARTING(TerminalState.NON_TERMINAL), + /** The job has been suspended and is currently waiting for the cleanup to complete */ + SUSPENDING(TerminalState.NON_TERMINAL), + /** * The job has been suspended which means that it has been stopped but not been removed from a * potential HA job store. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java index f31c970a8dcc7..a60a40d842622 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java @@ -372,7 +372,7 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) break; case CONNECTION_SUSPENDED: { - LOG.warn("ZooKeeper connection SUSPENDED. Changes to the submitted job " + + LOG.warn("ZooKeeper connection SUSPENDING. Changes to the submitted job " + "graphs are not monitored (temporarily)."); } break; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java index 19186c45b026e..382e87e3a8c99 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java @@ -91,19 +91,22 @@ public CompletableFuture getExecutionGraph(JobID jobId, Re if (oldEntry != null) { if (currentTime < oldEntry.getTTL()) { - if (oldEntry.getExecutionGraphFuture().isDone() && !oldEntry.getExecutionGraphFuture().isCompletedExceptionally()) { + final CompletableFuture executionGraphFuture = oldEntry.getExecutionGraphFuture(); + if (executionGraphFuture.isDone() && !executionGraphFuture.isCompletedExceptionally()) { // TODO: Remove once we no longer request the actual ExecutionGraph from the JobManager but only the ArchivedExecutionGraph try { - if (oldEntry.getExecutionGraphFuture().get().getState() != JobStatus.SUSPENDED) { - return oldEntry.getExecutionGraphFuture(); + final AccessExecutionGraph executionGraph = executionGraphFuture.get(); + if (executionGraph.getState() != JobStatus.SUSPENDING && + executionGraph.getState() != JobStatus.SUSPENDED) { + return executionGraphFuture; } // send a new request to get the ExecutionGraph from the new leader } catch (InterruptedException | ExecutionException e) { throw new RuntimeException("Could not retrieve ExecutionGraph from the orderly completed future. This should never happen.", e); } - } else if (!oldEntry.getExecutionGraphFuture().isDone()) { - return oldEntry.getExecutionGraphFuture(); + } else if (!executionGraphFuture.isDone()) { + return executionGraphFuture; } // otherwise it must be completed exceptionally } @@ -135,7 +138,8 @@ public CompletableFuture getExecutionGraph(JobID jobId, Re newEntry.getExecutionGraphFuture().complete(executionGraph); // TODO: Remove once we no longer request the actual ExecutionGraph from the JobManager but only the ArchivedExecutionGraph - if (executionGraph.getState() == JobStatus.SUSPENDED) { + if (executionGraph.getState() == JobStatus.SUSPENDING || + executionGraph.getState() == JobStatus.SUSPENDED) { // remove the entry in case of suspension --> triggers new request when accessed next time cachedExecutionGraphs.remove(jobId, newEntry); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 8bc5170b38c54..f15dca1fcc9c6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -172,6 +172,7 @@ private static void compareExecutionGraph(AccessExecutionGraph runtimeGraph, Acc assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CANCELED), archivedGraph.getStatusTimestamp(JobStatus.CANCELED)); assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FINISHED), archivedGraph.getStatusTimestamp(JobStatus.FINISHED)); assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.RESTARTING), archivedGraph.getStatusTimestamp(JobStatus.RESTARTING)); + assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.SUSPENDING), archivedGraph.getStatusTimestamp(JobStatus.SUSPENDING)); assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.SUSPENDED), archivedGraph.getStatusTimestamp(JobStatus.SUSPENDED)); assertEquals(runtimeGraph.isStoppable(), archivedGraph.isStoppable()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java index 52d4c818041f0..1b19c53965250 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java @@ -49,7 +49,7 @@ public class ExecutionGraphSuspendTest extends TestLogger { /** - * Going into SUSPENDED out of CREATED should immediately cancel everything and + * Going into SUSPENDING out of CREATED should immediately cancel everything and * not send out RPC calls. */ @Test @@ -72,7 +72,7 @@ public void testSuspendedOutOfCreated() throws Exception { } /** - * Going into SUSPENDED out of DEPLOYING vertices should cancel all vertices once with RPC calls. + * Going into SUSPENDING out of DEPLOYING vertices should cancel all vertices once with RPC calls. */ @Test public void testSuspendedOutOfDeploying() throws Exception { @@ -88,15 +88,20 @@ public void testSuspendedOutOfDeploying() throws Exception { eg.suspend(new Exception("suspend")); - assertEquals(JobStatus.SUSPENDED, eg.getState()); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + validateAllVerticesInState(eg, ExecutionState.CANCELING); validateCancelRpcCalls(gateway, parallelism); + ExecutionGraphTestUtils.completeCancellingForAllVertices(eg); + + assertEquals(JobStatus.SUSPENDED, eg.getState()); + ensureCannotLeaveSuspendedState(eg, gateway); } /** - * Going into SUSPENDED out of RUNNING vertices should cancel all vertices once with RPC calls. + * Going into SUSPENDING out of RUNNING vertices should cancel all vertices once with RPC calls. */ @Test public void testSuspendedOutOfRunning() throws Exception { @@ -114,15 +119,21 @@ public void testSuspendedOutOfRunning() throws Exception { eg.suspend(new Exception("suspend")); - assertEquals(JobStatus.SUSPENDED, eg.getState()); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + validateAllVerticesInState(eg, ExecutionState.CANCELING); + validateCancelRpcCalls(gateway, parallelism); + ExecutionGraphTestUtils.completeCancellingForAllVertices(eg); + + assertEquals(JobStatus.SUSPENDED, eg.getState()); + ensureCannotLeaveSuspendedState(eg, gateway); } /** - * Suspending from FAILING goes to SUSPENDED and sends no additional RPC calls + * Suspending from FAILING goes to SUSPENDING and sends no additional RPC calls */ @Test public void testSuspendedOutOfFailing() throws Exception { @@ -140,10 +151,14 @@ public void testSuspendedOutOfFailing() throws Exception { // suspend eg.suspend(new Exception("suspend")); - assertEquals(JobStatus.SUSPENDED, eg.getState()); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + + ensureCannotLeaveSuspendingState(eg, gateway); ExecutionGraphTestUtils.completeCancellingForAllVertices(eg); + assertEquals(JobStatus.SUSPENDED, eg.getState()); + ensureCannotLeaveSuspendedState(eg, gateway); } @@ -176,7 +191,7 @@ public void testSuspendedOutOfFailed() throws Exception { } /** - * Suspending from CANCELING goes to SUSPENDED and sends no additional RPC calls. + * Suspending from CANCELING goes to SUSPENDING and sends no additional RPC calls. */ @Test public void testSuspendedOutOfCanceling() throws Exception { @@ -194,10 +209,14 @@ public void testSuspendedOutOfCanceling() throws Exception { // suspend eg.suspend(new Exception("suspend")); - assertEquals(JobStatus.SUSPENDED, eg.getState()); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + + ensureCannotLeaveSuspendingState(eg, gateway); ExecutionGraphTestUtils.completeCancellingForAllVertices(eg); + assertEquals(JobStatus.SUSPENDED, eg.getState()); + ensureCannotLeaveSuspendedState(eg, gateway); } @@ -280,6 +299,27 @@ private static void ensureCannotLeaveSuspendedState(ExecutionGraph eg, TaskManag } } + private static void ensureCannotLeaveSuspendingState(ExecutionGraph eg, TaskManagerGateway gateway) { + assertEquals(JobStatus.SUSPENDING, eg.getState()); + reset(gateway); + + eg.failGlobal(new Exception("fail")); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + verifyNoMoreInteractions(gateway); + + eg.cancel(); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + verifyNoMoreInteractions(gateway); + + eg.suspend(new Exception("suspend again")); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + verifyNoMoreInteractions(gateway); + + for (ExecutionVertex ev : eg.getAllExecutionVertices()) { + assertEquals(0, ev.getCurrentExecutionAttempt().getAttemptNumber()); + } + } + private static void validateAllVerticesInState(ExecutionGraph eg, ExecutionState expected) { for (ExecutionVertex ev : eg.getAllExecutionVertices()) { assertEquals(expected, ev.getCurrentExecutionAttempt().getState()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java index 8bdaff5cd8db2..3afd9fe336bf8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java @@ -248,7 +248,7 @@ public void testConcurrentAccess() throws Exception { /** * Tests that a cache entry is invalidated if the retrieved {@link AccessExecutionGraph} is in - * state {@link JobStatus#SUSPENDED}. + * state {@link JobStatus#SUSPENDING} or {@link JobStatus#SUSPENDED}. * *

This test can be removed once we no longer request the actual {@link ExecutionGraph} from the * {@link JobManager}. @@ -259,9 +259,11 @@ public void testCacheInvalidationIfSuspended() throws Exception { final Time timeToLive = Time.hours(1L); final JobID expectedJobId = new JobID(); + final ArchivedExecutionGraph suspendingExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.SUSPENDING).build(); final ArchivedExecutionGraph suspendedExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.SUSPENDED).build(); final ConcurrentLinkedQueue> requestJobAnswers = new ConcurrentLinkedQueue<>(); + requestJobAnswers.offer(CompletableFuture.completedFuture(suspendingExecutionGraph)); requestJobAnswers.offer(CompletableFuture.completedFuture(suspendedExecutionGraph)); requestJobAnswers.offer(CompletableFuture.completedFuture(expectedExecutionGraph)); @@ -278,17 +280,21 @@ public void testCacheInvalidationIfSuspended() throws Exception { try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + assertEquals(suspendingExecutionGraph, executionGraphFuture.get()); + + executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + assertEquals(suspendedExecutionGraph, executionGraphFuture.get()); - CompletableFuture executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(expectedExecutionGraph, executionGraphFuture2.get()); + assertEquals(expectedExecutionGraph, executionGraphFuture.get()); } } /** * Tests that a cache entry is invalidated if the retrieved {@link AccessExecutionGraph} changes its - * state to {@link JobStatus#SUSPENDED}. + * state to {@link JobStatus#SUSPENDING} or {@link JobStatus#SUSPENDED}. * *

This test can be removed once we no longer request the actual {@link ExecutionGraph} from the * {@link JobManager}. @@ -299,30 +305,39 @@ public void testCacheInvalidationIfSwitchToSuspended() throws Exception { final Time timeToLive = Time.hours(1L); final JobID expectedJobId = new JobID(); + final SuspendableAccessExecutionGraph toBeSuspendingExecutionGraph = new SuspendableAccessExecutionGraph(expectedJobId); final SuspendableAccessExecutionGraph toBeSuspendedExecutionGraph = new SuspendableAccessExecutionGraph(expectedJobId); final CountingRestfulGateway restfulGateway = createCountingRestfulGateway( expectedJobId, + CompletableFuture.completedFuture(toBeSuspendingExecutionGraph), CompletableFuture.completedFuture(toBeSuspendedExecutionGraph), CompletableFuture.completedFuture(expectedExecutionGraph)); try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + assertEquals(toBeSuspendingExecutionGraph, executionGraphFuture.get()); + + toBeSuspendingExecutionGraph.setJobStatus(JobStatus.SUSPENDING); + + // retrieve the same job from the cache again --> this should return it and invalidate the cache entry + executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + assertEquals(toBeSuspendedExecutionGraph, executionGraphFuture.get()); toBeSuspendedExecutionGraph.setJobStatus(JobStatus.SUSPENDED); // retrieve the same job from the cache again --> this should return it and invalidate the cache entry - CompletableFuture executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(expectedExecutionGraph, executionGraphFuture2.get()); + assertEquals(expectedExecutionGraph, executionGraphFuture.get()); - CompletableFuture executionGraphFuture3 = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(expectedExecutionGraph, executionGraphFuture3.get()); + assertEquals(expectedExecutionGraph, executionGraphFuture.get()); - assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(2)); + assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(3)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java index f8122eef1dab9..00829e6bf859b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; @@ -38,7 +39,7 @@ /** * Tests for the JobAccumulatorsHandler. */ -public class JobAccumulatorsHandlerTest { +public class JobAccumulatorsHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java index d173e0f090e81..2279cd832092d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -39,7 +40,7 @@ /** * Tests for the JobConfigHandler. */ -public class JobConfigHandlerTest { +public class JobConfigHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java index 2980a08cb51c6..dbfa8cc79e880 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -46,7 +47,7 @@ /** * Tests for the JobDetailsHandler. */ -public class JobDetailsHandlerTest { +public class JobDetailsHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java index 29e0819c61dea..9edaef1001a65 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; @@ -34,7 +35,7 @@ /** * Tests for the JobPlanHandler. */ -public class JobPlanHandlerTest { +public class JobPlanHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java index 97356f457788f..abb22e0eea2f7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; @@ -40,7 +41,7 @@ /** * Tests for the JobVertexAccumulatorsHandler. */ -public class JobVertexAccumulatorsHandlerTest { +public class JobVertexAccumulatorsHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java index 9cc294a719300..0c5217143ab7c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; @@ -42,7 +43,7 @@ /** * Tests for the JobVertexDetailsHandler. */ -public class JobVertexDetailsHandlerTest { +public class JobVertexDetailsHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java index ad3a95f9be718..92b0d8acee372 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java @@ -135,7 +135,7 @@ private static void generateArchivedJob() throws Exception { .setTasks(tasks) .setFailureCause(new ErrorInfo(new Exception("jobException"), originalAttempt.getStateTimestamp(ExecutionState.FAILED))) .setState(JobStatus.FINISHED) - .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) + .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}) .setArchivedUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2}) .build(); } From 7518bd5f0ba9d4319e2d3072fa6ec078c08e602a Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 13 Feb 2018 16:14:41 +0100 Subject: [PATCH 3/3] [FLINK-8629] [flip6] Allow JobMaster to rescale jobs This commit adds the functionality to rescale a job or parts of it to the JobMaster. In order to rescale a job, the JobMaster does the following: 1. Take a savepoint 2. Create a rescaled ExecutionGraph from the JobGraph 3. Initialize it with the taken savepoint 4. Suspend the old ExecutionGraph 5. Restart the new ExecutionGraph once the old ExecutionGraph has been suspended This closes #5446. --- .../function/BiConsumerWithException.java | 50 ++++ .../flink/runtime/checkpoint/Checkpoints.java | 2 +- .../flink/runtime/jobmaster/JobMaster.java | 219 ++++++++++++++++-- .../jobmaster/JobMasterConfiguration.java | 12 + .../runtime/jobmaster/JobMasterGateway.java | 28 +++ .../runtime/jobmaster/RescalingBehaviour.java | 49 ++++ .../exceptions/JobMasterException.java | 41 ++++ .../exceptions/JobModificationException.java | 39 ++++ .../utils/TestingJobMasterGateway.java | 11 + 9 files changed, 434 insertions(+), 17 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java diff --git a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java new file mode 100644 index 0000000000000..6ab11618b1f22 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util.function; + +import java.util.function.BiConsumer; + +/** + * A checked extension of the {@link BiConsumer} interface. + * + * @param type of the first argument + * @param type of the second argument + * @param type of the thrown exception + */ +@FunctionalInterface +public interface BiConsumerWithException extends BiConsumer { + + /** + * Performs this operation on the given arguments. + * + * @param t the first input argument + * @param u the second input argument + * @throws E in case of an error + */ + void acceptWithException(T t, U u) throws E; + + @Override + default void accept(T t, U u) { + try { + acceptWithException(t, u); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java index 47efa6facf65c..72b7c53ab9589 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java @@ -246,7 +246,7 @@ public static void disposeSavepoint( try (InputStream in = metadataHandle.openInputStream(); DataInputStream dis = new DataInputStream(in)) { - savepoint = loadCheckpointMetadata(dis, classLoader); + savepoint = loadCheckpointMetadata(dis, classLoader); } Exception exception = null; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 2a4b88194820f..22c69f5c4ee5b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -26,13 +26,15 @@ import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.queryablestate.KvStateID; +import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.StoppingException; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.Checkpoints; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; @@ -56,10 +58,12 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; +import org.apache.flink.runtime.jobmaster.exceptions.JobModificationException; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway; @@ -106,6 +110,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -115,6 +120,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -174,9 +180,6 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast private final ClassLoader userCodeLoader; - /** The execution graph of this job. */ - private final ExecutionGraph executionGraph; - private final SlotPool slotPool; private final SlotPoolGateway slotPoolGateway; @@ -201,6 +204,11 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast private final Map> registeredTaskManagers; + // -------- Mutable fields --------- + + /** The execution graph of this job. */ + private ExecutionGraph executionGraph; + // ------------------------------------------------------------------------ public JobMaster( @@ -268,8 +276,6 @@ public JobMaster( log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid); - CheckpointRecoveryFactory checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory(); - resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever(); this.slotPool = new SlotPool( @@ -289,7 +295,7 @@ public JobMaster( scheduledExecutorService, slotPool.getSlotProvider(), userCodeLoader, - checkpointRecoveryFactory, + highAvailabilityServices.getCheckpointRecoveryFactory(), rpcTimeout, restartStrategy, jobMetricGroup, @@ -447,6 +453,165 @@ public CompletableFuture stop(Time timeout) { return CompletableFuture.completedFuture(Acknowledge.get()); } + @Override + public CompletableFuture rescaleJob( + int newParallelism, + RescalingBehaviour rescalingBehaviour, + Time timeout) { + final ArrayList allOperators = new ArrayList<>(jobGraph.getNumberOfVertices()); + + for (JobVertex jobVertex : jobGraph.getVertices()) { + allOperators.add(jobVertex.getID()); + } + + return rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout); + } + + @Override + public CompletableFuture rescaleOperators( + Collection operators, + int newParallelism, + RescalingBehaviour rescalingBehaviour, + Time timeout) { + // 1. Check whether we can rescale the job & rescale the respective vertices + for (JobVertexID jobVertexId : operators) { + final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId); + + // update max parallelism in case that it has not been configure + final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + if (executionJobVertex != null) { + jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism()); + } + + try { + rescalingBehaviour.acceptWithException(jobVertex, newParallelism); + } catch (FlinkException e) { + final String msg = String.format("Cannot rescale job %s.", jobGraph.getName()); + + log.info(msg, e); + + return FutureUtils.completedExceptionally( + new JobModificationException(msg, e)); + } + } + + final ExecutionGraph currentExecutionGraph = executionGraph; + + final ExecutionGraph newExecutionGraph; + + try { + newExecutionGraph = ExecutionGraphBuilder.buildGraph( + null, + jobGraph, + jobMasterConfiguration.getConfiguration(), + scheduledExecutorService, + scheduledExecutorService, + slotPool.getSlotProvider(), + userCodeLoader, + highAvailabilityServices.getCheckpointRecoveryFactory(), + rpcTimeout, + currentExecutionGraph.getRestartStrategy(), + jobMetricGroup, + 1, + blobServer, + jobMasterConfiguration.getSlotRequestTimeout(), + log); + } catch (JobExecutionException | JobException e) { + return FutureUtils.completedExceptionally( + new JobModificationException("Could not create rescaled ExecutionGraph.", e)); + } + + // 3. disable checkpoint coordinator to suppress subsequent checkpoints + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + checkpointCoordinator.stopCheckpointScheduler(); + + // 4. take a savepoint + final CompletableFuture savepointFuture = triggerSavepoint( + jobMasterConfiguration.getTmpDirectory(), + timeout); + + final CompletableFuture executionGraphFuture = savepointFuture + .thenApplyAsync( + (String savepointPath) -> { + try { + newExecutionGraph.getCheckpointCoordinator().restoreSavepoint( + savepointPath, + false, + newExecutionGraph.getAllVertices(), + userCodeLoader); + } catch (Exception e) { + disposeSavepoint(savepointPath); + + throw new CompletionException(new JobModificationException("Could not restore from temporary rescaling savepoint.", e)); + } + + // delete the savepoint file once we reach a terminal state + newExecutionGraph.getTerminationFuture() + .whenCompleteAsync( + (JobStatus jobStatus, Throwable throwable) -> disposeSavepoint(savepointPath), + scheduledExecutorService); + + return newExecutionGraph; + }, scheduledExecutorService) + .exceptionally( + (Throwable failure) -> { + // in case that we couldn't take a savepoint or restore from it, let's restart the checkpoint + // coordinator and abort the rescaling operation + if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) { + checkpointCoordinator.startCheckpointScheduler(); + } + + throw new CompletionException(failure); + }); + + // 5. suspend the current job + final CompletableFuture terminationFuture = executionGraphFuture.thenComposeAsync( + (ExecutionGraph ignored) -> { + currentExecutionGraph.suspend(new FlinkException("Job is being rescaled.")); + return currentExecutionGraph.getTerminationFuture(); + }, + getMainThreadExecutor()); + + final CompletableFuture suspendedFuture = terminationFuture.thenAccept( + (JobStatus jobStatus) -> { + if (jobStatus != JobStatus.SUSPENDED) { + final String msg = String.format("Job %s rescaling failed because we could not suspend the execution graph.", jobGraph.getName()); + log.info(msg); + throw new CompletionException(new JobModificationException(msg)); + } + }); + + // 6. resume the new execution graph from the taken savepoint + final CompletableFuture rescalingFuture = suspendedFuture.thenCombineAsync( + executionGraphFuture, + (Void ignored, ExecutionGraph restoredExecutionGraph) -> { + // check if the ExecutionGraph is still the same + //noinspection ObjectEquality + if (executionGraph == currentExecutionGraph) { + executionGraph = restoredExecutionGraph; + + scheduleExecutionGraph(); + + return Acknowledge.get(); + } else { + throw new CompletionException(new JobModificationException("Detected concurrent modification of ExecutionGraph. Aborting the resacling.")); + } + + }, + getMainThreadExecutor()); + + rescalingFuture.whenComplete( + (Acknowledge ignored, Throwable throwable) -> { + if (throwable != null) { + // fail the newly created execution graph + newExecutionGraph.failGlobal(new FlinkException("Failed to rescale the job " + jobGraph.getJobID() + '.', throwable)); + } + }); + + return rescalingFuture; + } + /** * Updates the task execution state for a given task. * @@ -912,15 +1077,7 @@ private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Excepti } // start scheduling job in another thread - scheduledExecutorService.execute( - () -> { - try { - executionGraph.scheduleForExecution(); - } - catch (Throwable t) { - executionGraph.failGlobal(t); - } - }); + scheduledExecutorService.execute(this::scheduleExecutionGraph); return Acknowledge.get(); } @@ -963,6 +1120,36 @@ private Acknowledge suspendExecution(final Exception cause) { return Acknowledge.get(); } + /** + * Schedules the execution of the current {@link ExecutionGraph}. + */ + private void scheduleExecutionGraph() { + try { + executionGraph.scheduleForExecution(); + } + catch (Throwable t) { + executionGraph.failGlobal(t); + } + } + + /** + * Dispose the savepoint stored under the given path. + * + * @param savepointPath path where the savepoint is stored + */ + private void disposeSavepoint(String savepointPath) { + try { + // delete the temporary savepoint + Checkpoints.disposeSavepoint( + savepointPath, + jobMasterConfiguration.getConfiguration(), + userCodeLoader, + log); + } catch (FlinkException | IOException de) { + log.info("Could not dispose temporary rescaling savepoint under {}.", savepointPath, de); + } + } + //---------------------------------------------------------------------------------------------- private void handleFatalError(final Throwable cause) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java index 15a30e26a2b37..5a4e3b37e84df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; @@ -36,16 +37,20 @@ public class JobMasterConfiguration { private final Time slotIdleTimeout; + private final String tmpDirectory; + private final Configuration configuration; public JobMasterConfiguration( Time rpcTimeout, Time slotRequestTimeout, Time slotIdleTimeout, + String tmpDirectory, Configuration configuration) { this.rpcTimeout = Preconditions.checkNotNull(rpcTimeout); this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout); this.slotIdleTimeout = Preconditions.checkNotNull(slotIdleTimeout); + this.tmpDirectory = Preconditions.checkNotNull(tmpDirectory); this.configuration = Preconditions.checkNotNull(configuration); } @@ -61,6 +66,10 @@ public Time getSlotIdleTimeout() { return slotIdleTimeout; } + public String getTmpDirectory() { + return tmpDirectory; + } + public Configuration getConfiguration() { return configuration; } @@ -78,10 +87,13 @@ public static JobMasterConfiguration fromConfiguration(Configuration configurati final Time slotRequestTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT)); final Time slotIdleTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT)); + final String tmpDirectory = ConfigurationUtils.parseTempDirectories(configuration)[0]; + return new JobMasterConfiguration( rpcTimeout, slotRequestTimeout, slotIdleTimeout, + tmpDirectory, configuration); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 0dcf3fb4b687a..fb53237ea8145 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -72,6 +72,34 @@ public interface JobMasterGateway extends */ CompletableFuture stop(@RpcTimeout Time timeout); + /** + * Triggers rescaling of the executed job. + * + * @param newParallelism new parallelism of the job + * @param rescalingBehaviour defining how strict the rescaling has to be executed + * @param timeout of this operation + * @return Future which is completed with {@link Acknowledge} once the rescaling was successful + */ + CompletableFuture rescaleJob( + int newParallelism, + RescalingBehaviour rescalingBehaviour, + @RpcTimeout Time timeout); + + /** + * Triggers rescaling of the given set of operators. + * + * @param operators set of operators which shall be rescaled + * @param newParallelism new parallelism of the given set of operators + * @param rescalingBehaviour defining how strict the rescaling has to be executed + * @param timeout of this operation + * @return Future which is completed with {@link Acknowledge} once the rescaling was successful + */ + CompletableFuture rescaleOperators( + Collection operators, + int newParallelism, + RescalingBehaviour rescalingBehaviour, + @RpcTimeout Time timeout); + /** * Updates the task execution state for a given task. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java new file mode 100644 index 0000000000000..7de956081d8c9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.BiConsumerWithException; + +/** + * Definition of the rescaling behaviour. + */ +public enum RescalingBehaviour implements BiConsumerWithException { + // rescaling is only executed if the operator can be set to the given parallelism + STRICT { + @Override + public void acceptWithException(JobVertex jobVertex, Integer newParallelism) throws FlinkException { + if (jobVertex.getMaxParallelism() < newParallelism) { + throw new FlinkException("Cannot rescale vertex " + jobVertex.getName() + + " because its maximum parallelism " + jobVertex.getMaxParallelism() + + " is smaller than the new parallelism " + newParallelism + '.'); + } else { + jobVertex.setParallelism(newParallelism); + } + } + }, + // the new parallelism will be the minimum of the given parallelism and the maximum parallelism + RELAXED { + @Override + public void acceptWithException(JobVertex jobVertex, Integer newParallelism) { + jobVertex.setParallelism(Math.min(jobVertex.getMaxParallelism(), newParallelism)); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java new file mode 100644 index 0000000000000..a7b62e14538b2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.exceptions; + +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.util.FlinkException; + +/** + * Base class for all {@link JobMaster} related exceptions. + */ +public class JobMasterException extends FlinkException { + private static final long serialVersionUID = 2941885469739200908L; + + public JobMasterException(String message) { + super(message); + } + + public JobMasterException(Throwable cause) { + super(cause); + } + + public JobMasterException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java new file mode 100644 index 0000000000000..e08ec625734ec --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.exceptions; + +/** + * Base class for all exception which originate from a failed job modification. + */ +public class JobModificationException extends JobMasterException { + + private static final long serialVersionUID = 2374146694058970746L; + + public JobModificationException(String message) { + super(message); + } + + public JobModificationException(Throwable cause) { + super(cause); + } + + public JobModificationException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java index 168b32bf10736..cac7e90bd09b3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.RescalingBehaviour; import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; import org.apache.flink.runtime.messages.Acknowledge; @@ -66,6 +67,16 @@ public CompletableFuture stop(Time timeout) { throw new UnsupportedOperationException(); } + @Override + public CompletableFuture rescaleJob(int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture rescaleOperators(Collection operators, int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture updateTaskExecutionState(TaskExecutionState taskExecutionState) { throw new UnsupportedOperationException();