Skip to content

Commit

Permalink
[FLINK-5916] [yarn] make env.java.opts.jobmanager and env.java.opts.t…
Browse files Browse the repository at this point in the history
…askmanager working in YARN mode

minor change and add test case

This closes #3415.
  • Loading branch information
WangTaoTheTonic authored and tillrohrmann committed Mar 7, 2017
1 parent 53fb8f3 commit e9a5c86
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 8 deletions.
4 changes: 2 additions & 2 deletions docs/setup/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ The configuration files for the TaskManagers can be different, Flink does not as

- `env.java.opts`: Set custom JVM options. This value is respected by Flink's start scripts, both JobManager and TaskManager, and Flink's YARN client. This can be used to set different garbage collectors or to include remote debuggers into the JVMs running Flink's services. Use `env.java.opts.jobmanager` and `env.java.opts.taskmanager` for JobManager or TaskManager-specific options, respectively.

- `env.java.opts.jobmanager`: JobManager-specific JVM options. These are used in addition to the regular `env.java.opts`. This configuration option is ignored by the YARN client.
- `env.java.opts.jobmanager`: JobManager-specific JVM options. These are used in addition to the regular `env.java.opts`.

- `env.java.opts.taskmanager`: TaskManager-specific JVM options. These are used in addition to the regular `env.java.opts`. This configuration option is ignored by the YARN client.
- `env.java.opts.taskmanager`: TaskManager-specific JVM options. These are used in addition to the regular `env.java.opts`.

- `jobmanager.rpc.address`: The external address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost). **Note:** The address (host name or IP) should be accessible by all nodes including the client.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ public class CoreOptions {
.key("env.java.opts")
.defaultValue("");

public static final ConfigOption<String> FLINK_JM_JVM_OPTIONS = ConfigOptions
.key("env.java.opts.jobmanager")
.defaultValue("");

public static final ConfigOption<String> FLINK_TM_JVM_OPTIONS = ConfigOptions
.key("env.java.opts.taskmanager")
.defaultValue("");

public static final ConfigOption<Integer> DEFAULT_PARALLELISM_KEY = ConfigOptions
.key("parallelism.default")
.defaultValue(-1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ public static String getTaskManagerShellCommand(
"-Xmx" + tmParams.taskManagerHeapSizeMB() + "m " +
"-XX:MaxDirectMemorySize=" + tmParams.taskManagerDirectMemoryLimitMB() + "m");
String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS);
if (flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS).length() > 0) {
javaOpts += " " + flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS);
}
//applicable only for YarnMiniCluster secure test run
//krb5.conf file will be available as local resource in JM/TM container
if(hasKrb5) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public void testGetTaskManagerShellCommand() {
final String java = "$JAVA_HOME/bin/java";
final String jvmmem = "-Xms768m -Xmx768m -XX:MaxDirectMemorySize=256m";
final String jvmOpts = "-Djvm"; // if set
final String tmJvmOpts = "-DtmJvm"; // if set
final String logfile = "-Dlog.file=./logs/taskmanager.log"; // if set
final String logback =
"-Dlogback.configurationFile=file:./conf/logback.xml"; // if set
Expand Down Expand Up @@ -229,13 +230,33 @@ public void testGetTaskManagerShellCommand() {
.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
true, true, true, this.getClass()));

// logback + log4j, with/out krb5, different JVM opts
cfg.setString(CoreOptions.FLINK_TM_JVM_OPTIONS, tmJvmOpts);
assertEquals(
java + " " + jvmmem +
" " + jvmOpts + " " + tmJvmOpts +
" " + logfile + " " + logback + " " + log4j +
" " + mainClass + " " + args + " " + redirects,
BootstrapTools
.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
true, true, false, this.getClass()));

assertEquals(
java + " " + jvmmem +
" " + jvmOpts + " " + tmJvmOpts + " " + krb5 + // jvmOpts
" " + logfile + " " + logback + " " + log4j +
" " + mainClass + " " + args + " " + redirects,
BootstrapTools
.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
true, true, true, this.getClass()));

// now try some configurations with different yarn.container-start-command-template

cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
"%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%");
assertEquals(
java + " 1 " + jvmmem +
" 2 " + jvmOpts + " " + krb5 + // jvmOpts
" 2 " + jvmOpts + " " + tmJvmOpts + " " + krb5 + // jvmOpts
" 3 " + logfile + " " + logback + " " + log4j +
" 4 " + mainClass + " 5 " + args + " 6 " + redirects,
BootstrapTools
Expand All @@ -247,7 +268,7 @@ public void testGetTaskManagerShellCommand() {
assertEquals(
java +
" " + logfile + " " + logback + " " + log4j +
" " + jvmOpts + " " + krb5 + // jvmOpts
" " + jvmOpts + " " + tmJvmOpts + " " + krb5 + // jvmOpts
" " + jvmmem +
" " + mainClass + " " + args + " " + redirects,
BootstrapTools
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,8 +721,8 @@ public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttrib
paths.add(remotePathConf);
classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);

// write job graph to tmp file and add it to local resource
// TODO: server use user main method to generate job graph
// write job graph to tmp file and add it to local resource
// TODO: server use user main method to generate job graph
if (jobGraph != null) {
try {
File fp = File.createTempFile(appId.toString(), null);
Expand Down Expand Up @@ -1234,6 +1234,9 @@ protected ContainerLaunchContext setupApplicationMasterContainer(boolean hasLogb

// respect custom JVM options in the YAML file
String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {
javaOpts += " " + flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
}
//applicable only for YarnMiniCluster secure test run
//krb5.conf file will be available as local resource in JM/TM container
if (hasKrb5) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public void testSetupApplicationMasterContainer() {
final String java = "$JAVA_HOME/bin/java";
final String jvmmem = "-Xmx424m";
final String jvmOpts = "-Djvm"; // if set
final String jmJvmOpts = "-DjmJvm"; // if set
final String krb5 = "-Djava.security.krb5.conf=krb5.conf";
final String logfile =
"-Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
Expand Down Expand Up @@ -223,13 +224,33 @@ public void testSetupApplicationMasterContainer() {
.setupApplicationMasterContainer(true, true, true)
.getCommands().get(0));

// logback + log4j, with/out krb5, different JVM opts
cfg.setString(CoreOptions.FLINK_JM_JVM_OPTIONS, jmJvmOpts);
assertEquals(
java + " " + jvmmem +
" " + jvmOpts + " " + jmJvmOpts +
" " + logfile + " " + logback + " " + log4j +
" " + mainClass + " " + args + " "+ redirects,
clusterDescriptor
.setupApplicationMasterContainer(true, true, false)
.getCommands().get(0));

assertEquals(
java + " " + jvmmem +
" " + jvmOpts + " " + jmJvmOpts + " " + krb5 +// jvmOpts
" " + logfile + " " + logback + " " + log4j +
" " + mainClass + " " + args + " "+ redirects,
clusterDescriptor
.setupApplicationMasterContainer(true, true, true)
.getCommands().get(0));

// now try some configurations with different yarn.container-start-command-template

cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
"%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%");
assertEquals(
java + " 1 " + jvmmem +
" 2 " + jvmOpts + " " + krb5 + // jvmOpts
" 2 " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
" 3 " + logfile + " " + logback + " " + log4j +
" 4 " + mainClass + " 5 " + args + " 6 " + redirects,
clusterDescriptor
Expand All @@ -241,7 +262,7 @@ public void testSetupApplicationMasterContainer() {
assertEquals(
java +
" " + logfile + " " + logback + " " + log4j +
" " + jvmOpts + " " + krb5 + // jvmOpts
" " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
" " + jvmmem +
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
Expand Down

0 comments on commit e9a5c86

Please sign in to comment.