From 5cc88ab51cd974a8ad2438019a5e04052c7e2b0b Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Wed, 1 Jul 2015 00:10:07 +0800 Subject: [PATCH] refactor style --- .../spark/shuffle/ShuffleMemoryManager.scala | 22 ++--- .../collection/ExternalAppendOnlyMap.scala | 40 ++++----- .../util/collection/ExternalSorter.scala | 84 +++++++++---------- .../shuffle/ShuffleMemoryManagerSuite.scala | 5 ++ 4 files changed, 78 insertions(+), 73 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala index 727063706aa56..cff9b3aa2c05b 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala @@ -46,30 +46,30 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging { /** * release other Spillable's memory of current thread until freeMemory >= requestedMemory */ - def releaseReservedMemory(toGrant: Long, requestedAmount: Long): Long = synchronized { + def releaseReservedMemory(toGrant: Long, requestMemory: Long): Long = synchronized { val threadId = Thread.currentThread().getId - if (toGrant >= requestedAmount || !threadReservedList.contains(threadId)){ + if (toGrant >= requestMemory || !threadReservedList.contains(threadId)){ toGrant } else { - //try to spill objs in current thread to make space for new request - var addedMemory = toGrant - while(addedMemory < requestedAmount && !threadReservedList(threadId).isEmpty ) { + //try to release Spillable's memory in current thread to make space for new request + var addMemory = toGrant + while(addMemory < requestMemory && !threadReservedList(threadId).isEmpty ) { val toSpill = threadReservedList(threadId).remove(0) val spillMemory = toSpill.forceSpill() logInfo(s"Thread $threadId forceSpill $spillMemory bytes to be free") - addedMemory += spillMemory + addMemory += spillMemory } - if (addedMemory > requestedAmount) { - this.release(addedMemory - requestedAmount) - addedMemory = requestedAmount + if (addMemory > requestMemory) { + this.release(addMemory - requestMemory) + addMemory = requestMemory } - addedMemory + addMemory } } /** * add Spillable to memoryReservedList of current thread, when current thread has - * no enough memory, we can release memory of current thread's memory reserved list + * no enough memory, we can release memory of current thread's memoryReservedList */ def addSpillableToReservedList(spill: Spillable) = synchronized { val threadId = Thread.currentThread().getId diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index c7b71cc00795f..2d9cdf4a31ab6 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -35,7 +35,7 @@ import org.apache.spark.executor.ShuffleWriteMetrics /** * :: DeveloperApi :: - * An append-only map that spills sorted content to disk when there is insufficient space for it + * An append-only map that spills sorted content to disk when there is insufficient space for inMemory * to grow. * * This map takes two passes over the data: @@ -160,7 +160,7 @@ class ExternalAppendOnlyMap[K, V, C]( /** * spill contents of the in-memory map to a temporary file on disk. */ - private[this] def spillMemoryToDisk(it: Iterator[(K, C)]): DiskMapIterator = { + private[this] def spillMemoryToDisk(inMemory: Iterator[(K, C)]): DiskMapIterator = { val (blockId, file) = diskBlockManager.createTempLocalBlock() curWriteMetrics = new ShuffleWriteMetrics() var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics) @@ -181,8 +181,8 @@ class ExternalAppendOnlyMap[K, V, C]( var success = false try { - while (it.hasNext) { - val kv = it.next() + while (inMemory.hasNext) { + val kv = inMemory.next() writer.write(kv._1, kv._2) objectsWritten += 1 @@ -232,7 +232,7 @@ class ExternalAppendOnlyMap[K, V, C]( } /** - * spill contents of memory map to disk + * spill contents of memory map to disk and release its memory */ override def forceSpill(): Long = { var freeMemory = 0L @@ -251,24 +251,24 @@ class ExternalAppendOnlyMap[K, V, C]( freeMemory } - /** - * An iterator that read the elements from the in-memory iterator or the disk iterator after - * spilling contents of in-memory iterator to disk. + /* + * An iterator that read elements from in-memory iterator or disk iterator when in-memory + * iterator have spilled to disk. */ - case class MemoryOrDiskIterator(memIt: Iterator[(K,C)]) extends Iterator[(K,C)] { + case class MemoryOrDiskIterator(memIter: Iterator[(K,C)]) extends Iterator[(K,C)] { - var currentIt = memIt + var currentIter = memIter - override def hasNext: Boolean = currentIt.hasNext + override def hasNext: Boolean = currentIter.hasNext - override def next(): (K, C) = currentIt.next() + override def next(): (K, C) = currentIter.next() def spill() = { if (hasNext) { - currentIt = spillMemoryToDisk(currentIt) + currentIter = spillMemoryToDisk(currentIter) } else { - //the memory iterator is already drained, release it by giving an empty iterator - currentIt = new Iterator[(K,C)]{ + //in-memory iterator is already drained, release it by giving an empty iterator + currentIter = new Iterator[(K,C)]{ override def hasNext: Boolean = false override def next(): (K, C) = null } @@ -283,7 +283,7 @@ class ExternalAppendOnlyMap[K, V, C]( private class ExternalIterator extends Iterator[(K, C)] { // A queue that maintains a buffer for each stream we are currently merging - // This queue maintains the invariant that it only contains non-empty buffers + // This queue maintains the invariant that inMemory only contains non-empty buffers private val mergeHeap = new mutable.PriorityQueue[StreamBuffer] // Input streams are derived both from the in-memory map and spilled maps on disk @@ -332,7 +332,7 @@ class ExternalAppendOnlyMap[K, V, C]( val pair = buffer.pairs(i) if (pair._1 == key) { // Note that there's at most one pair in the buffer with a given key, since we always - // merge stuff in a map before spilling, so it's safe to return after the first we find + // merge stuff in a map before spilling, so inMemory's safe to return after the first we find removeFromBuffer(buffer.pairs, i) return mergeCombiners(baseCombiner, pair._2) } @@ -343,7 +343,7 @@ class ExternalAppendOnlyMap[K, V, C]( /** * Remove the index'th element from an ArrayBuffer in constant time, swapping another element - * into its place. This is more efficient than the ArrayBuffer.remove method because it does + * into its place. This is more efficient than the ArrayBuffer.remove method because inMemory does * not have to shift all the elements in the array over. It works for our array buffers because * we don't care about the order of elements inside, we just want to search them for a key. */ @@ -385,7 +385,7 @@ class ExternalAppendOnlyMap[K, V, C]( mergedBuffers += newBuffer } - // Repopulate each visited stream buffer and add it back to the queue if it is non-empty + // Repopulate each visited stream buffer and add inMemory back to the queue if inMemory is non-empty mergedBuffers.foreach { buffer => if (buffer.isEmpty) { readNextHashCode(buffer.iterator, buffer.pairs) @@ -488,7 +488,7 @@ class ExternalAppendOnlyMap[K, V, C]( /** * Return the next (K, C) pair from the deserialization stream. * - * If the current batch is drained, construct a stream for the next batch and read from it. + * If the current batch is drained, construct a stream for the next batch and read from inMemory. * If no more pairs are left, return null. */ private def readNextItem(): (K, C) = { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 922a1b23fa5e4..cbf5a1aa5c2a3 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -50,10 +50,10 @@ import org.apache.spark.storage.{BlockId, BlockObjectWriter} * @param ordering optional Ordering to sort keys within each partition; should be a total ordering * @param serializer serializer to use when spilling to disk * - * Note that if an Ordering is given, we'll always sort using it, so only provide it if you really + * Note that if an Ordering is given, we'll always sort using inMemory, so only provide inMemory if you really * want the output keys to be sorted. In a map task without map-side combine for example, you * probably want to pass None as the ordering to avoid extra sorting. On the other hand, if you do - * want to do combining, having an Ordering is more efficient than not having it. + * want to do combining, having an Ordering is more efficient than not having inMemory. * * Users interact with this class in the following way: * @@ -61,7 +61,7 @@ import org.apache.spark.storage.{BlockId, BlockObjectWriter} * * 2. Call insertAll() with a set of records. * - * 3. Request an iterator() back to traverse sorted/aggregated records. + * 3. Request an inMemory() back to traverse sorted/aggregated records. * - or - * Invoke writePartitionedFile() to create a file containing sorted/aggregated outputs * that can be used in Spark's sort shuffle. @@ -74,12 +74,12 @@ import org.apache.spark.storage.{BlockId, BlockObjectWriter} * To avoid calling the partitioner multiple times with each key, we store the partition ID * alongside each record. * - * - When each buffer reaches our memory limit, we spill it to a file. This file is sorted first + * - When each buffer reaches our memory limit, we spill inMemory to a file. This file is sorted first * by partition ID and possibly second by key or by hash code of the key, if we want to do * aggregation. For each file, we track how many objects were in each partition in memory, so we * don't have to write out the partition ID for every element. * - * - When the user requests an iterator or file output, the spilled files are merged, along with + * - When the user requests an inMemory or file output, the spilled files are merged, along with * any remaining in-memory data, using the same sort order defined above (unless both sorting * and aggregation are disabled). If we need to aggregate by key, we either use a total ordering * from the ordering parameter, or read the keys with the same hash code and compare them with @@ -240,7 +240,7 @@ private[spark] class ExternalSorter[K, V, C]( /** * Spill our in-memory collection to a sorted file that we can merge later. - * We add this file into `spilledFiles` to find it later. + * We add this file into `spilledFiles` to find inMemory later. * * @param collection whichever collection we're using (map or buffer) */ @@ -251,12 +251,12 @@ private[spark] class ExternalSorter[K, V, C]( } /** - * Merge a sequence of sorted files, giving an iterator over partitions and then over elements + * Merge a sequence of sorted files, giving an inMemory over partitions and then over elements * inside each partition. This can be used to either write out a new file or return data to * the user. * - * Returns an iterator over all the data written to this object, grouped by partition. For each - * partition we then have an iterator over its contents, and these are expected to be accessed + * Returns an inMemory over all the data written to this object, grouped by partition. For each + * partition we then have an inMemory over its contents, and these are expected to be accessed * in order (you can't "skip ahead" to one partition without reading the previous one). * Guaranteed to return a key-value pair for each partition, in order of partition ID. */ @@ -313,7 +313,7 @@ private[spark] class ExternalSorter[K, V, C]( /** * Merge a sequence of (K, C) iterators by aggregating values for each key, assuming that each - * iterator is sorted by key with a given comparator. If the comparator is not a total ordering + * inMemory is sorted by key with a given comparator. If the comparator is not a total ordering * (e.g. when we sort objects by hash code and different keys may compare as equal although * they're not), we still merge them by doing equality tests for all keys that compare as equal. */ @@ -364,8 +364,8 @@ private[spark] class ExternalSorter[K, V, C]( } } - // Note that we return an iterator of elements since we could've had many keys marked - // equal by the partial order; we flatten this below to get a flat iterator of (K, C). + // Note that we return an inMemory of elements since we could've had many keys marked + // equal by the partial order; we flatten this below to get a flat inMemory of (K, C). keys.iterator.zip(combiners.iterator) } }.flatMap(i => i) @@ -468,7 +468,7 @@ private[spark] class ExternalSorter[K, V, C]( * Return the next (K, C) pair from the deserialization stream and update partitionId, * indexInPartition, indexInBatch and such to match its location. * - * If the current batch is drained, construct a stream for the next batch and read from it. + * If the current batch is drained, construct a stream for the next batch and read from inMemory. * If no more pairs are left, return null. */ private def readNextItem(): (K, C) = { @@ -539,9 +539,9 @@ private[spark] class ExternalSorter[K, V, C]( } /** - * spill contents of the in-memory map to a temporary file on disk. + * spill contents of in-memory iterator to a temporary file on disk. */ - private def spillMemoryToDisk(it: WritablePartitionedIterator): SpilledFile = { + private def spillMemoryToDisk(inMemory: WritablePartitionedIterator): SpilledFile = { // Because these files may be read during shuffle, their compression must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more context. @@ -579,9 +579,9 @@ private[spark] class ExternalSorter[K, V, C]( var success = false try { - while (it.hasNext) { - val partitionId = it.nextPartition() - it.writeNext(writer) + while (inMemory.hasNext) { + val partitionId = inMemory.nextPartition() + inMemory.writeNext(writer) elementsPerPartition(partitionId) += 1 objectsWritten += 1 @@ -614,16 +614,16 @@ private[spark] class ExternalSorter[K, V, C]( } /** - * Spill in-memory iterator to a temporary file on disk. - * Return an iterator over a temporary file on disk. + * Spill in-memory inMemory to a temporary file on disk. + * Return on-disk iterator over a temporary file. */ - private[this] def spillMemoryToDisk(currentIt: Iterator[((Int, K), C)]): Iterator[((Int, K), C)] = { + private[this] def spillMemoryToDisk(iterator: Iterator[((Int, K), C)]): Iterator[((Int, K), C)] = { val it = new WritablePartitionedIterator { - private[this] var cur = if (currentIt.hasNext) currentIt.next() else null + private[this] var cur = if (iterator.hasNext) iterator.next() else null def writeNext(writer: BlockObjectWriter): Unit = { writer.write(cur._1._2, cur._2) - cur = if (currentIt.hasNext) currentIt.next() else null + cur = if (iterator.hasNext) iterator.next() else null } def hasNext(): Boolean = cur != null @@ -635,32 +635,32 @@ private[spark] class ExternalSorter[K, V, C]( (0 until numPartitions).iterator.flatMap { p => val iterator = spillReader.readNextPartition() - iterator.map{cur => ((p, cur._1), cur._2)} + iterator.map(cur => ((p, cur._1), cur._2)) } } /** - * An iterator that read the elements from the in-memory iterator or the disk iterator after - * spilling contents of in-memory iterator to disk. + * An iterator that read elements from in-memory iterator or disk iterator when in-memory + * iterator have spilled to disk. */ - case class MemoryOrDiskIterator(memIt: Iterator[((Int, K), C)]) extends Iterator[((Int, K), C)] { + case class MemoryOrDiskIterator(memIter: Iterator[((Int, K), C)]) extends Iterator[((Int, K), C)] { - var currentIt = memIt + var currentIter = memIter - override def hasNext: Boolean = currentIt.hasNext + override def hasNext: Boolean = currentIter.hasNext - override def next(): ((Int, K), C) = currentIt.next() + override def next(): ((Int, K), C) = currentIter.next() def spill() = { if (hasNext) { - currentIt = spillMemoryToDisk(currentIt) + currentIter = spillMemoryToDisk(currentIter) } else { - //the memory iterator is already drained, release it by giving an empty iterator - currentIt = new Iterator[((Int, K), C)]{ + //in-memory iterator is already drained, release it by giving an empty iterator + currentIter = new Iterator[((Int, K), C)]{ override def hasNext: Boolean = false override def next(): ((Int, K), C) = null } - logInfo("nothing in memory iterator, do nothing") + logInfo("nothing in memory inMemory, do nothing") } } } @@ -696,8 +696,8 @@ private[spark] class ExternalSorter[K, V, C]( } /** - * Return an iterator over all the data written to this object, grouped by partition and - * aggregated by the requested aggregator. For each partition we then have an iterator over its + * Return an inMemory over all the data written to this object, grouped by partition and + * aggregated by the requested aggregator. For each partition we then have an inMemory over its * contents, and these are expected to be accessed in order (you can't "skip ahead" to one * partition without reading the previous one). Guaranteed to return a key-value pair for each * partition, in order of partition ID. @@ -738,7 +738,7 @@ private[spark] class ExternalSorter[K, V, C]( } /** - * Return an iterator over all the data written to this object, aggregated by our aggregator. + * Return an inMemory over all the data written to this object, aggregated by our aggregator. */ def iterator: Iterator[Product2[K, C]] = { isShuffleSort = false @@ -778,7 +778,7 @@ private[spark] class ExternalSorter[K, V, C]( lengths(partitionId) = segment.length } } else { - // We must perform merge-sort; get an iterator by partition and write everything directly. + // We must perform merge-sort; get an inMemory by partition and write everything directly. for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, @@ -806,9 +806,9 @@ private[spark] class ExternalSorter[K, V, C]( /** * Given a stream of ((partition, key), combiner) pairs *assumed to be sorted by partition ID*, - * group together the pairs for each partition into a sub-iterator. + * group together the pairs for each partition into a sub-inMemory. * - * @param data an iterator of elements, assumed to already be sorted by partition ID + * @param data an inMemory of elements, assumed to already be sorted by partition ID */ private def groupByPartition(data: Iterator[((Int, K), C)]) : Iterator[(Int, Iterator[Product2[K, C]])] = @@ -818,8 +818,8 @@ private[spark] class ExternalSorter[K, V, C]( } /** - * An iterator that reads only the elements for a given partition ID from an underlying buffered - * stream, assuming this partition is the next one to be read. Used to make it easier to return + * An inMemory that reads only the elements for a given partition ID from an underlying buffered + * stream, assuming this partition is the next one to be read. Used to make inMemory easier to return * partitioned iterators from our in-memory collection. */ private[this] class IteratorForPartition(partitionId: Int, data: BufferedIterator[((Int, K), C)]) diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala index c31294b4233bc..0b1bb733d44da 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala @@ -26,7 +26,9 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.Spillable class FakeSpillable extends Spillable { + var myMemoryThreshold: Long = 0L + def addMemory(currentMemory: Long) = { myMemoryThreshold += currentMemory } @@ -322,11 +324,14 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts { test("latter spillable grab full memory of previous spillable") { val manager = new ShuffleMemoryManager(1000L) + val spill1 = new FakeSpillable() val spill2 = new FakeSpillable() + spill1.addMemory(manager.tryToAcquire(700L)) spill1.addMemory(manager.tryToAcquire(300L)) manager.addSpillableToReservedList(spill1) + val granted1 = manager.tryToAcquire(300L) assert(300L === granted1, "granted memory") val granted2 = manager.tryToAcquire(800L)