From 2e4f34e54e92bfc47d817cb6392d89d660401b57 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 23 Dec 2016 04:59:01 +0000 Subject: [PATCH 1/5] Return false when forceSpill is called before the map is asked for iterator. --- .../util/collection/ExternalAppendOnlyMap.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 948cc3b099b18..9c3e27016ac5d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -115,6 +115,7 @@ class ExternalAppendOnlyMap[K, V, C]( private val keyComparator = new HashComparator[K] private val ser = serializer.newInstance() + @volatile private var isReadingIterator: Boolean = false @volatile private var readingIterator: SpillableIterator = null /** @@ -192,12 +193,16 @@ class ExternalAppendOnlyMap[K, V, C]( * It will be called by TaskMemoryManager when there is not enough memory for the task. */ override protected[this] def forceSpill(): Boolean = { - assert(readingIterator != null) - val isSpilled = readingIterator.spill() - if (isSpilled) { - currentMap = null + if (isReadingIterator) { + assert(readingIterator != null) + val isSpilled = readingIterator.spill() + if (isSpilled) { + currentMap = null + } + isSpilled + } else { + false } - isSpilled } /** @@ -261,6 +266,7 @@ class ExternalAppendOnlyMap[K, V, C]( */ def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = { readingIterator = new SpillableIterator(inMemoryIterator) + isReadingIterator = true readingIterator } From 03d4dc0afbba0217a322a20a999894640f43aecc Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 23 Dec 2016 05:45:54 +0000 Subject: [PATCH 2/5] Add test. --- .../util/collection/ExternalAppendOnlyMapSuite.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 7f0838268a111..b601559197188 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -283,6 +283,17 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("ExternalAppendOnlyMap shouldn't fail when forced to spill before calling its iterator") { + val size = 1000 + val conf = createSparkConf(loadDefaults = true) + conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) + sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) + val map = createExternalMap[String] + val consumer = createExternalMap[String] + map.insertAll((1 to size).iterator.map(_.toString).map(i => (i, i))) + assert(map.spill(10000, consumer) == 0L) + } + test("spilling with hash collisions") { val size = 1000 val conf = createSparkConf(loadDefaults = true) From 5645533ede8bb37aa316afbc0d003d042bf5d52d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 9 Jan 2017 07:46:12 +0000 Subject: [PATCH 3/5] Address comment. --- .../apache/spark/util/collection/ExternalAppendOnlyMap.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 9c3e27016ac5d..70b243d9a62aa 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -115,7 +115,6 @@ class ExternalAppendOnlyMap[K, V, C]( private val keyComparator = new HashComparator[K] private val ser = serializer.newInstance() - @volatile private var isReadingIterator: Boolean = false @volatile private var readingIterator: SpillableIterator = null /** @@ -193,8 +192,7 @@ class ExternalAppendOnlyMap[K, V, C]( * It will be called by TaskMemoryManager when there is not enough memory for the task. */ override protected[this] def forceSpill(): Boolean = { - if (isReadingIterator) { - assert(readingIterator != null) + if (readingIterator != null) { val isSpilled = readingIterator.spill() if (isSpilled) { currentMap = null @@ -266,7 +264,6 @@ class ExternalAppendOnlyMap[K, V, C]( */ def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = { readingIterator = new SpillableIterator(inMemoryIterator) - isReadingIterator = true readingIterator } From 5ebf079667ae897cfcefcd523b0476273a67a181 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 11 Jan 2017 13:52:35 +0000 Subject: [PATCH 4/5] Address comment. --- .../apache/spark/util/collection/ExternalAppendOnlyMap.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 70b243d9a62aa..fab60d4a8374d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -199,7 +199,9 @@ class ExternalAppendOnlyMap[K, V, C]( } isSpilled } else { - false + spill(currentMap) + currentMap = new SizeTrackingAppendOnlyMap[K, C] + true } } From b1ef9ec749737125d833cd3a64922b4a9f8c32f1 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 16 Jan 2017 07:24:40 +0000 Subject: [PATCH 5/5] Only spill if the map has elements. --- .../apache/spark/util/collection/ExternalAppendOnlyMap.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index fab60d4a8374d..8aafda5e45d52 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -198,10 +198,12 @@ class ExternalAppendOnlyMap[K, V, C]( currentMap = null } isSpilled - } else { + } else if (currentMap.size > 0) { spill(currentMap) currentMap = new SizeTrackingAppendOnlyMap[K, C] true + } else { + false } }