From 57f7ac2575d16274d4696a6ecb9752472f8e98af Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 9 Jun 2016 10:47:08 +0100 Subject: [PATCH 1/3] Reduce spark.memory.fraction default to 0.66 --- .../org/apache/spark/memory/UnifiedMemoryManager.scala | 8 ++++---- docs/configuration.md | 2 +- docs/tuning.md | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index ae747c1d163e8..36f9d46ccc2a3 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -25,9 +25,9 @@ import org.apache.spark.storage.BlockId * either side can borrow memory from the other. * * The region shared between execution and storage is a fraction of (the total heap space - 300MB) - * configurable through `spark.memory.fraction` (default 0.75). The position of the boundary + * configurable through `spark.memory.fraction` (default 0.66). The position of the boundary * within this space is further determined by `spark.memory.storageFraction` (default 0.5). - * This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default. + * This means the size of the storage region is 0.66 * 0.5 = 0.33 of the heap space by default. * * Storage can borrow as much execution memory as is free until execution reclaims its space. * When this happens, cached blocks will be evicted from memory until sufficient borrowed @@ -187,7 +187,7 @@ object UnifiedMemoryManager { // Set aside a fixed amount of memory for non-storage, non-execution purposes. // This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve // sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then - // the memory used for execution and storage will be (1024 - 300) * 0.75 = 543MB by default. + // the memory used for execution and storage will be (1024 - 300) * 0.66 = 478MB by default. private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024 def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = { @@ -223,7 +223,7 @@ object UnifiedMemoryManager { } } val usableMemory = systemMemory - reservedMemory - val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75) + val memoryFraction = conf.getDouble("spark.memory.fraction", 0.66) (usableMemory * memoryFraction).toLong } } diff --git a/docs/configuration.md b/docs/configuration.md index 32c3a9266078a..a910270626eb0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -778,7 +778,7 @@ Apart from these, the following properties are also available, and may be useful Property NameDefaultMeaning spark.memory.fraction - 0.75 + 0.66 Fraction of (heap space - 300MB) used for execution and storage. The lower this is, the more frequently spills and cached data eviction occur. The purpose of this config is to set diff --git a/docs/tuning.md b/docs/tuning.md index e73ed69ffbbf8..cfb6c2ea6a2ac 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -115,7 +115,7 @@ Although there are two relevant configurations, the typical user should not need as the default values are applicable to most workloads: * `spark.memory.fraction` expresses the size of `M` as a fraction of the (JVM heap space - 300MB) -(default 0.75). The rest of the space (25%) is reserved for user data structures, internal +(default 0.66). The rest of the space (25%) is reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records. * `spark.memory.storageFraction` expresses the size of `R` as a fraction of `M` (default 0.5). From 2692ae30dfd38d55253d7c05ee8a0395db899c2c Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Mon, 13 Jun 2016 09:37:38 +0100 Subject: [PATCH 2/3] Default to 0.6 and document interaction with NewRatio --- .../spark/memory/UnifiedMemoryManager.scala | 8 ++++---- docs/configuration.md | 7 ++++--- docs/tuning.md | 18 +++++++++++++++++- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 36f9d46ccc2a3..c7b36be6027a5 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -25,9 +25,9 @@ import org.apache.spark.storage.BlockId * either side can borrow memory from the other. * * The region shared between execution and storage is a fraction of (the total heap space - 300MB) - * configurable through `spark.memory.fraction` (default 0.66). The position of the boundary + * configurable through `spark.memory.fraction` (default 0.6). The position of the boundary * within this space is further determined by `spark.memory.storageFraction` (default 0.5). - * This means the size of the storage region is 0.66 * 0.5 = 0.33 of the heap space by default. + * This means the size of the storage region is 0.6 * 0.5 = 0.3 of the heap space by default. * * Storage can borrow as much execution memory as is free until execution reclaims its space. * When this happens, cached blocks will be evicted from memory until sufficient borrowed @@ -187,7 +187,7 @@ object UnifiedMemoryManager { // Set aside a fixed amount of memory for non-storage, non-execution purposes. // This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve // sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then - // the memory used for execution and storage will be (1024 - 300) * 0.66 = 478MB by default. + // the memory used for execution and storage will be (1024 - 300) * 0.6 = 434MB by default. private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024 def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = { @@ -223,7 +223,7 @@ object UnifiedMemoryManager { } } val usableMemory = systemMemory - reservedMemory - val memoryFraction = conf.getDouble("spark.memory.fraction", 0.66) + val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6) (usableMemory * memoryFraction).toLong } } diff --git a/docs/configuration.md b/docs/configuration.md index a910270626eb0..fbda91c109626 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -778,14 +778,15 @@ Apart from these, the following properties are also available, and may be useful Property NameDefaultMeaning spark.memory.fraction - 0.66 + 0.6 Fraction of (heap space - 300MB) used for execution and storage. The lower this is, the more frequently spills and cached data eviction occur. The purpose of this config is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records. Leaving this at the default value is - recommended. For more detail, see - this description. + recommended. For more detail, including important information about correctly tuning JVM + garbage collection when increasing this value, see + this description. diff --git a/docs/tuning.md b/docs/tuning.md index cfb6c2ea6a2ac..1ed14091c0546 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -115,12 +115,28 @@ Although there are two relevant configurations, the typical user should not need as the default values are applicable to most workloads: * `spark.memory.fraction` expresses the size of `M` as a fraction of the (JVM heap space - 300MB) -(default 0.66). The rest of the space (25%) is reserved for user data structures, internal +(default 0.6). The rest of the space (25%) is reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records. * `spark.memory.storageFraction` expresses the size of `R` as a fraction of `M` (default 0.5). `R` is the storage space within `M` where cached blocks immune to being evicted by execution. +The value of `spark.memory.fraction` should be set in order to fit this amount of heap space +comfortably within the JVM's old or "tenured" generation. Otherwise, when much of this space is +used for caching and execution, the tenured generation will be full, which causes the JVM to +significantly increase time spent in garbage collection. See +Java GC sizing documentation +for more information. + +The tenured generation size is controlled by the JVM's `NewRatio` parameter, which defaults to 2, +meaning that the tenured generation is 2 times the size of the new generation (the rest of the heap). +So, by default, the tenured generation occupies 2/3 or about 0.66 of the heap. A value of +0.6 for `spark.memory.fraction` keeps storage and execution memory within the old generation with +room to spare. If `spark.memory.fraction` is increased to, say, 0.8, then `NewRatio` may have to +increase to 6 or more. + +`NewRatio` is set as a JVM flag for executors, which means adding +`spark.executor.extraJavaOptions=-XX:NewRatio=x` to a Spark job's configuration. ## Determining Memory Consumption From e30914254e48e865cd226dd3e2fea0943e1baf26 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Mon, 13 Jun 2016 15:10:38 +0100 Subject: [PATCH 3/3] Use smaller partitions in test of partitions fitting partially in memory, to account for smaller default max memory size --- core/src/test/scala/org/apache/spark/DistributedSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 6e69fc4247079..0515e6e3a6319 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -223,7 +223,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex test("compute when only some partitions fit in memory") { val size = 10000 - val numPartitions = 10 + val numPartitions = 20 val conf = new SparkConf() .set("spark.storage.unrollMemoryThreshold", "1024") .set("spark.testing.memory", size.toString)