From 7270e0d45e55113e8a3de0e08c93d31c4c93efba Mon Sep 17 00:00:00 2001 From: Jim Lim Date: Fri, 12 Sep 2014 11:37:01 -0700 Subject: [PATCH 1/4] SPARK-2761 refactor #maybeSpill into Spillable --- .../collection/ExternalAppendOnlyMap.scala | 46 ++------ .../util/collection/ExternalSorter.scala | 63 +++-------- .../spark/util/collection/Spillable.scala | 102 ++++++++++++++++++ 3 files changed, 129 insertions(+), 82 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/util/collection/Spillable.scala diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 8a015c1d26a96..934ae47be53d7 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -66,23 +66,19 @@ class ExternalAppendOnlyMap[K, V, C]( mergeCombiners: (C, C) => C, serializer: Serializer = SparkEnv.get.serializer, blockManager: BlockManager = SparkEnv.get.blockManager) - extends Iterable[(K, C)] with Serializable with Logging { + extends Iterable[(K, C)] + with Serializable + with Logging + with Spillable[SizeTrackingAppendOnlyMap[K, C]] { private var currentMap = new SizeTrackingAppendOnlyMap[K, C] private val spilledMaps = new ArrayBuffer[DiskMapIterator] private val sparkConf = SparkEnv.get.conf private val diskBlockManager = blockManager.diskBlockManager - private val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager // Number of pairs inserted since last spill; note that we count them even if a value is merged // with a previous key in case we're doing something like groupBy where the result grows - private var elementsRead = 0L - - // Number of in-memory pairs inserted before tracking the map's shuffle memory usage - private val trackMemoryThreshold = 1000 - - // How much of the shared memory pool this collection has claimed - private var myMemoryThreshold = 0L + protected[this] var elementsRead = 0L /** * Size of object batches when reading/writing from serializers. @@ -95,11 +91,7 @@ class ExternalAppendOnlyMap[K, V, C]( */ private val serializerBatchSize = sparkConf.getLong("spark.shuffle.spill.batchSize", 10000) - // How many times we have spilled so far - private var spillCount = 0 - // Number of bytes spilled in total - private var _memoryBytesSpilled = 0L private var _diskBytesSpilled = 0L private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024 @@ -136,20 +128,7 @@ class ExternalAppendOnlyMap[K, V, C]( while (entries.hasNext) { curEntry = entries.next() - if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 && - currentMap.estimateSize() >= myMemoryThreshold) - { - // Claim up to double our current memory from the shuffle memory pool - val currentMemory = currentMap.estimateSize() - val amountToRequest = 2 * currentMemory - myMemoryThreshold - val granted = shuffleMemoryManager.tryToAcquire(amountToRequest) - myMemoryThreshold += granted - if (myMemoryThreshold <= currentMemory) { - // We were granted too little memory to grow further (either tryToAcquire returned 0, - // or we already had more memory than myMemoryThreshold); spill the current collection - spill(currentMemory) // Will also release memory back to ShuffleMemoryManager - } - } + currentMap = maybeSpill(currentMap) currentMap.changeValue(curEntry._1, update) elementsRead += 1 } @@ -171,8 +150,8 @@ class ExternalAppendOnlyMap[K, V, C]( /** * Sort the existing contents of the in-memory map and spill them to a temporary file on disk. */ - private def spill(mapSize: Long): Unit = { - spillCount += 1 + protected[this] def spill[A <: SizeTrackingAppendOnlyMap[K, C]](collection: A): A = { + val mapSize = collection.estimateSize() val threadId = Thread.currentThread().getId logInfo("Thread %d spilling in-memory map of %d MB to disk (%d time%s so far)" .format(threadId, mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) @@ -231,18 +210,13 @@ class ExternalAppendOnlyMap[K, V, C]( } } - currentMap = new SizeTrackingAppendOnlyMap[K, C] spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes)) - // Release our memory back to the shuffle pool so that other threads can grab it - shuffleMemoryManager.release(myMemoryThreshold) - myMemoryThreshold = 0L - elementsRead = 0 - _memoryBytesSpilled += mapSize + + new SizeTrackingAppendOnlyMap[K, C].asInstanceOf[A] } - def memoryBytesSpilled: Long = _memoryBytesSpilled def diskBytesSpilled: Long = _diskBytesSpilled /** diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 782b979e2e93d..1634ce93ee1ba 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -79,14 +79,14 @@ private[spark] class ExternalSorter[K, V, C]( aggregator: Option[Aggregator[K, V, C]] = None, partitioner: Option[Partitioner] = None, ordering: Option[Ordering[K]] = None, - serializer: Option[Serializer] = None) extends Logging { + serializer: Option[Serializer] = None) + extends Logging with Spillable[SizeTrackingPairCollection[(Int, K), C]] { private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) private val shouldPartition = numPartitions > 1 private val blockManager = SparkEnv.get.blockManager private val diskBlockManager = blockManager.diskBlockManager - private val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager private val ser = Serializer.getSerializer(serializer) private val serInstance = ser.newInstance() @@ -115,22 +115,14 @@ private[spark] class ExternalSorter[K, V, C]( // Number of pairs read from input since last spill; note that we count them even if a value is // merged with a previous key in case we're doing something like groupBy where the result grows - private var elementsRead = 0L - - // What threshold of elementsRead we start estimating map size at. - private val trackMemoryThreshold = 1000 + protected[this] var elementsRead = 0L // Total spilling statistics - private var spillCount = 0 - private var _memoryBytesSpilled = 0L private var _diskBytesSpilled = 0L // Write metrics for current spill private var curWriteMetrics: ShuffleWriteMetrics = _ - // How much of the shared memory pool this collection has claimed - private var myMemoryThreshold = 0L - // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't need // local aggregation and sorting, write numPartitions files directly and just concatenate them // at the end. This avoids doing serialization and deserialization twice to merge together the @@ -232,35 +224,18 @@ private[spark] class ExternalSorter[K, V, C]( return } - val collection: SizeTrackingPairCollection[(Int, K), C] = if (usingMap) map else buffer - - // TODO: factor this out of both here and ExternalAppendOnlyMap - if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 && - collection.estimateSize() >= myMemoryThreshold) - { - // Claim up to double our current memory from the shuffle memory pool - val currentMemory = collection.estimateSize() - val amountToRequest = 2 * currentMemory - myMemoryThreshold - val granted = shuffleMemoryManager.tryToAcquire(amountToRequest) - myMemoryThreshold += granted - if (myMemoryThreshold <= currentMemory) { - // We were granted too little memory to grow further (either tryToAcquire returned 0, - // or we already had more memory than myMemoryThreshold); spill the current collection - spill(currentMemory, usingMap) // Will also release memory back to ShuffleMemoryManager - } + if (usingMap) { + map = maybeSpill(map) + } else { + buffer = maybeSpill(buffer) } } /** * Spill the current in-memory collection to disk, adding a new file to spills, and clear it. - * - * @param usingMap whether we're using a map or buffer as our current in-memory collection */ - private def spill(memorySize: Long, usingMap: Boolean): Unit = { - val collection: SizeTrackingPairCollection[(Int, K), C] = if (usingMap) map else buffer + protected[this] def spill[A <: SizeTrackingPairCollection[(Int, K), C]](collection: A): A = { val memorySize = collection.estimateSize() - - spillCount += 1 val threadId = Thread.currentThread().getId logInfo("Thread %d spilling in-memory batch of %d MB to disk (%d spill%s so far)" .format(threadId, memorySize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) @@ -271,19 +246,17 @@ private[spark] class ExternalSorter[K, V, C]( spillToMergeableFile(collection) } - if (usingMap) { - map = new SizeTrackingAppendOnlyMap[(Int, K), C] - } else { - buffer = new SizeTrackingPairBuffer[(Int, K), C] - } - - // Release our memory back to the shuffle pool so that other threads can grab it - shuffleMemoryManager.release(myMemoryThreshold) - myMemoryThreshold = 0 - - _memoryBytesSpilled += memorySize + newCollection[A](collection) } + private def newCollection[A](t: A): A = + t match { + case _: AppendOnlyMap[_, _] => + new SizeTrackingAppendOnlyMap[(Int, K), C].asInstanceOf[A] + case _ => + new SizeTrackingPairBuffer[(Int, K), C].asInstanceOf[A] + } + /** * Spill our in-memory collection to a sorted file that we can merge later (normal code path). * We add this file into spilledFiles to find it later. @@ -804,8 +777,6 @@ private[spark] class ExternalSorter[K, V, C]( } } - def memoryBytesSpilled: Long = _memoryBytesSpilled - def diskBytesSpilled: Long = _diskBytesSpilled /** diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala new file mode 100644 index 0000000000000..5ec3fb4efbdda --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import org.apache.spark.SparkEnv +import scala.language.reflectiveCalls + +/** + * Spills contents of an in-memory collection to disk when the memory threshold + * has been exceeded. + * + * @tparam C collection type that provides a size estimate + */ +private[spark] trait Spillable[C <: { def estimateSize(): Long }] { + + // Number of elements read from input since last spill + protected[this] var elementsRead: Long + + // Memory manager that can be used to acquire/release memory + private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager + + // What threshold of elementsRead we start estimating collection size at + private[this] val trackMemoryThreshold = 1000 + + // How much of the shared memory pool this collection has claimed + private[this] var myMemoryThreshold = 0L + + // Number of bytes spilled in total + private[this] var _memoryBytesSpilled = 0L + + // Number of spills + private[this] var _spillCount = 0 + + /** + * Spills the current in-memory collection to disk if needed. Attempts to acquire more + * memory before spilling. + * + * @tparam A type of collection to be spilled + * @return if spilled, a new empty collection instance; otherwise, the same collection instance + */ + protected[this] def maybeSpill[A <: C](collection: A): A = { + if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 && + collection.estimateSize() >= myMemoryThreshold) { + // Claim up to double our current memory from the shuffle memory pool + val currentMemory = collection.estimateSize() + val amountToRequest = 2 * currentMemory - myMemoryThreshold + val granted = shuffleMemoryManager.tryToAcquire(amountToRequest) + myMemoryThreshold += granted + if (myMemoryThreshold <= currentMemory) { + // We were granted too little memory to grow further (either tryToAcquire returned 0, + // or we already had more memory than myMemoryThreshold); spill the current collection + _spillCount += 1 + val empty = spill[A](collection) + _memoryBytesSpilled += currentMemory + release() + return empty + } + } + collection + } + + /** + * Spills the current in-memory collection to disk, and releases the memory. + * + * @param collection collection to spill to disk + * @return new, empty collection + */ + protected[this] def spill[A <: C](collection: A): A + + /** + * @return total number of times this collection was spilled + */ + protected[this] def spillCount: Int = _spillCount + + /** + * @return number of bytes spilled in total + */ + def memoryBytesSpilled: Long = _memoryBytesSpilled + + /** + * Release our memory back to the shuffle pool so that other threads can grab it. + */ + private[this] def release(): Unit = { + shuffleMemoryManager.release(myMemoryThreshold) + myMemoryThreshold = 0L + } +} From e75a24e1a9bd451ce4c0e917175af45b7e2ddc18 Mon Sep 17 00:00:00 2001 From: Jim Lim Date: Fri, 26 Sep 2014 08:58:03 -0700 Subject: [PATCH 2/4] SPARK-2761 use protected over protected[this] --- .../org/apache/spark/util/collection/Spillable.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index 5ec3fb4efbdda..ec41f2e7d757a 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -29,7 +29,7 @@ import scala.language.reflectiveCalls private[spark] trait Spillable[C <: { def estimateSize(): Long }] { // Number of elements read from input since last spill - protected[this] var elementsRead: Long + protected var elementsRead: Long // Memory manager that can be used to acquire/release memory private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager @@ -53,7 +53,7 @@ private[spark] trait Spillable[C <: { def estimateSize(): Long }] { * @tparam A type of collection to be spilled * @return if spilled, a new empty collection instance; otherwise, the same collection instance */ - protected[this] def maybeSpill[A <: C](collection: A): A = { + protected def maybeSpill[A <: C](collection: A): A = { if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 && collection.estimateSize() >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool @@ -80,12 +80,12 @@ private[spark] trait Spillable[C <: { def estimateSize(): Long }] { * @param collection collection to spill to disk * @return new, empty collection */ - protected[this] def spill[A <: C](collection: A): A + protected def spill[A <: C](collection: A): A /** * @return total number of times this collection was spilled */ - protected[this] def spillCount: Int = _spillCount + protected def spillCount: Int = _spillCount /** * @return number of bytes spilled in total From f94d522f85274c3562c25235936e74536cd3daa7 Mon Sep 17 00:00:00 2001 From: Jim Lim Date: Fri, 26 Sep 2014 09:58:26 -0700 Subject: [PATCH 3/4] SPARK-2761 refactor Spillable to simplify sig - move spilling logging into Spillable - remove spillCount method --- .../collection/ExternalAppendOnlyMap.scala | 14 +++--- .../util/collection/ExternalSorter.scala | 31 +++++-------- .../spark/util/collection/Spillable.scala | 45 +++++++++++-------- 3 files changed, 42 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 934ae47be53d7..0c088da46aa5e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -69,7 +69,7 @@ class ExternalAppendOnlyMap[K, V, C]( extends Iterable[(K, C)] with Serializable with Logging - with Spillable[SizeTrackingAppendOnlyMap[K, C]] { + with Spillable[SizeTracker] { private var currentMap = new SizeTrackingAppendOnlyMap[K, C] private val spilledMaps = new ArrayBuffer[DiskMapIterator] @@ -128,7 +128,9 @@ class ExternalAppendOnlyMap[K, V, C]( while (entries.hasNext) { curEntry = entries.next() - currentMap = maybeSpill(currentMap) + if (maybeSpill(currentMap, currentMap.estimateSize())) { + currentMap = new SizeTrackingAppendOnlyMap[K, C] + } currentMap.changeValue(curEntry._1, update) elementsRead += 1 } @@ -150,11 +152,7 @@ class ExternalAppendOnlyMap[K, V, C]( /** * Sort the existing contents of the in-memory map and spill them to a temporary file on disk. */ - protected[this] def spill[A <: SizeTrackingAppendOnlyMap[K, C]](collection: A): A = { - val mapSize = collection.estimateSize() - val threadId = Thread.currentThread().getId - logInfo("Thread %d spilling in-memory map of %d MB to disk (%d time%s so far)" - .format(threadId, mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) + override protected[this] def spill(collection: SizeTracker): Unit = { val (blockId, file) = diskBlockManager.createTempBlock() curWriteMetrics = new ShuffleWriteMetrics() var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize, @@ -213,8 +211,6 @@ class ExternalAppendOnlyMap[K, V, C]( spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes)) elementsRead = 0 - - new SizeTrackingAppendOnlyMap[K, C].asInstanceOf[A] } def diskBytesSpilled: Long = _diskBytesSpilled diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 1634ce93ee1ba..0a152cb97ad9e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -201,7 +201,7 @@ private[spark] class ExternalSorter[K, V, C]( elementsRead += 1 kv = records.next() map.changeValue((getPartition(kv._1), kv._1), update) - maybeSpill(usingMap = true) + maybeSpillCollection(usingMap = true) } } else { // Stick values into our buffer @@ -209,7 +209,7 @@ private[spark] class ExternalSorter[K, V, C]( elementsRead += 1 val kv = records.next() buffer.insert((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C]) - maybeSpill(usingMap = false) + maybeSpillCollection(usingMap = false) } } } @@ -219,44 +219,33 @@ private[spark] class ExternalSorter[K, V, C]( * * @param usingMap whether we're using a map or buffer as our current in-memory collection */ - private def maybeSpill(usingMap: Boolean): Unit = { + private def maybeSpillCollection(usingMap: Boolean): Unit = { if (!spillingEnabled) { return } if (usingMap) { - map = maybeSpill(map) + if (maybeSpill(map, map.estimateSize())) { + map = new SizeTrackingAppendOnlyMap[(Int, K), C] + } } else { - buffer = maybeSpill(buffer) + if (maybeSpill(buffer, buffer.estimateSize())) { + buffer = new SizeTrackingPairBuffer[(Int, K), C] + } } } /** * Spill the current in-memory collection to disk, adding a new file to spills, and clear it. */ - protected[this] def spill[A <: SizeTrackingPairCollection[(Int, K), C]](collection: A): A = { - val memorySize = collection.estimateSize() - val threadId = Thread.currentThread().getId - logInfo("Thread %d spilling in-memory batch of %d MB to disk (%d spill%s so far)" - .format(threadId, memorySize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) - + override protected[this] def spill(collection: SizeTrackingPairCollection[(Int, K), C]): Unit = { if (bypassMergeSort) { spillToPartitionFiles(collection) } else { spillToMergeableFile(collection) } - - newCollection[A](collection) } - private def newCollection[A](t: A): A = - t match { - case _: AppendOnlyMap[_, _] => - new SizeTrackingAppendOnlyMap[(Int, K), C].asInstanceOf[A] - case _ => - new SizeTrackingPairBuffer[(Int, K), C].asInstanceOf[A] - } - /** * Spill our in-memory collection to a sorted file that we can merge later (normal code path). * We add this file into spilledFiles to find it later. diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index ec41f2e7d757a..b5a67fef9b4f3 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -17,16 +17,17 @@ package org.apache.spark.util.collection +import org.apache.spark.Logging import org.apache.spark.SparkEnv import scala.language.reflectiveCalls /** * Spills contents of an in-memory collection to disk when the memory threshold * has been exceeded. - * - * @tparam C collection type that provides a size estimate */ -private[spark] trait Spillable[C <: { def estimateSize(): Long }] { +private[spark] trait Spillable[C] { + + this: Logging => // Number of elements read from input since last spill protected var elementsRead: Long @@ -50,14 +51,12 @@ private[spark] trait Spillable[C <: { def estimateSize(): Long }] { * Spills the current in-memory collection to disk if needed. Attempts to acquire more * memory before spilling. * - * @tparam A type of collection to be spilled * @return if spilled, a new empty collection instance; otherwise, the same collection instance */ - protected def maybeSpill[A <: C](collection: A): A = { + protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 && - collection.estimateSize() >= myMemoryThreshold) { + currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool - val currentMemory = collection.estimateSize() val amountToRequest = 2 * currentMemory - myMemoryThreshold val granted = shuffleMemoryManager.tryToAcquire(amountToRequest) myMemoryThreshold += granted @@ -65,13 +64,17 @@ private[spark] trait Spillable[C <: { def estimateSize(): Long }] { // We were granted too little memory to grow further (either tryToAcquire returned 0, // or we already had more memory than myMemoryThreshold); spill the current collection _spillCount += 1 - val empty = spill[A](collection) + logSpillage(currentMemory) + + spill(collection) + + // Keep track of spills, and release memory _memoryBytesSpilled += currentMemory - release() - return empty + releaseMemoryForThisThread() + return true } } - collection + false } /** @@ -80,12 +83,7 @@ private[spark] trait Spillable[C <: { def estimateSize(): Long }] { * @param collection collection to spill to disk * @return new, empty collection */ - protected def spill[A <: C](collection: A): A - - /** - * @return total number of times this collection was spilled - */ - protected def spillCount: Int = _spillCount + protected def spill(collection: C): Unit /** * @return number of bytes spilled in total @@ -95,8 +93,19 @@ private[spark] trait Spillable[C <: { def estimateSize(): Long }] { /** * Release our memory back to the shuffle pool so that other threads can grab it. */ - private[this] def release(): Unit = { + private[this] def releaseMemoryForThisThread(): Unit = { shuffleMemoryManager.release(myMemoryThreshold) myMemoryThreshold = 0L } + + /** + * Prints a standard log message detailing spillage. + * + * @param size number of bytes spilled + */ + @inline private[this] def logSpillage(size: Long) { + val threadId = Thread.currentThread().getId + logInfo("Thread %d spilling in-memory map of %d MB to disk (%d time%s so far)" + .format(threadId, size / (1024 * 1024), _spillCount, if (_spillCount > 1) "s" else "")) + } } From cf8be9a59f1dbca3d0dcfbd973c3858b6fa50d50 Mon Sep 17 00:00:00 2001 From: Jim Lim Date: Sat, 27 Sep 2014 17:46:57 -0700 Subject: [PATCH 4/4] SPARK-2761 fix documentation, reorder code --- .../spark/util/collection/Spillable.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index b5a67fef9b4f3..d7dccd4af8c6e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -19,7 +19,6 @@ package org.apache.spark.util.collection import org.apache.spark.Logging import org.apache.spark.SparkEnv -import scala.language.reflectiveCalls /** * Spills contents of an in-memory collection to disk when the memory threshold @@ -29,6 +28,13 @@ private[spark] trait Spillable[C] { this: Logging => + /** + * Spills the current in-memory collection to disk, and releases the memory. + * + * @param collection collection to spill to disk + */ + protected def spill(collection: C): Unit + // Number of elements read from input since last spill protected var elementsRead: Long @@ -51,7 +57,9 @@ private[spark] trait Spillable[C] { * Spills the current in-memory collection to disk if needed. Attempts to acquire more * memory before spilling. * - * @return if spilled, a new empty collection instance; otherwise, the same collection instance + * @param collection collection to spill to disk + * @param currentMemory estimated size of the collection in bytes + * @return true if `collection` was spilled to disk; false otherwise */ protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 && @@ -77,14 +85,6 @@ private[spark] trait Spillable[C] { false } - /** - * Spills the current in-memory collection to disk, and releases the memory. - * - * @param collection collection to spill to disk - * @return new, empty collection - */ - protected def spill(collection: C): Unit - /** * @return number of bytes spilled in total */ @@ -93,7 +93,7 @@ private[spark] trait Spillable[C] { /** * Release our memory back to the shuffle pool so that other threads can grab it. */ - private[this] def releaseMemoryForThisThread(): Unit = { + private def releaseMemoryForThisThread(): Unit = { shuffleMemoryManager.release(myMemoryThreshold) myMemoryThreshold = 0L } @@ -103,7 +103,7 @@ private[spark] trait Spillable[C] { * * @param size number of bytes spilled */ - @inline private[this] def logSpillage(size: Long) { + @inline private def logSpillage(size: Long) { val threadId = Thread.currentThread().getId logInfo("Thread %d spilling in-memory map of %d MB to disk (%d time%s so far)" .format(threadId, size / (1024 * 1024), _spillCount, if (_spillCount > 1) "s" else ""))