diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java index a0c995d210f1d..8ae42c8ed6fa3 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java @@ -115,7 +115,6 @@ private int run(String[] args) throws FlinkException, CliArgsException { kubernetesClusterClientFactory.getClusterSpecification( configuration)) .getClusterClient(); - clusterId = clusterClient.getClusterId(); } try { @@ -131,7 +130,7 @@ private int run(String[] args) throws FlinkException, CliArgsException { e); } if (continueRepl.f1) { - kubernetesClusterDescriptor.killCluster(clusterId); + clusterClient.shutDownCluster(); } } clusterClient.close(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 5aae4cdbc64a6..267dc648cfc0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -692,7 +692,14 @@ public CompletableFuture shutDownCluster() { @Override public CompletableFuture shutDownCluster( final ApplicationStatus applicationStatus) { - shutDownFuture.complete(applicationStatus); + // Graceful shutdown will not clean up the HA data when dispatcher still has running jobs + if (runningJobs.isEmpty()) { + shutDownFuture.complete(applicationStatus); + } else { + shutDownFuture.completeExceptionally( + new NotAllJobsFinishedException( + "Not all jobs finished. Running jobs: " + runningJobs.keySet())); + } return CompletableFuture.completedFuture(Acknowledge.get()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NotAllJobsFinishedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NotAllJobsFinishedException.java new file mode 100644 index 0000000000000..229bb7db9b15e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NotAllJobsFinishedException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +/** Exception indicating that not all jobs in dispatcher have finished. */ +public class NotAllJobsFinishedException extends DispatcherException { + private static final long serialVersionUID = 1L; + + public NotAllJobsFinishedException(String message) { + super(message); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 69b090d15e6da..c92eedb39f2c6 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore; import org.apache.flink.runtime.dispatcher.MiniDispatcher; +import org.apache.flink.runtime.dispatcher.NotAllJobsFinishedException; import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent; import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; @@ -266,7 +267,9 @@ private void runCluster(Configuration configuration, PluginManager pluginManager (ApplicationStatus applicationStatus, Throwable throwable) -> { if (throwable != null) { shutDownAsync( - ApplicationStatus.UNKNOWN, + throwable instanceof NotAllJobsFinishedException + ? ApplicationStatus.CANCELED + : ApplicationStatus.UNKNOWN, ShutdownBehaviour.STOP_APPLICATION, ExceptionUtils.stringifyException(throwable), false); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index ddf8c936fa46b..80b90b13b31d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore; +import org.apache.flink.runtime.dispatcher.NotAllJobsFinishedException; import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils; import org.apache.flink.runtime.entrypoint.ClusterInformation; import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; @@ -80,6 +81,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.util.FatalExitExceptionHandler; import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever; @@ -436,8 +438,16 @@ private void setupDispatcherResourceManagerComponents( dispatcherResourceManagerComponents) { final CompletableFuture shutDownFuture = dispatcherResourceManagerComponent.getShutDownFuture(); - FutureUtils.assertNoException( - shutDownFuture.thenRun(dispatcherResourceManagerComponent::closeAsync)); + shutDownFuture.whenComplete( + (ignored, throwable) -> { + // Expected exception when shutdown cluster with running jobs + if (throwable instanceof NotAllJobsFinishedException) { + dispatcherResourceManagerComponent.closeAsync(); + } else { + FutureUtils.handleUncaughtException( + shutDownFuture, FatalExitExceptionHandler.INSTANCE); + } + }); shutDownFutures.add(shutDownFuture); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 4cc63e0cdaa98..a0eb3ce927377 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.VoidBlobStore; @@ -130,12 +131,12 @@ import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -751,6 +752,29 @@ public void testShutDownClusterShouldCompleteShutDownFuture() throws Exception { dispatcher.getShutDownFuture().get(); } + @Test + public void testShutDownClusterWithRunningJobsShouldCompleteShutDownFutureExceptionally() + throws Exception { + dispatcher = + createAndStartDispatcher( + heartbeatServices, + haServices, + JobMasterServiceLeadershipRunnerFactory.INSTANCE); + final DispatcherGateway dispatcherGateway = + dispatcher.getSelfGateway(DispatcherGateway.class); + + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + + dispatcherGateway.shutDownCluster().get(); + + try { + dispatcher.getShutDownFuture().get(); + } catch (Exception e) { + assertThat(e, FlinkMatchers.containsCause(NotAllJobsFinishedException.class)); + assertThat(e, FlinkMatchers.containsMessage("Not all jobs finished")); + } + } + @Test public void testOnRemovedJobGraphDoesNotCleanUpHAFiles() throws Exception { final CompletableFuture removeJobGraphFuture = new CompletableFuture<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java index 121adcebb5f33..08c56a7212f7a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore; import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore; +import org.apache.flink.runtime.dispatcher.NotAllJobsFinishedException; import org.apache.flink.runtime.dispatcher.PartialDispatcherServices; import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner; import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory; @@ -48,6 +49,7 @@ import org.apache.flink.runtime.testutils.TestingClusterEntrypointProcess; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.testutils.executor.TestExecutorResource; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.TestLogger; @@ -63,6 +65,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -229,6 +232,80 @@ public void testCloseAsyncShouldBeExecutedInShutdownHook() throws Exception { } } + @Test + public void testShutDownFutureCompleteExceptionallyShouldNotCleanUpHAData() throws Exception { + testShutDownCluster( + dispatcherShutDownFuture -> + dispatcherShutDownFuture.completeExceptionally( + new FlinkException("Exception")), + ApplicationStatus.UNKNOWN, + false); + } + + @Test + public void testShutDownFutureCompleteWithNotAllJobsFinishedExceptionShouldNotCleanUpHAData() + throws Exception { + testShutDownCluster( + dispatcherShutDownFuture -> + dispatcherShutDownFuture.completeExceptionally( + new NotAllJobsFinishedException("Not all jobs finished.")), + ApplicationStatus.CANCELED, + false); + } + + @Test + public void testShutDownFutureCompleteNormallyShouldCleanUpHAData() throws Exception { + testShutDownCluster( + dispatcherShutDownFuture -> + dispatcherShutDownFuture.complete(ApplicationStatus.SUCCEEDED), + ApplicationStatus.SUCCEEDED, + true); + } + + private void testShutDownCluster( + Consumer> dispatcherShutDownFutureCompleteConsumer, + ApplicationStatus expectedApplicationStatus, + boolean shouldCleanUpHAData) + throws Exception { + final CompletableFuture closeFuture = new CompletableFuture<>(); + final CompletableFuture closeAndCleanupAllDataFuture = new CompletableFuture<>(); + final CompletableFuture dispatcherShutDownFuture = + new CompletableFuture<>(); + + final HighAvailabilityServices testingHaService = + new TestingHighAvailabilityServicesBuilder() + .setCloseFuture(closeFuture) + .setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture) + .build(); + final TestingDispatcherRunnerFactory testingDispatcherRunnerFactory = + new TestingDispatcherRunnerFactory.Builder() + .setShutDownFuture(dispatcherShutDownFuture) + .build(); + final TestingResourceManagerFactory testingResourceManagerFactory = + new TestingResourceManagerFactory.Builder() + .setInitializeConsumer( + (ignore) -> + dispatcherShutDownFutureCompleteConsumer.accept( + dispatcherShutDownFuture)) + .build(); + final TestingEntryPoint testingEntryPoint = + new TestingEntryPoint.Builder() + .setConfiguration(flinkConfig) + .setHighAvailabilityServices(testingHaService) + .setDispatcherRunnerFactory(testingDispatcherRunnerFactory) + .setResourceManagerFactory(testingResourceManagerFactory) + .build(); + + final CompletableFuture appStatusFuture = + startClusterEntrypoint(testingEntryPoint); + + assertThat( + appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS), + is(expectedApplicationStatus)); + assertThat(closeFuture.isDone(), is(!shouldCleanUpHAData)); + assertThat(closeAndCleanupAllDataFuture.isDone(), is(shouldCleanUpHAData)); + } + private CompletableFuture startClusterEntrypoint( TestingEntryPoint testingEntryPoint) throws Exception { testingEntryPoint.startCluster();