From 793ebaf506d0af8b3e376de8b44dfbed8f1e8e85 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Tue, 28 Jun 2016 15:12:05 +0200 Subject: [PATCH 1/5] [FLINK-4127] Check API compatbility for 1.1 in flink-core --- docs/setup/config.md | 18 ++++++++++++++---- .../flink/api/common/io/BinaryInputFormat.java | 3 +++ .../common/io/CheckpointableInputFormat.java | 4 ++-- .../api/common/io/DelimitedInputFormat.java | 3 +++ .../flink/configuration/ConfigConstants.java | 14 +++++++------- .../ContaineredTaskManagerParameters.java | 10 +++++----- .../java/org/apache/flink/yarn/UtilsTest.java | 18 +++++++++--------- .../main/java/org/apache/flink/yarn/Utils.java | 12 ++++++------ .../yarn/YarnApplicationMasterRunner.java | 8 ++++---- 9 files changed, 53 insertions(+), 37 deletions(-) diff --git a/docs/setup/config.md b/docs/setup/config.md index 46c6c9a3ad1a8..f57cbe0a26893 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -85,6 +85,8 @@ The default fraction for managed memory can be adjusted using the `taskmanager.m - `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false) +- `taskmanager.runtime.large-record-handler`: Whether to use the LargeRecordHandler when spilling. This feature is experimental. (DEFAULT: false) + ### Memory and Performance Debugging These options are useful for debugging a Flink application for memory and garbage collection related issues, such as performance and out-of-memory process kills or exceptions. @@ -107,12 +109,20 @@ Please make sure to set the maximum ticket life span high long running jobs. The If you are on YARN, then it is sufficient to authenticate the client with Kerberos. On a Flink standalone cluster you need to ensure that, initially, all nodes are authenticated with Kerberos using the `kinit` tool. +### Resource Manager + +- `resourcemanager.rpc.port`: The config parameter defining the network port to connect to for communication with the resource manager. + ### Other - `taskmanager.tmp.dirs`: The directory for temporary files, or a list of directories separated by the systems directory delimiter (for example ':' (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round-robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: The system's tmp dir). +- `taskmanager.log.path`: The config parameter defining the taskmanager log file location + - `jobmanager.web.port`: Port of the JobManager's web interface (DEFAULT: 8081). +- `jobmanager.web.tmpdir`: This configuration parameter allows defining the Flink web directory to be used by the web interface. + - `fs.overwrite-files`: Specifies whether file output writers should overwrite existing files by default. Set to *true* to overwrite by default, *false* otherwise. (DEFAULT: false) - `fs.output.always-create-directory`: File writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory. If this option is set to *true*, writers with a parallelism of 1 will also create a directory and place a single result file into it. If the option is set to *false*, the writer will directly create the file directly at the output path, without creating a containing directory. (DEFAULT: false) @@ -230,8 +240,8 @@ definition. This scheme is used **ONLY** if no other scheme is specified (explic ## YARN -- `yarn.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to remove from containers started by YARN. When a user requests a certain amount of memory for each TaskManager container (for example 4 GB), we can not pass this amount as the maximum heap space for the JVM (`-Xmx` argument) because the JVM is also allocating memory outside the heap. YARN is very strict with killing containers which are using more memory than requested. Therefore, we remove a 15% of the memory from the requested heap as a safety margin. -- `yarn.heap-cutoff-min`: (Default 384 MB) Minimum amount of memory to cut off the requested heap size. +- `container.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to remove from containers started by YARN. When a user requests a certain amount of memory for each TaskManager container (for example 4 GB), we can not pass this amount as the maximum heap space for the JVM (`-Xmx` argument) because the JVM is also allocating memory outside the heap. YARN is very strict with killing containers which are using more memory than requested. Therefore, we remove a 15% of the memory from the requested heap as a safety margin. +- `container.heap-cutoff-min`: (Default 384 MB) Minimum amount of memory to cut off the requested heap size. - `yarn.reallocate-failed` (Default 'true') Controls whether YARN should reallocate failed containers @@ -243,13 +253,13 @@ definition. This scheme is used **ONLY** if no other scheme is specified (explic - `yarn.properties-file.location` (Default: temp directory). When a Flink job is submitted to YARN, the JobManager's host and the number of available processing slots is written into a properties file, so that the Flink client is able to pick those details up. This configuration parameter allows changing the default location of that file (for example for environments sharing a Flink installation between users) -- `yarn.application-master.env.`*ENV_VAR1=value* Configuration values prefixed with `yarn.application-master.env.` will be passed as environment variables to the ApplicationMaster/JobManager process. For example for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationMaster, set: +- `container.application-master.env.`*ENV_VAR1=value* Configuration values prefixed with `yarn.application-master.env.` will be passed as environment variables to the ApplicationMaster/JobManager process. For example for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationMaster, set: yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" - `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default, the number of `vcores` is set to the number of slots per TaskManager, if set, or to 1, otherwise. -- `yarn.taskmanager.env.` Similar to the configuration prefix about, this prefix allows setting custom environment variables for the TaskManager processes. +- `container.taskmanager.env.` Similar to the configuration prefix above, this prefix allows setting custom environment variables for the TaskManager processes. - `yarn.application-master.port` (Default: 0, which lets the OS choose an ephemeral port) With this configuration option, users can specify a port, a range of ports or a list of ports for the Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to let the operating system choose an appropriate port. In particular when multiple AMs are running on the same physical host, fixed port assignments prevent the AM from starting. diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java index 96e0e0df0085f..14280d93835bc 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.io; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; @@ -373,6 +374,7 @@ public int read(byte[] b, int off, int len) throws IOException { // Checkpointing // -------------------------------------------------------------------------------------------- + @PublicEvolving @Override public Tuple2 getCurrentState() throws IOException { if (this.blockBasedInput == null) { @@ -385,6 +387,7 @@ public Tuple2 getCurrentState() throws IOException { ); } + @PublicEvolving @Override public void reopen(FileInputSplit split, Tuple2 state) throws IOException { Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java index 17b06251196de..266914bd5d0fd 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java @@ -25,7 +25,7 @@ import java.io.Serializable; /** - * An interface the describes {@link InputFormat}s that allow checkpointing/restoring their state. + * An interface that describes {@link InputFormat}s that allow checkpointing/restoring their state. * * @param The type of input split. * @param The type of the channel state to be checkpointed / included in the snapshot. @@ -40,7 +40,7 @@ public interface CheckpointableInputFormat= 1 || memoryCutoffRatio <= 0) { throw new IllegalArgumentException("The configuration value '" - + ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value given=" + + ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value given=" + memoryCutoffRatio); } if (minCutoff >= containerMemoryMB) { throw new IllegalArgumentException("The configuration value '" - + ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN + "'='" + minCutoff + + ConfigConstants.CONTAINER_HEAP_CUTOFF_MIN + "'='" + minCutoff + "' is larger than the total container memory " + containerMemoryMB); } @@ -166,7 +166,7 @@ public static ContaineredTaskManagerParameters create( // (3) obtain the additional environment variables from the configuration final HashMap envVars = new HashMap<>(); - final String prefix = ConfigConstants.CONTAINERED_TASK_MANAGER_ENV_PREFIX; + final String prefix = ConfigConstants.CONTAINER_TASK_MANAGER_ENV_PREFIX; for (String key : config.keySet()) { if (key.startsWith(prefix) && key.length() > prefix.length()) { diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java index 784bf24f0c7ff..fff0a8b4bd327 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -57,8 +57,8 @@ public void testUberjarLocator() { @Test public void testHeapCutoff() { Configuration conf = new Configuration(); - conf.setDouble(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 0.15); - conf.setInteger(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, 384); + conf.setDouble(ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO, 0.15); + conf.setInteger(ConfigConstants.CONTAINER_HEAP_CUTOFF_MIN, 384); Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) ); Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) ); @@ -66,14 +66,14 @@ public void testHeapCutoff() { // test different configuration Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf)); - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, "1000"); - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.1"); + conf.setString(ConfigConstants.CONTAINER_HEAP_CUTOFF_MIN, "1000"); + conf.setString(ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO, "0.1"); Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf)); - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.5"); + conf.setString(ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO, "0.5"); Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf)); - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1"); + conf.setString(ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO, "1"); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); // test also deprecated keys @@ -88,21 +88,21 @@ public void testHeapCutoff() { @Test(expected = IllegalArgumentException.class) public void illegalArgument() { Configuration conf = new Configuration(); - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1.1"); + conf.setString(ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO, "1.1"); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); } @Test(expected = IllegalArgumentException.class) public void illegalArgumentNegative() { Configuration conf = new Configuration(); - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "-0.01"); + conf.setString(ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO, "-0.01"); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); } @Test(expected = IllegalArgumentException.class) public void tooMuchCutoff() { Configuration conf = new Configuration(); - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "6000"); + conf.setString(ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO, "6000"); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index f56c024dded98..60d125441e16f 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -67,23 +67,23 @@ public final class Utils { public static int calculateHeapSize(int memory, org.apache.flink.configuration.Configuration conf) { BootstrapTools.substituteDeprecatedConfigKey(conf, - ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO); + ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO); BootstrapTools.substituteDeprecatedConfigKey(conf, - ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN); + ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.CONTAINER_HEAP_CUTOFF_MIN); - float memoryCutoffRatio = conf.getFloat(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, + float memoryCutoffRatio = conf.getFloat(ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO, ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO); - int minCutoff = conf.getInteger(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, + int minCutoff = conf.getInteger(ConfigConstants.CONTAINER_HEAP_CUTOFF_MIN, ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF); if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) { throw new IllegalArgumentException("The configuration value '" - + ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO + + ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value given=" + memoryCutoffRatio); } if (minCutoff > memory) { throw new IllegalArgumentException("The configuration value '" - + ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN + + ConfigConstants.CONTAINER_HEAP_CUTOFF_MIN + "' is higher (" + minCutoff + ") than the requested amount of memory " + memory); } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 563425f3cbb9b..0cbed58a055d0 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -439,19 +439,19 @@ private static Configuration createConfiguration(String baseDirectory, Map Date: Fri, 1 Jul 2016 10:43:45 +0200 Subject: [PATCH 2/5] Address some of the comments --- docs/setup/config.md | 18 +++++++++--------- .../flink/configuration/ConfigConstants.java | 14 +++++++------- .../ContaineredTaskManagerParameters.java | 10 +++++----- .../java/org/apache/flink/yarn/UtilsTest.java | 18 +++++++++--------- .../main/java/org/apache/flink/yarn/Utils.java | 12 ++++++------ .../yarn/YarnApplicationMasterRunner.java | 8 ++++---- 6 files changed, 40 insertions(+), 40 deletions(-) diff --git a/docs/setup/config.md b/docs/setup/config.md index f57cbe0a26893..8b0d64433dfe1 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -85,8 +85,6 @@ The default fraction for managed memory can be adjusted using the `taskmanager.m - `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false) -- `taskmanager.runtime.large-record-handler`: Whether to use the LargeRecordHandler when spilling. This feature is experimental. (DEFAULT: false) - ### Memory and Performance Debugging These options are useful for debugging a Flink application for memory and garbage collection related issues, such as performance and out-of-memory process kills or exceptions. @@ -111,7 +109,8 @@ If you are on YARN, then it is sufficient to authenticate the client with Kerber ### Resource Manager -- `resourcemanager.rpc.port`: The config parameter defining the network port to connect to for communication with the resource manager. +- `resourcemanager.rpc.port`: The config parameter defining the network port to connect to for communication with the resource manager. By default, the port +of the JobManager, because the same ActorSystem is used. Its not possible to use this configuration key to define port ranges. ### Other @@ -121,7 +120,8 @@ If you are on YARN, then it is sufficient to authenticate the client with Kerber - `jobmanager.web.port`: Port of the JobManager's web interface (DEFAULT: 8081). -- `jobmanager.web.tmpdir`: This configuration parameter allows defining the Flink web directory to be used by the web interface. +- `jobmanager.web.tmpdir`: This configuration parameter allows defining the Flink web directory to be used by the web interface. The web interface +will copy its static files into the directory. Also uploaded job jars are stored in the directory. By default, the temporary directory is used. - `fs.overwrite-files`: Specifies whether file output writers should overwrite existing files by default. Set to *true* to overwrite by default, *false* otherwise. (DEFAULT: false) @@ -240,8 +240,8 @@ definition. This scheme is used **ONLY** if no other scheme is specified (explic ## YARN -- `container.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to remove from containers started by YARN. When a user requests a certain amount of memory for each TaskManager container (for example 4 GB), we can not pass this amount as the maximum heap space for the JVM (`-Xmx` argument) because the JVM is also allocating memory outside the heap. YARN is very strict with killing containers which are using more memory than requested. Therefore, we remove a 15% of the memory from the requested heap as a safety margin. -- `container.heap-cutoff-min`: (Default 384 MB) Minimum amount of memory to cut off the requested heap size. +- `containerized.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to remove from containers started by YARN for example. When a user requests a certain amount of memory for each TaskManager container (for example 4 GB), we can not pass this amount as the maximum heap space for the JVM (`-Xmx` argument) because the JVM is also allocating memory outside the heap. YARN is very strict with killing containers which are using more memory than requested. Therefore, we remove a 15% of the memory from the requested heap as a safety margin. +- `containerized.heap-cutoff-min`: (Default 384 MB) Minimum amount of memory to cut off the requested heap size. - `yarn.reallocate-failed` (Default 'true') Controls whether YARN should reallocate failed containers @@ -253,13 +253,13 @@ definition. This scheme is used **ONLY** if no other scheme is specified (explic - `yarn.properties-file.location` (Default: temp directory). When a Flink job is submitted to YARN, the JobManager's host and the number of available processing slots is written into a properties file, so that the Flink client is able to pick those details up. This configuration parameter allows changing the default location of that file (for example for environments sharing a Flink installation between users) -- `container.application-master.env.`*ENV_VAR1=value* Configuration values prefixed with `yarn.application-master.env.` will be passed as environment variables to the ApplicationMaster/JobManager process. For example for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationMaster, set: +- `containerized.master.env.`*ENV_VAR1=value* Configuration values prefixed with `containerized.master.env.` will be passed as environment variables to the ApplicationMaster/JobManager process. For example for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationMaster, set: - yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" + containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" - `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default, the number of `vcores` is set to the number of slots per TaskManager, if set, or to 1, otherwise. -- `container.taskmanager.env.` Similar to the configuration prefix above, this prefix allows setting custom environment variables for the TaskManager processes. +- `containerized.taskmanager.env.` Similar to the configuration prefix above, this prefix allows setting custom environment variables for the TaskManager processes. - `yarn.application-master.port` (Default: 0, which lets the OS choose an ephemeral port) With this configuration option, users can specify a port, a range of ports or a list of ports for the Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to let the operating system choose an appropriate port. In particular when multiple AMs are running on the same physical host, fixed port assignments prevent the AM from starting. diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index e431b7d79bc11..f14d80123addf 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -269,12 +269,12 @@ public final class ConfigConstants { * Percentage of heap space to remove from containers (YARN / Mesos), to compensate * for other JVM memory usage. */ - public static final String CONTAINER_HEAP_CUTOFF_RATIO = "container.heap-cutoff-ratio"; + public static final String CONTAINERIZED_HEAP_CUTOFF_RATIO = "containerized.heap-cutoff-ratio"; /** * Minimum amount of heap memory to remove in containers, as a safety margin. */ - public static final String CONTAINER_HEAP_CUTOFF_MIN = "container.heap-cutoff-min"; + public static final String CONTAINERIZED_HEAP_CUTOFF_MIN = "containerized.heap-cutoff-min"; /** * Prefix for passing custom environment variables to Flink's master process. @@ -282,13 +282,13 @@ public final class ConfigConstants { * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" * in the flink-conf.yaml. */ - public static final String CONTAINER_MASTER_ENV_PREFIX = "container.application-master.env."; + public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env."; /** - * Similar to the {@see CONTAINER_MASTER_ENV_PREFIX}, this configuration prefix allows + * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration prefix allows * setting custom environment variables for the workers (TaskManagers) */ - public static final String CONTAINER_TASK_MANAGER_ENV_PREFIX = "container.taskmanager.env."; + public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env."; // --------------------------Standalone Setup ----------------------------- @@ -355,7 +355,7 @@ public final class ConfigConstants { * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" * in the flink-conf.yaml. - * @deprecated Please use {@code CONTAINER_MASTER_ENV_PREFIX}. + * @deprecated Please use {@code CONTAINERIZED_MASTER_ENV_PREFIX}. */ @Deprecated public static final String YARN_APPLICATION_MASTER_ENV_PREFIX = "yarn.application-master.env."; @@ -369,7 +369,7 @@ public final class ConfigConstants { /** * Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this configuration prefix allows * setting custom environment variables. - * @deprecated Please use {@code CONTAINER_TASK_MANAGER_ENV_PREFIX}. + * @deprecated Please use {@code CONTAINERIZED_TASK_MANAGER_ENV_PREFIX}. */ @Deprecated public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env."; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java index 3cb3b0dc41015..3dc4394bc9a44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java @@ -114,22 +114,22 @@ public static ContaineredTaskManagerParameters create( // (1) compute how much memory we subtract from the total memory, to get the Java memory final float memoryCutoffRatio = config.getFloat( - ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO, + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO); final int minCutoff = config.getInteger( - ConfigConstants.CONTAINER_HEAP_CUTOFF_MIN, + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF); if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) { throw new IllegalArgumentException("The configuration value '" - + ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value given=" + + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value given=" + memoryCutoffRatio); } if (minCutoff >= containerMemoryMB) { throw new IllegalArgumentException("The configuration value '" - + ConfigConstants.CONTAINER_HEAP_CUTOFF_MIN + "'='" + minCutoff + + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN + "'='" + minCutoff + "' is larger than the total container memory " + containerMemoryMB); } @@ -166,7 +166,7 @@ public static ContaineredTaskManagerParameters create( // (3) obtain the additional environment variables from the configuration final HashMap envVars = new HashMap<>(); - final String prefix = ConfigConstants.CONTAINER_TASK_MANAGER_ENV_PREFIX; + final String prefix = ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX; for (String key : config.keySet()) { if (key.startsWith(prefix) && key.length() > prefix.length()) { diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java index fff0a8b4bd327..c7100641b6e4e 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -57,8 +57,8 @@ public void testUberjarLocator() { @Test public void testHeapCutoff() { Configuration conf = new Configuration(); - conf.setDouble(ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO, 0.15); - conf.setInteger(ConfigConstants.CONTAINER_HEAP_CUTOFF_MIN, 384); + conf.setDouble(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15); + conf.setInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, 384); Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) ); Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) ); @@ -66,14 +66,14 @@ public void testHeapCutoff() { // test different configuration Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf)); - conf.setString(ConfigConstants.CONTAINER_HEAP_CUTOFF_MIN, "1000"); - conf.setString(ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO, "0.1"); + conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, "1000"); + conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "0.1"); Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf)); - conf.setString(ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO, "0.5"); + conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "0.5"); Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf)); - conf.setString(ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO, "1"); + conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "1"); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); // test also deprecated keys @@ -88,21 +88,21 @@ public void testHeapCutoff() { @Test(expected = IllegalArgumentException.class) public void illegalArgument() { Configuration conf = new Configuration(); - conf.setString(ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO, "1.1"); + conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "1.1"); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); } @Test(expected = IllegalArgumentException.class) public void illegalArgumentNegative() { Configuration conf = new Configuration(); - conf.setString(ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO, "-0.01"); + conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "-0.01"); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); } @Test(expected = IllegalArgumentException.class) public void tooMuchCutoff() { Configuration conf = new Configuration(); - conf.setString(ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO, "6000"); + conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "6000"); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 60d125441e16f..d5bad2f7914ee 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -67,23 +67,23 @@ public final class Utils { public static int calculateHeapSize(int memory, org.apache.flink.configuration.Configuration conf) { BootstrapTools.substituteDeprecatedConfigKey(conf, - ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO); + ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO); BootstrapTools.substituteDeprecatedConfigKey(conf, - ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.CONTAINER_HEAP_CUTOFF_MIN); + ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN); - float memoryCutoffRatio = conf.getFloat(ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO, + float memoryCutoffRatio = conf.getFloat(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO); - int minCutoff = conf.getInteger(ConfigConstants.CONTAINER_HEAP_CUTOFF_MIN, + int minCutoff = conf.getInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF); if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) { throw new IllegalArgumentException("The configuration value '" - + ConfigConstants.CONTAINER_HEAP_CUTOFF_RATIO + + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value given=" + memoryCutoffRatio); } if (minCutoff > memory) { throw new IllegalArgumentException("The configuration value '" - + ConfigConstants.CONTAINER_HEAP_CUTOFF_MIN + + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN + "' is higher (" + minCutoff + ") than the requested amount of memory " + memory); } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 0cbed58a055d0..582910b84c2cf 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -439,19 +439,19 @@ private static Configuration createConfiguration(String baseDirectory, Map Date: Fri, 1 Jul 2016 11:04:14 +0200 Subject: [PATCH 3/5] group config keys --- docs/setup/config.md | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/docs/setup/config.md b/docs/setup/config.md index 8b0d64433dfe1..c6b074110e39e 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -107,10 +107,6 @@ Please make sure to set the maximum ticket life span high long running jobs. The If you are on YARN, then it is sufficient to authenticate the client with Kerberos. On a Flink standalone cluster you need to ensure that, initially, all nodes are authenticated with Kerberos using the `kinit` tool. -### Resource Manager - -- `resourcemanager.rpc.port`: The config parameter defining the network port to connect to for communication with the resource manager. By default, the port -of the JobManager, because the same ActorSystem is used. Its not possible to use this configuration key to define port ranges. ### Other @@ -238,10 +234,22 @@ definition. This scheme is used **ONLY** if no other scheme is specified (explic - `taskmanager.runtime.max-fan`: The maximal fan-in for external merge joins and fan-out for spilling hash tables. Limits the number of file handles per operator, but may cause intermediate merging/partitioning, if set too small (DEFAULT: 128). - `taskmanager.runtime.sort-spilling-threshold`: A sort operation starts spilling when this fraction of its memory budget is full (DEFAULT: 0.8). -## YARN +### Resource Manager +The configuration keys in this section are independent of the used resource management framework (YARN, Mesos, Standalone, ...) + +- `resourcemanager.rpc.port`: The config parameter defining the network port to connect to for communication with the resource manager. By default, the port +of the JobManager, because the same ActorSystem is used. Its not possible to use this configuration key to define port ranges. - `containerized.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to remove from containers started by YARN for example. When a user requests a certain amount of memory for each TaskManager container (for example 4 GB), we can not pass this amount as the maximum heap space for the JVM (`-Xmx` argument) because the JVM is also allocating memory outside the heap. YARN is very strict with killing containers which are using more memory than requested. Therefore, we remove a 15% of the memory from the requested heap as a safety margin. - `containerized.heap-cutoff-min`: (Default 384 MB) Minimum amount of memory to cut off the requested heap size. +- `containerized.master.env.`*ENV_VAR1=value* Configuration values prefixed with `containerized.master.env.` will be passed as environment variables to the ApplicationMaster/JobManager process. For example for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationMaster, set: + + containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" + +- `containerized.taskmanager.env.` Similar to the configuration prefix above, this prefix allows setting custom environment variables for the TaskManager processes. + + +## YARN - `yarn.reallocate-failed` (Default 'true') Controls whether YARN should reallocate failed containers @@ -253,14 +261,8 @@ definition. This scheme is used **ONLY** if no other scheme is specified (explic - `yarn.properties-file.location` (Default: temp directory). When a Flink job is submitted to YARN, the JobManager's host and the number of available processing slots is written into a properties file, so that the Flink client is able to pick those details up. This configuration parameter allows changing the default location of that file (for example for environments sharing a Flink installation between users) -- `containerized.master.env.`*ENV_VAR1=value* Configuration values prefixed with `containerized.master.env.` will be passed as environment variables to the ApplicationMaster/JobManager process. For example for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationMaster, set: - - containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" - - `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default, the number of `vcores` is set to the number of slots per TaskManager, if set, or to 1, otherwise. -- `containerized.taskmanager.env.` Similar to the configuration prefix above, this prefix allows setting custom environment variables for the TaskManager processes. - - `yarn.application-master.port` (Default: 0, which lets the OS choose an ephemeral port) With this configuration option, users can specify a port, a range of ports or a list of ports for the Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to let the operating system choose an appropriate port. In particular when multiple AMs are running on the same physical host, fixed port assignments prevent the AM from starting. For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports. From e2190df7aefefc83398dd4b300479fa24fb3a6be Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Mon, 4 Jul 2016 15:23:26 +0200 Subject: [PATCH 4/5] Adress Max comment --- docs/setup/config.md | 16 +++++++--------- .../flink/configuration/ConfigConstants.java | 16 ++++++++-------- .../ContaineredTaskManagerParameters.java | 2 +- .../flink/yarn/YarnApplicationMasterRunner.java | 4 ++-- 4 files changed, 18 insertions(+), 20 deletions(-) diff --git a/docs/setup/config.md b/docs/setup/config.md index c6b074110e39e..375cff1705e14 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -41,9 +41,9 @@ The configuration files for the TaskManagers can be different, Flink does not as - `env.java.opts`: Set custom JVM options. This value is respected by Flink's start scripts, both JobManager and TaskManager, and Flink's YARN client. This can be used to set different garbage collectors or to include remote debuggers into the JVMs running Flink's services. Use `env.java.opts.jobmanager` and `env.java.opts.taskmanager` for JobManager or TaskManager-specific options, respectively. -- `env.java.opts.jobmanager`: JobManager-specific JVM options. These are used in addition to the regular `env.java.opts`. +- `env.java.opts.jobmanager`: JobManager-specific JVM options. These are used in addition to the regular `env.java.opts`. This configuration option is ignored by the YARN client. -- `env.java.opts.taskmanager`: TaskManager-specific JVM options. These are used in addition to the regular `env.java.opts`. +- `env.java.opts.taskmanager`: TaskManager-specific JVM options. These are used in addition to the regular `env.java.opts`. This configuration option is ignored by the YARN client. - `jobmanager.rpc.address`: The IP address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost). @@ -71,7 +71,7 @@ without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to ### Managed Memory -By default, Flink allocates a fraction of `0.7` of the total memory configured via `taskmanager.heap.mb` for its managed memory. Managed memory helps Flink to run the operators efficiently. It prevents `OutOfMemoryException`s because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system. +By default, Flink allocates a fraction of `0.7` of the total memory configured via `taskmanager.heap.mb` for its managed memory. Managed memory helps Flink to run the batch operators efficiently. It prevents `OutOfMemoryException`s because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system. The default fraction for managed memory can be adjusted using the `taskmanager.memory.fraction` parameter. An absolute value may be set using `taskmanager.memory.size` (overrides the fraction parameter). If desired, the managed memory may be allocated outside the JVM heap. This may improve performance in setups with large memory sizes. @@ -242,17 +242,15 @@ The configuration keys in this section are independent of the used resource mana of the JobManager, because the same ActorSystem is used. Its not possible to use this configuration key to define port ranges. - `containerized.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to remove from containers started by YARN for example. When a user requests a certain amount of memory for each TaskManager container (for example 4 GB), we can not pass this amount as the maximum heap space for the JVM (`-Xmx` argument) because the JVM is also allocating memory outside the heap. YARN is very strict with killing containers which are using more memory than requested. Therefore, we remove a 15% of the memory from the requested heap as a safety margin. - `containerized.heap-cutoff-min`: (Default 384 MB) Minimum amount of memory to cut off the requested heap size. -- `containerized.master.env.`*ENV_VAR1=value* Configuration values prefixed with `containerized.master.env.` will be passed as environment variables to the ApplicationMaster/JobManager process. For example for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationMaster, set: +- `container.master.env.`*ENV_VAR1=value* Configuration values prefixed with `containerized.master.env.` will be passed as environment variables to the ApplicationMaster/JobManager process. For example for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationMaster, set: - containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" + `container.master.env.LD_LIBRARY_PATH: "/usr/lib/native"` -- `containerized.taskmanager.env.` Similar to the configuration prefix above, this prefix allows setting custom environment variables for the TaskManager processes. +- `container.taskmanager.env.` Similar to the configuration prefix above, this prefix allows setting custom environment variables for the TaskManager processes. ## YARN -- `yarn.reallocate-failed` (Default 'true') Controls whether YARN should reallocate failed containers - - `yarn.maximum-failed-containers` (Default: number of requested containers). Maximum number of containers the system is going to reallocate in case of a failure. - `yarn.application-attempts` (Default: 1). Number of ApplicationMaster restarts. Note that that the entire Flink cluster will restart and the YARN Client will loose the connection. Also, the JobManager address will change and you'll need to set the JM host:port manually. It is recommended to leave this option at 1. @@ -265,7 +263,7 @@ of the JobManager, because the same ActorSystem is used. Its not possible to use - `yarn.application-master.port` (Default: 0, which lets the OS choose an ephemeral port) With this configuration option, users can specify a port, a range of ports or a list of ports for the Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to let the operating system choose an appropriate port. In particular when multiple AMs are running on the same physical host, fixed port assignments prevent the AM from starting. -For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports. + For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports. ## High Availability Mode diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index f14d80123addf..3d9c705db31d7 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -279,19 +279,17 @@ public final class ConfigConstants { /** * Prefix for passing custom environment variables to Flink's master process. * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: - * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" + * container.master.env.LD_LIBRARY_PATH: "/usr/lib/native" * in the flink-conf.yaml. */ - public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env."; + public static final String CONTAINER_MASTER_ENV_PREFIX = "container.master.env."; /** - * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration prefix allows + * Similar to the {@see CONTAINER_MASTER_ENV_PREFIX}, this configuration prefix allows * setting custom environment variables for the workers (TaskManagers) */ - public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env."; + public static final String CONTAINER_TASK_MANAGER_ENV_PREFIX = "container.taskmanager.env."; - // --------------------------Standalone Setup ----------------------------- - // ------------------------ YARN Configuration ------------------------ @@ -302,12 +300,14 @@ public final class ConfigConstants { /** * Percentage of heap space to remove from containers started by YARN. + * @deprecated in favor of {@code #CONTAINERIZED_HEAP_CUTOFF_RATIO} */ @Deprecated public static final String YARN_HEAP_CUTOFF_RATIO = "yarn.heap-cutoff-ratio"; /** * Minimum amount of memory to remove from the heap space as a safety margin. + * @deprecated in favor of {@code #CONTAINERIZED_HEAP_CUTOFF_MIN} */ @Deprecated public static final String YARN_HEAP_CUTOFF_MIN = "yarn.heap-cutoff-min"; @@ -355,7 +355,7 @@ public final class ConfigConstants { * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" * in the flink-conf.yaml. - * @deprecated Please use {@code CONTAINERIZED_MASTER_ENV_PREFIX}. + * @deprecated Please use {@code CONTAINER_MASTER_ENV_PREFIX}. */ @Deprecated public static final String YARN_APPLICATION_MASTER_ENV_PREFIX = "yarn.application-master.env."; @@ -369,7 +369,7 @@ public final class ConfigConstants { /** * Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this configuration prefix allows * setting custom environment variables. - * @deprecated Please use {@code CONTAINERIZED_TASK_MANAGER_ENV_PREFIX}. + * @deprecated Please use {@code CONTAINER_TASK_MANAGER_ENV_PREFIX}. */ @Deprecated public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env."; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java index 3dc4394bc9a44..f98dc289814ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java @@ -166,7 +166,7 @@ public static ContaineredTaskManagerParameters create( // (3) obtain the additional environment variables from the configuration final HashMap envVars = new HashMap<>(); - final String prefix = ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX; + final String prefix = ConfigConstants.CONTAINER_TASK_MANAGER_ENV_PREFIX; for (String key : config.keySet()) { if (key.startsWith(prefix) && key.length() > prefix.length()) { diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 582910b84c2cf..4d1ce703d9cf1 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -447,11 +447,11 @@ private static Configuration createConfiguration(String baseDirectory, Map Date: Mon, 11 Jul 2016 13:52:28 +0200 Subject: [PATCH 5/5] undo --- docs/setup/config.md | 18 +++++++++++------- .../flink/configuration/ConfigConstants.java | 12 ++++++------ .../ContaineredTaskManagerParameters.java | 2 +- .../yarn/YarnApplicationMasterRunner.java | 4 ++-- 4 files changed, 20 insertions(+), 16 deletions(-) diff --git a/docs/setup/config.md b/docs/setup/config.md index 375cff1705e14..78f51e1817e2d 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -240,17 +240,13 @@ The configuration keys in this section are independent of the used resource mana - `resourcemanager.rpc.port`: The config parameter defining the network port to connect to for communication with the resource manager. By default, the port of the JobManager, because the same ActorSystem is used. Its not possible to use this configuration key to define port ranges. -- `containerized.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to remove from containers started by YARN for example. When a user requests a certain amount of memory for each TaskManager container (for example 4 GB), we can not pass this amount as the maximum heap space for the JVM (`-Xmx` argument) because the JVM is also allocating memory outside the heap. YARN is very strict with killing containers which are using more memory than requested. Therefore, we remove a 15% of the memory from the requested heap as a safety margin. -- `containerized.heap-cutoff-min`: (Default 384 MB) Minimum amount of memory to cut off the requested heap size. -- `container.master.env.`*ENV_VAR1=value* Configuration values prefixed with `containerized.master.env.` will be passed as environment variables to the ApplicationMaster/JobManager process. For example for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationMaster, set: - - `container.master.env.LD_LIBRARY_PATH: "/usr/lib/native"` - -- `container.taskmanager.env.` Similar to the configuration prefix above, this prefix allows setting custom environment variables for the TaskManager processes. ## YARN +- `yarn.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to remove from containers started by YARN. When a user requests a certain amount of memory for each TaskManager container (for example 4 GB), we can not pass this amount as the maximum heap space for the JVM (`-Xmx` argument) because the JVM is also allocating memory outside the heap. YARN is very strict with killing containers which are using more memory than requested. Therefore, we remove a 15% of the memory from the requested heap as a safety margin. +- `yarn.heap-cutoff-min`: (Default 384 MB) Minimum amount of memory to cut off the requested heap size. + - `yarn.maximum-failed-containers` (Default: number of requested containers). Maximum number of containers the system is going to reallocate in case of a failure. - `yarn.application-attempts` (Default: 1). Number of ApplicationMaster restarts. Note that that the entire Flink cluster will restart and the YARN Client will loose the connection. Also, the JobManager address will change and you'll need to set the JM host:port manually. It is recommended to leave this option at 1. @@ -261,6 +257,14 @@ of the JobManager, because the same ActorSystem is used. Its not possible to use - `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default, the number of `vcores` is set to the number of slots per TaskManager, if set, or to 1, otherwise. +- `yarn.application-master.env.`*ENV_VAR1=value* Configuration values prefixed with `yarn.application-master.env.` will be passed as environment variables to the ApplicationMaster/JobManager process. For example for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationMaster, set: + + `yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"` + +- `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default, the number of `vcores` is set to the number of slots per TaskManager, if set, or to 1, otherwise. + +- `yarn.taskmanager.env.` Similar to the configuration prefix about, this prefix allows setting custom environment variables for the TaskManager processes. + - `yarn.application-master.port` (Default: 0, which lets the OS choose an ephemeral port) With this configuration option, users can specify a port, a range of ports or a list of ports for the Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to let the operating system choose an appropriate port. In particular when multiple AMs are running on the same physical host, fixed port assignments prevent the AM from starting. For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports. diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 3d9c705db31d7..caf45c964d3e6 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -279,16 +279,16 @@ public final class ConfigConstants { /** * Prefix for passing custom environment variables to Flink's master process. * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: - * container.master.env.LD_LIBRARY_PATH: "/usr/lib/native" + * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" * in the flink-conf.yaml. */ - public static final String CONTAINER_MASTER_ENV_PREFIX = "container.master.env."; + public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env."; /** - * Similar to the {@see CONTAINER_MASTER_ENV_PREFIX}, this configuration prefix allows + * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration prefix allows * setting custom environment variables for the workers (TaskManagers) */ - public static final String CONTAINER_TASK_MANAGER_ENV_PREFIX = "container.taskmanager.env."; + public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env."; // ------------------------ YARN Configuration ------------------------ @@ -355,7 +355,7 @@ public final class ConfigConstants { * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" * in the flink-conf.yaml. - * @deprecated Please use {@code CONTAINER_MASTER_ENV_PREFIX}. + * @deprecated Please use {@code CONTAINERIZED_MASTER_ENV_PREFIX}. */ @Deprecated public static final String YARN_APPLICATION_MASTER_ENV_PREFIX = "yarn.application-master.env."; @@ -369,7 +369,7 @@ public final class ConfigConstants { /** * Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this configuration prefix allows * setting custom environment variables. - * @deprecated Please use {@code CONTAINER_TASK_MANAGER_ENV_PREFIX}. + * @deprecated Please use {@code CONTAINERIZED_TASK_MANAGER_ENV_PREFIX}. */ @Deprecated public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env."; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java index f98dc289814ba..3dc4394bc9a44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java @@ -166,7 +166,7 @@ public static ContaineredTaskManagerParameters create( // (3) obtain the additional environment variables from the configuration final HashMap envVars = new HashMap<>(); - final String prefix = ConfigConstants.CONTAINER_TASK_MANAGER_ENV_PREFIX; + final String prefix = ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX; for (String key : config.keySet()) { if (key.startsWith(prefix) && key.length() > prefix.length()) { diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 4d1ce703d9cf1..582910b84c2cf 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -447,11 +447,11 @@ private static Configuration createConfiguration(String baseDirectory, Map