From 3842385a4c5a4dcaa844b94ba5897d022b780c6e Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Tue, 10 Jun 2014 17:34:00 +0200 Subject: [PATCH] Fix for FLINK-708 (Hadoop 2.4 compatibility) and FLINK-887 (YARN Jobmanager heapspace calc) --- .../stratosphere/yarn/ApplicationMaster.java | 6 +-- .../java/eu/stratosphere/yarn/Client.java | 2 +- .../main/java/eu/stratosphere/yarn/Utils.java | 43 +++++++++++++++++-- .../stratosphere-bin/yarn-bin/yarn-session.sh | 2 +- 4 files changed, 43 insertions(+), 10 deletions(-) diff --git a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/ApplicationMaster.java b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/ApplicationMaster.java index 65ae114b35a51..e208b01879922 100644 --- a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/ApplicationMaster.java +++ b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/ApplicationMaster.java @@ -62,7 +62,6 @@ public class ApplicationMaster { private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); - private static final int HEAP_LIMIT_CAP = 500; private void run() throws Exception { //Utils.logFilesInCurrentDirectory(LOG); @@ -83,10 +82,7 @@ private void run() throws Exception { final int memoryPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_MEMORY)); final int coresPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_CORES)); - int heapLimit = (int)((float)memoryPerTaskManager*0.85); - if( (memoryPerTaskManager - heapLimit) > HEAP_LIMIT_CAP) { - heapLimit = memoryPerTaskManager-HEAP_LIMIT_CAP; - } + int heapLimit = Utils.calculateHeapSize(memoryPerTaskManager); if(currDir == null) { throw new RuntimeException("Current directory unknown"); diff --git a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java index 9d6e5ad535c9e..4ef1456281be5 100644 --- a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java +++ b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java @@ -347,7 +347,7 @@ public boolean accept(File dir, String name) { .newRecord(ContainerLaunchContext.class); String amCommand = "$JAVA_HOME/bin/java" - + " -Xmx"+jmMemory+"M " +javaOpts; + + " -Xmx"+Utils.calculateHeapSize(jmMemory)+"M " +javaOpts; if(hasLog4j) { amCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/jobmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties"; } diff --git a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Utils.java b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Utils.java index 551b5daf774a5..da7f8aebadfaa 100644 --- a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Utils.java +++ b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Utils.java @@ -42,13 +42,13 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.ConverterUtils; import eu.stratosphere.configuration.ConfigConstants; @@ -57,6 +57,7 @@ public class Utils { private static final Log LOG = LogFactory.getLog(Utils.class); + private static final int HEAP_LIMIT_CAP = 500; public static void copyJarContents(String prefix, String pathToJar) throws IOException { @@ -90,6 +91,23 @@ public static void copyJarContents(String prefix, String pathToJar) throws IOExc jar.close(); } + /** + * Calculate the heap size for the JVMs to start in the containers. + * Since JVMs are allocating more than just the heap space, and YARN is very + * fast at killing processes that use memory beyond their limit, we have to come + * up with a good heapsize. + * This code takes 85% of the given amount of memory (in MB). If the amount we removed by these 85% + * more than 500MB (the current HEAP_LIMIT_CAP), we'll just subtract 500 MB. + * + */ + public static int calculateHeapSize(int memory) { + int heapLimit = (int)((float)memory*0.85); + if( (memory - heapLimit) > HEAP_LIMIT_CAP) { + heapLimit = memory-HEAP_LIMIT_CAP; + } + return heapLimit; + } + public static void getStratosphereConfiguration(String confDir) { GlobalConfiguration.loadConfiguration(confDir); } @@ -106,6 +124,7 @@ private static void addPathToConfig(Configuration conf, File path) { ClassLoader cl = new URLClassLoader(urls, conf.getClassLoader()); conf.setClassLoader(cl); } + private static void setDefaultConfValues(Configuration conf) { if(conf.get("fs.hdfs.impl",null) == null) { conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); @@ -114,6 +133,7 @@ private static void setDefaultConfValues(Configuration conf) { conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem"); } } + public static Configuration initializeYarnConfiguration() { Configuration conf = new YarnConfiguration(); String configuredHadoopConfig = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); @@ -157,9 +177,9 @@ public static Configuration initializeYarnConfiguration() { public static void setupEnv(Configuration conf, Map appMasterEnv) { for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { - Apps.addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim()); + addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim()); } - Apps.addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator + "*"); + addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator + "*"); } @@ -221,4 +241,21 @@ public boolean accept(File dir, String name) { } }); } + + /** + * Copied method from org.apache.hadoop.yarn.util.Apps + * It was broken by YARN-1824 (2.4.0) and fixed for 2.4.1 + * by https://issues.apache.org/jira/browse/YARN-1931 + */ + public static void addToEnvironment(Map environment, + String variable, String value) { + String val = environment.get(variable); + if (val == null) { + val = value; + } else { + val = val + File.pathSeparator + value; + } + environment.put(StringInterner.weakIntern(variable), + StringInterner.weakIntern(val)); + } } diff --git a/stratosphere-dist/src/main/stratosphere-bin/yarn-bin/yarn-session.sh b/stratosphere-dist/src/main/stratosphere-bin/yarn-bin/yarn-session.sh index 63d665849a2ad..6ce83e93b9bd9 100644 --- a/stratosphere-dist/src/main/stratosphere-bin/yarn-bin/yarn-session.sh +++ b/stratosphere-dist/src/main/stratosphere-bin/yarn-bin/yarn-session.sh @@ -49,5 +49,5 @@ CC_CLASSPATH=`manglePathList $(constructCLIClientClassPath)` export STRATOSPHERE_CONF_DIR # $log_setting -$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH eu.stratosphere.yarn.Client -ship ship/ -confDir $STRATOSPHERE_CONF_DIR -j $STRATOSPHERE_LIB_DIR/*yarn-uberjar.jar $* +$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH eu.stratosphere.yarn.Client -ship $bin/../ship/ -confDir $STRATOSPHERE_CONF_DIR -j $STRATOSPHERE_LIB_DIR/*yarn-uberjar.jar $*