From e8d6f397c471842dc412c2f8636c7127067a2e44 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 1 Feb 2018 14:37:15 +0100 Subject: [PATCH 1/7] [hotfix] Fix checkstyle violations in ExecutionGraph --- .../executiongraph/ExecutionGraph.java | 56 ++++++++++--------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 22e5c9282f1b8..beb3ead54074b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -131,7 +131,7 @@ * *

Global and local failover

* - * The Execution Graph has two failover modes: global failover and local failover. + *

The Execution Graph has two failover modes: global failover and local failover. * *

A global failover aborts the task executions for all vertices and restarts whole * data flow graph from the last completed checkpoint. Global failover is considered the @@ -154,7 +154,7 @@ public class ExecutionGraph implements AccessExecutionGraph { /** In place updater for the execution graph's current state. Avoids having to use an - * AtomicReference and thus makes the frequent read access a bit faster */ + * AtomicReference and thus makes the frequent read access a bit faster. */ private static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state"); @@ -175,38 +175,38 @@ public class ExecutionGraph implements AccessExecutionGraph { /** Job specific information like the job id, job name, job configuration, etc. */ private final JobInformation jobInformation; - /** Serialized job information or a blob key pointing to the offloaded job information */ + /** Serialized job information or a blob key pointing to the offloaded job information. */ private final Either, PermanentBlobKey> jobInformationOrBlobKey; /** The executor which is used to execute futures. */ private final ScheduledExecutorService futureExecutor; - /** The executor which is used to execute blocking io operations */ + /** The executor which is used to execute blocking io operations. */ private final Executor ioExecutor; /** {@code true} if all source tasks are stoppable. */ private boolean isStoppable = true; - /** All job vertices that are part of this graph */ + /** All job vertices that are part of this graph. */ private final ConcurrentHashMap tasks; - /** All vertices, in the order in which they were created **/ + /** All vertices, in the order in which they were created. **/ private final List verticesInCreationOrder; - /** All intermediate results that are part of this graph */ + /** All intermediate results that are part of this graph. */ private final ConcurrentHashMap intermediateResults; - /** The currently executed tasks, for callbacks */ + /** The currently executed tasks, for callbacks. */ private final ConcurrentHashMap currentExecutions; /** Listeners that receive messages when the entire job switches it status - * (such as from RUNNING to FINISHED) */ + * (such as from RUNNING to FINISHED). */ private final List jobStatusListeners; - /** Listeners that receive messages whenever a single task execution changes its status */ + /** Listeners that receive messages whenever a single task execution changes its status. */ private final List executionListeners; - /** The implementation that decides how to recover the failures of tasks */ + /** The implementation that decides how to recover the failures of tasks. */ private final FailoverStrategy failoverStrategy; /** Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when @@ -215,28 +215,28 @@ public class ExecutionGraph implements AccessExecutionGraph { * at {@code stateTimestamps[RUNNING.ordinal()]}. */ private final long[] stateTimestamps; - /** The timeout for all messages that require a response/acknowledgement */ + /** The timeout for all messages that require a response/acknowledgement. */ private final Time rpcTimeout; /** The timeout for slot allocations. */ private final Time allocationTimeout; - /** Strategy to use for restarts */ + /** Strategy to use for restarts. */ private final RestartStrategy restartStrategy; - /** The slot provider to use for allocating slots for tasks as they are needed */ + /** The slot provider to use for allocating slots for tasks as they are needed. */ private final SlotProvider slotProvider; - /** The classloader for the user code. Needed for calls into user code classes */ + /** The classloader for the user code. Needed for calls into user code classes. */ private final ClassLoader userClassLoader; /** Registered KvState instances reported by the TaskManagers. */ private final KvStateLocationRegistry kvStateLocationRegistry; - /** Blob writer used to offload RPC messages */ + /** Blob writer used to offload RPC messages. */ private final BlobWriter blobWriter; - /** The total number of vertices currently in the execution graph */ + /** The total number of vertices currently in the execution graph. */ private int numVerticesTotal; // ------ Configuration of the Execution ------- @@ -254,18 +254,18 @@ public class ExecutionGraph implements AccessExecutionGraph { private final AtomicInteger verticesFinished; - /** Current status of the job execution */ + /** Current status of the job execution. */ private volatile JobStatus state = JobStatus.CREATED; - /** A future that completes once the job has reached a terminal state */ + /** A future that completes once the job has reached a terminal state. */ private volatile CompletableFuture terminationFuture; /** On each global recovery, this version is incremented. The version breaks conflicts - * between concurrent restart attempts by local failover strategies */ + * between concurrent restart attempts by local failover strategies. */ private volatile long globalModVersion; /** The exception that caused the job to fail. This is set to the first root exception - * that was not recoverable and triggered job failure */ + * that was not recoverable and triggered job failure. */ private volatile Throwable failureCause; /** The extended failure cause information for the job. This exists in addition to 'failureCause', @@ -281,7 +281,7 @@ public class ExecutionGraph implements AccessExecutionGraph { // ------ Fields that are relevant to the execution and need to be cleared before archiving ------- - /** The coordinator for checkpoints, if snapshot checkpoints are enabled */ + /** The coordinator for checkpoints, if snapshot checkpoints are enabled. */ private CheckpointCoordinator checkpointCoordinator; /** Checkpoint stats tracker separate from the coordinator in order to be @@ -578,7 +578,8 @@ private ExecutionVertex[] collectExecutionVertices(List jobV // -------------------------------------------------------------------------------------------- /** - * Returns a list of BLOB keys referring to the JAR files required to run this job + * Returns a list of BLOB keys referring to the JAR files required to run this job. + * * @return list of BLOB keys referring to the JAR files required to run this job */ public Collection getRequiredJarFiles() { @@ -586,7 +587,8 @@ public Collection getRequiredJarFiles() { } /** - * Returns a list of classpaths referring to the directories/JAR files required to run this job + * Returns a list of classpaths referring to the directories/JAR files required to run this job. + * * @return list of classpaths referring to the directories/JAR files required to run this job */ public Collection getRequiredClasspaths() { @@ -1083,10 +1085,10 @@ public void stop() throws StoppingException { /** * Suspends the current ExecutionGraph. * - * The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal + *

The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal * state. All ExecutionJobVertices will be canceled and the onTerminalState() is executed. * - * The SUSPENDED state is a local terminal state which stops the execution of the job but does + *

The SUSPENDED state is a local terminal state which stops the execution of the job but does * not remove the job from the HA job store so that it can be recovered by another JobManager. * * @param suspensionCause Cause of the suspension @@ -1533,7 +1535,7 @@ private void onTerminalState(JobStatus status) { /** * Updates the state of one of the ExecutionVertex's Execution attempts. * If the new status if "FINISHED", this also updates the accumulators. - * + * * @param state The state update. * @return True, if the task update was properly applied, false, if the execution attempt was not found. */ From 7e96a248587ddbf2eb0ae445eb3079a4b2e4753f Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 1 Feb 2018 18:04:06 +0100 Subject: [PATCH 2/7] [FLINK-8627] Introduce new JobStatus#SUSPENDING to ExecutionGraph The new JobStatus#SUSPENDING says that an ExecutionGraph has been suspended but its clean up has not been done yet. Only after all Executions have been canceled, the ExecutionGraph will enter the SUSPENDED state and complete the termination future accordingly. This closes #5445. --- .../executiongraph/ExecutionGraph.java | 46 ++++++++++----- .../flink/runtime/jobgraph/JobStatus.java | 3 + .../ZooKeeperSubmittedJobGraphStore.java | 2 +- .../handler/legacy/ExecutionGraphCache.java | 16 +++-- .../ArchivedExecutionGraphTest.java | 1 + .../ExecutionGraphSuspendTest.java | 58 ++++++++++++++++--- .../legacy/ExecutionGraphCacheTest.java | 33 ++++++++--- .../legacy/JobAccumulatorsHandlerTest.java | 3 +- .../handler/legacy/JobConfigHandlerTest.java | 3 +- .../handler/legacy/JobDetailsHandlerTest.java | 3 +- .../handler/legacy/JobPlanHandlerTest.java | 3 +- .../JobVertexAccumulatorsHandlerTest.java | 3 +- .../legacy/JobVertexDetailsHandlerTest.java | 3 +- .../utils/ArchivedJobGenerationUtils.java | 2 +- 14 files changed, 133 insertions(+), 46 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index beb3ead54074b..9313466474234 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -1085,10 +1085,10 @@ public void stop() throws StoppingException { /** * Suspends the current ExecutionGraph. * - *

The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal + *

The JobStatus will be directly set to SUSPENDING iff the current state is not a terminal * state. All ExecutionJobVertices will be canceled and the onTerminalState() is executed. * - *

The SUSPENDED state is a local terminal state which stops the execution of the job but does + *

The SUSPENDING state is a local terminal state which stops the execution of the job but does * not remove the job from the HA job store so that it can be recovered by another JobManager. * * @param suspensionCause Cause of the suspension @@ -1097,10 +1097,10 @@ public void suspend(Throwable suspensionCause) { while (true) { JobStatus currentState = state; - if (currentState.isTerminalState()) { + if (currentState.isTerminalState() || currentState == JobStatus.SUSPENDING) { // stay in a terminal state return; - } else if (transitionState(currentState, JobStatus.SUSPENDED, suspensionCause)) { + } else if (transitionState(currentState, JobStatus.SUSPENDING, suspensionCause)) { initFailureCause(suspensionCause); // make sure no concurrent local actions interfere with the cancellation @@ -1112,16 +1112,25 @@ public void suspend(Throwable suspensionCause) { if (ongoingSchedulingFuture != null) { ongoingSchedulingFuture.cancel(false); } + final ArrayList> executionJobVertexTerminationFutures = new ArrayList<>(verticesInCreationOrder.size()); for (ExecutionJobVertex ejv: verticesInCreationOrder) { - ejv.cancel(); + executionJobVertexTerminationFutures.add(ejv.cancelWithFuture()); } - synchronized (progressLock) { - onTerminalState(JobStatus.SUSPENDED); + final ConjunctFuture jobVerticesTerminationFuture = FutureUtils.waitForAll(executionJobVertexTerminationFutures); - LOG.info("Job {} has been suspended.", getJobID()); - } + jobVerticesTerminationFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + if (throwable != null) { + LOG.debug("Flink could not properly clean up resource after suspension.", throwable); + } + + // the globalModVersion does not play a role because there is no way + // currently to leave the SUSPENDING state + allVerticesInTerminalState(-1L); + LOG.info("Job {} has been suspended.", getJobID()); + }); return; } @@ -1144,6 +1153,7 @@ public void failGlobal(Throwable t) { JobStatus current = state; // stay in these states if (current == JobStatus.FAILING || + current == JobStatus.SUSPENDING || current == JobStatus.SUSPENDED || current.isGloballyTerminalState()) { return; @@ -1216,7 +1226,7 @@ public void restart(long expectedGlobalVersion) { } else if (current == JobStatus.FAILED) { LOG.info("Failed job during restart. Aborting restart."); return; - } else if (current == JobStatus.SUSPENDED) { + } else if (current == JobStatus.SUSPENDING || current == JobStatus.SUSPENDED) { LOG.info("Suspended job during restart. Aborting restart."); return; } else if (current != JobStatus.RESTARTING) { @@ -1301,7 +1311,13 @@ public ArchivedExecutionConfig getArchivedExecutionConfig() { return null; } - @VisibleForTesting + /** + * Returns the termination future of this {@link ExecutionGraph}. The termination future + * is completed with the terminal {@link JobStatus} once the ExecutionGraph reaches this + * terminal state and all {@link Execution} have been terminated. + * + * @return Termination future of this {@link ExecutionGraph}. + */ public CompletableFuture getTerminationFuture() { return terminationFuture; } @@ -1441,9 +1457,11 @@ else if (current == JobStatus.FAILING) { } // concurrent job status change, let's check again } - else if (current == JobStatus.SUSPENDED) { - // we've already cleaned up when entering the SUSPENDED state - break; + else if (current == JobStatus.SUSPENDING) { + if (transitionState(current, JobStatus.SUSPENDED)) { + onTerminalState(JobStatus.SUSPENDED); + break; + } } else if (current.isGloballyTerminalState()) { LOG.warn("Job has entered globally terminal state without waiting for all " + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java index 4ef86bd0ae7a7..c04528eabaaa7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java @@ -47,6 +47,9 @@ public enum JobStatus { /** The job is currently undergoing a reset and total restart */ RESTARTING(TerminalState.NON_TERMINAL), + /** The job has been suspended and is currently waiting for the cleanup to complete */ + SUSPENDING(TerminalState.NON_TERMINAL), + /** * The job has been suspended which means that it has been stopped but not been removed from a * potential HA job store. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java index f31c970a8dcc7..a60a40d842622 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java @@ -372,7 +372,7 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) break; case CONNECTION_SUSPENDED: { - LOG.warn("ZooKeeper connection SUSPENDED. Changes to the submitted job " + + LOG.warn("ZooKeeper connection SUSPENDING. Changes to the submitted job " + "graphs are not monitored (temporarily)."); } break; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java index 19186c45b026e..382e87e3a8c99 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java @@ -91,19 +91,22 @@ public CompletableFuture getExecutionGraph(JobID jobId, Re if (oldEntry != null) { if (currentTime < oldEntry.getTTL()) { - if (oldEntry.getExecutionGraphFuture().isDone() && !oldEntry.getExecutionGraphFuture().isCompletedExceptionally()) { + final CompletableFuture executionGraphFuture = oldEntry.getExecutionGraphFuture(); + if (executionGraphFuture.isDone() && !executionGraphFuture.isCompletedExceptionally()) { // TODO: Remove once we no longer request the actual ExecutionGraph from the JobManager but only the ArchivedExecutionGraph try { - if (oldEntry.getExecutionGraphFuture().get().getState() != JobStatus.SUSPENDED) { - return oldEntry.getExecutionGraphFuture(); + final AccessExecutionGraph executionGraph = executionGraphFuture.get(); + if (executionGraph.getState() != JobStatus.SUSPENDING && + executionGraph.getState() != JobStatus.SUSPENDED) { + return executionGraphFuture; } // send a new request to get the ExecutionGraph from the new leader } catch (InterruptedException | ExecutionException e) { throw new RuntimeException("Could not retrieve ExecutionGraph from the orderly completed future. This should never happen.", e); } - } else if (!oldEntry.getExecutionGraphFuture().isDone()) { - return oldEntry.getExecutionGraphFuture(); + } else if (!executionGraphFuture.isDone()) { + return executionGraphFuture; } // otherwise it must be completed exceptionally } @@ -135,7 +138,8 @@ public CompletableFuture getExecutionGraph(JobID jobId, Re newEntry.getExecutionGraphFuture().complete(executionGraph); // TODO: Remove once we no longer request the actual ExecutionGraph from the JobManager but only the ArchivedExecutionGraph - if (executionGraph.getState() == JobStatus.SUSPENDED) { + if (executionGraph.getState() == JobStatus.SUSPENDING || + executionGraph.getState() == JobStatus.SUSPENDED) { // remove the entry in case of suspension --> triggers new request when accessed next time cachedExecutionGraphs.remove(jobId, newEntry); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 8bc5170b38c54..f15dca1fcc9c6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -172,6 +172,7 @@ private static void compareExecutionGraph(AccessExecutionGraph runtimeGraph, Acc assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CANCELED), archivedGraph.getStatusTimestamp(JobStatus.CANCELED)); assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FINISHED), archivedGraph.getStatusTimestamp(JobStatus.FINISHED)); assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.RESTARTING), archivedGraph.getStatusTimestamp(JobStatus.RESTARTING)); + assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.SUSPENDING), archivedGraph.getStatusTimestamp(JobStatus.SUSPENDING)); assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.SUSPENDED), archivedGraph.getStatusTimestamp(JobStatus.SUSPENDED)); assertEquals(runtimeGraph.isStoppable(), archivedGraph.isStoppable()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java index 52d4c818041f0..1b19c53965250 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java @@ -49,7 +49,7 @@ public class ExecutionGraphSuspendTest extends TestLogger { /** - * Going into SUSPENDED out of CREATED should immediately cancel everything and + * Going into SUSPENDING out of CREATED should immediately cancel everything and * not send out RPC calls. */ @Test @@ -72,7 +72,7 @@ public void testSuspendedOutOfCreated() throws Exception { } /** - * Going into SUSPENDED out of DEPLOYING vertices should cancel all vertices once with RPC calls. + * Going into SUSPENDING out of DEPLOYING vertices should cancel all vertices once with RPC calls. */ @Test public void testSuspendedOutOfDeploying() throws Exception { @@ -88,15 +88,20 @@ public void testSuspendedOutOfDeploying() throws Exception { eg.suspend(new Exception("suspend")); - assertEquals(JobStatus.SUSPENDED, eg.getState()); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + validateAllVerticesInState(eg, ExecutionState.CANCELING); validateCancelRpcCalls(gateway, parallelism); + ExecutionGraphTestUtils.completeCancellingForAllVertices(eg); + + assertEquals(JobStatus.SUSPENDED, eg.getState()); + ensureCannotLeaveSuspendedState(eg, gateway); } /** - * Going into SUSPENDED out of RUNNING vertices should cancel all vertices once with RPC calls. + * Going into SUSPENDING out of RUNNING vertices should cancel all vertices once with RPC calls. */ @Test public void testSuspendedOutOfRunning() throws Exception { @@ -114,15 +119,21 @@ public void testSuspendedOutOfRunning() throws Exception { eg.suspend(new Exception("suspend")); - assertEquals(JobStatus.SUSPENDED, eg.getState()); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + validateAllVerticesInState(eg, ExecutionState.CANCELING); + validateCancelRpcCalls(gateway, parallelism); + ExecutionGraphTestUtils.completeCancellingForAllVertices(eg); + + assertEquals(JobStatus.SUSPENDED, eg.getState()); + ensureCannotLeaveSuspendedState(eg, gateway); } /** - * Suspending from FAILING goes to SUSPENDED and sends no additional RPC calls + * Suspending from FAILING goes to SUSPENDING and sends no additional RPC calls */ @Test public void testSuspendedOutOfFailing() throws Exception { @@ -140,10 +151,14 @@ public void testSuspendedOutOfFailing() throws Exception { // suspend eg.suspend(new Exception("suspend")); - assertEquals(JobStatus.SUSPENDED, eg.getState()); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + + ensureCannotLeaveSuspendingState(eg, gateway); ExecutionGraphTestUtils.completeCancellingForAllVertices(eg); + assertEquals(JobStatus.SUSPENDED, eg.getState()); + ensureCannotLeaveSuspendedState(eg, gateway); } @@ -176,7 +191,7 @@ public void testSuspendedOutOfFailed() throws Exception { } /** - * Suspending from CANCELING goes to SUSPENDED and sends no additional RPC calls. + * Suspending from CANCELING goes to SUSPENDING and sends no additional RPC calls. */ @Test public void testSuspendedOutOfCanceling() throws Exception { @@ -194,10 +209,14 @@ public void testSuspendedOutOfCanceling() throws Exception { // suspend eg.suspend(new Exception("suspend")); - assertEquals(JobStatus.SUSPENDED, eg.getState()); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + + ensureCannotLeaveSuspendingState(eg, gateway); ExecutionGraphTestUtils.completeCancellingForAllVertices(eg); + assertEquals(JobStatus.SUSPENDED, eg.getState()); + ensureCannotLeaveSuspendedState(eg, gateway); } @@ -280,6 +299,27 @@ private static void ensureCannotLeaveSuspendedState(ExecutionGraph eg, TaskManag } } + private static void ensureCannotLeaveSuspendingState(ExecutionGraph eg, TaskManagerGateway gateway) { + assertEquals(JobStatus.SUSPENDING, eg.getState()); + reset(gateway); + + eg.failGlobal(new Exception("fail")); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + verifyNoMoreInteractions(gateway); + + eg.cancel(); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + verifyNoMoreInteractions(gateway); + + eg.suspend(new Exception("suspend again")); + assertEquals(JobStatus.SUSPENDING, eg.getState()); + verifyNoMoreInteractions(gateway); + + for (ExecutionVertex ev : eg.getAllExecutionVertices()) { + assertEquals(0, ev.getCurrentExecutionAttempt().getAttemptNumber()); + } + } + private static void validateAllVerticesInState(ExecutionGraph eg, ExecutionState expected) { for (ExecutionVertex ev : eg.getAllExecutionVertices()) { assertEquals(expected, ev.getCurrentExecutionAttempt().getState()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java index 8bdaff5cd8db2..3afd9fe336bf8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java @@ -248,7 +248,7 @@ public void testConcurrentAccess() throws Exception { /** * Tests that a cache entry is invalidated if the retrieved {@link AccessExecutionGraph} is in - * state {@link JobStatus#SUSPENDED}. + * state {@link JobStatus#SUSPENDING} or {@link JobStatus#SUSPENDED}. * *

This test can be removed once we no longer request the actual {@link ExecutionGraph} from the * {@link JobManager}. @@ -259,9 +259,11 @@ public void testCacheInvalidationIfSuspended() throws Exception { final Time timeToLive = Time.hours(1L); final JobID expectedJobId = new JobID(); + final ArchivedExecutionGraph suspendingExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.SUSPENDING).build(); final ArchivedExecutionGraph suspendedExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.SUSPENDED).build(); final ConcurrentLinkedQueue> requestJobAnswers = new ConcurrentLinkedQueue<>(); + requestJobAnswers.offer(CompletableFuture.completedFuture(suspendingExecutionGraph)); requestJobAnswers.offer(CompletableFuture.completedFuture(suspendedExecutionGraph)); requestJobAnswers.offer(CompletableFuture.completedFuture(expectedExecutionGraph)); @@ -278,17 +280,21 @@ public void testCacheInvalidationIfSuspended() throws Exception { try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + assertEquals(suspendingExecutionGraph, executionGraphFuture.get()); + + executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + assertEquals(suspendedExecutionGraph, executionGraphFuture.get()); - CompletableFuture executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(expectedExecutionGraph, executionGraphFuture2.get()); + assertEquals(expectedExecutionGraph, executionGraphFuture.get()); } } /** * Tests that a cache entry is invalidated if the retrieved {@link AccessExecutionGraph} changes its - * state to {@link JobStatus#SUSPENDED}. + * state to {@link JobStatus#SUSPENDING} or {@link JobStatus#SUSPENDED}. * *

This test can be removed once we no longer request the actual {@link ExecutionGraph} from the * {@link JobManager}. @@ -299,30 +305,39 @@ public void testCacheInvalidationIfSwitchToSuspended() throws Exception { final Time timeToLive = Time.hours(1L); final JobID expectedJobId = new JobID(); + final SuspendableAccessExecutionGraph toBeSuspendingExecutionGraph = new SuspendableAccessExecutionGraph(expectedJobId); final SuspendableAccessExecutionGraph toBeSuspendedExecutionGraph = new SuspendableAccessExecutionGraph(expectedJobId); final CountingRestfulGateway restfulGateway = createCountingRestfulGateway( expectedJobId, + CompletableFuture.completedFuture(toBeSuspendingExecutionGraph), CompletableFuture.completedFuture(toBeSuspendedExecutionGraph), CompletableFuture.completedFuture(expectedExecutionGraph)); try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + assertEquals(toBeSuspendingExecutionGraph, executionGraphFuture.get()); + + toBeSuspendingExecutionGraph.setJobStatus(JobStatus.SUSPENDING); + + // retrieve the same job from the cache again --> this should return it and invalidate the cache entry + executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + assertEquals(toBeSuspendedExecutionGraph, executionGraphFuture.get()); toBeSuspendedExecutionGraph.setJobStatus(JobStatus.SUSPENDED); // retrieve the same job from the cache again --> this should return it and invalidate the cache entry - CompletableFuture executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(expectedExecutionGraph, executionGraphFuture2.get()); + assertEquals(expectedExecutionGraph, executionGraphFuture.get()); - CompletableFuture executionGraphFuture3 = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); + executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(expectedExecutionGraph, executionGraphFuture3.get()); + assertEquals(expectedExecutionGraph, executionGraphFuture.get()); - assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(2)); + assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(3)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java index f8122eef1dab9..00829e6bf859b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; @@ -38,7 +39,7 @@ /** * Tests for the JobAccumulatorsHandler. */ -public class JobAccumulatorsHandlerTest { +public class JobAccumulatorsHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java index d173e0f090e81..2279cd832092d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -39,7 +40,7 @@ /** * Tests for the JobConfigHandler. */ -public class JobConfigHandlerTest { +public class JobConfigHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java index 2980a08cb51c6..dbfa8cc79e880 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -46,7 +47,7 @@ /** * Tests for the JobDetailsHandler. */ -public class JobDetailsHandlerTest { +public class JobDetailsHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java index 29e0819c61dea..9edaef1001a65 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; @@ -34,7 +35,7 @@ /** * Tests for the JobPlanHandler. */ -public class JobPlanHandlerTest { +public class JobPlanHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java index 97356f457788f..abb22e0eea2f7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; @@ -40,7 +41,7 @@ /** * Tests for the JobVertexAccumulatorsHandler. */ -public class JobVertexAccumulatorsHandlerTest { +public class JobVertexAccumulatorsHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java index 9cc294a719300..0c5217143ab7c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; @@ -42,7 +43,7 @@ /** * Tests for the JobVertexDetailsHandler. */ -public class JobVertexDetailsHandlerTest { +public class JobVertexDetailsHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java index ad3a95f9be718..92b0d8acee372 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java @@ -135,7 +135,7 @@ private static void generateArchivedJob() throws Exception { .setTasks(tasks) .setFailureCause(new ErrorInfo(new Exception("jobException"), originalAttempt.getStateTimestamp(ExecutionState.FAILED))) .setState(JobStatus.FINISHED) - .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) + .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}) .setArchivedUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2}) .build(); } From f83e2f770a2ba7da9c9333ef536bbd612d744de2 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 13 Feb 2018 16:14:41 +0100 Subject: [PATCH 3/7] [FLINK-8629] [flip6] Allow JobMaster to rescale jobs This commit adds the functionality to rescale a job or parts of it to the JobMaster. In order to rescale a job, the JobMaster does the following: 1. Take a savepoint 2. Create a rescaled ExecutionGraph from the JobGraph 3. Initialize it with the taken savepoint 4. Suspend the old ExecutionGraph 5. Restart the new ExecutionGraph once the old ExecutionGraph has been suspended This closes #5446. --- .../function/BiConsumerWithException.java | 50 ++++ .../flink/runtime/checkpoint/Checkpoints.java | 2 +- .../flink/runtime/jobmaster/JobMaster.java | 219 ++++++++++++++++-- .../jobmaster/JobMasterConfiguration.java | 12 + .../runtime/jobmaster/JobMasterGateway.java | 28 +++ .../runtime/jobmaster/RescalingBehaviour.java | 49 ++++ .../exceptions/JobMasterException.java | 41 ++++ .../exceptions/JobModificationException.java | 39 ++++ .../utils/TestingJobMasterGateway.java | 11 + 9 files changed, 434 insertions(+), 17 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java diff --git a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java new file mode 100644 index 0000000000000..6ab11618b1f22 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util.function; + +import java.util.function.BiConsumer; + +/** + * A checked extension of the {@link BiConsumer} interface. + * + * @param type of the first argument + * @param type of the second argument + * @param type of the thrown exception + */ +@FunctionalInterface +public interface BiConsumerWithException extends BiConsumer { + + /** + * Performs this operation on the given arguments. + * + * @param t the first input argument + * @param u the second input argument + * @throws E in case of an error + */ + void acceptWithException(T t, U u) throws E; + + @Override + default void accept(T t, U u) { + try { + acceptWithException(t, u); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java index 47efa6facf65c..72b7c53ab9589 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java @@ -246,7 +246,7 @@ public static void disposeSavepoint( try (InputStream in = metadataHandle.openInputStream(); DataInputStream dis = new DataInputStream(in)) { - savepoint = loadCheckpointMetadata(dis, classLoader); + savepoint = loadCheckpointMetadata(dis, classLoader); } Exception exception = null; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 2a4b88194820f..22c69f5c4ee5b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -26,13 +26,15 @@ import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.queryablestate.KvStateID; +import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.StoppingException; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.Checkpoints; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; @@ -56,10 +58,12 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; +import org.apache.flink.runtime.jobmaster.exceptions.JobModificationException; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway; @@ -106,6 +110,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -115,6 +120,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -174,9 +180,6 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast private final ClassLoader userCodeLoader; - /** The execution graph of this job. */ - private final ExecutionGraph executionGraph; - private final SlotPool slotPool; private final SlotPoolGateway slotPoolGateway; @@ -201,6 +204,11 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast private final Map> registeredTaskManagers; + // -------- Mutable fields --------- + + /** The execution graph of this job. */ + private ExecutionGraph executionGraph; + // ------------------------------------------------------------------------ public JobMaster( @@ -268,8 +276,6 @@ public JobMaster( log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid); - CheckpointRecoveryFactory checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory(); - resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever(); this.slotPool = new SlotPool( @@ -289,7 +295,7 @@ public JobMaster( scheduledExecutorService, slotPool.getSlotProvider(), userCodeLoader, - checkpointRecoveryFactory, + highAvailabilityServices.getCheckpointRecoveryFactory(), rpcTimeout, restartStrategy, jobMetricGroup, @@ -447,6 +453,165 @@ public CompletableFuture stop(Time timeout) { return CompletableFuture.completedFuture(Acknowledge.get()); } + @Override + public CompletableFuture rescaleJob( + int newParallelism, + RescalingBehaviour rescalingBehaviour, + Time timeout) { + final ArrayList allOperators = new ArrayList<>(jobGraph.getNumberOfVertices()); + + for (JobVertex jobVertex : jobGraph.getVertices()) { + allOperators.add(jobVertex.getID()); + } + + return rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout); + } + + @Override + public CompletableFuture rescaleOperators( + Collection operators, + int newParallelism, + RescalingBehaviour rescalingBehaviour, + Time timeout) { + // 1. Check whether we can rescale the job & rescale the respective vertices + for (JobVertexID jobVertexId : operators) { + final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId); + + // update max parallelism in case that it has not been configure + final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + if (executionJobVertex != null) { + jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism()); + } + + try { + rescalingBehaviour.acceptWithException(jobVertex, newParallelism); + } catch (FlinkException e) { + final String msg = String.format("Cannot rescale job %s.", jobGraph.getName()); + + log.info(msg, e); + + return FutureUtils.completedExceptionally( + new JobModificationException(msg, e)); + } + } + + final ExecutionGraph currentExecutionGraph = executionGraph; + + final ExecutionGraph newExecutionGraph; + + try { + newExecutionGraph = ExecutionGraphBuilder.buildGraph( + null, + jobGraph, + jobMasterConfiguration.getConfiguration(), + scheduledExecutorService, + scheduledExecutorService, + slotPool.getSlotProvider(), + userCodeLoader, + highAvailabilityServices.getCheckpointRecoveryFactory(), + rpcTimeout, + currentExecutionGraph.getRestartStrategy(), + jobMetricGroup, + 1, + blobServer, + jobMasterConfiguration.getSlotRequestTimeout(), + log); + } catch (JobExecutionException | JobException e) { + return FutureUtils.completedExceptionally( + new JobModificationException("Could not create rescaled ExecutionGraph.", e)); + } + + // 3. disable checkpoint coordinator to suppress subsequent checkpoints + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + checkpointCoordinator.stopCheckpointScheduler(); + + // 4. take a savepoint + final CompletableFuture savepointFuture = triggerSavepoint( + jobMasterConfiguration.getTmpDirectory(), + timeout); + + final CompletableFuture executionGraphFuture = savepointFuture + .thenApplyAsync( + (String savepointPath) -> { + try { + newExecutionGraph.getCheckpointCoordinator().restoreSavepoint( + savepointPath, + false, + newExecutionGraph.getAllVertices(), + userCodeLoader); + } catch (Exception e) { + disposeSavepoint(savepointPath); + + throw new CompletionException(new JobModificationException("Could not restore from temporary rescaling savepoint.", e)); + } + + // delete the savepoint file once we reach a terminal state + newExecutionGraph.getTerminationFuture() + .whenCompleteAsync( + (JobStatus jobStatus, Throwable throwable) -> disposeSavepoint(savepointPath), + scheduledExecutorService); + + return newExecutionGraph; + }, scheduledExecutorService) + .exceptionally( + (Throwable failure) -> { + // in case that we couldn't take a savepoint or restore from it, let's restart the checkpoint + // coordinator and abort the rescaling operation + if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) { + checkpointCoordinator.startCheckpointScheduler(); + } + + throw new CompletionException(failure); + }); + + // 5. suspend the current job + final CompletableFuture terminationFuture = executionGraphFuture.thenComposeAsync( + (ExecutionGraph ignored) -> { + currentExecutionGraph.suspend(new FlinkException("Job is being rescaled.")); + return currentExecutionGraph.getTerminationFuture(); + }, + getMainThreadExecutor()); + + final CompletableFuture suspendedFuture = terminationFuture.thenAccept( + (JobStatus jobStatus) -> { + if (jobStatus != JobStatus.SUSPENDED) { + final String msg = String.format("Job %s rescaling failed because we could not suspend the execution graph.", jobGraph.getName()); + log.info(msg); + throw new CompletionException(new JobModificationException(msg)); + } + }); + + // 6. resume the new execution graph from the taken savepoint + final CompletableFuture rescalingFuture = suspendedFuture.thenCombineAsync( + executionGraphFuture, + (Void ignored, ExecutionGraph restoredExecutionGraph) -> { + // check if the ExecutionGraph is still the same + //noinspection ObjectEquality + if (executionGraph == currentExecutionGraph) { + executionGraph = restoredExecutionGraph; + + scheduleExecutionGraph(); + + return Acknowledge.get(); + } else { + throw new CompletionException(new JobModificationException("Detected concurrent modification of ExecutionGraph. Aborting the resacling.")); + } + + }, + getMainThreadExecutor()); + + rescalingFuture.whenComplete( + (Acknowledge ignored, Throwable throwable) -> { + if (throwable != null) { + // fail the newly created execution graph + newExecutionGraph.failGlobal(new FlinkException("Failed to rescale the job " + jobGraph.getJobID() + '.', throwable)); + } + }); + + return rescalingFuture; + } + /** * Updates the task execution state for a given task. * @@ -912,15 +1077,7 @@ private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Excepti } // start scheduling job in another thread - scheduledExecutorService.execute( - () -> { - try { - executionGraph.scheduleForExecution(); - } - catch (Throwable t) { - executionGraph.failGlobal(t); - } - }); + scheduledExecutorService.execute(this::scheduleExecutionGraph); return Acknowledge.get(); } @@ -963,6 +1120,36 @@ private Acknowledge suspendExecution(final Exception cause) { return Acknowledge.get(); } + /** + * Schedules the execution of the current {@link ExecutionGraph}. + */ + private void scheduleExecutionGraph() { + try { + executionGraph.scheduleForExecution(); + } + catch (Throwable t) { + executionGraph.failGlobal(t); + } + } + + /** + * Dispose the savepoint stored under the given path. + * + * @param savepointPath path where the savepoint is stored + */ + private void disposeSavepoint(String savepointPath) { + try { + // delete the temporary savepoint + Checkpoints.disposeSavepoint( + savepointPath, + jobMasterConfiguration.getConfiguration(), + userCodeLoader, + log); + } catch (FlinkException | IOException de) { + log.info("Could not dispose temporary rescaling savepoint under {}.", savepointPath, de); + } + } + //---------------------------------------------------------------------------------------------- private void handleFatalError(final Throwable cause) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java index 15a30e26a2b37..5a4e3b37e84df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; @@ -36,16 +37,20 @@ public class JobMasterConfiguration { private final Time slotIdleTimeout; + private final String tmpDirectory; + private final Configuration configuration; public JobMasterConfiguration( Time rpcTimeout, Time slotRequestTimeout, Time slotIdleTimeout, + String tmpDirectory, Configuration configuration) { this.rpcTimeout = Preconditions.checkNotNull(rpcTimeout); this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout); this.slotIdleTimeout = Preconditions.checkNotNull(slotIdleTimeout); + this.tmpDirectory = Preconditions.checkNotNull(tmpDirectory); this.configuration = Preconditions.checkNotNull(configuration); } @@ -61,6 +66,10 @@ public Time getSlotIdleTimeout() { return slotIdleTimeout; } + public String getTmpDirectory() { + return tmpDirectory; + } + public Configuration getConfiguration() { return configuration; } @@ -78,10 +87,13 @@ public static JobMasterConfiguration fromConfiguration(Configuration configurati final Time slotRequestTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT)); final Time slotIdleTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT)); + final String tmpDirectory = ConfigurationUtils.parseTempDirectories(configuration)[0]; + return new JobMasterConfiguration( rpcTimeout, slotRequestTimeout, slotIdleTimeout, + tmpDirectory, configuration); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 0dcf3fb4b687a..fb53237ea8145 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -72,6 +72,34 @@ public interface JobMasterGateway extends */ CompletableFuture stop(@RpcTimeout Time timeout); + /** + * Triggers rescaling of the executed job. + * + * @param newParallelism new parallelism of the job + * @param rescalingBehaviour defining how strict the rescaling has to be executed + * @param timeout of this operation + * @return Future which is completed with {@link Acknowledge} once the rescaling was successful + */ + CompletableFuture rescaleJob( + int newParallelism, + RescalingBehaviour rescalingBehaviour, + @RpcTimeout Time timeout); + + /** + * Triggers rescaling of the given set of operators. + * + * @param operators set of operators which shall be rescaled + * @param newParallelism new parallelism of the given set of operators + * @param rescalingBehaviour defining how strict the rescaling has to be executed + * @param timeout of this operation + * @return Future which is completed with {@link Acknowledge} once the rescaling was successful + */ + CompletableFuture rescaleOperators( + Collection operators, + int newParallelism, + RescalingBehaviour rescalingBehaviour, + @RpcTimeout Time timeout); + /** * Updates the task execution state for a given task. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java new file mode 100644 index 0000000000000..7de956081d8c9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.BiConsumerWithException; + +/** + * Definition of the rescaling behaviour. + */ +public enum RescalingBehaviour implements BiConsumerWithException { + // rescaling is only executed if the operator can be set to the given parallelism + STRICT { + @Override + public void acceptWithException(JobVertex jobVertex, Integer newParallelism) throws FlinkException { + if (jobVertex.getMaxParallelism() < newParallelism) { + throw new FlinkException("Cannot rescale vertex " + jobVertex.getName() + + " because its maximum parallelism " + jobVertex.getMaxParallelism() + + " is smaller than the new parallelism " + newParallelism + '.'); + } else { + jobVertex.setParallelism(newParallelism); + } + } + }, + // the new parallelism will be the minimum of the given parallelism and the maximum parallelism + RELAXED { + @Override + public void acceptWithException(JobVertex jobVertex, Integer newParallelism) { + jobVertex.setParallelism(Math.min(jobVertex.getMaxParallelism(), newParallelism)); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java new file mode 100644 index 0000000000000..a7b62e14538b2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.exceptions; + +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.util.FlinkException; + +/** + * Base class for all {@link JobMaster} related exceptions. + */ +public class JobMasterException extends FlinkException { + private static final long serialVersionUID = 2941885469739200908L; + + public JobMasterException(String message) { + super(message); + } + + public JobMasterException(Throwable cause) { + super(cause); + } + + public JobMasterException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java new file mode 100644 index 0000000000000..e08ec625734ec --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.exceptions; + +/** + * Base class for all exception which originate from a failed job modification. + */ +public class JobModificationException extends JobMasterException { + + private static final long serialVersionUID = 2374146694058970746L; + + public JobModificationException(String message) { + super(message); + } + + public JobModificationException(Throwable cause) { + super(cause); + } + + public JobModificationException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java index 168b32bf10736..cac7e90bd09b3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.RescalingBehaviour; import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; import org.apache.flink.runtime.messages.Acknowledge; @@ -66,6 +67,16 @@ public CompletableFuture stop(Time timeout) { throw new UnsupportedOperationException(); } + @Override + public CompletableFuture rescaleJob(int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture rescaleOperators(Collection operators, int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture updateTaskExecutionState(TaskExecutionState taskExecutionState) { throw new UnsupportedOperationException(); From 4756573c257cfbc2390a4fc64e65f4de449a53a5 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 13 Feb 2018 16:34:31 +0100 Subject: [PATCH 4/7] [FLINK-8633] [flip6] Expose rescaling of jobs via the Dispatcher This commit exposes the JobMaster#rescaleJob via the Dispatcher. This will allow it to call this functionality from a REST handler. This closes #5452. --- .../flink/runtime/dispatcher/Dispatcher.java | 12 ++++++++++++ .../runtime/webmonitor/RestfulGateway.java | 17 +++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index e751bc47b0be7..b2d2b6a8b0cf5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; import org.apache.flink.runtime.jobmaster.JobNotFinishedException; import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.jobmaster.RescalingBehaviour; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; @@ -313,6 +314,17 @@ public CompletableFuture stopJob(JobID jobId, Time timeout) { } } + @Override + public CompletableFuture rescaleJob(JobID jobId, int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) { + JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId); + + if (jobManagerRunner == null) { + return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + } else { + return jobManagerRunner.getJobManagerGateway().rescaleJob(newParallelism, rescalingBehaviour, timeout); + } + } + @Override public CompletableFuture requestRestAddress(Time timeout) { if (restAddress != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java index 65a46649a6b70..ed90f37ccd089 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.jobmaster.RescalingBehaviour; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; @@ -169,4 +170,20 @@ default CompletableFuture requestOperatorBack throw new UnsupportedOperationException(); } + /** + * Trigger rescaling of the given job. + * + * @param jobId specifying the job to rescale + * @param newParallelism new parallelism of the job + * @param rescalingBehaviour defining how strict the rescaling has to be executed + * @param timeout of this operation + * @return Future which is completed with {@link Acknowledge} once the rescaling was successful + */ + default CompletableFuture rescaleJob( + JobID jobId, + int newParallelism, + RescalingBehaviour rescalingBehaviour, + @RpcTimeout Time timeout) { + throw new UnsupportedOperationException(); + } } From 06acdc1907300862d5faddc4e882f5f6dd670edb Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 2 Feb 2018 11:06:35 +0100 Subject: [PATCH 5/7] [FLINK-8634] [rest] Introduce job rescaling REST handler Add rescaling REST handler as a sub class of the AbstractAsynchronousOperationHandlers. This closes #5451. --- .../async/AsynchronousOperationInfo.java | 64 ++++++++++ .../job/AsynchronousJobOperationKey.java | 74 ++++++++++++ .../rest/handler/job/RescalingHandlers.java | 111 ++++++++++++++++++ .../job/RescalingStatusMessageParameters.java | 39 ++++++ .../RescalingTriggerMessageParameters.java | 40 +++++++ .../job/savepoints/SavepointHandlers.java | 60 +--------- .../RescaleParallelismQueryParameter.java | 41 +++++++ .../RescalingParallelismQueryParameter.java | 41 +++++++ 8 files changed, 416 insertions(+), 54 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescalingParallelismQueryParameter.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java new file mode 100644 index 0000000000000..a46fba90e8765 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.async; + +import org.apache.flink.runtime.rest.messages.json.SerializedThrowableDeserializer; +import org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer; +import org.apache.flink.util.SerializedThrowable; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +/** + * Basic information object for asynchronous operations. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class AsynchronousOperationInfo { + + private static final String FIELD_NAME_FAILURE_CAUSE = "failure-cause"; + + @JsonProperty(FIELD_NAME_FAILURE_CAUSE) + @JsonSerialize(using = SerializedThrowableSerializer.class) + @Nullable + private final SerializedThrowable failureCause; + + private AsynchronousOperationInfo( + @JsonProperty(FIELD_NAME_FAILURE_CAUSE) + @JsonDeserialize(using = SerializedThrowableDeserializer.class) + @Nullable SerializedThrowable failureCause) { + this.failureCause = failureCause; + } + + @Nullable + public SerializedThrowable getFailureCause() { + return failureCause; + } + + public static AsynchronousOperationInfo completeExceptional(SerializedThrowable serializedThrowable) { + return new AsynchronousOperationInfo(serializedThrowable); + } + + public static AsynchronousOperationInfo complete() { + return new AsynchronousOperationInfo(null); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java new file mode 100644 index 0000000000000..4bb473e9ceb97 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers; +import org.apache.flink.runtime.rest.handler.async.OperationKey; +import org.apache.flink.runtime.rest.messages.TriggerId; + +import javax.annotation.concurrent.Immutable; + +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * A pair of {@link JobID} and {@link TriggerId} used as a key to a hash based + * collection. + * + * @see AbstractAsynchronousOperationHandlers.CompletedOperationCache + */ +@Immutable +public class AsynchronousJobOperationKey extends OperationKey { + + private final JobID jobId; + + private AsynchronousJobOperationKey(final TriggerId triggerId, final JobID jobId) { + super(triggerId); + this.jobId = requireNonNull(jobId); + } + + public static AsynchronousJobOperationKey of(final TriggerId triggerId, final JobID jobId) { + return new AsynchronousJobOperationKey(triggerId, jobId); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + if (!super.equals(o)) { + return false; + } + + AsynchronousJobOperationKey that = (AsynchronousJobOperationKey) o; + return Objects.equals(jobId, that.jobId); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), jobId); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java new file mode 100644 index 0000000000000..6f8895a56963c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobmaster.RescalingBehaviour; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult; +import org.apache.flink.runtime.rest.handler.async.TriggerResponse; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RescaleParallelismQueryParameter; +import org.apache.flink.runtime.rest.messages.TriggerId; +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.SerializedThrowable; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Rest handler to trigger and poll the rescaling of a running job. + */ +public class RescalingHandlers extends AbstractAsynchronousOperationHandlers { + + /** + * Handler which triggers the rescaling of the specified job. + */ + public class RescalingTriggerHandler extends TriggerHandler { + + protected RescalingTriggerHandler(CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout, Map responseHeaders, MessageHeaders messageHeaders) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders); + } + + @Override + protected CompletableFuture triggerOperation(HandlerRequest request, RestfulGateway gateway) throws RestHandlerException { + final JobID jobId = request.getPathParameter(JobIDPathParameter.class); + final List queryParameter = request.getQueryParameter(RescaleParallelismQueryParameter.class); + + if (queryParameter.isEmpty()) { + throw new RestHandlerException("No new parallelism was specified.", HttpResponseStatus.BAD_REQUEST); + } + + final int newParallelism = queryParameter.get(0); + + final CompletableFuture rescalingFuture = gateway.rescaleJob(jobId, newParallelism, RescalingBehaviour.STRICT, timeout); + + return rescalingFuture; + } + + @Override + protected AsynchronousJobOperationKey createOperationKey(HandlerRequest request) { + final JobID jobId = request.getPathParameter(JobIDPathParameter.class); + return AsynchronousJobOperationKey.of(new TriggerId(), jobId); + } + } + + /** + * Handler which reports the status of the rescaling operation. + */ + public class RescalingStatusHandler extends StatusHandler { + + protected RescalingStatusHandler(CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout, Map responseHeaders, MessageHeaders, RescalingStatusMessageParameters> messageHeaders) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders); + } + + @Override + protected AsynchronousJobOperationKey getOperationKey(HandlerRequest request) { + final JobID jobId = request.getPathParameter(JobIDPathParameter.class); + final TriggerId triggerId = request.getPathParameter(TriggerIdPathParameter.class); + + return AsynchronousJobOperationKey.of(triggerId, jobId); + } + + @Override + protected AsynchronousOperationInfo exceptionalOperationResultResponse(Throwable throwable) { + return AsynchronousOperationInfo.completeExceptional(new SerializedThrowable(throwable)); + } + + @Override + protected AsynchronousOperationInfo operationResultResponse(Acknowledge operationResult) { + return AsynchronousOperationInfo.complete(); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java new file mode 100644 index 0000000000000..4821b4f7c27a9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Message headers for the {@link RescalingHandlers.RescalingStatusHandler}. + */ +public class RescalingStatusMessageParameters extends JobMessageParameters { + + public final TriggerIdPathParameter triggerIdPathParameter = new TriggerIdPathParameter(); + + @Override + public Collection> getPathParameters() { + return Arrays.asList(jobPathParameter, triggerIdPathParameter); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java new file mode 100644 index 0000000000000..096baa1cdd44d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; +import org.apache.flink.runtime.rest.messages.RescalingParallelismQueryParameter; + +import java.util.Collection; +import java.util.Collections; + +/** + * {@link MessageParameters} for triggering the rescaling of a job. + */ +public class RescalingTriggerMessageParameters extends JobMessageParameters { + + public final RescalingParallelismQueryParameter rescalingParallelismQueryParameter = new RescalingParallelismQueryParameter(); + + @Override + public Collection> getQueryParameters() { + return Collections.singleton(rescalingParallelismQueryParameter); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java index 5915d7225068e..cb3ff5bb06c46 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java @@ -24,7 +24,7 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers; -import org.apache.flink.runtime.rest.handler.async.OperationKey; +import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.TriggerId; @@ -43,14 +43,10 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import javax.annotation.Nullable; -import javax.annotation.concurrent.Immutable; import java.util.Map; -import java.util.Objects; import java.util.concurrent.CompletableFuture; -import static java.util.Objects.requireNonNull; - /** * HTTP handlers for asynchronous triggering of savepoints. * @@ -96,7 +92,7 @@ * } * */ -public class SavepointHandlers extends AbstractAsynchronousOperationHandlers { +public class SavepointHandlers extends AbstractAsynchronousOperationHandlers { @Nullable private final String defaultSavepointDir; @@ -136,9 +132,9 @@ protected CompletableFuture triggerOperation(HandlerRequest request) { + protected AsynchronousJobOperationKey createOperationKey(HandlerRequest request) { final JobID jobId = request.getPathParameter(JobIDPathParameter.class); - return SavepointKey.of(new TriggerId(), jobId); + return AsynchronousJobOperationKey.of(new TriggerId(), jobId); } } @@ -156,10 +152,10 @@ public SavepointStatusHandler( } @Override - protected SavepointKey getOperationKey(HandlerRequest request) { + protected AsynchronousJobOperationKey getOperationKey(HandlerRequest request) { final TriggerId triggerId = request.getPathParameter(TriggerIdPathParameter.class); final JobID jobId = request.getPathParameter(JobIDPathParameter.class); - return SavepointKey.of(triggerId, jobId); + return AsynchronousJobOperationKey.of(triggerId, jobId); } @Override @@ -172,48 +168,4 @@ protected SavepointInfo operationResultResponse(String operationResult) { return new SavepointInfo(operationResult, null); } } - - /** - * A pair of {@link JobID} and {@link TriggerId} used as a key to a hash based - * collection. - * - * @see AbstractAsynchronousOperationHandlers.CompletedOperationCache - */ - @Immutable - public static class SavepointKey extends OperationKey { - - private final JobID jobId; - - private SavepointKey(final TriggerId triggerId, final JobID jobId) { - super(triggerId); - this.jobId = requireNonNull(jobId); - } - - private static SavepointKey of(final TriggerId triggerId, final JobID jobId) { - return new SavepointKey(triggerId, jobId); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - - if (o == null || getClass() != o.getClass()) { - return false; - } - - if (!super.equals(o)) { - return false; - } - - SavepointKey that = (SavepointKey) o; - return Objects.equals(jobId, that.jobId); - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), jobId); - } - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java new file mode 100644 index 0000000000000..8058ab9acbe60 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +/** + * Parallelism for the rescaling of jobs specified as a {@link MessageQueryParameter}. + */ +public class RescaleParallelismQueryParameter extends MessageQueryParameter { + + public static final String KEY = "parallelism"; + + protected RescaleParallelismQueryParameter() { + super(KEY, MessageParameterRequisiteness.MANDATORY); + } + + @Override + public Integer convertValueFromString(String value) { + return Integer.valueOf(value); + } + + @Override + public String convertStringToValue(Integer value) { + return value.toString(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescalingParallelismQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescalingParallelismQueryParameter.java new file mode 100644 index 0000000000000..9230d790a7b9e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescalingParallelismQueryParameter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +/** + * Parallelism for the rescaling of jobs specified as a {@link MessageQueryParameter}. + */ +public class RescalingParallelismQueryParameter extends MessageQueryParameter { + + public static final String KEY = "parallelism"; + + public RescalingParallelismQueryParameter() { + super(KEY, MessageParameterRequisiteness.MANDATORY); + } + + @Override + public Integer convertValueFromString(String value) { + return Integer.valueOf(value); + } + + @Override + public String convertStringToValue(Integer value) { + return value.toString(); + } +} From 4e8409796a8fb80293db39d62745fbbcce6447cd Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Sun, 11 Feb 2018 19:50:46 +0100 Subject: [PATCH 6/7] [FLINK-8635] [rest] Register rescaling handlers at web endpoint This closes #5454. --- ...hronousOperationTriggerMessageHeaders.java | 3 +- .../{ => rescaling}/RescalingHandlers.java | 42 +++++++--- .../job/rescaling/RescalingStatusHeaders.java | 78 +++++++++++++++++++ .../RescalingStatusMessageParameters.java | 5 +- .../rescaling/RescalingTriggerHeaders.java | 70 +++++++++++++++++ .../RescalingTriggerMessageParameters.java | 2 +- .../RescaleParallelismQueryParameter.java | 41 ---------- .../webmonitor/WebMonitorEndpoint.java | 22 +++++- 8 files changed, 205 insertions(+), 58 deletions(-) rename flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/{ => rescaling}/RescalingHandlers.java (77%) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusHeaders.java rename flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/{ => rescaling}/RescalingStatusMessageParameters.java (87%) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerHeaders.java rename flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/{ => rescaling}/RescalingTriggerMessageParameters.java (96%) delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationTriggerMessageHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationTriggerMessageHeaders.java index b317028259ef7..5baa5add0fb8d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationTriggerMessageHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationTriggerMessageHeaders.java @@ -28,7 +28,8 @@ * @param type of the request * @param type of the message parameters */ -public abstract class AsynchronousOperationTriggerMessageHeaders implements MessageHeaders { +public abstract class AsynchronousOperationTriggerMessageHeaders + implements MessageHeaders { @Override public Class getResponseClass() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java similarity index 77% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java index 6f8895a56963c..3e4ae5a4e35e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.handler.job; +package org.apache.flink.runtime.rest.handler.job.rescaling; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; @@ -26,12 +26,10 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers; import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; -import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult; -import org.apache.flink.runtime.rest.handler.async.TriggerResponse; +import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobIDPathParameter; -import org.apache.flink.runtime.rest.messages.MessageHeaders; -import org.apache.flink.runtime.rest.messages.RescaleParallelismQueryParameter; +import org.apache.flink.runtime.rest.messages.RescalingParallelismQueryParameter; import org.apache.flink.runtime.rest.messages.TriggerId; import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; import org.apache.flink.runtime.webmonitor.RestfulGateway; @@ -54,14 +52,23 @@ public class RescalingHandlers extends AbstractAsynchronousOperationHandlers { - protected RescalingTriggerHandler(CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout, Map responseHeaders, MessageHeaders messageHeaders) { - super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders); + public RescalingTriggerHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders) { + super( + localRestAddress, + leaderRetriever, + timeout, + responseHeaders, + RescalingTriggerHeaders.getInstance()); } @Override protected CompletableFuture triggerOperation(HandlerRequest request, RestfulGateway gateway) throws RestHandlerException { final JobID jobId = request.getPathParameter(JobIDPathParameter.class); - final List queryParameter = request.getQueryParameter(RescaleParallelismQueryParameter.class); + final List queryParameter = request.getQueryParameter(RescalingParallelismQueryParameter.class); if (queryParameter.isEmpty()) { throw new RestHandlerException("No new parallelism was specified.", HttpResponseStatus.BAD_REQUEST); @@ -69,7 +76,11 @@ protected CompletableFuture triggerOperation(HandlerRequest rescalingFuture = gateway.rescaleJob(jobId, newParallelism, RescalingBehaviour.STRICT, timeout); + final CompletableFuture rescalingFuture = gateway.rescaleJob( + jobId, + newParallelism, + RescalingBehaviour.STRICT, + timeout); return rescalingFuture; } @@ -86,8 +97,17 @@ protected AsynchronousJobOperationKey createOperationKey(HandlerRequest { - protected RescalingStatusHandler(CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout, Map responseHeaders, MessageHeaders, RescalingStatusMessageParameters> messageHeaders) { - super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders); + public RescalingStatusHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders) { + super( + localRestAddress, + leaderRetriever, + timeout, + responseHeaders, + RescalingStatusHeaders.getInstance()); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusHeaders.java new file mode 100644 index 0000000000000..2d5babb17fc7d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusHeaders.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.rescaling; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationStatusMessageHeaders; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for polling the status of an ongoing rescaling operation. + */ +public class RescalingStatusHeaders extends + AsynchronousOperationStatusMessageHeaders { + + private static final RescalingStatusHeaders INSTANCE = new RescalingStatusHeaders(); + + private static final String URL = String.format( + "/jobs/:%s/rescaling/:%s", + JobIDPathParameter.KEY, + TriggerIdPathParameter.KEY); + + private RescalingStatusHeaders() {} + + @Override + protected Class getValueClass() { + return AsynchronousOperationInfo.class; + } + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public RescalingStatusMessageParameters getUnresolvedMessageParameters() { + return new RescalingStatusMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static RescalingStatusHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusMessageParameters.java similarity index 87% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusMessageParameters.java index 4821b4f7c27a9..938a3639d3224 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusMessageParameters.java @@ -16,9 +16,10 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.handler.job; +package org.apache.flink.runtime.rest.handler.job.rescaling; import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageParameters; import org.apache.flink.runtime.rest.messages.MessagePathParameter; import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; @@ -26,7 +27,7 @@ import java.util.Collection; /** - * Message headers for the {@link RescalingHandlers.RescalingStatusHandler}. + * {@link MessageParameters} for polling the status of a rescaling operation. */ public class RescalingStatusMessageParameters extends JobMessageParameters { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerHeaders.java new file mode 100644 index 0000000000000..9567410df342a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerHeaders.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.rescaling; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationTriggerMessageHeaders; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for triggering the rescaling of a job. + */ +public class RescalingTriggerHeaders extends + AsynchronousOperationTriggerMessageHeaders { + + private static final RescalingTriggerHeaders INSTANCE = new RescalingTriggerHeaders(); + + private static final String URL = String.format( + "/jobs/:%s/rescaling", + JobIDPathParameter.KEY); + + private RescalingTriggerHeaders() {} + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public RescalingTriggerMessageParameters getUnresolvedMessageParameters() { + return new RescalingTriggerMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.PATCH; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static RescalingTriggerHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerMessageParameters.java similarity index 96% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerMessageParameters.java index 096baa1cdd44d..4b5d307527a61 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerMessageParameters.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.handler.job; +package org.apache.flink.runtime.rest.handler.job.rescaling; import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.MessageParameters; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java deleted file mode 100644 index 8058ab9acbe60..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.messages; - -/** - * Parallelism for the rescaling of jobs specified as a {@link MessageQueryParameter}. - */ -public class RescaleParallelismQueryParameter extends MessageQueryParameter { - - public static final String KEY = "parallelism"; - - protected RescaleParallelismQueryParameter() { - super(KEY, MessageParameterRequisiteness.MANDATORY); - } - - @Override - public Integer convertValueFromString(String value) { - return Integer.valueOf(value); - } - - @Override - public String convertStringToValue(Integer value) { - return value.toString(); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 427332f763309..10a3650ec2323 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -61,6 +61,9 @@ import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler; import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler; import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers; +import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeaders; +import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders; import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers; import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; @@ -448,8 +451,21 @@ protected List> initiali SubtaskCurrentAttemptDetailsHeaders.getInstance(), executionGraphCache, executor, - metricFetcher - ); + metricFetcher); + + final RescalingHandlers rescalingHandlers = new RescalingHandlers(); + + final RescalingHandlers.RescalingTriggerHandler rescalingTriggerHandler = rescalingHandlers.new RescalingTriggerHandler( + restAddressFuture, + leaderRetriever, + timeout, + responseHeaders); + + final RescalingHandlers.RescalingStatusHandler rescalingStatusHandler = rescalingHandlers.new RescalingStatusHandler( + restAddressFuture, + leaderRetriever, + timeout, + responseHeaders); JobVertexBackPressureHandler jobVertexBackPressureHandler = new JobVertexBackPressureHandler( restAddressFuture, @@ -532,6 +548,8 @@ protected List> initiali handlers.add(Tuple2.of(JobVertexBackPressureHeaders.getInstance(), jobVertexBackPressureHandler)); handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobCancelTerminationHandler)); handlers.add(Tuple2.of(JobVertexDetailsHeaders.getInstance(), jobVertexDetailsHandler)); + handlers.add(Tuple2.of(RescalingTriggerHeaders.getInstance(), rescalingTriggerHandler)); + handlers.add(Tuple2.of(RescalingStatusHeaders.getInstance(), rescalingStatusHandler)); // TODO: Remove once the Yarn proxy can forward all REST verbs handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), jobCancelTerminationHandler)); From 0651876ae4d0c790904f60b1728e9009fd4e52a4 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 13 Feb 2018 17:29:32 +0100 Subject: [PATCH 7/7] [FLINK-8656] [flip6] Add modify CLI command to rescale Flink jobs Jobs can now be rescaled by calling flink modify -p . Internally, the CliFrontend will send the corresponding REST call and poll for status updates. This closes #5487. --- .../apache/flink/client/cli/CliFrontend.java | 59 ++++++++ .../flink/client/cli/CliFrontendParser.java | 27 ++++ .../flink/client/program/ClusterClient.java | 11 ++ .../program/rest/RestClusterClient.java | 49 +++++++ .../client/cli/CliFrontendModifyTest.java | 137 ++++++++++++++++++ .../function/BiConsumerWithException.java | 4 +- .../flink/runtime/jobmaster/JobMaster.java | 14 +- .../async/AsynchronousOperationInfo.java | 6 +- 8 files changed, 299 insertions(+), 8 deletions(-) create mode 100644 flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index a849de13cb792..92d2ccb8c3571 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -84,6 +84,9 @@ import scala.concurrent.duration.FiniteDuration; +import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.MODIFY_PARALLELISM_OPTION; + /** * Implementation of a simple command line frontend for executing programs. */ @@ -98,6 +101,7 @@ public class CliFrontend { private static final String ACTION_CANCEL = "cancel"; private static final String ACTION_STOP = "stop"; private static final String ACTION_SAVEPOINT = "savepoint"; + private static final String ACTION_MODIFY = "modify"; // configuration dir parameters private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf"; @@ -714,6 +718,58 @@ private void disposeSavepoint(ClusterClient clusterClient, String savepointPa logAndSysout("Savepoint '" + savepointPath + "' disposed."); } + protected void modify(String[] args) throws CliArgsException, FlinkException { + LOG.info("Running 'modify' command."); + + final Options commandOptions = CliFrontendParser.getModifyOptions(); + + final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions); + + final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, false); + + if (commandLine.hasOption(HELP_OPTION.getOpt())) { + CliFrontendParser.printHelpForModify(customCommandLines); + } + + final JobID jobId; + final String[] modifyArgs = commandLine.getArgs(); + + if (modifyArgs.length > 0) { + jobId = parseJobId(modifyArgs[0]); + } else { + throw new CliArgsException("Missing JobId"); + } + + final int newParallelism; + if (commandLine.hasOption(MODIFY_PARALLELISM_OPTION.getOpt())) { + try { + newParallelism = Integer.parseInt(commandLine.getOptionValue(MODIFY_PARALLELISM_OPTION.getOpt())); + } catch (NumberFormatException e) { + throw new CliArgsException("Could not parse the parallelism which is supposed to be an integer.", e); + } + } else { + throw new CliArgsException("Missing new parallelism."); + } + + final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine); + + logAndSysout("Modify job " + jobId + '.'); + runClusterAction( + activeCommandLine, + commandLine, + clusterClient -> { + CompletableFuture rescaleFuture = clusterClient.rescaleJob(jobId, newParallelism); + + try { + rescaleFuture.get(); + } catch (Exception e) { + throw new FlinkException("Could not rescale job " + jobId + '.', ExceptionUtils.stripExecutionException(e)); + } + logAndSysout("Rescaled job " + jobId + ". Its new parallelism is " + newParallelism + '.'); + } + ); + } + // -------------------------------------------------------------------------------------------- // Interaction with programs and JobManager // -------------------------------------------------------------------------------------------- @@ -977,6 +1033,9 @@ public int parseParameters(String[] args) { case ACTION_SAVEPOINT: savepoint(params); return 0; + case ACTION_MODIFY: + modify(params); + return 0; case "-h": case "--help": CliFrontendParser.printHelp(customCommandLines); diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index e5d550fc9c4a4..eb6826436a5c8 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -95,6 +95,8 @@ public class CliFrontendParser { "directory is optional. If no directory is specified, the configured default " + "directory (" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + ") is used."); + static final Option MODIFY_PARALLELISM_OPTION = new Option("p", "parallelism", true, "New parallelism for the specified job."); + static { HELP_OPTION.setRequired(false); @@ -134,6 +136,9 @@ public class CliFrontendParser { CANCEL_WITH_SAVEPOINT_OPTION.setRequired(false); CANCEL_WITH_SAVEPOINT_OPTION.setArgName("targetDirectory"); CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true); + + MODIFY_PARALLELISM_OPTION.setRequired(false); + MODIFY_PARALLELISM_OPTION.setArgName("newParallelism"); } private static final Options RUN_OPTIONS = getRunCommandOptions(); @@ -198,6 +203,12 @@ static Options getSavepointCommandOptions() { return options.addOption(JAR_OPTION); } + static Options getModifyOptions() { + final Options options = buildGeneralOptions(new Options()); + options.addOption(MODIFY_PARALLELISM_OPTION); + return options; + } + // -------------------------------------------------------------------------------------------- // Help // -------------------------------------------------------------------------------------------- @@ -247,6 +258,7 @@ public static void printHelp(Collection> customCommandLines printHelpForStop(customCommandLines); printHelpForCancel(customCommandLines); printHelpForSavepoint(customCommandLines); + printHelpForModify(customCommandLines); System.out.println(); } @@ -339,6 +351,21 @@ public static void printHelpForSavepoint(Collection> custom System.out.println(); } + public static void printHelpForModify(Collection> customCommandLines) { + HelpFormatter formatter = new HelpFormatter(); + formatter.setLeftPadding(5); + formatter.setWidth(80); + + System.out.println("\nAction \"modify\" modifies a running job (e.g. change of parallelism)."); + System.out.println("\n Syntax: modify [OPTIONS]"); + formatter.setSyntaxPrefix(" \"modify\" action options:"); + formatter.printHelp(" ", getModifyOptions()); + + printCustomCliOptions(customCommandLines, formatter, false); + + System.out.println(); + } + /** * Prints custom cli options. * @param formatter The formatter to use for printing diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index 62be23415b31f..e2efbac523ceb 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -984,4 +984,15 @@ public Configuration getFlinkConfiguration() { protected abstract JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException; + /** + * Rescales the specified job such that it will have the new parallelism. + * + * @param jobId specifying the job to modify + * @param newParallelism specifying the new parallelism of the rescaled job + * @return Future which is completed once the rescaling has been completed + */ + public CompletableFuture rescaleJob(JobID jobId, int newParallelism) { + throw new UnsupportedOperationException("The " + getClass().getSimpleName() + " does not support rescaling."); + } + } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 28bd2b542b73b..8ad571f375817 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -38,8 +38,14 @@ import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; import org.apache.flink.runtime.rest.handler.async.TriggerResponse; +import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeaders; +import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusMessageParameters; +import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders; +import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerMessageParameters; import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; @@ -401,6 +407,49 @@ public LeaderConnectionInfo getClusterConnectionInfo() throws LeaderRetrievalExc timeout); } + @Override + public CompletableFuture rescaleJob(JobID jobId, int newParallelism) { + + final RescalingTriggerHeaders rescalingTriggerHeaders = RescalingTriggerHeaders.getInstance(); + final RescalingTriggerMessageParameters rescalingTriggerMessageParameters = rescalingTriggerHeaders.getUnresolvedMessageParameters(); + rescalingTriggerMessageParameters.jobPathParameter.resolve(jobId); + rescalingTriggerMessageParameters.rescalingParallelismQueryParameter.resolve(Collections.singletonList(newParallelism)); + + final CompletableFuture rescalingTriggerResponseFuture = sendRequest( + rescalingTriggerHeaders, + rescalingTriggerMessageParameters, + EmptyRequestBody.getInstance()); + + final CompletableFuture rescalingOperationFuture = rescalingTriggerResponseFuture.thenCompose( + (TriggerResponse triggerResponse) -> { + final TriggerId triggerId = triggerResponse.getTriggerId(); + + return pollResourceAsync( + () -> { + final RescalingStatusHeaders rescalingStatusHeaders = RescalingStatusHeaders.getInstance(); + final RescalingStatusMessageParameters rescalingStatusMessageParameters = rescalingStatusHeaders.getUnresolvedMessageParameters(); + + rescalingStatusMessageParameters.jobPathParameter.resolve(jobId); + rescalingStatusMessageParameters.triggerIdPathParameter.resolve(triggerId); + return sendRetryableRequest( + rescalingStatusHeaders, + rescalingStatusMessageParameters, + EmptyRequestBody.getInstance(), + isConnectionProblemException()); + } + ); + }); + + return rescalingOperationFuture.thenApply( + (AsynchronousOperationInfo asynchronousOperationInfo) -> { + if (asynchronousOperationInfo.getFailureCause() == null) { + return Acknowledge.get(); + } else { + throw new CompletionException(asynchronousOperationInfo.getFailureCause()); + } + }); + } + /** * Creates a {@code CompletableFuture} that polls a {@code AsynchronouslyCreatedResource} until * its {@link AsynchronouslyCreatedResource#queueStatus() QueueStatus} becomes diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java new file mode 100644 index 0000000000000..50d87bacf753e --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.cli; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.util.MockedCliFrontend; +import org.apache.flink.client.program.StandaloneClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.TestLogger; + +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Tests for the modify command. + */ +public class CliFrontendModifyTest extends TestLogger { + + @Test + public void testModifyJob() throws Exception { + final JobID jobId = new JobID(); + final int parallelism = 42; + String[] args = {jobId.toString(), "-p", String.valueOf(parallelism)}; + + Tuple2 jobIdParallelism = callModify(args); + + assertThat(jobIdParallelism.f0, Matchers.is(jobId)); + assertThat(jobIdParallelism.f1, Matchers.is(parallelism)); + } + + @Test + public void testMissingJobId() throws Exception { + final int parallelism = 42; + final String[] args = {"-p", String.valueOf(parallelism)}; + + try { + callModify(args); + fail("Expected CliArgsException"); + } catch (CliArgsException expected) { + // expected + } + } + + @Test + public void testMissingParallelism() throws Exception { + final JobID jobId = new JobID(); + final String[] args = {jobId.toString()}; + + try { + callModify(args); + fail("Expected CliArgsException"); + } catch (CliArgsException expected) { + // expected + } + } + + @Test + public void testUnparsableParalllelism() throws Exception { + final JobID jobId = new JobID(); + final String[] args = {jobId.toString(), "-p", "foobar"}; + + try { + callModify(args); + fail("Expected CliArgsException"); + } catch (CliArgsException expected) { + // expected + } + } + + @Test + public void testUnparsableJobId() throws Exception { + final int parallelism = 42; + final String[] args = {"foobar", "-p", String.valueOf(parallelism)}; + + try { + callModify(args); + fail("Expected CliArgsException"); + } catch (CliArgsException expected) { + // expected + } + } + + private Tuple2 callModify(String[] args) throws Exception { + final CompletableFuture> rescaleJobFuture = new CompletableFuture<>(); + final TestingClusterClient clusterClient = new TestingClusterClient(rescaleJobFuture); + final MockedCliFrontend cliFrontend = new MockedCliFrontend(clusterClient); + + cliFrontend.modify(args); + + assertThat(rescaleJobFuture.isDone(), Matchers.is(true)); + + return rescaleJobFuture.get(); + } + + private static final class TestingClusterClient extends StandaloneClusterClient { + + private final CompletableFuture> rescaleJobFuture; + + public TestingClusterClient(CompletableFuture> rescaleJobFuture) throws Exception { + super(new Configuration(), new TestingHighAvailabilityServices()); + + this.rescaleJobFuture = rescaleJobFuture; + } + + @Override + public CompletableFuture rescaleJob(JobID jobId, int newParallelism) { + rescaleJobFuture.complete(Tuple2.of(jobId, newParallelism)); + + return CompletableFuture.completedFuture(Acknowledge.get()); + } + } + +} diff --git a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java index 6ab11618b1f22..5864c8a985d39 100644 --- a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java +++ b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java @@ -18,6 +18,8 @@ package org.apache.flink.util.function; +import org.apache.flink.util.ExceptionUtils; + import java.util.function.BiConsumer; /** @@ -44,7 +46,7 @@ default void accept(T t, U u) { try { acceptWithException(t, u); } catch (Throwable e) { - throw new RuntimeException(e); + ExceptionUtils.rethrow(e); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 22c69f5c4ee5b..dd2a7eaef6411 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -473,11 +473,17 @@ public CompletableFuture rescaleOperators( int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) { + + if (newParallelism <= 0) { + return FutureUtils.completedExceptionally( + new JobModificationException("The target parallelism of a rescaling operation must be larger than 0.")); + } + // 1. Check whether we can rescale the job & rescale the respective vertices for (JobVertexID jobVertexId : operators) { final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId); - // update max parallelism in case that it has not been configure + // update max parallelism in case that it has not been configured final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); if (executionJobVertex != null) { @@ -595,7 +601,7 @@ public CompletableFuture rescaleOperators( return Acknowledge.get(); } else { - throw new CompletionException(new JobModificationException("Detected concurrent modification of ExecutionGraph. Aborting the resacling.")); + throw new CompletionException(new JobModificationException("Detected concurrent modification of ExecutionGraph. Aborting the rescaling.")); } }, @@ -1145,8 +1151,8 @@ private void disposeSavepoint(String savepointPath) { jobMasterConfiguration.getConfiguration(), userCodeLoader, log); - } catch (FlinkException | IOException de) { - log.info("Could not dispose temporary rescaling savepoint under {}.", savepointPath, de); + } catch (FlinkException | IOException e) { + log.info("Could not dispose temporary rescaling savepoint under {}.", savepointPath, e); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java index a46fba90e8765..7d75bf6bf7f8e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java @@ -43,9 +43,9 @@ public class AsynchronousOperationInfo { private final SerializedThrowable failureCause; private AsynchronousOperationInfo( - @JsonProperty(FIELD_NAME_FAILURE_CAUSE) - @JsonDeserialize(using = SerializedThrowableDeserializer.class) - @Nullable SerializedThrowable failureCause) { + @JsonProperty(FIELD_NAME_FAILURE_CAUSE) + @JsonDeserialize(using = SerializedThrowableDeserializer.class) + @Nullable SerializedThrowable failureCause) { this.failureCause = failureCause; }