From 681b36f5fb63e14dc89e17813894227be9e2324f Mon Sep 17 00:00:00 2001 From: nravi Date: Thu, 8 May 2014 00:05:33 -0700 Subject: [PATCH 01/13] Fix for SPARK-1758: failing test org.apache.spark.JavaAPISuite.wholeTextFiles The prefix "file:" is missing in the string inserted as key in HashMap --- core/src/test/java/org/apache/spark/JavaAPISuite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 7193223addf66..dc5d3261f98e4 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -639,8 +639,8 @@ public void wholeTextFiles() throws IOException { ds.close(); HashMap container = new HashMap(); - container.put(tempDirName+"/part-00000", new Text(content1).toString()); - container.put(tempDirName+"/part-00001", new Text(content2).toString()); + container.put("file:" + tempDirName+"/part-00000", new Text(content1).toString()); + container.put("file:" + tempDirName+"/part-00001", new Text(content2).toString()); JavaPairRDD readRDD = sc.wholeTextFiles(tempDirName, 3); List> result = readRDD.collect(); From 5108700230fd70b995e76598f49bdf328c971e77 Mon Sep 17 00:00:00 2001 From: nravi Date: Tue, 3 Jun 2014 15:25:22 -0700 Subject: [PATCH 02/13] Fix in Spark for the Concurrent thread modification issue (SPARK-1097, HADOOP-10456) --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 6547755764dcf..8346d25a29d49 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -126,7 +126,7 @@ class HadoopRDD[K, V]( private val createTime = new Date() // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. - protected def getJobConf(): JobConf = { + protected def getJobConf(): JobConf = synchronized { val conf: Configuration = broadcastedConf.value.value if (conf.isInstanceOf[JobConf]) { // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. From 6b840f017870207d23e75de224710971ada0b3d0 Mon Sep 17 00:00:00 2001 From: nravi Date: Tue, 3 Jun 2014 15:34:02 -0700 Subject: [PATCH 03/13] Undo the fix for SPARK-1758 (the problem is fixed) --- core/src/test/java/org/apache/spark/JavaAPISuite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index dc5d3261f98e4..7193223addf66 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -639,8 +639,8 @@ public void wholeTextFiles() throws IOException { ds.close(); HashMap container = new HashMap(); - container.put("file:" + tempDirName+"/part-00000", new Text(content1).toString()); - container.put("file:" + tempDirName+"/part-00001", new Text(content2).toString()); + container.put(tempDirName+"/part-00000", new Text(content1).toString()); + container.put(tempDirName+"/part-00001", new Text(content2).toString()); JavaPairRDD readRDD = sc.wholeTextFiles(tempDirName, 3); List> result = readRDD.collect(); From df2aeb179fca4fc893803c72a657317f5b5539d7 Mon Sep 17 00:00:00 2001 From: nravi Date: Mon, 9 Jun 2014 12:02:59 -0700 Subject: [PATCH 04/13] Improved fix for ConcurrentModificationIssue (Spark-1097, Hadoop-10456) --- .../main/scala/org/apache/spark/rdd/HadoopRDD.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 8346d25a29d49..2aa111d600e9b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -126,7 +126,7 @@ class HadoopRDD[K, V]( private val createTime = new Date() // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. - protected def getJobConf(): JobConf = synchronized { + protected def getJobConf(): JobConf = { val conf: Configuration = broadcastedConf.value.value if (conf.isInstanceOf[JobConf]) { // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. @@ -139,10 +139,13 @@ class HadoopRDD[K, V]( // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. - val newJobConf = new JobConf(broadcastedConf.value.value) - initLocalJobConfFuncOpt.map(f => f(newJobConf)) - HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) - newJobConf + // synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456) + broadcastedConf.synchronized { + val newJobConf = new JobConf(broadcastedConf.value.value) + initLocalJobConfFuncOpt.map(f => f(newJobConf)) + HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) + newJobConf + } } } From 2b630f94079b82df3ebae2b26a3743112afcd526 Mon Sep 17 00:00:00 2001 From: nravi Date: Sun, 15 Jun 2014 23:00:31 -0700 Subject: [PATCH 05/13] Accept memory input as "30g", "512M" instead of an int value, to be consistent with rest of Spark --- .../scala/org/apache/spark/deploy/ClientArguments.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala index 5da9615c9e9af..39150deab863c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala @@ -21,6 +21,8 @@ import scala.collection.mutable.ListBuffer import org.apache.log4j.Level +import org.apache.spark.util.MemoryParam + /** * Command-line parser for the driver client. */ @@ -51,8 +53,8 @@ private[spark] class ClientArguments(args: Array[String]) { cores = value.toInt parse(tail) - case ("--memory" | "-m") :: value :: tail => - memory = value.toInt + case ("--memory" | "-m") :: MemoryParam(value) :: tail => + memory = value parse(tail) case ("--supervise" | "-s") :: tail => From 715201e21a963c1d435a167d79e25c5b55b73885 Mon Sep 17 00:00:00 2001 From: Nishkam Ravi Date: Sat, 12 Jul 2014 22:33:12 -0700 Subject: [PATCH 06/13] Modify default YARN memory_overhead: from an additive constant to a multiplier --- docs/running-on-yarn.md | 4 ++-- .../org/apache/spark/deploy/yarn/ExecutorLauncher.scala | 2 +- .../apache/spark/deploy/yarn/YarnAllocationHandler.scala | 7 ++++--- .../scala/org/apache/spark/deploy/yarn/ClientBase.scala | 2 +- .../apache/spark/deploy/yarn/YarnAllocationHandler.scala | 7 ++++--- 5 files changed, 12 insertions(+), 10 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 0362f5a223319..a8ce499bcaee9 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -83,14 +83,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes spark.yarn.executor.memoryOverhead - 384 + executorMemory * 0.07, with minimum of 384 The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. spark.yarn.driver.memoryOverhead - 384 + driverMemory * 0.07, with minimum of 384 The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index bfdb6232f5113..b2eed31e461d1 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -100,7 +100,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp if (minimumMemory > 0) { val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead", - YarnAllocationHandler.MEMORY_OVERHEAD) + math.max((YarnAllocationHandler.MEMORY_OVERHEAD_FACTOR * args.executorMemory).toInt, YarnAllocationHandler.MEMORY_OVERHEAD_MIN)) val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) if (numCore > 0) { diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 80e0162e9f277..a8da82934e5ff 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -90,7 +90,7 @@ private[yarn] class YarnAllocationHandler( // Additional memory overhead - in mb. private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", - YarnAllocationHandler.MEMORY_OVERHEAD) + math.max((YarnAllocationHandler.MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, YarnAllocationHandler.MEMORY_OVERHEAD_MIN)) private val numExecutorsRunning = new AtomicInteger() // Used to generate a unique id per executor @@ -548,8 +548,9 @@ object YarnAllocationHandler { // request types (like map/reduce in hadoop for example) val PRIORITY = 1 - // Additional memory overhead - in mb - val MEMORY_OVERHEAD = 384 + // Additional memory overhead + val MEMORY_OVERHEAD_FACTOR = 0.07 + val MEMORY_OVERHEAD_MIN = 384 // Host to rack map - saved from allocation requests // We are expecting this not to change. diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 556f49342977a..74625361373df 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -67,7 +67,7 @@ trait ClientBase extends Logging { // Additional memory overhead - in mb. protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead", - YarnAllocationHandler.MEMORY_OVERHEAD) + math.max((YarnAllocationHandler.MEMORY_OVERHEAD_FACTOR * args.amMemory).toInt, YarnAllocationHandler.MEMORY_OVERHEAD_MIN)) // TODO(harvey): This could just go in ClientArguments. def validateArgs() = { diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 29ccec2adcac3..1261b17c3c94f 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -92,7 +92,7 @@ private[yarn] class YarnAllocationHandler( // Additional memory overhead - in mb. private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", - YarnAllocationHandler.MEMORY_OVERHEAD) + math.max((YarnAllocationHandler.MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, YarnAllocationHandler.MEMORY_OVERHEAD_MIN)) // Number of container requests that have been sent to, but not yet allocated by the // ApplicationMaster. @@ -562,8 +562,9 @@ object YarnAllocationHandler { // request types (like map/reduce in hadoop for example) val PRIORITY = 1 - // Additional memory overhead - in mb. - val MEMORY_OVERHEAD = 384 + // Additional memory overhead + val MEMORY_OVERHEAD_FACTOR = 0.07 + val MEMORY_OVERHEAD_MIN = 384 // Host to rack map - saved from allocation requests. We are expecting this not to change. // Note that it is possible for this to change : and ResurceManager will indicate that to us via From 11a231c629cb3de5ba6bdde1c3989a4f5612ce19 Mon Sep 17 00:00:00 2001 From: nishkamravi2 Date: Thu, 17 Jul 2014 11:53:11 -0700 Subject: [PATCH 07/13] Update running-on-yarn.md --- docs/running-on-yarn.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index a8ce499bcaee9..9e054deace29f 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -83,14 +83,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes spark.yarn.executor.memoryOverhead - executorMemory * 0.07, with minimum of 384 + executorMemory * 0.06, with minimum of 384 The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. spark.yarn.driver.memoryOverhead - driverMemory * 0.07, with minimum of 384 + driverMemory * 0.06, with minimum of 384 The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. From b44845b0a64c7a9b05634d65c4f47c0719385d0c Mon Sep 17 00:00:00 2001 From: nishkamravi2 Date: Thu, 17 Jul 2014 11:53:47 -0700 Subject: [PATCH 08/13] Update YarnAllocationHandler.scala --- .../org/apache/spark/deploy/yarn/YarnAllocationHandler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index a8da82934e5ff..5b2398a8322c4 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -549,7 +549,7 @@ object YarnAllocationHandler { val PRIORITY = 1 // Additional memory overhead - val MEMORY_OVERHEAD_FACTOR = 0.07 + val MEMORY_OVERHEAD_FACTOR = 0.06 val MEMORY_OVERHEAD_MIN = 384 // Host to rack map - saved from allocation requests From 44ba4daf9fa99676f16a284772bb0a7be7b4a981 Mon Sep 17 00:00:00 2001 From: nishkamravi2 Date: Thu, 17 Jul 2014 11:54:25 -0700 Subject: [PATCH 09/13] Update YarnAllocationHandler.scala --- .../org/apache/spark/deploy/yarn/YarnAllocationHandler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 1261b17c3c94f..4382fac632bed 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -563,7 +563,7 @@ object YarnAllocationHandler { val PRIORITY = 1 // Additional memory overhead - val MEMORY_OVERHEAD_FACTOR = 0.07 + val MEMORY_OVERHEAD_FACTOR = 0.06 val MEMORY_OVERHEAD_MIN = 384 // Host to rack map - saved from allocation requests. We are expecting this not to change. From 3062d6b5abb103e57115837afd028ecaa038525a Mon Sep 17 00:00:00 2001 From: nishkamravi2 Date: Thu, 18 Sep 2014 17:03:04 -0700 Subject: [PATCH 10/13] Update ExecutorLauncher.scala --- .../scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index b2eed31e461d1..d3d8e25621eb7 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -100,7 +100,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp if (minimumMemory > 0) { val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead", - math.max((YarnAllocationHandler.MEMORY_OVERHEAD_FACTOR * args.executorMemory).toInt, YarnAllocationHandler.MEMORY_OVERHEAD_MIN)) + math.max((YarnAllocationHandler.MEMORY_OVERHEAD_FACTOR * args.executorMemory).toInt, + YarnAllocationHandler.MEMORY_OVERHEAD_MIN)) val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) if (numCore > 0) { From b8eeefed777b0e0130af25443fe5d88c4a20211d Mon Sep 17 00:00:00 2001 From: nishkamravi2 Date: Thu, 18 Sep 2014 17:03:29 -0700 Subject: [PATCH 11/13] Update YarnAllocationHandler.scala --- .../org/apache/spark/deploy/yarn/YarnAllocationHandler.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 5b2398a8322c4..47d67b40875c0 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -90,7 +90,8 @@ private[yarn] class YarnAllocationHandler( // Additional memory overhead - in mb. private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", - math.max((YarnAllocationHandler.MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, YarnAllocationHandler.MEMORY_OVERHEAD_MIN)) + math.max((YarnAllocationHandler.MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, + YarnAllocationHandler.MEMORY_OVERHEAD_MIN)) private val numExecutorsRunning = new AtomicInteger() // Used to generate a unique id per executor From 492c33fd412f4944ac3792d3f33c5f13beca0c31 Mon Sep 17 00:00:00 2001 From: nishkamravi2 Date: Thu, 18 Sep 2014 17:03:48 -0700 Subject: [PATCH 12/13] Update ClientBase.scala --- .../main/scala/org/apache/spark/deploy/yarn/ClientBase.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 74625361373df..689db6299be62 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -67,7 +67,8 @@ trait ClientBase extends Logging { // Additional memory overhead - in mb. protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead", - math.max((YarnAllocationHandler.MEMORY_OVERHEAD_FACTOR * args.amMemory).toInt, YarnAllocationHandler.MEMORY_OVERHEAD_MIN)) + math.max((YarnAllocationHandler.MEMORY_OVERHEAD_FACTOR * args.amMemory).toInt, + YarnAllocationHandler.MEMORY_OVERHEAD_MIN)) // TODO(harvey): This could just go in ClientArguments. def validateArgs() = { From 5cbe89cdb6cc8702a040ded9f35c0a299818533e Mon Sep 17 00:00:00 2001 From: nishkamravi2 Date: Thu, 18 Sep 2014 17:04:18 -0700 Subject: [PATCH 13/13] Update YarnAllocationHandler.scala --- .../org/apache/spark/deploy/yarn/YarnAllocationHandler.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 4382fac632bed..c35ed62d77eea 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -92,7 +92,8 @@ private[yarn] class YarnAllocationHandler( // Additional memory overhead - in mb. private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", - math.max((YarnAllocationHandler.MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, YarnAllocationHandler.MEMORY_OVERHEAD_MIN)) + math.max((YarnAllocationHandler.MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, + YarnAllocationHandler.MEMORY_OVERHEAD_MIN)) // Number of container requests that have been sent to, but not yet allocated by the // ApplicationMaster.