From 681b36f5fb63e14dc89e17813894227be9e2324f Mon Sep 17 00:00:00 2001 From: nravi Date: Thu, 8 May 2014 00:05:33 -0700 Subject: [PATCH 01/15] Fix for SPARK-1758: failing test org.apache.spark.JavaAPISuite.wholeTextFiles The prefix "file:" is missing in the string inserted as key in HashMap --- core/src/test/java/org/apache/spark/JavaAPISuite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 7193223addf66..dc5d3261f98e4 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -639,8 +639,8 @@ public void wholeTextFiles() throws IOException { ds.close(); HashMap container = new HashMap(); - container.put(tempDirName+"/part-00000", new Text(content1).toString()); - container.put(tempDirName+"/part-00001", new Text(content2).toString()); + container.put("file:" + tempDirName+"/part-00000", new Text(content1).toString()); + container.put("file:" + tempDirName+"/part-00001", new Text(content2).toString()); JavaPairRDD readRDD = sc.wholeTextFiles(tempDirName, 3); List> result = readRDD.collect(); From 5108700230fd70b995e76598f49bdf328c971e77 Mon Sep 17 00:00:00 2001 From: nravi Date: Tue, 3 Jun 2014 15:25:22 -0700 Subject: [PATCH 02/15] Fix in Spark for the Concurrent thread modification issue (SPARK-1097, HADOOP-10456) --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 6547755764dcf..8346d25a29d49 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -126,7 +126,7 @@ class HadoopRDD[K, V]( private val createTime = new Date() // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. - protected def getJobConf(): JobConf = { + protected def getJobConf(): JobConf = synchronized { val conf: Configuration = broadcastedConf.value.value if (conf.isInstanceOf[JobConf]) { // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. From 6b840f017870207d23e75de224710971ada0b3d0 Mon Sep 17 00:00:00 2001 From: nravi Date: Tue, 3 Jun 2014 15:34:02 -0700 Subject: [PATCH 03/15] Undo the fix for SPARK-1758 (the problem is fixed) --- core/src/test/java/org/apache/spark/JavaAPISuite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index dc5d3261f98e4..7193223addf66 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -639,8 +639,8 @@ public void wholeTextFiles() throws IOException { ds.close(); HashMap container = new HashMap(); - container.put("file:" + tempDirName+"/part-00000", new Text(content1).toString()); - container.put("file:" + tempDirName+"/part-00001", new Text(content2).toString()); + container.put(tempDirName+"/part-00000", new Text(content1).toString()); + container.put(tempDirName+"/part-00001", new Text(content2).toString()); JavaPairRDD readRDD = sc.wholeTextFiles(tempDirName, 3); List> result = readRDD.collect(); From df2aeb179fca4fc893803c72a657317f5b5539d7 Mon Sep 17 00:00:00 2001 From: nravi Date: Mon, 9 Jun 2014 12:02:59 -0700 Subject: [PATCH 04/15] Improved fix for ConcurrentModificationIssue (Spark-1097, Hadoop-10456) --- .../main/scala/org/apache/spark/rdd/HadoopRDD.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 8346d25a29d49..2aa111d600e9b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -126,7 +126,7 @@ class HadoopRDD[K, V]( private val createTime = new Date() // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. - protected def getJobConf(): JobConf = synchronized { + protected def getJobConf(): JobConf = { val conf: Configuration = broadcastedConf.value.value if (conf.isInstanceOf[JobConf]) { // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. @@ -139,10 +139,13 @@ class HadoopRDD[K, V]( // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. - val newJobConf = new JobConf(broadcastedConf.value.value) - initLocalJobConfFuncOpt.map(f => f(newJobConf)) - HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) - newJobConf + // synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456) + broadcastedConf.synchronized { + val newJobConf = new JobConf(broadcastedConf.value.value) + initLocalJobConfFuncOpt.map(f => f(newJobConf)) + HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) + newJobConf + } } } From 2b630f94079b82df3ebae2b26a3743112afcd526 Mon Sep 17 00:00:00 2001 From: nravi Date: Sun, 15 Jun 2014 23:00:31 -0700 Subject: [PATCH 05/15] Accept memory input as "30g", "512M" instead of an int value, to be consistent with rest of Spark --- .../scala/org/apache/spark/deploy/ClientArguments.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala index 5da9615c9e9af..39150deab863c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala @@ -21,6 +21,8 @@ import scala.collection.mutable.ListBuffer import org.apache.log4j.Level +import org.apache.spark.util.MemoryParam + /** * Command-line parser for the driver client. */ @@ -51,8 +53,8 @@ private[spark] class ClientArguments(args: Array[String]) { cores = value.toInt parse(tail) - case ("--memory" | "-m") :: value :: tail => - memory = value.toInt + case ("--memory" | "-m") :: MemoryParam(value) :: tail => + memory = value parse(tail) case ("--supervise" | "-s") :: tail => From ebcde10252e6c45169ea086e8426ec9997d46490 Mon Sep 17 00:00:00 2001 From: Nishkam Ravi Date: Sun, 21 Sep 2014 23:44:40 -0700 Subject: [PATCH 06/15] Modify default YARN memory_overhead-- from an additive constant to a multiplier (redone to resolve merge conflicts) --- docs/running-on-yarn.md | 4 ++-- out | 1 + .../org/apache/spark/deploy/yarn/ClientBase.scala | 3 ++- .../apache/spark/deploy/yarn/YarnAllocator.scala | 14 ++++++++------ .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 5 +++-- 5 files changed, 16 insertions(+), 11 deletions(-) create mode 100644 out diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 74bcc2eeb65f6..134edfb93e984 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -79,14 +79,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes spark.yarn.executor.memoryOverhead - 384 + executorMemory * 0.07, with minimum of 384 The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. spark.yarn.driver.memoryOverhead - 384 + driverMemory * 0.07, with minimum of 384 The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. diff --git a/out b/out new file mode 100644 index 0000000000000..2a7bfed917afe --- /dev/null +++ b/out @@ -0,0 +1 @@ +Already up-to-date. diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index c96f731923d22..b8ed23921aa6a 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -61,7 +61,8 @@ trait ClientBase extends Logging { // Additional memory overhead - in mb. protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead", - YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD) + math.max((YarnSparkHadoopUtil.MEMORY_OVERHEAD_FACTOR * args.amMemory).toInt, + YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN)) // TODO(harvey): This could just go in ClientArguments. def validateArgs() = { diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 299e38a5eb9c0..8625aaa8f3fdb 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -78,10 +78,6 @@ private[yarn] abstract class YarnAllocator( // Containers to be released in next request to RM private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean] - // Additional memory overhead - in mb. - protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", - YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD) - // Number of container requests that have been sent to, but not yet allocated by the // ApplicationMaster. private val numPendingAllocate = new AtomicInteger() @@ -97,6 +93,11 @@ private[yarn] abstract class YarnAllocator( protected val (preferredHostToCount, preferredRackToCount) = generateNodeToWeight(conf, preferredNodes) + // Additional memory overhead - in mb. + protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", + math.max((YarnSparkHadoopUtil.MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, + YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN)) + private val launcherPool = new ThreadPoolExecutor( // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE, @@ -117,9 +118,10 @@ private[yarn] abstract class YarnAllocator( if (missing > 0) { numPendingAllocate.addAndGet(missing) - logInfo("Will Allocate %d executor containers, each with %d memory".format( + logInfo("Will Allocate %d executor containers, each with %d+%d MB memory".format( missing, - (executorMemory + memoryOverhead))) + executorMemory, + memoryOverhead)) } else { logDebug("Empty allocation request ...") } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 4a33e34c3bfc7..c31f6cb2abb46 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -85,8 +85,9 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { } object YarnSparkHadoopUtil { - // Additional memory overhead - in mb. - val DEFAULT_MEMORY_OVERHEAD = 384 + // Additional memory overhead + val MEMORY_OVERHEAD_FACTOR = 0.07 + val MEMORY_OVERHEAD_MIN = 384 val ANY_HOST = "*" From 1cf2d1ef57ed6d783df06dad36b9505bc74329fb Mon Sep 17 00:00:00 2001 From: nishkamravi2 Date: Mon, 22 Sep 2014 01:54:33 -0700 Subject: [PATCH 07/15] Update YarnAllocator.scala --- .../scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 8625aaa8f3fdb..c6e57e3647b7a 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -118,9 +118,9 @@ private[yarn] abstract class YarnAllocator( if (missing > 0) { numPendingAllocate.addAndGet(missing) - logInfo("Will Allocate %d executor containers, each with %d+%d MB memory".format( + logInfo("Will allocate %d executor containers, each with %d MB memory including %d MB overhead".format( missing, - executorMemory, + (executorMemory + memoryOverhead), memoryOverhead)) } else { logDebug("Empty allocation request ...") From f00fa311945c1eafa8957eae5c84719521761dcd Mon Sep 17 00:00:00 2001 From: Nishkam Ravi Date: Mon, 22 Sep 2014 16:06:07 -0700 Subject: [PATCH 08/15] Improving logging for AM memoryOverhead --- .../scala/org/apache/spark/deploy/yarn/ClientBase.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index b8ed23921aa6a..8d818011c38f2 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -100,11 +100,14 @@ trait ClientBase extends Logging { val amMem = args.amMemory + memoryOverhead if (amMem > maxMem) { - val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster." - .format(amMem, maxMem) + val errorMessage = "Required AM memory (%d+%d MB) is above the max threshold (%d MB) of this cluster." + .format(args.amMemory, memoryOverhead, maxMem) logError(errorMessage) throw new IllegalArgumentException(errorMessage) } + logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format( + amMem, + memoryOverhead)) // We could add checks to make sure the entire cluster has enough resources but that involves // getting all the node reports and computing ourselves. From 362da5edfd04bd8bad990fb210a9e11b8494fa62 Mon Sep 17 00:00:00 2001 From: Nishkam Ravi Date: Thu, 25 Sep 2014 12:56:13 -0700 Subject: [PATCH 09/15] Additional changes for yarn memory overhead --- .../org/apache/spark/deploy/yarn/ClientArguments.scala | 8 +++----- .../scala/org/apache/spark/deploy/yarn/ClientBase.scala | 8 ++++---- .../org/apache/spark/deploy/yarn/YarnAllocator.scala | 8 +++----- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index b2b170180e409..76f7ff7b4a261 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkConf import org.apache.spark.util.{Utils, IntParam, MemoryParam} - +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware ! private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) { @@ -45,12 +45,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) // Additional memory to allocate to containers // For now, use driver's memory overhead as our AM container's memory overhead val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead", - math.max((YarnSparkHadoopUtil.MEMORY_OVERHEAD_FACTOR * amMemory).toInt, - YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN)) + math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN)) val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", - math.max((YarnSparkHadoopUtil.MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, - YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN)) + math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) validateArgs() diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 1f39fa21a1ab7..8fa3cedccf485 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -64,13 +64,13 @@ private[spark] trait ClientBase extends Logging { s"memory capability of the cluster ($maxMem MB per container)") val executorMem = args.executorMemory + executorMemoryOverhead if (executorMem > maxMem) { - throw new IllegalArgumentException(s"Required executor memory ($args.executorMemmory+$executorMemoryOverhead MB) " + - s"is above the max threshold ($maxMem MB) of this cluster!") + throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" + + s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!") } val amMem = args.amMemory + amMemoryOverhead if (amMem > maxMem) { - throw new IllegalArgumentException(s"Required AM memory ($args.amMemory+$amMemoryOverhead MB) " + - s"is above the max threshold ($maxMem MB) of this cluster!") + throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" + + s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!") } logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format( amMem, diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index c6e57e3647b7a..b96f8e2bfda45 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -115,13 +115,11 @@ private[yarn] abstract class YarnAllocator( // this is needed by alpha, do it here since we add numPending right after this val executorsPending = numPendingAllocate.get() - if (missing > 0) { + val totalExecutorMemory = executorMemory + memoryOverhead; numPendingAllocate.addAndGet(missing) - logInfo("Will allocate %d executor containers, each with %d MB memory including %d MB overhead".format( - missing, - (executorMemory + memoryOverhead), - memoryOverhead)) + logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " + + s"memory including $memoryOverhead MB overhead") } else { logDebug("Empty allocation request ...") } From 42c2c3d18862d3632c20931ecfe2c64883c5febf Mon Sep 17 00:00:00 2001 From: Nishkam Ravi Date: Thu, 25 Sep 2014 13:02:49 -0700 Subject: [PATCH 10/15] Additional changes for yarn memory overhead issue --- .../scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index b96f8e2bfda45..d2de57000aadf 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -33,6 +33,7 @@ import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ object AllocationType extends Enumeration { type AllocationType = Value @@ -95,8 +96,7 @@ private[yarn] abstract class YarnAllocator( // Additional memory overhead - in mb. protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", - math.max((YarnSparkHadoopUtil.MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, - YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN)) + math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) private val launcherPool = new ThreadPoolExecutor( // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue From dac1047995c99f5a2670f934eb8d3a4ad9b532c8 Mon Sep 17 00:00:00 2001 From: Nishkam Ravi Date: Thu, 25 Sep 2014 14:20:38 -0700 Subject: [PATCH 11/15] Additional documentation for yarn memory overhead issue --- .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index b8953781cd8be..e1e0144f46fe9 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -85,6 +85,9 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { object YarnSparkHadoopUtil { // Additional memory overhead + // 7% was arrived at experimentally. In the interest of minimizing memory waste while covering + // the common cases. Memory overhead tends to grow with container size. + val MEMORY_OVERHEAD_FACTOR = 0.07 val MEMORY_OVERHEAD_MIN = 384 From 5ac2ec11629e19030ad5577da1eee2d135cc3d1c Mon Sep 17 00:00:00 2001 From: Nishkam Ravi Date: Thu, 25 Sep 2014 14:25:44 -0700 Subject: [PATCH 12/15] Remove out --- out | 1 - 1 file changed, 1 deletion(-) delete mode 100644 out diff --git a/out b/out deleted file mode 100644 index 2a7bfed917afe..0000000000000 --- a/out +++ /dev/null @@ -1 +0,0 @@ -Already up-to-date. From 35daa6498048cabb736316e2f19e565c99243b7e Mon Sep 17 00:00:00 2001 From: Nishkam Ravi Date: Thu, 25 Sep 2014 14:59:22 -0700 Subject: [PATCH 13/15] Slight change in the doc for yarn memory overhead --- docs/running-on-yarn.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 5361019b14b32..aa4a831e75be3 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -81,7 +81,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes spark.yarn.executor.memoryOverhead executorMemory * 0.07, with minimum of 384 - The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. + The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%). From 8f76c8b46379736aeb7dbe1a4d88729424a041f7 Mon Sep 17 00:00:00 2001 From: Nishkam Ravi Date: Thu, 25 Sep 2014 15:03:00 -0700 Subject: [PATCH 14/15] Doc change for yarn memory overhead --- docs/running-on-yarn.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index aa4a831e75be3..695813a2ba881 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -88,7 +88,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes spark.yarn.driver.memoryOverhead driverMemory * 0.07, with minimum of 384 - The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. + The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). From 636a9ffeb4a4ae0b941edd849dcbabf38821db53 Mon Sep 17 00:00:00 2001 From: nishkamravi2 Date: Tue, 30 Sep 2014 11:33:28 -0700 Subject: [PATCH 15/15] Update YarnAllocator.scala --- .../main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index d2de57000aadf..4f4f1d2aaaade 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -116,7 +116,7 @@ private[yarn] abstract class YarnAllocator( // this is needed by alpha, do it here since we add numPending right after this val executorsPending = numPendingAllocate.get() if (missing > 0) { - val totalExecutorMemory = executorMemory + memoryOverhead; + val totalExecutorMemory = executorMemory + memoryOverhead numPendingAllocate.addAndGet(missing) logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " + s"memory including $memoryOverhead MB overhead")