Skip to content

Commit

Permalink
[FLINK-8840] [yarn] Pull YarnClient and YarnConfiguration instantiati…
Browse files Browse the repository at this point in the history
…on out of AbstractYarnClusterClient

For better testability, this commit moves the YarnClient and YarnConfiguration out of
the AbstractYarnClusterDescriptor.
  • Loading branch information
tillrohrmann committed Mar 2, 2018
1 parent 344a477 commit 193386b
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 34 deletions.
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;


import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;


import java.io.File; import java.io.File;
import java.io.FilenameFilter; import java.io.FilenameFilter;
Expand All @@ -37,11 +38,18 @@
*/ */
public class TestingYarnClusterDescriptor extends YarnClusterDescriptor { public class TestingYarnClusterDescriptor extends YarnClusterDescriptor {


public TestingYarnClusterDescriptor(Configuration configuration, String configurationDirectory) { public TestingYarnClusterDescriptor(
Configuration configuration,
YarnConfiguration yarnConfiguration,
String configurationDirectory,
YarnClient yarnClient,
boolean sharedYarnClient) {
super( super(
configuration, configuration,
yarnConfiguration,
configurationDirectory, configurationDirectory,
YarnClient.createYarnClient()); yarnClient,
sharedYarnClient);
List<File> filesToShip = new ArrayList<>(); List<File> filesToShip = new ArrayList<>();


File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests")); File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests"));
Expand Down
Expand Up @@ -110,7 +110,12 @@ public void testMultipleAMKill() throws Exception {
final int numberKillingAttempts = numberApplicationAttempts - 1; final int numberKillingAttempts = numberApplicationAttempts - 1;
String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
final Configuration configuration = GlobalConfiguration.loadConfiguration(); final Configuration configuration = GlobalConfiguration.loadConfiguration();
TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor(configuration, confDirPath); TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor(
configuration,
getYarnConfiguration(),
confDirPath,
getYarnClient(),
true);


Assert.assertNotNull("unable to get yarn client", flinkYarnClient); Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
Expand Down
Expand Up @@ -55,12 +55,14 @@ public static void setup() {
public void testPerJobMode() throws Exception { public void testPerJobMode() throws Exception {
Configuration configuration = new Configuration(); Configuration configuration = new Configuration();
configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s"); configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
final YarnClient yarnClient = YarnClient.createYarnClient(); final YarnClient yarnClient = getYarnClient();


try (final Flip6YarnClusterDescriptor flip6YarnClusterDescriptor = new Flip6YarnClusterDescriptor( try (final Flip6YarnClusterDescriptor flip6YarnClusterDescriptor = new Flip6YarnClusterDescriptor(
configuration, configuration,
getYarnConfiguration(),
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR), System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR),
yarnClient)) { yarnClient,
true)) {


flip6YarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); flip6YarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
flip6YarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles())); flip6YarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
Expand Down
Expand Up @@ -231,12 +231,13 @@ public void testJavaAPI() throws Exception {


String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
Configuration configuration = GlobalConfiguration.loadConfiguration(); Configuration configuration = GlobalConfiguration.loadConfiguration();
final YarnClient yarnClient = YarnClient.createYarnClient();


try (final AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor( try (final AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
configuration, configuration,
getYarnConfiguration(),
confDirPath, confDirPath,
yarnClient)) { getYarnClient(),
true)) {
Assert.assertNotNull("unable to get yarn client", clusterDescriptor); Assert.assertNotNull("unable to get yarn client", clusterDescriptor);
clusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); clusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
clusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles())); clusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
Expand Down
Expand Up @@ -128,7 +128,7 @@ public abstract class YarnTestBase extends TestLogger {
*/ */
protected static File flinkUberjar; protected static File flinkUberjar;


protected static final Configuration YARN_CONFIGURATION; protected static final YarnConfiguration YARN_CONFIGURATION;


/** /**
* lib/ folder of the flink distribution. * lib/ folder of the flink distribution.
Expand Down Expand Up @@ -213,6 +213,14 @@ public void checkClusterEmpty() throws IOException, YarnException {
flip6 = CoreOptions.FLIP6_MODE.equalsIgnoreCase(flinkConfiguration.getString(CoreOptions.MODE)); flip6 = CoreOptions.FLIP6_MODE.equalsIgnoreCase(flinkConfiguration.getString(CoreOptions.MODE));
} }


protected YarnClient getYarnClient() {
return yarnClient;
}

protected static YarnConfiguration getYarnConfiguration() {
return YARN_CONFIGURATION;
}

/** /**
* Locate a file or directory. * Locate a file or directory.
*/ */
Expand Down
Expand Up @@ -118,6 +118,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor


private final YarnClient yarnClient; private final YarnClient yarnClient;


/** True if the descriptor must not shut down the YarnClient. */
private final boolean sharedYarnClient;

private String yarnQueue; private String yarnQueue;


private String configurationDirectory; private String configurationDirectory;
Expand Down Expand Up @@ -145,10 +148,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor


public AbstractYarnClusterDescriptor( public AbstractYarnClusterDescriptor(
Configuration flinkConfiguration, Configuration flinkConfiguration,
YarnConfiguration yarnConfiguration,
String configurationDirectory, String configurationDirectory,
YarnClient yarnClient) { YarnClient yarnClient,
boolean sharedYarnClient) {


yarnConfiguration = new YarnConfiguration(); this.yarnConfiguration = Preconditions.checkNotNull(yarnConfiguration);


// for unit tests only // for unit tests only
if (System.getenv("IN_TESTS") != null) { if (System.getenv("IN_TESTS") != null) {
Expand All @@ -160,8 +165,7 @@ public AbstractYarnClusterDescriptor(
} }


this.yarnClient = Preconditions.checkNotNull(yarnClient); this.yarnClient = Preconditions.checkNotNull(yarnClient);
yarnClient.init(yarnConfiguration); this.sharedYarnClient = sharedYarnClient;
yarnClient.start();


this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration); this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration);
userJarInclusion = getUserJarInclusionMode(flinkConfiguration); userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
Expand Down Expand Up @@ -328,7 +332,9 @@ public void setZookeeperNamespace(String zookeeperNamespace) {


@Override @Override
public void close() { public void close() {
yarnClient.stop(); if (!sharedYarnClient) {
yarnClient.stop();
}
} }


// ------------------------------------------------------------- // -------------------------------------------------------------
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;


/** /**
* Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the
Expand All @@ -39,9 +40,16 @@ public class Flip6YarnClusterDescriptor extends AbstractYarnClusterDescriptor {


public Flip6YarnClusterDescriptor( public Flip6YarnClusterDescriptor(
Configuration flinkConfiguration, Configuration flinkConfiguration,
YarnConfiguration yarnConfiguration,
String configurationDirectory, String configurationDirectory,
YarnClient yarnCLient) { YarnClient yarnClient,
super(flinkConfiguration, configurationDirectory, yarnCLient); boolean sharedYarnClient) {
super(
flinkConfiguration,
yarnConfiguration,
configurationDirectory,
yarnClient,
sharedYarnClient);
} }


@Override @Override
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;


/** /**
* Default implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link YarnApplicationMasterRunner}. * Default implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link YarnApplicationMasterRunner}.
Expand All @@ -34,9 +35,16 @@ public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor {


public YarnClusterDescriptor( public YarnClusterDescriptor(
Configuration flinkConfiguration, Configuration flinkConfiguration,
YarnConfiguration yarnConfiguration,
String configurationDirectory, String configurationDirectory,
YarnClient yarnClient) { YarnClient yarnClient,
super(flinkConfiguration, configurationDirectory, yarnClient); boolean sharedYarnClient) {
super(
flinkConfiguration,
yarnConfiguration,
configurationDirectory,
yarnClient,
sharedYarnClient);
} }


@Override @Override
Expand Down
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand Down Expand Up @@ -159,6 +160,8 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId


private final boolean flip6; private final boolean flip6;


private final YarnConfiguration yarnConfiguration;

//------------------------------------ Internal fields ------------------------- //------------------------------------ Internal fields -------------------------
private boolean detachedMode = false; private boolean detachedMode = false;


Expand Down Expand Up @@ -257,16 +260,20 @@ public FlinkYarnSessionCli(
} else { } else {
yarnApplicationIdFromYarnProperties = null; yarnApplicationIdFromYarnProperties = null;
} }

this.yarnConfiguration = new YarnConfiguration();
} }


private AbstractYarnClusterDescriptor createDescriptor( private AbstractYarnClusterDescriptor createDescriptor(
Configuration configuration, Configuration configuration,
YarnConfiguration yarnConfiguration,
String configurationDirectory, String configurationDirectory,
String defaultApplicationName, String defaultApplicationName,
CommandLine cmd) { CommandLine cmd) {


AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor( AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor(
configuration, configuration,
yarnConfiguration,
configurationDirectory); configurationDirectory);


// Jar Path // Jar Path
Expand Down Expand Up @@ -440,6 +447,7 @@ public AbstractYarnClusterDescriptor createClusterDescriptor(CommandLine command


return createDescriptor( return createDescriptor(
effectiveConfiguration, effectiveConfiguration,
yarnConfiguration,
configurationDirectory, configurationDirectory,
null, null,
commandLine); commandLine);
Expand Down Expand Up @@ -955,12 +963,28 @@ public static File getYarnPropertiesLocation(@Nullable String yarnPropertiesFile
return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser); return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
} }


private AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String configurationDirectory) { private AbstractYarnClusterDescriptor getClusterDescriptor(
Configuration configuration,
YarnConfiguration yarnConfiguration,
String configurationDirectory) {
final YarnClient yarnClient = YarnClient.createYarnClient(); final YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfiguration);
yarnClient.start();

if (flip6) { if (flip6) {
return new Flip6YarnClusterDescriptor(configuration, configurationDirectory, yarnClient); return new Flip6YarnClusterDescriptor(
configuration,
yarnConfiguration,
configurationDirectory,
yarnClient,
false);
} else { } else {
return new YarnClusterDescriptor(configuration, configurationDirectory, yarnClient); return new YarnClusterDescriptor(
configuration,
yarnConfiguration,
configurationDirectory,
yarnClient,
false);
} }
} }
} }
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
Expand Down Expand Up @@ -63,13 +64,22 @@ public void testClusterClientRetrievalOfFinishedYarnApplication() throws Excepti
FinalApplicationStatus.SUCCEEDED); FinalApplicationStatus.SUCCEEDED);


final YarnClient yarnClient = new TestingYarnClient(Collections.singletonMap(applicationId, applicationReport)); final YarnClient yarnClient = new TestingYarnClient(Collections.singletonMap(applicationId, applicationReport));
final YarnConfiguration yarnConfiguration = new YarnConfiguration();
yarnClient.init(yarnConfiguration);
yarnClient.start();


final TestingAbstractYarnClusterDescriptor clusterDescriptor = new TestingAbstractYarnClusterDescriptor( final TestingAbstractYarnClusterDescriptor clusterDescriptor = new TestingAbstractYarnClusterDescriptor(
new Configuration(), new Configuration(),
yarnConfiguration,
temporaryFolder.newFolder().getAbsolutePath(), temporaryFolder.newFolder().getAbsolutePath(),
yarnClient); yarnClient,
false);


clusterDescriptor.retrieve(applicationId); try {
clusterDescriptor.retrieve(applicationId);
} finally {
clusterDescriptor.close();
}
} }


private ApplicationReport createApplicationReport( private ApplicationReport createApplicationReport(
Expand Down Expand Up @@ -121,9 +131,11 @@ private static final class TestingAbstractYarnClusterDescriptor extends Abstract


private TestingAbstractYarnClusterDescriptor( private TestingAbstractYarnClusterDescriptor(
Configuration flinkConfiguration, Configuration flinkConfiguration,
YarnConfiguration yarnConfiguration,
String configurationDirectory, String configurationDirectory,
YarnClient yarnClient) { YarnClient yarnClient,
super(flinkConfiguration, configurationDirectory, yarnClient); boolean sharedYarnClient) {
super(flinkConfiguration, yarnConfiguration, configurationDirectory, yarnClient, sharedYarnClient);
} }


@Override @Override
Expand Down

0 comments on commit 193386b

Please sign in to comment.