Skip to content

Commit

Permalink
[FLINK-1950] Change safety margin computation for YARN containers
Browse files Browse the repository at this point in the history
This closes #637
  • Loading branch information
rmetzger committed Apr 28, 2015
1 parent b0df4f4 commit f2daa10
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 30 deletions.
20 changes: 16 additions & 4 deletions docs/setup/config.md
Expand Up @@ -304,13 +304,25 @@ input format's parameters (DEFAULT: 2097152 (= 2 MiBytes)).

## YARN

Please note that all ports used by Flink in a YARN session are offsetted by the YARN application ID
to avoid duplicate port allocations when running multiple YARN sessions in parallel.

So if `yarn.am.rpc.port` is configured to `10245` and the session's application ID is `application_1406629969999_0002`, then the actual port being used is 10245 + 2 = 10247
- `yarn.heap-cutoff-ratio`: (Default 0.15) Percentage of heap space to remove from containers started by YARN.
When a user requests a certain amount of memory for each TaskManager container (for example 4 GB),
we can not pass this amount as the maximum heap space for the JVM (`-Xmx` argument) because the JVM
is also allocating memory outside the heap. YARN is very strict with killing containers which are using
more memory than requested.
Therefore, we remove a 15% of the memory from the requested heap as a safety margin.
- `yarn.heap-cutoff-min`: (Default 384 MB) Minimum amount of memory to cut off the requested heap size.

- `yarn.heap-cutoff-ratio`: Percentage of heap space to remove from containers started by YARN.
- `yarn.reallocate-failed` (Default 'true') Controls whether YARN should reallocate failed containers

- `yarn.maximum-failed-containers` (Default: number of requested containers). Maximum number of containers the system
is going to reallocate in case of a failure.

- `yarn.application-attempts` (Default: 1). Number of ApplicationMaster restarts. Note that that the entire Flink cluster
will restart and the YARN Client will loose the connection. Also, the JobManager address will change and you'll need
to set the JM host:port manually. It is recommended to leave this option at 1.

- `yarn.heartbeat-delay` (Default: 5 seconds). Time between heartbeats with the ResourceManager.

## Background

Expand Down
Expand Up @@ -197,9 +197,18 @@ public final class ConfigConstants {
* Upper bound for heap cutoff on YARN.
* The "yarn.heap-cutoff-ratio" is removing a certain ratio from the heap.
* This value is limiting this cutoff to a absolute value.
*
* THE VALUE IS NO LONGER IN USE.
*/
@Deprecated
public static final String YARN_HEAP_LIMIT_CAP = "yarn.heap-limit-cap";

/**
* Minimum amount of memory to remove from the heap space as a safety margin.
*/
public static final String YARN_HEAP_CUTOFF_MIN = "yarn.heap-cutoff-min";


/**
* Reallocate failed YARN containers.
*/
Expand Down Expand Up @@ -512,6 +521,13 @@ public final class ConfigConstants {
* The default timeout for filesystem stream opening: infinite (means max long milliseconds).
*/
public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0;

// ------------------------ YARN Configuration ------------------------


public static final int DEFAULT_YARN_MIN_HEAP_CUTOFF = 384;

public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.15f;


// ------------------------ File System Behavior ------------------------
Expand Down
Expand Up @@ -95,7 +95,7 @@ public void cancel() {
// kill all tasks currently running in this slot
Execution exec = this.executedTask;
if (exec != null && !exec.isFinished()) {
exec.fail(new Exception("The slot in which the task was scheduled has been killed (probably loss of TaskManager)."));
exec.fail(new Exception("The slot in which the task was scheduled has been killed (probably loss of TaskManager). Instance:"+getInstance()));
}
}
}
Expand Down
Expand Up @@ -118,9 +118,9 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
*/
private int slots = -1;

private int jobManagerMemoryMb = 512;
private int jobManagerMemoryMb = 1024;

private int taskManagerMemoryMb = 512;
private int taskManagerMemoryMb = 1024;

private int taskManagerCount = 1;

Expand Down
29 changes: 14 additions & 15 deletions flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
Expand Up @@ -51,33 +51,32 @@ public class Utils {

private static final Logger LOG = LoggerFactory.getLogger(Utils.class);

private static final int DEFAULT_HEAP_LIMIT_CAP = 700;
private static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.8f;

/**
* Calculate the heap size for the JVMs to start in the containers.
* Since JVMs are allocating more than just the heap space, and YARN is very
* fast at killing processes that use memory beyond their limit, we have to come
* up with a good heapsize.
* This code takes 85% of the given amount of memory (in MB). If the amount we removed by these 85%
* more than 500MB (the current HEAP_LIMIT_CAP), we'll just subtract 500 MB.
*
* See documentation
*/
public static int calculateHeapSize(int memory, org.apache.flink.configuration.Configuration conf) {
float memoryCutoffRatio = conf.getFloat(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, DEFAULT_YARN_HEAP_CUTOFF_RATIO);
int heapLimitCap = conf.getInteger(ConfigConstants.YARN_HEAP_LIMIT_CAP, DEFAULT_HEAP_LIMIT_CAP);
float memoryCutoffRatio = conf.getFloat(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
int minCutoff = conf.getInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.DEFAULT_YARN_MIN_HEAP_CUTOFF);

if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) {
throw new IllegalArgumentException("The configuration value '"+ConfigConstants.YARN_HEAP_CUTOFF_RATIO+"' must be between 0 and 1. Value given="+memoryCutoffRatio);
}
if (minCutoff > memory) {
throw new IllegalArgumentException("The configuration value '"+ConfigConstants.YARN_HEAP_CUTOFF_MIN +"' is higher ("+minCutoff+") than the requested amount of memory "+memory);
}

int heapLimit = (int)((float)memory * memoryCutoffRatio);
if( (memory - heapLimit) > heapLimitCap) {
heapLimit = memory-heapLimitCap;
if (heapLimit < minCutoff) {
heapLimit = minCutoff;
}
return heapLimit;
return memory - heapLimit;
}


public static void setupEnv(Configuration conf, Map<String, String> appMasterEnv) {
addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator + "*");
for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
for (String c: conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim());
}
}
Expand Down
45 changes: 37 additions & 8 deletions flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTests.java
Expand Up @@ -24,21 +24,50 @@

public class UtilsTests {


/**
* Remove 15% of the heap, at least 384MB.
*
*/
@Test
public void testHeapCutoff() {
Configuration conf = new Configuration();
// ASSUMES DEFAULT Configuration values.
Assert.assertEquals(800, Utils.calculateHeapSize(1000, conf) );
Assert.assertEquals(9300, Utils.calculateHeapSize(10000, conf) );

Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );

// test different configuration
Assert.assertEquals(3300, Utils.calculateHeapSize(4000, conf) );
Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf) );

conf.setString(ConfigConstants.YARN_HEAP_LIMIT_CAP, "1000");
conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "0.3");
conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_MIN, "1000");
conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "0.1");
Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));

conf.setString(ConfigConstants.YARN_HEAP_LIMIT_CAP, "6000000");
Assert.assertEquals(1200, Utils.calculateHeapSize(4000, conf));
conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "0.5");
Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));

conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "1");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}

@Test(expected = IllegalArgumentException.class)
public void illegalArgument() {
Configuration conf = new Configuration();
conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "1.1");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}

@Test(expected = IllegalArgumentException.class)
public void illegalArgumentNegative() {
Configuration conf = new Configuration();
conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "-0.01");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}

@Test(expected = IllegalArgumentException.class)
public void tooMuchCutoff() {
Configuration conf = new Configuration();
conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_MIN, "6000");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}
}

0 comments on commit f2daa10

Please sign in to comment.