diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index 18bcfebed329c..9a8f5033f3ff5 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -133,7 +133,7 @@ public void testMultipleAMKill() throws Exception { "@@" + CheckpointingOptions.CHECKPOINTS_DIRECTORY + "=" + fsStateHandlePath + "/checkpoints" + "@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery"); - ClusterClient yarnCluster = null; + ClusterClient yarnClusterClient = null; final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES); @@ -147,10 +147,10 @@ public void testMultipleAMKill() throws Exception { .createClusterSpecification(); try { - yarnCluster = flinkYarnClient.deploySessionCluster(clusterSpecification); + yarnClusterClient = flinkYarnClient.deploySessionCluster(clusterSpecification); highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( - yarnCluster.getFlinkConfiguration(), + yarnClusterClient.getFlinkConfiguration(), Executors.directExecutor(), HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); @@ -201,8 +201,10 @@ protected void run() { }}; } finally { - if (yarnCluster != null) { - yarnCluster.shutdown(); + if (yarnClusterClient != null) { + log.info("Shutting down the Flink Yarn application."); + yarnClusterClient.shutDownCluster(); + yarnClusterClient.shutdown(); } if (highAvailabilityServices != null) { diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 514a3d55fff2f..1a0520f68fe9a 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -18,6 +18,7 @@ package org.apache.flink.yarn; +import org.apache.flink.api.common.time.Deadline; import org.apache.flink.client.cli.CliFrontend; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.CoreOptions; @@ -67,6 +68,7 @@ import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.io.PrintStream; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -79,6 +81,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * This base class allows to use the MiniYARNCluster. @@ -186,39 +189,55 @@ public static void populateYarnSecureConfigurations(Configuration conf, String p conf.set("hadoop.security.auth_to_local", "RULE:[1:$1] RULE:[2:$1]"); } - /** - * Sleep a bit between the tests (we are re-using the YARN cluster for the tests). - */ - @After - public void sleep() { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Assert.fail("Should not happen"); - } - } - @Before - public void checkClusterEmpty() throws IOException, YarnException { + public void checkClusterEmpty() { if (yarnClient == null) { yarnClient = YarnClient.createYarnClient(); yarnClient.init(getYarnConfiguration()); yarnClient.start(); } - List apps = yarnClient.getApplications(); - for (ApplicationReport app : apps) { - if (app.getYarnApplicationState() != YarnApplicationState.FINISHED - && app.getYarnApplicationState() != YarnApplicationState.KILLED - && app.getYarnApplicationState() != YarnApplicationState.FAILED) { - Assert.fail("There is at least one application on the cluster is not finished." + - "App " + app.getApplicationId() + " is in state " + app.getYarnApplicationState()); + flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration); + + isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType()); + } + + /** + * Sleep a bit between the tests (we are re-using the YARN cluster for the tests). + */ + @After + public void sleep() throws IOException, YarnException { + Deadline deadline = Deadline.now().plus(Duration.ofSeconds(10)); + + boolean isAnyJobRunning = yarnClient.getApplications().stream() + .anyMatch(YarnTestBase::isApplicationRunning); + + while (deadline.hasTimeLeft() && isAnyJobRunning) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Assert.fail("Should not happen"); } + isAnyJobRunning = yarnClient.getApplications().stream() + .anyMatch(YarnTestBase::isApplicationRunning); } - flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration); + if (isAnyJobRunning) { + final List runningApps = yarnClient.getApplications().stream() + .filter(YarnTestBase::isApplicationRunning) + .map(app -> "App " + app.getApplicationId() + " is in state " + app.getYarnApplicationState() + '.') + .collect(Collectors.toList()); + if (!runningApps.isEmpty()) { + Assert.fail("There is at least one application on the cluster that is not finished." + runningApps); + } + } + } - isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType()); + private static boolean isApplicationRunning(ApplicationReport app) { + final YarnApplicationState yarnApplicationState = app.getYarnApplicationState(); + return yarnApplicationState != YarnApplicationState.FINISHED + && app.getYarnApplicationState() != YarnApplicationState.KILLED + && app.getYarnApplicationState() != YarnApplicationState.FAILED; } @Nullable