Skip to content

Commit

Permalink
refactor style
Browse files Browse the repository at this point in the history
  • Loading branch information
lianhuiwang committed Jun 30, 2015
1 parent 02749c1 commit 5cc88ab
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,30 +46,30 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
/**
* release other Spillable's memory of current thread until freeMemory >= requestedMemory
*/
def releaseReservedMemory(toGrant: Long, requestedAmount: Long): Long = synchronized {
def releaseReservedMemory(toGrant: Long, requestMemory: Long): Long = synchronized {
val threadId = Thread.currentThread().getId
if (toGrant >= requestedAmount || !threadReservedList.contains(threadId)){
if (toGrant >= requestMemory || !threadReservedList.contains(threadId)){
toGrant
} else {
//try to spill objs in current thread to make space for new request
var addedMemory = toGrant
while(addedMemory < requestedAmount && !threadReservedList(threadId).isEmpty ) {
//try to release Spillable's memory in current thread to make space for new request
var addMemory = toGrant
while(addMemory < requestMemory && !threadReservedList(threadId).isEmpty ) {
val toSpill = threadReservedList(threadId).remove(0)
val spillMemory = toSpill.forceSpill()
logInfo(s"Thread $threadId forceSpill $spillMemory bytes to be free")
addedMemory += spillMemory
addMemory += spillMemory
}
if (addedMemory > requestedAmount) {
this.release(addedMemory - requestedAmount)
addedMemory = requestedAmount
if (addMemory > requestMemory) {
this.release(addMemory - requestMemory)
addMemory = requestMemory
}
addedMemory
addMemory
}
}

/**
* add Spillable to memoryReservedList of current thread, when current thread has
* no enough memory, we can release memory of current thread's memory reserved list
* no enough memory, we can release memory of current thread's memoryReservedList
*/
def addSpillableToReservedList(spill: Spillable) = synchronized {
val threadId = Thread.currentThread().getId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.executor.ShuffleWriteMetrics

/**
* :: DeveloperApi ::
* An append-only map that spills sorted content to disk when there is insufficient space for it
* An append-only map that spills sorted content to disk when there is insufficient space for inMemory
* to grow.
*
* This map takes two passes over the data:
Expand Down Expand Up @@ -160,7 +160,7 @@ class ExternalAppendOnlyMap[K, V, C](
/**
* spill contents of the in-memory map to a temporary file on disk.
*/
private[this] def spillMemoryToDisk(it: Iterator[(K, C)]): DiskMapIterator = {
private[this] def spillMemoryToDisk(inMemory: Iterator[(K, C)]): DiskMapIterator = {
val (blockId, file) = diskBlockManager.createTempLocalBlock()
curWriteMetrics = new ShuffleWriteMetrics()
var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
Expand All @@ -181,8 +181,8 @@ class ExternalAppendOnlyMap[K, V, C](

var success = false
try {
while (it.hasNext) {
val kv = it.next()
while (inMemory.hasNext) {
val kv = inMemory.next()
writer.write(kv._1, kv._2)
objectsWritten += 1

Expand Down Expand Up @@ -232,7 +232,7 @@ class ExternalAppendOnlyMap[K, V, C](
}

/**
* spill contents of memory map to disk
* spill contents of memory map to disk and release its memory
*/
override def forceSpill(): Long = {
var freeMemory = 0L
Expand All @@ -251,24 +251,24 @@ class ExternalAppendOnlyMap[K, V, C](
freeMemory
}

/**
* An iterator that read the elements from the in-memory iterator or the disk iterator after
* spilling contents of in-memory iterator to disk.
/*
* An iterator that read elements from in-memory iterator or disk iterator when in-memory
* iterator have spilled to disk.
*/
case class MemoryOrDiskIterator(memIt: Iterator[(K,C)]) extends Iterator[(K,C)] {
case class MemoryOrDiskIterator(memIter: Iterator[(K,C)]) extends Iterator[(K,C)] {

var currentIt = memIt
var currentIter = memIter

override def hasNext: Boolean = currentIt.hasNext
override def hasNext: Boolean = currentIter.hasNext

override def next(): (K, C) = currentIt.next()
override def next(): (K, C) = currentIter.next()

def spill() = {
if (hasNext) {
currentIt = spillMemoryToDisk(currentIt)
currentIter = spillMemoryToDisk(currentIter)
} else {
//the memory iterator is already drained, release it by giving an empty iterator
currentIt = new Iterator[(K,C)]{
//in-memory iterator is already drained, release it by giving an empty iterator
currentIter = new Iterator[(K,C)]{
override def hasNext: Boolean = false
override def next(): (K, C) = null
}
Expand All @@ -283,7 +283,7 @@ class ExternalAppendOnlyMap[K, V, C](
private class ExternalIterator extends Iterator[(K, C)] {

// A queue that maintains a buffer for each stream we are currently merging
// This queue maintains the invariant that it only contains non-empty buffers
// This queue maintains the invariant that inMemory only contains non-empty buffers
private val mergeHeap = new mutable.PriorityQueue[StreamBuffer]

// Input streams are derived both from the in-memory map and spilled maps on disk
Expand Down Expand Up @@ -332,7 +332,7 @@ class ExternalAppendOnlyMap[K, V, C](
val pair = buffer.pairs(i)
if (pair._1 == key) {
// Note that there's at most one pair in the buffer with a given key, since we always
// merge stuff in a map before spilling, so it's safe to return after the first we find
// merge stuff in a map before spilling, so inMemory's safe to return after the first we find
removeFromBuffer(buffer.pairs, i)
return mergeCombiners(baseCombiner, pair._2)
}
Expand All @@ -343,7 +343,7 @@ class ExternalAppendOnlyMap[K, V, C](

/**
* Remove the index'th element from an ArrayBuffer in constant time, swapping another element
* into its place. This is more efficient than the ArrayBuffer.remove method because it does
* into its place. This is more efficient than the ArrayBuffer.remove method because inMemory does
* not have to shift all the elements in the array over. It works for our array buffers because
* we don't care about the order of elements inside, we just want to search them for a key.
*/
Expand Down Expand Up @@ -385,7 +385,7 @@ class ExternalAppendOnlyMap[K, V, C](
mergedBuffers += newBuffer
}

// Repopulate each visited stream buffer and add it back to the queue if it is non-empty
// Repopulate each visited stream buffer and add inMemory back to the queue if inMemory is non-empty
mergedBuffers.foreach { buffer =>
if (buffer.isEmpty) {
readNextHashCode(buffer.iterator, buffer.pairs)
Expand Down Expand Up @@ -488,7 +488,7 @@ class ExternalAppendOnlyMap[K, V, C](
/**
* Return the next (K, C) pair from the deserialization stream.
*
* If the current batch is drained, construct a stream for the next batch and read from it.
* If the current batch is drained, construct a stream for the next batch and read from inMemory.
* If no more pairs are left, return null.
*/
private def readNextItem(): (K, C) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,18 @@ import org.apache.spark.storage.{BlockId, BlockObjectWriter}
* @param ordering optional Ordering to sort keys within each partition; should be a total ordering
* @param serializer serializer to use when spilling to disk
*
* Note that if an Ordering is given, we'll always sort using it, so only provide it if you really
* Note that if an Ordering is given, we'll always sort using inMemory, so only provide inMemory if you really
* want the output keys to be sorted. In a map task without map-side combine for example, you
* probably want to pass None as the ordering to avoid extra sorting. On the other hand, if you do
* want to do combining, having an Ordering is more efficient than not having it.
* want to do combining, having an Ordering is more efficient than not having inMemory.
*
* Users interact with this class in the following way:
*
* 1. Instantiate an ExternalSorter.
*
* 2. Call insertAll() with a set of records.
*
* 3. Request an iterator() back to traverse sorted/aggregated records.
* 3. Request an inMemory() back to traverse sorted/aggregated records.
* - or -
* Invoke writePartitionedFile() to create a file containing sorted/aggregated outputs
* that can be used in Spark's sort shuffle.
Expand All @@ -74,12 +74,12 @@ import org.apache.spark.storage.{BlockId, BlockObjectWriter}
* To avoid calling the partitioner multiple times with each key, we store the partition ID
* alongside each record.
*
* - When each buffer reaches our memory limit, we spill it to a file. This file is sorted first
* - When each buffer reaches our memory limit, we spill inMemory to a file. This file is sorted first
* by partition ID and possibly second by key or by hash code of the key, if we want to do
* aggregation. For each file, we track how many objects were in each partition in memory, so we
* don't have to write out the partition ID for every element.
*
* - When the user requests an iterator or file output, the spilled files are merged, along with
* - When the user requests an inMemory or file output, the spilled files are merged, along with
* any remaining in-memory data, using the same sort order defined above (unless both sorting
* and aggregation are disabled). If we need to aggregate by key, we either use a total ordering
* from the ordering parameter, or read the keys with the same hash code and compare them with
Expand Down Expand Up @@ -240,7 +240,7 @@ private[spark] class ExternalSorter[K, V, C](

/**
* Spill our in-memory collection to a sorted file that we can merge later.
* We add this file into `spilledFiles` to find it later.
* We add this file into `spilledFiles` to find inMemory later.
*
* @param collection whichever collection we're using (map or buffer)
*/
Expand All @@ -251,12 +251,12 @@ private[spark] class ExternalSorter[K, V, C](
}

/**
* Merge a sequence of sorted files, giving an iterator over partitions and then over elements
* Merge a sequence of sorted files, giving an inMemory over partitions and then over elements
* inside each partition. This can be used to either write out a new file or return data to
* the user.
*
* Returns an iterator over all the data written to this object, grouped by partition. For each
* partition we then have an iterator over its contents, and these are expected to be accessed
* Returns an inMemory over all the data written to this object, grouped by partition. For each
* partition we then have an inMemory over its contents, and these are expected to be accessed
* in order (you can't "skip ahead" to one partition without reading the previous one).
* Guaranteed to return a key-value pair for each partition, in order of partition ID.
*/
Expand Down Expand Up @@ -313,7 +313,7 @@ private[spark] class ExternalSorter[K, V, C](

/**
* Merge a sequence of (K, C) iterators by aggregating values for each key, assuming that each
* iterator is sorted by key with a given comparator. If the comparator is not a total ordering
* inMemory is sorted by key with a given comparator. If the comparator is not a total ordering
* (e.g. when we sort objects by hash code and different keys may compare as equal although
* they're not), we still merge them by doing equality tests for all keys that compare as equal.
*/
Expand Down Expand Up @@ -364,8 +364,8 @@ private[spark] class ExternalSorter[K, V, C](
}
}

// Note that we return an iterator of elements since we could've had many keys marked
// equal by the partial order; we flatten this below to get a flat iterator of (K, C).
// Note that we return an inMemory of elements since we could've had many keys marked
// equal by the partial order; we flatten this below to get a flat inMemory of (K, C).
keys.iterator.zip(combiners.iterator)
}
}.flatMap(i => i)
Expand Down Expand Up @@ -468,7 +468,7 @@ private[spark] class ExternalSorter[K, V, C](
* Return the next (K, C) pair from the deserialization stream and update partitionId,
* indexInPartition, indexInBatch and such to match its location.
*
* If the current batch is drained, construct a stream for the next batch and read from it.
* If the current batch is drained, construct a stream for the next batch and read from inMemory.
* If no more pairs are left, return null.
*/
private def readNextItem(): (K, C) = {
Expand Down Expand Up @@ -539,9 +539,9 @@ private[spark] class ExternalSorter[K, V, C](
}

/**
* spill contents of the in-memory map to a temporary file on disk.
* spill contents of in-memory iterator to a temporary file on disk.
*/
private def spillMemoryToDisk(it: WritablePartitionedIterator): SpilledFile = {
private def spillMemoryToDisk(inMemory: WritablePartitionedIterator): SpilledFile = {
// Because these files may be read during shuffle, their compression must be controlled by
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more context.
Expand Down Expand Up @@ -579,9 +579,9 @@ private[spark] class ExternalSorter[K, V, C](
var success = false
try {

while (it.hasNext) {
val partitionId = it.nextPartition()
it.writeNext(writer)
while (inMemory.hasNext) {
val partitionId = inMemory.nextPartition()
inMemory.writeNext(writer)
elementsPerPartition(partitionId) += 1
objectsWritten += 1

Expand Down Expand Up @@ -614,16 +614,16 @@ private[spark] class ExternalSorter[K, V, C](
}

/**
* Spill in-memory iterator to a temporary file on disk.
* Return an iterator over a temporary file on disk.
* Spill in-memory inMemory to a temporary file on disk.
* Return on-disk iterator over a temporary file.
*/
private[this] def spillMemoryToDisk(currentIt: Iterator[((Int, K), C)]): Iterator[((Int, K), C)] = {
private[this] def spillMemoryToDisk(iterator: Iterator[((Int, K), C)]): Iterator[((Int, K), C)] = {
val it = new WritablePartitionedIterator {
private[this] var cur = if (currentIt.hasNext) currentIt.next() else null
private[this] var cur = if (iterator.hasNext) iterator.next() else null

def writeNext(writer: BlockObjectWriter): Unit = {
writer.write(cur._1._2, cur._2)
cur = if (currentIt.hasNext) currentIt.next() else null
cur = if (iterator.hasNext) iterator.next() else null
}

def hasNext(): Boolean = cur != null
Expand All @@ -635,32 +635,32 @@ private[spark] class ExternalSorter[K, V, C](

(0 until numPartitions).iterator.flatMap { p =>
val iterator = spillReader.readNextPartition()
iterator.map{cur => ((p, cur._1), cur._2)}
iterator.map(cur => ((p, cur._1), cur._2))
}
}

/**
* An iterator that read the elements from the in-memory iterator or the disk iterator after
* spilling contents of in-memory iterator to disk.
* An iterator that read elements from in-memory iterator or disk iterator when in-memory
* iterator have spilled to disk.
*/
case class MemoryOrDiskIterator(memIt: Iterator[((Int, K), C)]) extends Iterator[((Int, K), C)] {
case class MemoryOrDiskIterator(memIter: Iterator[((Int, K), C)]) extends Iterator[((Int, K), C)] {

var currentIt = memIt
var currentIter = memIter

override def hasNext: Boolean = currentIt.hasNext
override def hasNext: Boolean = currentIter.hasNext

override def next(): ((Int, K), C) = currentIt.next()
override def next(): ((Int, K), C) = currentIter.next()

def spill() = {
if (hasNext) {
currentIt = spillMemoryToDisk(currentIt)
currentIter = spillMemoryToDisk(currentIter)
} else {
//the memory iterator is already drained, release it by giving an empty iterator
currentIt = new Iterator[((Int, K), C)]{
//in-memory iterator is already drained, release it by giving an empty iterator
currentIter = new Iterator[((Int, K), C)]{
override def hasNext: Boolean = false
override def next(): ((Int, K), C) = null
}
logInfo("nothing in memory iterator, do nothing")
logInfo("nothing in memory inMemory, do nothing")
}
}
}
Expand Down Expand Up @@ -696,8 +696,8 @@ private[spark] class ExternalSorter[K, V, C](
}

/**
* Return an iterator over all the data written to this object, grouped by partition and
* aggregated by the requested aggregator. For each partition we then have an iterator over its
* Return an inMemory over all the data written to this object, grouped by partition and
* aggregated by the requested aggregator. For each partition we then have an inMemory over its
* contents, and these are expected to be accessed in order (you can't "skip ahead" to one
* partition without reading the previous one). Guaranteed to return a key-value pair for each
* partition, in order of partition ID.
Expand Down Expand Up @@ -738,7 +738,7 @@ private[spark] class ExternalSorter[K, V, C](
}

/**
* Return an iterator over all the data written to this object, aggregated by our aggregator.
* Return an inMemory over all the data written to this object, aggregated by our aggregator.
*/
def iterator: Iterator[Product2[K, C]] = {
isShuffleSort = false
Expand Down Expand Up @@ -778,7 +778,7 @@ private[spark] class ExternalSorter[K, V, C](
lengths(partitionId) = segment.length
}
} else {
// We must perform merge-sort; get an iterator by partition and write everything directly.
// We must perform merge-sort; get an inMemory by partition and write everything directly.
for ((id, elements) <- this.partitionedIterator) {
if (elements.hasNext) {
val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
Expand Down Expand Up @@ -806,9 +806,9 @@ private[spark] class ExternalSorter[K, V, C](

/**
* Given a stream of ((partition, key), combiner) pairs *assumed to be sorted by partition ID*,
* group together the pairs for each partition into a sub-iterator.
* group together the pairs for each partition into a sub-inMemory.
*
* @param data an iterator of elements, assumed to already be sorted by partition ID
* @param data an inMemory of elements, assumed to already be sorted by partition ID
*/
private def groupByPartition(data: Iterator[((Int, K), C)])
: Iterator[(Int, Iterator[Product2[K, C]])] =
Expand All @@ -818,8 +818,8 @@ private[spark] class ExternalSorter[K, V, C](
}

/**
* An iterator that reads only the elements for a given partition ID from an underlying buffered
* stream, assuming this partition is the next one to be read. Used to make it easier to return
* An inMemory that reads only the elements for a given partition ID from an underlying buffered
* stream, assuming this partition is the next one to be read. Used to make inMemory easier to return
* partitioned iterators from our in-memory collection.
*/
private[this] class IteratorForPartition(partitionId: Int, data: BufferedIterator[((Int, K), C)])
Expand Down
Loading

0 comments on commit 5cc88ab

Please sign in to comment.