Skip to content

Commit

Permalink
[FLINK-2298] Add option to pass a custom name for Flink on YARN
Browse files Browse the repository at this point in the history
This closes #876
  • Loading branch information
rmetzger committed Jul 1, 2015
1 parent bd3c8d5 commit a9dc430
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 20 deletions.
2 changes: 2 additions & 0 deletions docs/setup/yarn_setup.md
Expand Up @@ -100,11 +100,13 @@ Usage:
-D <arg> Dynamic properties -D <arg> Dynamic properties
-d,--detached Start detached -d,--detached Start detached
-jm,--jobManagerMemory <arg> Memory for JobManager Container [in MB] -jm,--jobManagerMemory <arg> Memory for JobManager Container [in MB]
-nm,--name Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores) -q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue. -qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager -s,--slots <arg> Number of slots per TaskManager
-st,--streaming Start Flink in streaming mode -st,--streaming Start Flink in streaming mode
-tm,--taskManagerMemory <arg> Memory per TaskManager Container [in MB] -tm,--taskManagerMemory <arg> Memory per TaskManager Container [in MB]

~~~ ~~~


Please note that the Client requires the `YARN_CONF_DIR` or `HADOOP_CONF_DIR` environment variable to be set to read the YARN and HDFS configuration. Please note that the Client requires the `YARN_CONF_DIR` or `HADOOP_CONF_DIR` environment variable to be set to read the YARN and HDFS configuration.
Expand Down
Expand Up @@ -134,8 +134,6 @@ public class CliFrontend {


private AbstractFlinkYarnCluster yarnCluster; private AbstractFlinkYarnCluster yarnCluster;




/** /**
* *
* @throws Exception Thrown if teh configuration directory was not found, the configuration could not * @throws Exception Thrown if teh configuration directory was not found, the configuration could not
Expand Down Expand Up @@ -744,7 +742,7 @@ protected Client getClient(CommandLineOptions options, ClassLoader classLoader,
// user wants to run Flink in YARN cluster. // user wants to run Flink in YARN cluster.
CommandLine commandLine = options.getCommandLine(); CommandLine commandLine = options.getCommandLine();
AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser.getFlinkYarnSessionCli().createFlinkYarnClient(commandLine); AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser.getFlinkYarnSessionCli().createFlinkYarnClient(commandLine);

flinkYarnClient.setName("Flink Application: " + programName);
if (flinkYarnClient == null) { if (flinkYarnClient == null) {
throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages"); throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages");
} }
Expand All @@ -763,7 +761,7 @@ protected Client getClient(CommandLineOptions options, ClassLoader classLoader,
} }


try { try {
yarnCluster = flinkYarnClient.deploy("Flink Application: " + programName); yarnCluster = flinkYarnClient.deploy();
yarnCluster.connectToCluster(); yarnCluster.connectToCluster();
} }
catch(Exception e) { catch(Exception e) {
Expand Down
Expand Up @@ -74,6 +74,7 @@ public class FlinkYarnSessionCli {
private final Option SLOTS; private final Option SLOTS;
private final Option DETACHED; private final Option DETACHED;
private final Option STREAMING; private final Option STREAMING;
private final Option NAME;


/** /**
* Dynamic properties allow the user to specify additional configuration values with -D, such as * Dynamic properties allow the user to specify additional configuration values with -D, such as
Expand All @@ -97,6 +98,7 @@ public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties"); DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties");
DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached"); DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached");
STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode"); STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode");
NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
} }


public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) { public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
Expand Down Expand Up @@ -220,6 +222,9 @@ public boolean accept(File dir, String name) {
if (cmd.hasOption(STREAMING.getOpt())) { if (cmd.hasOption(STREAMING.getOpt())) {
flinkYarnClient.setStreamingMode(true); flinkYarnClient.setStreamingMode(true);
} }
if(cmd.hasOption(NAME.getOpt())) {
flinkYarnClient.setName(cmd.getOptionValue(NAME.getOpt()));
}
return flinkYarnClient; return flinkYarnClient;
} }


Expand All @@ -244,6 +249,7 @@ private void printUsage() {
opt.addOption(DYNAMIC_PROPERTIES); opt.addOption(DYNAMIC_PROPERTIES);
opt.addOption(DETACHED); opt.addOption(DETACHED);
opt.addOption(STREAMING); opt.addOption(STREAMING);
opt.addOption(NAME);
formatter.printHelp(" ", opt); formatter.printHelp(" ", opt);
} }


Expand Down Expand Up @@ -350,6 +356,7 @@ public void getYARNSessionCLIOptions(Options options) {
options.addOption(DYNAMIC_PROPERTIES); options.addOption(DYNAMIC_PROPERTIES);
options.addOption(DETACHED); options.addOption(DETACHED);
options.addOption(STREAMING); options.addOption(STREAMING);
options.addOption(NAME);
} }


public int run(String[] args) { public int run(String[] args) {
Expand Down Expand Up @@ -393,7 +400,7 @@ public int run(String[] args) {




try { try {
yarnCluster = flinkYarnClient.deploy(null); yarnCluster = flinkYarnClient.deploy();
// only connect to cluster if its not a detached session. // only connect to cluster if its not a detached session.
if(!flinkYarnClient.isDetached()) { if(!flinkYarnClient.isDetached()) {
yarnCluster.connectToCluster(); yarnCluster.connectToCluster();
Expand Down
Expand Up @@ -115,9 +115,8 @@ public abstract class AbstractFlinkYarnClient {
/** /**
* Trigger the deployment to YARN. * Trigger the deployment to YARN.
* *
* @param clusterName Name to be shown in the YARN resource manager overview.
*/ */
public abstract AbstractFlinkYarnCluster deploy(String clusterName) throws Exception; public abstract AbstractFlinkYarnCluster deploy() throws Exception;


/** /**
* @param detachedMode If true, the Flink YARN client is non-blocking. That means it returns * @param detachedMode If true, the Flink YARN client is non-blocking. That means it returns
Expand All @@ -138,4 +137,10 @@ public abstract class AbstractFlinkYarnClient {
* @param streamingMode * @param streamingMode
*/ */
public abstract void setStreamingMode(boolean streamingMode); public abstract void setStreamingMode(boolean streamingMode);

/**
* Set a name for the YARN application
* @param name
*/
public abstract void setName(String name);
} }
Expand Up @@ -121,6 +121,7 @@ public void testDetachedMode() {
"-n", "1", "-n", "1",
"-jm", "768", "-jm", "768",
"-tm", "1024", "-tm", "1024",
"--name", "MyCustomName", // test setting a custom name
"--detached"}, "--detached"},
"Flink JobManager is now running on", RunTypes.YARN_SESSION); "Flink JobManager is now running on", RunTypes.YARN_SESSION);


Expand All @@ -142,7 +143,9 @@ public void testDetachedMode() {
yc.start(); yc.start();
List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
Assert.assertEquals(1, apps.size()); // Only one running Assert.assertEquals(1, apps.size()); // Only one running
ApplicationId id = apps.get(0).getApplicationId(); ApplicationReport app = apps.get(0);
Assert.assertEquals("MyCustomName", app.getName());
ApplicationId id = app.getApplicationId();
yc.killApplication(id); yc.killApplication(id);


while(yc.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 0) { while(yc.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 0) {
Expand All @@ -166,6 +169,7 @@ public void testTaskManagerFailure() {
"-n", "1", "-n", "1",
"-jm", "768", "-jm", "768",
"-tm", "1024", "-tm", "1024",
"-nm", "customName",
"-Dfancy-configuration-value=veryFancy", "-Dfancy-configuration-value=veryFancy",
"-Dyarn.maximum-failed-containers=3"}, "-Dyarn.maximum-failed-containers=3"},
"Number of connected TaskManagers changed to 1. Slots available: 1", "Number of connected TaskManagers changed to 1. Slots available: 1",
Expand All @@ -180,7 +184,9 @@ public void testTaskManagerFailure() {
yc.start(); yc.start();
List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
Assert.assertEquals(1, apps.size()); // Only one running Assert.assertEquals(1, apps.size()); // Only one running
String url = apps.get(0).getTrackingUrl(); ApplicationReport app = apps.get(0);
Assert.assertEquals("customName", app.getName());
String url = app.getTrackingUrl();
if(!url.endsWith("/")) { if(!url.endsWith("/")) {
url += "/"; url += "/";
} }
Expand Down Expand Up @@ -615,7 +621,7 @@ public void testJavaAPI() {
// deploy // deploy
AbstractFlinkYarnCluster yarnCluster = null; AbstractFlinkYarnCluster yarnCluster = null;
try { try {
yarnCluster = flinkYarnClient.deploy(null); yarnCluster = flinkYarnClient.deploy();
yarnCluster.connectToCluster(); yarnCluster.connectToCluster();
} catch (Exception e) { } catch (Exception e) {
System.err.println("Error while deploying YARN cluster: "+e.getMessage()); System.err.println("Error while deploying YARN cluster: "+e.getMessage());
Expand Down
32 changes: 22 additions & 10 deletions flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
Expand Up @@ -143,6 +143,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
private boolean detached; private boolean detached;
private boolean streamingMode; private boolean streamingMode;


private String customName = null;


public FlinkYarnClient() { public FlinkYarnClient() {
conf = new YarnConfiguration(); conf = new YarnConfiguration();
Expand Down Expand Up @@ -314,7 +315,7 @@ public boolean isDetached() {
return detached; return detached;
} }


public AbstractFlinkYarnCluster deploy(final String clusterName) throws Exception { public AbstractFlinkYarnCluster deploy() throws Exception {


UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Expand All @@ -327,11 +328,11 @@ public AbstractFlinkYarnCluster deploy(final String clusterName) throws Exceptio
return ugi.doAs(new PrivilegedExceptionAction<AbstractFlinkYarnCluster>() { return ugi.doAs(new PrivilegedExceptionAction<AbstractFlinkYarnCluster>() {
@Override @Override
public AbstractFlinkYarnCluster run() throws Exception { public AbstractFlinkYarnCluster run() throws Exception {
return deployInternal(clusterName); return deployInternal();
} }
}); });
} else { } else {
return deployInternal(clusterName); return deployInternal();
} }
} }


Expand All @@ -341,7 +342,7 @@ public AbstractFlinkYarnCluster run() throws Exception {
* This method will block until the ApplicationMaster/JobManager have been * This method will block until the ApplicationMaster/JobManager have been
* deployed on YARN. * deployed on YARN.
*/ */
protected AbstractFlinkYarnCluster deployInternal(String clusterName) throws Exception { protected AbstractFlinkYarnCluster deployInternal() throws Exception {
isReadyForDepoyment(); isReadyForDepoyment();


LOG.info("Using values:"); LOG.info("Using values:");
Expand Down Expand Up @@ -591,14 +592,17 @@ protected AbstractFlinkYarnCluster deployInternal(String clusterName) throws Exc
capability.setMemory(jobManagerMemoryMb); capability.setMemory(jobManagerMemoryMb);
capability.setVirtualCores(1); capability.setVirtualCores(1);


if(clusterName == null) { String name;
clusterName = "Flink session with "+taskManagerCount+" TaskManagers"; if(customName == null) {
} name = "Flink session with "+taskManagerCount+" TaskManagers";
if(detached) { if(detached) {
clusterName += " (detached)"; name += " (detached)";
}
} else {
name = customName;
} }


appContext.setApplicationName(clusterName); // application name appContext.setApplicationName(name); // application name
appContext.setApplicationType("Apache Flink"); appContext.setApplicationType("Apache Flink");
appContext.setAMContainerSpec(amContainer); appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability); appContext.setResource(capability);
Expand Down Expand Up @@ -734,6 +738,14 @@ public void setStreamingMode(boolean streamingMode) {
this.streamingMode = streamingMode; this.streamingMode = streamingMode;
} }


@Override
public void setName(String name) {
if(name == null) {
throw new IllegalArgumentException("The passed name is null");
}
customName = name;
}

public static class YarnDeploymentException extends RuntimeException { public static class YarnDeploymentException extends RuntimeException {
public YarnDeploymentException() { public YarnDeploymentException() {
} }
Expand Down

0 comments on commit a9dc430

Please sign in to comment.