Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions docs/setup/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ The configuration files for the TaskManagers can be different, Flink does not as

- `env.java.opts`: Set custom JVM options. This value is respected by Flink's start scripts, both JobManager and TaskManager, and Flink's YARN client. This can be used to set different garbage collectors or to include remote debuggers into the JVMs running Flink's services. Use `env.java.opts.jobmanager` and `env.java.opts.taskmanager` for JobManager or TaskManager-specific options, respectively.

- `env.java.opts.jobmanager`: JobManager-specific JVM options. These are used in addition to the regular `env.java.opts`.
- `env.java.opts.jobmanager`: JobManager-specific JVM options. These are used in addition to the regular `env.java.opts`. This configuration option is ignored by the YARN client.

- `env.java.opts.taskmanager`: TaskManager-specific JVM options. These are used in addition to the regular `env.java.opts`.
- `env.java.opts.taskmanager`: TaskManager-specific JVM options. These are used in addition to the regular `env.java.opts`. This configuration option is ignored by the YARN client.

- `jobmanager.rpc.address`: The IP address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost).

Expand Down Expand Up @@ -71,7 +71,7 @@ without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to

### Managed Memory

By default, Flink allocates a fraction of `0.7` of the total memory configured via `taskmanager.heap.mb` for its managed memory. Managed memory helps Flink to run the operators efficiently. It prevents `OutOfMemoryException`s because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system.
By default, Flink allocates a fraction of `0.7` of the total memory configured via `taskmanager.heap.mb` for its managed memory. Managed memory helps Flink to run the batch operators efficiently. It prevents `OutOfMemoryException`s because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system.

The default fraction for managed memory can be adjusted using the `taskmanager.memory.fraction` parameter. An absolute value may be set using `taskmanager.memory.size` (overrides the fraction parameter). If desired, the managed memory may be allocated outside the JVM heap. This may improve performance in setups with large memory sizes.

Expand Down Expand Up @@ -107,12 +107,18 @@ Please make sure to set the maximum ticket life span high long running jobs. The

If you are on YARN, then it is sufficient to authenticate the client with Kerberos. On a Flink standalone cluster you need to ensure that, initially, all nodes are authenticated with Kerberos using the `kinit` tool.


### Other

- `taskmanager.tmp.dirs`: The directory for temporary files, or a list of directories separated by the systems directory delimiter (for example ':' (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round-robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: The system's tmp dir).

- `taskmanager.log.path`: The config parameter defining the taskmanager log file location

- `jobmanager.web.port`: Port of the JobManager's web interface (DEFAULT: 8081).

- `jobmanager.web.tmpdir`: This configuration parameter allows defining the Flink web directory to be used by the web interface. The web interface
will copy its static files into the directory. Also uploaded job jars are stored in the directory. By default, the temporary directory is used.

- `fs.overwrite-files`: Specifies whether file output writers should overwrite existing files by default. Set to *true* to overwrite by default, *false* otherwise. (DEFAULT: false)

- `fs.output.always-create-directory`: File writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory. If this option is set to *true*, writers with a parallelism of 1 will also create a directory and place a single result file into it. If the option is set to *false*, the writer will directly create the file directly at the output path, without creating a containing directory. (DEFAULT: false)
Expand Down Expand Up @@ -228,13 +234,19 @@ definition. This scheme is used **ONLY** if no other scheme is specified (explic
- `taskmanager.runtime.max-fan`: The maximal fan-in for external merge joins and fan-out for spilling hash tables. Limits the number of file handles per operator, but may cause intermediate merging/partitioning, if set too small (DEFAULT: 128).
- `taskmanager.runtime.sort-spilling-threshold`: A sort operation starts spilling when this fraction of its memory budget is full (DEFAULT: 0.8).

### Resource Manager

The configuration keys in this section are independent of the used resource management framework (YARN, Mesos, Standalone, ...)

- `resourcemanager.rpc.port`: The config parameter defining the network port to connect to for communication with the resource manager. By default, the port
of the JobManager, because the same ActorSystem is used. Its not possible to use this configuration key to define port ranges.


## YARN

- `yarn.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to remove from containers started by YARN. When a user requests a certain amount of memory for each TaskManager container (for example 4 GB), we can not pass this amount as the maximum heap space for the JVM (`-Xmx` argument) because the JVM is also allocating memory outside the heap. YARN is very strict with killing containers which are using more memory than requested. Therefore, we remove a 15% of the memory from the requested heap as a safety margin.
- `yarn.heap-cutoff-min`: (Default 384 MB) Minimum amount of memory to cut off the requested heap size.

- `yarn.reallocate-failed` (Default 'true') Controls whether YARN should reallocate failed containers

- `yarn.maximum-failed-containers` (Default: number of requested containers). Maximum number of containers the system is going to reallocate in case of a failure.

- `yarn.application-attempts` (Default: 1). Number of ApplicationMaster restarts. Note that that the entire Flink cluster will restart and the YARN Client will loose the connection. Also, the JobManager address will change and you'll need to set the JM host:port manually. It is recommended to leave this option at 1.
Expand All @@ -243,17 +255,19 @@ definition. This scheme is used **ONLY** if no other scheme is specified (explic

- `yarn.properties-file.location` (Default: temp directory). When a Flink job is submitted to YARN, the JobManager's host and the number of available processing slots is written into a properties file, so that the Flink client is able to pick those details up. This configuration parameter allows changing the default location of that file (for example for environments sharing a Flink installation between users)

- `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default, the number of `vcores` is set to the number of slots per TaskManager, if set, or to 1, otherwise.

- `yarn.application-master.env.`*ENV_VAR1=value* Configuration values prefixed with `yarn.application-master.env.` will be passed as environment variables to the ApplicationMaster/JobManager process. For example for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationMaster, set:

yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
`yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"`

- `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default, the number of `vcores` is set to the number of slots per TaskManager, if set, or to 1, otherwise.

- `yarn.taskmanager.env.` Similar to the configuration prefix about, this prefix allows setting custom environment variables for the TaskManager processes.

- `yarn.application-master.port` (Default: 0, which lets the OS choose an ephemeral port) With this configuration option, users can specify a port, a range of ports or a list of ports for the Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to let the operating system choose an appropriate port. In particular when multiple AMs are running on the same physical host, fixed port assignments prevent the AM from starting.

For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports.
For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports.


## High Availability Mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.common.io;

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -373,6 +374,7 @@ public int read(byte[] b, int off, int len) throws IOException {
// Checkpointing
// --------------------------------------------------------------------------------------------

@PublicEvolving
@Override
public Tuple2<Long, Long> getCurrentState() throws IOException {
if (this.blockBasedInput == null) {
Expand All @@ -385,6 +387,7 @@ public Tuple2<Long, Long> getCurrentState() throws IOException {
);
}

@PublicEvolving
@Override
public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.io.Serializable;

/**
* An interface the describes {@link InputFormat}s that allow checkpointing/restoring their state.
* An interface that describes {@link InputFormat}s that allow checkpointing/restoring their state.
*
* @param <S> The type of input split.
* @param <T> The type of the channel state to be checkpointed / included in the snapshot.
Expand All @@ -40,7 +40,7 @@ public interface CheckpointableInputFormat<S extends InputSplit, T extends Seria
*
* @return The state of the channel.
*
* @throws Exception Thrown if the creation of the state object failed.
* @throws IOException Thrown if the creation of the state object failed.
*/
T getCurrentState() throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.common.io;

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -629,11 +630,13 @@ private boolean fillBuffer() throws IOException {
// Checkpointing
// --------------------------------------------------------------------------------------------

@PublicEvolving
@Override
public Long getCurrentState() throws IOException {
return this.offset;
}

@PublicEvolving
@Override
public void reopen(FileInputSplit split, Long state) throws IOException {
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,29 +269,27 @@ public final class ConfigConstants {
* Percentage of heap space to remove from containers (YARN / Mesos), to compensate
* for other JVM memory usage.
*/
public static final String CONTAINERED_HEAP_CUTOFF_RATIO = "containered.heap-cutoff-ratio";
public static final String CONTAINERIZED_HEAP_CUTOFF_RATIO = "containerized.heap-cutoff-ratio";

/**
* Minimum amount of heap memory to remove in containers, as a safety margin.
*/
public static final String CONTAINERED_HEAP_CUTOFF_MIN = "containered.heap-cutoff-min";
public static final String CONTAINERIZED_HEAP_CUTOFF_MIN = "containerized.heap-cutoff-min";

/**
* Prefix for passing custom environment variables to Flink's master process.
* For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
* yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
* containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
* in the flink-conf.yaml.
*/
public static final String CONTAINERED_MASTER_ENV_PREFIX = "containered.application-master.env.";
public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env.";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this should be CONTAINER_MASTER_ENV_PREFIX = container.master.env.?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed it.
I think we misunderstood each other. I thought our discussion regarding container / containered / containerized was about the prefix in general, not for the heap cutoff settings alone.


/**
* Similar to the {@see CONTAINERED_MASTER_ENV_PREFIX}, this configuration prefix allows
* Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration prefix allows
* setting custom environment variables for the workers (TaskManagers)
*/
public static final String CONTAINERED_TASK_MANAGER_ENV_PREFIX = "containered.taskmanager.env.";
public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env.";

// --------------------------Standalone Setup -----------------------------


// ------------------------ YARN Configuration ------------------------

Expand All @@ -302,12 +300,14 @@ public final class ConfigConstants {

/**
* Percentage of heap space to remove from containers started by YARN.
* @deprecated in favor of {@code #CONTAINERIZED_HEAP_CUTOFF_RATIO}
*/
@Deprecated
public static final String YARN_HEAP_CUTOFF_RATIO = "yarn.heap-cutoff-ratio";

/**
* Minimum amount of memory to remove from the heap space as a safety margin.
* @deprecated in favor of {@code #CONTAINERIZED_HEAP_CUTOFF_MIN}
*/
@Deprecated
public static final String YARN_HEAP_CUTOFF_MIN = "yarn.heap-cutoff-min";
Expand Down Expand Up @@ -355,7 +355,7 @@ public final class ConfigConstants {
* For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
* yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
* in the flink-conf.yaml.
* @deprecated Please use {@code CONTAINERED_MASTER_ENV_PREFIX}.
* @deprecated Please use {@code CONTAINERIZED_MASTER_ENV_PREFIX}.
*/
@Deprecated
public static final String YARN_APPLICATION_MASTER_ENV_PREFIX = "yarn.application-master.env.";
Expand All @@ -369,7 +369,7 @@ public final class ConfigConstants {
/**
* Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this configuration prefix allows
* setting custom environment variables.
* @deprecated Please use {@code CONTAINERED_TASK_MANAGER_ENV_PREFIX}.
* @deprecated Please use {@code CONTAINERIZED_TASK_MANAGER_ENV_PREFIX}.
*/
@Deprecated
public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,22 +114,22 @@ public static ContaineredTaskManagerParameters create(
// (1) compute how much memory we subtract from the total memory, to get the Java memory

final float memoryCutoffRatio = config.getFloat(
ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO,
ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);

final int minCutoff = config.getInteger(
ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN,
ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN,
ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);

if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) {
throw new IllegalArgumentException("The configuration value '"
+ ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value given="
+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value given="
+ memoryCutoffRatio);
}

if (minCutoff >= containerMemoryMB) {
throw new IllegalArgumentException("The configuration value '"
+ ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN + "'='" + minCutoff
+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN + "'='" + minCutoff
+ "' is larger than the total container memory " + containerMemoryMB);
}

Expand Down Expand Up @@ -166,7 +166,7 @@ public static ContaineredTaskManagerParameters create(

// (3) obtain the additional environment variables from the configuration
final HashMap<String, String> envVars = new HashMap<>();
final String prefix = ConfigConstants.CONTAINERED_TASK_MANAGER_ENV_PREFIX;
final String prefix = ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;

for (String key : config.keySet()) {
if (key.startsWith(prefix) && key.length() > prefix.length()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,23 @@ public void testUberjarLocator() {
@Test
public void testHeapCutoff() {
Configuration conf = new Configuration();
conf.setDouble(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 0.15);
conf.setInteger(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, 384);
conf.setDouble(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15);
conf.setInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, 384);

Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );

// test different configuration
Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf));

conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, "1000");
conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.1");
conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, "1000");
conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "0.1");
Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));

conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.5");
conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "0.5");
Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));

conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1");
conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "1");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));

// test also deprecated keys
Expand All @@ -88,21 +88,21 @@ public void testHeapCutoff() {
@Test(expected = IllegalArgumentException.class)
public void illegalArgument() {
Configuration conf = new Configuration();
conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1.1");
conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "1.1");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}

@Test(expected = IllegalArgumentException.class)
public void illegalArgumentNegative() {
Configuration conf = new Configuration();
conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "-0.01");
conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "-0.01");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}

@Test(expected = IllegalArgumentException.class)
public void tooMuchCutoff() {
Configuration conf = new Configuration();
conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "6000");
conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "6000");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}

Expand Down
Loading