From 1e22ecb426328077781b116958d70d86af47e8d7 Mon Sep 17 00:00:00 2001 From: zentol Date: Tue, 17 Jul 2018 13:29:16 +0200 Subject: [PATCH 1/3] [FLINK-9815][yarn][tests] Harden tests against slow job shutdowns --- .../org/apache/flink/yarn/YarnTestBase.java | 64 ++++++++++++------- 1 file changed, 42 insertions(+), 22 deletions(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 514a3d55fff2f..3c2f2f7f7cc5a 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -18,6 +18,7 @@ package org.apache.flink.yarn; +import org.apache.flink.api.common.time.Deadline; import org.apache.flink.client.cli.CliFrontend; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.CoreOptions; @@ -67,6 +68,7 @@ import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.io.PrintStream; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -74,6 +76,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Scanner; import java.util.Set; import java.util.UUID; @@ -186,39 +189,56 @@ public static void populateYarnSecureConfigurations(Configuration conf, String p conf.set("hadoop.security.auth_to_local", "RULE:[1:$1] RULE:[2:$1]"); } - /** - * Sleep a bit between the tests (we are re-using the YARN cluster for the tests). - */ - @After - public void sleep() { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Assert.fail("Should not happen"); - } - } - @Before - public void checkClusterEmpty() throws IOException, YarnException { + public void checkClusterEmpty() { if (yarnClient == null) { yarnClient = YarnClient.createYarnClient(); yarnClient.init(getYarnConfiguration()); yarnClient.start(); } - List apps = yarnClient.getApplications(); - for (ApplicationReport app : apps) { - if (app.getYarnApplicationState() != YarnApplicationState.FINISHED - && app.getYarnApplicationState() != YarnApplicationState.KILLED - && app.getYarnApplicationState() != YarnApplicationState.FAILED) { - Assert.fail("There is at least one application on the cluster is not finished." + - "App " + app.getApplicationId() + " is in state " + app.getYarnApplicationState()); + flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration); + + isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType()); + } + + /** + * Sleep a bit between the tests (we are re-using the YARN cluster for the tests). + */ + @After + public void sleep() throws IOException, YarnException { + Deadline deadline = Deadline.now().plus(Duration.ofSeconds(10)); + + boolean isAnyJobRunning = yarnClient.getApplications().stream() + .anyMatch(YarnTestBase::isApplicationRunning); + + while (deadline.hasTimeLeft() && isAnyJobRunning) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Assert.fail("Should not happen"); } + isAnyJobRunning = yarnClient.getApplications().stream() + .anyMatch(YarnTestBase::isApplicationRunning); } - flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration); + if (isAnyJobRunning) { + final Optional runningApp = yarnClient.getApplications().stream() + .filter(YarnTestBase::isApplicationRunning) + .findAny(); + if (runningApp.isPresent()) { + final ApplicationReport app = runningApp.get(); + Assert.fail("There is at least one application on the cluster that is not finished." + + "App " + app.getApplicationId() + " is in state " + app.getYarnApplicationState()); + } + } + } - isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType()); + private static boolean isApplicationRunning(ApplicationReport app) { + final YarnApplicationState yarnApplicationState = app.getYarnApplicationState(); + return yarnApplicationState != YarnApplicationState.FINISHED + && app.getYarnApplicationState() != YarnApplicationState.KILLED + && app.getYarnApplicationState() != YarnApplicationState.FAILED; } @Nullable From d1b0946ddf193b03e8af762d86fe74a7837b30cc Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 18 Jul 2018 13:18:54 +0200 Subject: [PATCH 2/3] list all running apps --- .../java/org/apache/flink/yarn/YarnTestBase.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 3c2f2f7f7cc5a..1a0520f68fe9a 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -76,12 +76,12 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Scanner; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * This base class allows to use the MiniYARNCluster. @@ -223,13 +223,12 @@ public void sleep() throws IOException, YarnException { } if (isAnyJobRunning) { - final Optional runningApp = yarnClient.getApplications().stream() + final List runningApps = yarnClient.getApplications().stream() .filter(YarnTestBase::isApplicationRunning) - .findAny(); - if (runningApp.isPresent()) { - final ApplicationReport app = runningApp.get(); - Assert.fail("There is at least one application on the cluster that is not finished." + - "App " + app.getApplicationId() + " is in state " + app.getYarnApplicationState()); + .map(app -> "App " + app.getApplicationId() + " is in state " + app.getYarnApplicationState() + '.') + .collect(Collectors.toList()); + if (!runningApps.isEmpty()) { + Assert.fail("There is at least one application on the cluster that is not finished." + runningApps); } } } From ec88fc934947de95d72fb0a8a65c90e33091898f Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 18 Jul 2018 13:27:00 +0200 Subject: [PATCH 3/3] [experiment] shutdown cluster in YarnHA test --- .../flink/yarn/YARNHighAvailabilityITCase.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index 18bcfebed329c..9a8f5033f3ff5 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -133,7 +133,7 @@ public void testMultipleAMKill() throws Exception { "@@" + CheckpointingOptions.CHECKPOINTS_DIRECTORY + "=" + fsStateHandlePath + "/checkpoints" + "@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery"); - ClusterClient yarnCluster = null; + ClusterClient yarnClusterClient = null; final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES); @@ -147,10 +147,10 @@ public void testMultipleAMKill() throws Exception { .createClusterSpecification(); try { - yarnCluster = flinkYarnClient.deploySessionCluster(clusterSpecification); + yarnClusterClient = flinkYarnClient.deploySessionCluster(clusterSpecification); highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( - yarnCluster.getFlinkConfiguration(), + yarnClusterClient.getFlinkConfiguration(), Executors.directExecutor(), HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); @@ -201,8 +201,10 @@ protected void run() { }}; } finally { - if (yarnCluster != null) { - yarnCluster.shutdown(); + if (yarnClusterClient != null) { + log.info("Shutting down the Flink Yarn application."); + yarnClusterClient.shutDownCluster(); + yarnClusterClient.shutdown(); } if (highAvailabilityServices != null) {