Skip to content

Commit

Permalink
SMJ inner range spill over implementation and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zecevicp committed Aug 10, 2018
1 parent 39247ba commit 6a8ff5d
Show file tree
Hide file tree
Showing 7 changed files with 440 additions and 425 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution

import java.util.ConcurrentModificationException

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, Queue}

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.internal.Logging
Expand All @@ -41,12 +41,16 @@ import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, Unsaf
* - If [[numRowsSpillThreshold]] is too low, data will be spilled frequently and lead to
* excessive disk writes. This may lead to a performance regression compared to the normal case
* of using an [[ArrayBuffer]] or [[Array]].
*
* If [[asQueue]] is set to true, the class will function as a queue, supporting peek() and
* dequeue() operations.
*/
private[sql] class ExternalAppendOnlyUnsafeRowArray(
taskMemoryManager: TaskMemoryManager,
blockManager: BlockManager,
serializerManager: SerializerManager,
taskContext: TaskContext,
asQueue: Boolean,
initialSize: Int,
pageSizeBytes: Long,
numRowsInMemoryBufferThreshold: Int,
Expand All @@ -58,6 +62,20 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
TaskContext.get(),
false,
1024,
SparkEnv.get.memoryManager.pageSizeBytes,
numRowsInMemoryBufferThreshold,
numRowsSpillThreshold)
}

def this(numRowsInMemoryBufferThreshold: Int, numRowsSpillThreshold: Int, asQueue: Boolean) {
this(
TaskContext.get().taskMemoryManager(),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
TaskContext.get(),
asQueue,
1024,
SparkEnv.get.memoryManager.pageSizeBytes,
numRowsInMemoryBufferThreshold,
Expand All @@ -67,7 +85,13 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
private val initialSizeOfInMemoryBuffer =
Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsInMemoryBufferThreshold)

private val inMemoryBuffer = if (initialSizeOfInMemoryBuffer > 0) {
private val inMemoryQueue = if (asQueue && initialSizeOfInMemoryBuffer > 0) {
new Queue[UnsafeRow]()
} else {
null
}

private val inMemoryBuffer = if (!asQueue && initialSizeOfInMemoryBuffer > 0) {
new ArrayBuffer[UnsafeRow](initialSizeOfInMemoryBuffer)
} else {
null
Expand All @@ -76,6 +100,9 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
private var spillableArray: UnsafeExternalSorter = _
private var numRows = 0

// Used when functioning as a queue to allow skipping 'dequeued' items
private var spillableArrayOffset = 0

// A counter to keep track of total modifications done to this array since its creation.
// This helps to invalidate iterators when there are changes done to the backing array.
private var modificationsCount: Long = 0
Expand All @@ -95,17 +122,60 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
// inside `UnsafeExternalSorter`
spillableArray.cleanupResources()
spillableArray = null
spillableArrayOffset = 0
} else if (inMemoryBuffer != null) {
inMemoryBuffer.clear()
} else if (inMemoryQueue != null) {
inMemoryQueue.clear()
}
numFieldsPerRow = 0
numRows = 0
modificationsCount += 1
}

def dequeue(): Option[UnsafeRow] = {
if (!asQueue) {
throw new IllegalStateException("Not instantiated as a queue!")
}
if (numRows == 0) {
None
}
else if (spillableArray != null) {
val retval = Some(generateIterator().next)
numRows -= 1
modificationsCount += 1
spillableArrayOffset += 1
retval
}
else {
numRows -= 1
modificationsCount += 1
Some(inMemoryQueue.dequeue())
}
}

def peek(): Option[UnsafeRow] = {
if (!asQueue) {
throw new IllegalStateException("Not instantiated as a queue!")
}
if (numRows == 0) {
None
}
else if (spillableArray != null) {
Some(generateIterator().next)
}
else {
Some(inMemoryQueue(0))
}
}

def add(unsafeRow: UnsafeRow): Unit = {
if (numRows < numRowsInMemoryBufferThreshold) {
inMemoryBuffer += unsafeRow.copy()
if (spillableArray == null && numRows < numRowsInMemoryBufferThreshold) {
if (asQueue) {
inMemoryQueue += unsafeRow.copy()
} else {
inMemoryBuffer += unsafeRow.copy()
}
} else {
if (spillableArray == null) {
logInfo(s"Reached spill threshold of $numRowsInMemoryBufferThreshold rows, switching to " +
Expand All @@ -124,8 +194,21 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
numRowsSpillThreshold,
false)

spillableArrayOffset = 0

// populate with existing in-memory buffered rows
if (inMemoryBuffer != null) {
if (asQueue && inMemoryQueue != null) {
inMemoryQueue.foreach(existingUnsafeRow =>
spillableArray.insertRecord(
existingUnsafeRow.getBaseObject,
existingUnsafeRow.getBaseOffset,
existingUnsafeRow.getSizeInBytes,
0,
false)
)
inMemoryQueue.clear()
}
if (!asQueue && inMemoryBuffer != null) {
inMemoryBuffer.foreach(existingUnsafeRow =>
spillableArray.insertRecord(
existingUnsafeRow.getBaseObject,
Expand Down Expand Up @@ -168,7 +251,8 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
if (spillableArray == null) {
new InMemoryBufferIterator(startIndex)
} else {
new SpillableArrayIterator(spillableArray.getIterator(startIndex), numFieldsPerRow)
new SpillableArrayIterator(spillableArray.getIterator(startIndex + spillableArrayOffset),
numFieldsPerRow)
}
}

Expand Down Expand Up @@ -198,7 +282,7 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(

override def next(): UnsafeRow = {
throwExceptionIfModified()
val result = inMemoryBuffer(currentIndex)
val result = if (asQueue) inMemoryQueue(currentIndex) else inMemoryBuffer(currentIndex)
currentIndex += 1
result
}
Expand Down

This file was deleted.

0 comments on commit 6a8ff5d

Please sign in to comment.