From 4d2c79fc304449cd4abe915f593e9905e4a1adc3 Mon Sep 17 00:00:00 2001 From: gyao Date: Fri, 2 Mar 2018 15:11:36 +0100 Subject: [PATCH 1/3] [FLINK-8459][flip6] Implement RestClusterClient.cancelWithSavepoint Introduce cancelJob flag to existing triggerSavepoint methods in Dispatcher and JobMaster. Stop checkpoint scheduler before taking savepoint to make sure that the savepoint created by this command is the last one. --- .../client/program/MiniClusterClient.java | 2 +- .../program/rest/RestClusterClient.java | 15 +- .../flink/runtime/dispatcher/Dispatcher.java | 3 +- .../executiongraph/ExecutionGraph.java | 1 + .../flink/runtime/jobmaster/JobMaster.java | 47 +++- .../runtime/jobmaster/JobMasterGateway.java | 1 + .../runtime/minicluster/MiniCluster.java | 11 + .../job/savepoints/SavepointHandlers.java | 3 +- .../SavepointTriggerRequestBody.java | 13 +- .../runtime/webmonitor/RestfulGateway.java | 1 + .../utils/TestingJobMasterGateway.java | 4 +- ...ractAsynchronousOperationHandlersTest.java | 2 +- .../job/savepoints/SavepointHandlersTest.java | 2 +- .../SavepointTriggerRequestBodyTest.java | 4 +- .../webmonitor/TestingRestfulGateway.java | 2 +- .../JobMasterTriggerSavepointIT.java | 219 ++++++++++++++++++ 16 files changed, 307 insertions(+), 23 deletions(-) create mode 100644 flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java index aca75e0c15087..dd99f0dabeb99 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java @@ -87,7 +87,7 @@ public void cancel(JobID jobId) throws Exception { @Override public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception { - throw new UnsupportedOperationException("MiniClusterClient does not yet support this operation."); + return miniCluster.triggerSavepoint(jobId, savepointDirectory, true).get(); } @Override diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 976f2a4db31ca..8c90dc988a996 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -353,24 +353,29 @@ public void cancel(JobID jobID) throws Exception { @Override public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception { - throw new UnsupportedOperationException("Not implemented yet."); + return triggerSavepoint(jobId, savepointDirectory, true).get(); } @Override public CompletableFuture triggerSavepoint( final JobID jobId, final @Nullable String savepointDirectory) throws FlinkException { + return triggerSavepoint(jobId, savepointDirectory, false); + } + + private CompletableFuture triggerSavepoint( + final JobID jobId, + final @Nullable String savepointDirectory, + final boolean cancelJob) { final SavepointTriggerHeaders savepointTriggerHeaders = SavepointTriggerHeaders.getInstance(); final SavepointTriggerMessageParameters savepointTriggerMessageParameters = savepointTriggerHeaders.getUnresolvedMessageParameters(); savepointTriggerMessageParameters.jobID.resolve(jobId); - final CompletableFuture responseFuture; - - responseFuture = sendRequest( + final CompletableFuture responseFuture = sendRequest( savepointTriggerHeaders, savepointTriggerMessageParameters, - new SavepointTriggerRequestBody(savepointDirectory)); + new SavepointTriggerRequestBody(savepointDirectory, cancelJob)); return responseFuture.thenCompose(savepointTriggerResponseBody -> { final TriggerId savepointTriggerId = savepointTriggerResponseBody.getTriggerId(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 7a11cf0878418..9b2411c66b789 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -487,11 +487,12 @@ public CompletableFuture getBlobServerPort(Time timeout) { public CompletableFuture triggerSavepoint( final JobID jobId, final String targetDirectory, + final boolean cancelJob, final Time timeout) { if (jobManagerRunners.containsKey(jobId)) { return jobManagerRunners.get(jobId) .getJobManagerGateway() - .triggerSavepoint(targetDirectory, timeout); + .triggerSavepoint(targetDirectory, cancelJob, timeout); } else { return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 4e8b972bc0786..ee23884d3a6f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -524,6 +524,7 @@ public void enableCheckpointing( } } + @Nullable public CheckpointCoordinator getCheckpointCoordinator() { return checkpointCoordinator; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index bfe8aaaa2424a..74f9b656c6f0e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -971,14 +971,44 @@ public CompletableFuture requestJob(Time timeout) { @Override public CompletableFuture triggerSavepoint( - @Nullable final String targetDirectory, - final Time timeout) { - try { - return executionGraph.getCheckpointCoordinator() - .triggerSavepoint(System.currentTimeMillis(), targetDirectory) - .thenApply(CompletedCheckpoint::getExternalPointer); - } catch (Exception e) { - return FutureUtils.completedExceptionally(e); + @Nullable final String targetDirectory, + final boolean cancelJob, + final Time timeout) { + + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + if (checkpointCoordinator == null) { + return FutureUtils.completedExceptionally(new IllegalStateException( + String.format("Job %s is not a streaming job.", jobGraph.getJobID()))); + } + + if (cancelJob) { + checkpointCoordinator.stopCheckpointScheduler(); + } + return checkpointCoordinator + .triggerSavepoint(System.currentTimeMillis(), targetDirectory) + .thenApply(CompletedCheckpoint::getExternalPointer) + .thenApplyAsync(path -> { + if (cancelJob) { + log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID()); + cancel(timeout); + } + return path; + }, getMainThreadExecutor()) + .exceptionally(throwable -> { + if (cancelJob) { + startCheckpointScheduler(checkpointCoordinator); + } + throw new CompletionException(throwable); + }); + } + + private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) { + if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) { + try { + checkpointCoordinator.startCheckpointScheduler(); + } catch (IllegalStateException ignored) { + // Concurrent shut down of the coordinator + } } } @@ -1315,6 +1345,7 @@ private CompletableFuture restoreExecutionGraphFromRescalingSave private CompletableFuture getJobModificationSavepoint(Time timeout) { return triggerSavepoint( null, + false, timeout) .handleAsync( (String savepointPath, Throwable throwable) -> { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 6173a26cd8292..1e1bdda45117a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -263,6 +263,7 @@ CompletableFuture registerTaskManager( */ CompletableFuture triggerSavepoint( @Nullable final String targetDirectory, + final boolean cancelJob, final Time timeout); /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 6b5f9b50aecca..39b0192e3486e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -483,6 +483,17 @@ public CompletableFuture cancelJob(JobID jobId) { } } + public CompletableFuture triggerSavepoint(JobID jobId, String targetDirectory, boolean cancelJob) { + try { + return getDispatcherGateway().triggerSavepoint(jobId, targetDirectory, cancelJob, rpcTimeout); + } catch (LeaderRetrievalException | InterruptedException e) { + return FutureUtils.completedExceptionally( + new FlinkException( + String.format("Could not cancel job %s.", jobId), + e)); + } + } + // ------------------------------------------------------------------------ // running jobs // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java index cb3ff5bb06c46..17e263bec6e63 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java @@ -127,8 +127,9 @@ protected CompletableFuture triggerOperation(HandlerRequest requestMultipleJobDetails( default CompletableFuture triggerSavepoint( JobID jobId, String targetDirectory, + boolean cancelJob, @RpcTimeout Time timeout) { throw new UnsupportedOperationException(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java index cac7e90bd09b3..0d57a56b2ceb3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java @@ -48,6 +48,8 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import javax.annotation.Nullable; + import java.net.InetSocketAddress; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -153,7 +155,7 @@ public CompletableFuture requestJob(Time timeout) { } @Override - public CompletableFuture triggerSavepoint(String targetDirectory, Time timeout) { + public CompletableFuture triggerSavepoint(@Nullable final String targetDirectory, final boolean cancelJob, final Time timeout) { throw new UnsupportedOperationException(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java index 848e2539d7b07..7ad140e6d20bd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java @@ -305,7 +305,7 @@ protected TestingTriggerHandler(CompletableFuture localRestAddress, Gate @Override protected CompletableFuture triggerOperation(HandlerRequest request, RestfulGateway gateway) throws RestHandlerException { - return gateway.triggerSavepoint(new JobID(), null, timeout); + return gateway.triggerSavepoint(new JobID(), null, false, timeout); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java index a8f4b3f275406..06944525a39dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java @@ -200,7 +200,7 @@ private static HandlerRequest( - new SavepointTriggerRequestBody(targetDirectory), + new SavepointTriggerRequestBody(targetDirectory, false), new SavepointTriggerMessageParameters(), Collections.singletonMap(JobIDPathParameter.KEY, JOB_ID.toString()), Collections.emptyMap()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBodyTest.java index f7c3973627c89..6b9d4505dae0e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBodyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBodyTest.java @@ -34,8 +34,8 @@ protected Class getTestRequestClass() { } @Override - protected SavepointTriggerRequestBody getTestRequestInstance() throws Exception { - return new SavepointTriggerRequestBody("/tmp"); + protected SavepointTriggerRequestBody getTestRequestInstance() { + return new SavepointTriggerRequestBody("/tmp", true); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java index 5eff5a680e649..b92ba5182bee4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java @@ -190,7 +190,7 @@ public CompletableFuture requestOperatorBackP } @Override - public CompletableFuture triggerSavepoint(JobID jobId, String targetDirectory, Time timeout) { + public CompletableFuture triggerSavepoint(JobID jobId, String targetDirectory, boolean cancelJob, Time timeout) { return triggerSavepointFunction.apply(jobId, targetDirectory); } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java new file mode 100644 index 0000000000000..f9edfa6369540 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointTriggerException; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Assume; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.isOneOf; + +/** + * Tests for {@link org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, Time)}. + * + * @see org.apache.flink.runtime.jobmaster.JobMaster + */ +@Category(Flip6.class) +public class JobMasterTriggerSavepointIT extends AbstractTestBase { + + private static CountDownLatch invokeLatch; + + private static volatile CountDownLatch triggerCheckpointLatch; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private Path savepointDirectory; + private MiniClusterClient clusterClient; + private JobGraph jobGraph; + + @Before + public void setUp() throws Exception { + invokeLatch = new CountDownLatch(1); + triggerCheckpointLatch = new CountDownLatch(1); + savepointDirectory = temporaryFolder.newFolder().toPath(); + + Assume.assumeTrue( + "ClusterClient is not an instance of MiniClusterClient", + miniClusterResource.getClusterClient() instanceof MiniClusterClient); + + clusterClient = (MiniClusterClient) miniClusterResource.getClusterClient(); + clusterClient.setDetached(true); + + jobGraph = new JobGraph(); + + final JobVertex vertex = new JobVertex("testVertex"); + vertex.setInvokableClass(NoOpBlockingInvokable.class); + jobGraph.addVertex(vertex); + + jobGraph.setSnapshotSettings(new JobCheckpointingSettings( + Collections.singletonList(vertex.getID()), + Collections.singletonList(vertex.getID()), + Collections.singletonList(vertex.getID()), + new CheckpointCoordinatorConfiguration( + 10, + 60_000, + 10, + 1, + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true), + null + )); + + clusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader()); + invokeLatch.await(60, TimeUnit.SECONDS); + waitForJob(); + } + + @Test + public void testStopJobAfterSavepoint() throws Exception { + final String savepointLocation = cancelWithSavepoint(); + final JobStatus jobStatus = clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS); + + assertThat(jobStatus, isOneOf(JobStatus.CANCELED, JobStatus.CANCELLING)); + + final List savepoints = Files.list(savepointDirectory).map(Path::getFileName).collect(Collectors.toList()); + assertThat(savepoints, hasItem(Paths.get(savepointLocation).getFileName())); + } + + @Test + public void testDoNotCancelJobIfSavepointFails() throws Exception { + try { + Files.setPosixFilePermissions(savepointDirectory, Collections.emptySet()); + } catch (IOException e) { + Assume.assumeNoException(e); + } + + try { + cancelWithSavepoint(); + } catch (Exception e) { + assertThat(ExceptionUtils.findThrowable(e, CheckpointTriggerException.class).isPresent(), equalTo(true)); + } + + final JobStatus jobStatus = clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS); + assertThat(jobStatus, equalTo(JobStatus.RUNNING)); + + // assert that checkpoints are continued to be triggered + triggerCheckpointLatch = new CountDownLatch(1); + assertThat(triggerCheckpointLatch.await(60, TimeUnit.SECONDS), equalTo(true)); + } + + private void waitForJob() throws Exception { + for (int i = 0; i < 60; i++) { + try { + final JobStatus jobStatus = clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS); + assertThat(jobStatus.isGloballyTerminalState(), equalTo(false)); + if (jobStatus == JobStatus.RUNNING) { + return; + } + } catch (ExecutionException ignored) { + // JobManagerRunner is not yet registered in Dispatcher + } + Thread.sleep(1000); + } + throw new AssertionError("Job did not become running within timeout."); + } + + /** + * Invokable which calls {@link CountDownLatch#countDown()} on + * {@link JobMasterTriggerSavepointIT#invokeLatch}, and then blocks afterwards. + */ + public static class NoOpBlockingInvokable extends AbstractInvokable { + + public NoOpBlockingInvokable(final Environment environment) { + super(environment); + } + + @Override + public void invoke() { + invokeLatch.countDown(); + try { + Thread.sleep(Long.MAX_VALUE); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override + public boolean triggerCheckpoint(final CheckpointMetaData checkpointMetaData, final CheckpointOptions checkpointOptions) throws Exception { + final TaskStateSnapshot checkpointStateHandles = new TaskStateSnapshot(); + checkpointStateHandles.putSubtaskStateByOperatorID( + OperatorID.fromJobVertexID(getEnvironment().getJobVertexId()), + new OperatorSubtaskState()); + + getEnvironment().acknowledgeCheckpoint( + checkpointMetaData.getCheckpointId(), + new CheckpointMetrics(), + checkpointStateHandles); + + triggerCheckpointLatch.countDown(); + + return true; + } + + @Override + public void notifyCheckpointComplete(final long checkpointId) throws Exception { + } + } + + private String cancelWithSavepoint() throws Exception { + return clusterClient.cancelWithSavepoint( + jobGraph.getJobID(), + savepointDirectory.toAbsolutePath().toString()); + } + +} From 3cdd1dfaad9d663ce38e9763e9189f9bdd1b604f Mon Sep 17 00:00:00 2001 From: gyao Date: Fri, 2 Mar 2018 17:07:53 +0100 Subject: [PATCH 2/3] fixup! [FLINK-8459][flip6] Implement RestClusterClient.cancelWithSavepoint --- .../job/savepoints/SavepointTriggerRequestBody.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java index a97a4b095418f..feaf00e8dcfa2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java @@ -32,24 +32,23 @@ public class SavepointTriggerRequestBody implements RequestBody { public static final String FIELD_NAME_TARGET_DIRECTORY = "target-directory"; + private static final String FIELD_NAME_CANCEL_JOB = "cancel-job"; + @JsonProperty(FIELD_NAME_TARGET_DIRECTORY) @Nullable private final String targetDirectory; + @JsonProperty(FIELD_NAME_CANCEL_JOB) private final boolean cancelJob; @JsonCreator public SavepointTriggerRequestBody( @Nullable @JsonProperty(FIELD_NAME_TARGET_DIRECTORY) final String targetDirectory, - @JsonProperty(value = "cancel-job", defaultValue = "false") final boolean cancelJob) { + @JsonProperty(value = FIELD_NAME_CANCEL_JOB, defaultValue = "false") final boolean cancelJob) { this.targetDirectory = targetDirectory; this.cancelJob = cancelJob; } - public SavepointTriggerRequestBody(@Nullable final String targetDirectory) { - this(targetDirectory, false); - } - @Nullable public String getTargetDirectory() { return targetDirectory; From a99f1074a378182cbd0b4123c7809c9c9dece0e9 Mon Sep 17 00:00:00 2001 From: gyao Date: Fri, 2 Mar 2018 17:16:56 +0100 Subject: [PATCH 3/3] fixup! [FLINK-8459][flip6] Implement RestClusterClient.cancelWithSavepoint --- .../SavepointTriggerRequestBodyTest.java | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBodyTest.java index 6b9d4505dae0e..f79f8a2682188 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBodyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBodyTest.java @@ -20,14 +20,35 @@ import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; + import static org.junit.Assert.assertEquals; /** * Tests for {@link SavepointTriggerRequestBody}. */ +@RunWith(Parameterized.class) public class SavepointTriggerRequestBodyTest extends RestRequestMarshallingTestBase { + private final SavepointTriggerRequestBody savepointTriggerRequestBody; + + public SavepointTriggerRequestBodyTest(final SavepointTriggerRequestBody savepointTriggerRequestBody) { + this.savepointTriggerRequestBody = savepointTriggerRequestBody; + } + + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList(new Object[][]{ + {new SavepointTriggerRequestBody("/tmp", true)}, + {new SavepointTriggerRequestBody("/tmp", false)} + }); + } + @Override protected Class getTestRequestClass() { return SavepointTriggerRequestBody.class; @@ -35,7 +56,7 @@ protected Class getTestRequestClass() { @Override protected SavepointTriggerRequestBody getTestRequestInstance() { - return new SavepointTriggerRequestBody("/tmp", true); + return savepointTriggerRequestBody; } @Override