From 193386bcd5d0e2003a831138d6282af1880e1ab8 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 2 Mar 2018 15:27:13 +0100 Subject: [PATCH] [FLINK-8840] [yarn] Pull YarnClient and YarnConfiguration instantiation out of AbstractYarnClusterClient For better testability, this commit moves the YarnClient and YarnConfiguration out of the AbstractYarnClusterDescriptor. --- .../yarn/TestingYarnClusterDescriptor.java | 12 ++- .../yarn/YARNHighAvailabilityITCase.java | 7 +- .../org/apache/flink/yarn/YARNITCase.java | 6 +- .../flink/yarn/YARNSessionFIFOITCase.java | 5 +- .../org/apache/flink/yarn/YarnTestBase.java | 10 ++- .../yarn/AbstractYarnClusterDescriptor.java | 16 ++-- .../yarn/Flip6YarnClusterDescriptor.java | 12 ++- .../flink/yarn/YarnClusterDescriptor.java | 12 ++- .../flink/yarn/cli/FlinkYarnSessionCli.java | 30 ++++++- .../flink/yarn/AbstractYarnClusterTest.java | 20 ++++- .../flink/yarn/YarnClusterDescriptorTest.java | 79 ++++++++++++++++--- 11 files changed, 175 insertions(+), 34 deletions(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java index ec41d8e12b9bb..4d2aaa02efe34 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java @@ -24,6 +24,7 @@ import org.apache.flink.util.Preconditions; import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import java.io.File; import java.io.FilenameFilter; @@ -37,11 +38,18 @@ */ public class TestingYarnClusterDescriptor extends YarnClusterDescriptor { - public TestingYarnClusterDescriptor(Configuration configuration, String configurationDirectory) { + public TestingYarnClusterDescriptor( + Configuration configuration, + YarnConfiguration yarnConfiguration, + String configurationDirectory, + YarnClient yarnClient, + boolean sharedYarnClient) { super( configuration, + yarnConfiguration, configurationDirectory, - YarnClient.createYarnClient()); + yarnClient, + sharedYarnClient); List filesToShip = new ArrayList<>(); File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests")); diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index 05be03a175561..f9c03f937849f 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -110,7 +110,12 @@ public void testMultipleAMKill() throws Exception { final int numberKillingAttempts = numberApplicationAttempts - 1; String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); final Configuration configuration = GlobalConfiguration.loadConfiguration(); - TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor(configuration, confDirPath); + TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor( + configuration, + getYarnConfiguration(), + confDirPath, + getYarnClient(), + true); Assert.assertNotNull("unable to get yarn client", flinkYarnClient); flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java index 037e086ae8583..ef6706ad5fe36 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java @@ -55,12 +55,14 @@ public static void setup() { public void testPerJobMode() throws Exception { Configuration configuration = new Configuration(); configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s"); - final YarnClient yarnClient = YarnClient.createYarnClient(); + final YarnClient yarnClient = getYarnClient(); try (final Flip6YarnClusterDescriptor flip6YarnClusterDescriptor = new Flip6YarnClusterDescriptor( configuration, + getYarnConfiguration(), System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR), - yarnClient)) { + yarnClient, + true)) { flip6YarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); flip6YarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles())); diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index e54518793526d..b3dcaca145965 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -231,12 +231,13 @@ public void testJavaAPI() throws Exception { String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); Configuration configuration = GlobalConfiguration.loadConfiguration(); - final YarnClient yarnClient = YarnClient.createYarnClient(); try (final AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor( configuration, + getYarnConfiguration(), confDirPath, - yarnClient)) { + getYarnClient(), + true)) { Assert.assertNotNull("unable to get yarn client", clusterDescriptor); clusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); clusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles())); diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 7bca32192489b..b74a1557bb22b 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -128,7 +128,7 @@ public abstract class YarnTestBase extends TestLogger { */ protected static File flinkUberjar; - protected static final Configuration YARN_CONFIGURATION; + protected static final YarnConfiguration YARN_CONFIGURATION; /** * lib/ folder of the flink distribution. @@ -213,6 +213,14 @@ public void checkClusterEmpty() throws IOException, YarnException { flip6 = CoreOptions.FLIP6_MODE.equalsIgnoreCase(flinkConfiguration.getString(CoreOptions.MODE)); } + protected YarnClient getYarnClient() { + return yarnClient; + } + + protected static YarnConfiguration getYarnConfiguration() { + return YARN_CONFIGURATION; + } + /** * Locate a file or directory. */ diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 6b930163896c0..bdb59b11b5fd7 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -118,6 +118,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor private final YarnClient yarnClient; + /** True if the descriptor must not shut down the YarnClient. */ + private final boolean sharedYarnClient; + private String yarnQueue; private String configurationDirectory; @@ -145,10 +148,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor public AbstractYarnClusterDescriptor( Configuration flinkConfiguration, + YarnConfiguration yarnConfiguration, String configurationDirectory, - YarnClient yarnClient) { + YarnClient yarnClient, + boolean sharedYarnClient) { - yarnConfiguration = new YarnConfiguration(); + this.yarnConfiguration = Preconditions.checkNotNull(yarnConfiguration); // for unit tests only if (System.getenv("IN_TESTS") != null) { @@ -160,8 +165,7 @@ public AbstractYarnClusterDescriptor( } this.yarnClient = Preconditions.checkNotNull(yarnClient); - yarnClient.init(yarnConfiguration); - yarnClient.start(); + this.sharedYarnClient = sharedYarnClient; this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration); userJarInclusion = getUserJarInclusionMode(flinkConfiguration); @@ -328,7 +332,9 @@ public void setZookeeperNamespace(String zookeeperNamespace) { @Override public void close() { - yarnClient.stop(); + if (!sharedYarnClient) { + yarnClient.stop(); + } } // ------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java index 461dd555a45e7..9860363c00e32 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; /** * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the @@ -39,9 +40,16 @@ public class Flip6YarnClusterDescriptor extends AbstractYarnClusterDescriptor { public Flip6YarnClusterDescriptor( Configuration flinkConfiguration, + YarnConfiguration yarnConfiguration, String configurationDirectory, - YarnClient yarnCLient) { - super(flinkConfiguration, configurationDirectory, yarnCLient); + YarnClient yarnClient, + boolean sharedYarnClient) { + super( + flinkConfiguration, + yarnConfiguration, + configurationDirectory, + yarnClient, + sharedYarnClient); } @Override diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index a5254a0e3a376..8625cee8240c6 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; /** * Default implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link YarnApplicationMasterRunner}. @@ -34,9 +35,16 @@ public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor { public YarnClusterDescriptor( Configuration flinkConfiguration, + YarnConfiguration yarnConfiguration, String configurationDirectory, - YarnClient yarnClient) { - super(flinkConfiguration, configurationDirectory, yarnClient); + YarnClient yarnClient, + boolean sharedYarnClient) { + super( + flinkConfiguration, + yarnConfiguration, + configurationDirectory, + yarnClient, + sharedYarnClient); } @Override diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index e4e3dbd5566cb..7773600dabd71 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; import org.slf4j.Logger; @@ -159,6 +160,8 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine