Skip to content

Commit

Permalink
[SPARK-21595] Separate thresholds for buffering and spilling in Exter…
Browse files Browse the repository at this point in the history
…nalAppendOnlyUnsafeRowArray

## What changes were proposed in this pull request?

[SPARK-21595](https://issues.apache.org/jira/browse/SPARK-21595) reported that there is excessive spilling to disk due to default spill threshold for `ExternalAppendOnlyUnsafeRowArray` being quite small for WINDOW operator. Old behaviour of WINDOW operator (pre apache#16909) would hold data in an array for first 4096 records post which it would switch to `UnsafeExternalSorter` and start spilling to disk after reaching `spark.shuffle.spill.numElementsForceSpillThreshold` (or earlier if there was paucity of memory due to excessive consumers).

Currently the (switch from in-memory to `UnsafeExternalSorter`) and (`UnsafeExternalSorter` spilling to disk) for `ExternalAppendOnlyUnsafeRowArray` is controlled by a single threshold. This PR aims to separate that to have more granular control.

## How was this patch tested?

Added unit tests

Author: Tejas Patil <tejasp@fb.com>

Closes apache#18843 from tejasapatil/SPARK-21595.
  • Loading branch information
tejasapatil authored and hvanhovell committed Aug 11, 2017
1 parent 0377338 commit 9443999
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -844,24 +844,47 @@ object SQLConf {
.stringConf
.createWithDefaultFunction(() => TimeZone.getDefault.getID)

val WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
buildConf("spark.sql.windowExec.buffer.in.memory.threshold")
.internal()
.doc("Threshold for number of rows guaranteed to be held in memory by the window operator")
.intConf
.createWithDefault(4096)

val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.windowExec.buffer.spill.threshold")
.internal()
.doc("Threshold for number of rows buffered in window operator")
.doc("Threshold for number of rows to be spilled by window operator")
.intConf
.createWithDefault(4096)
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)

val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
buildConf("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold")
.internal()
.doc("Threshold for number of rows guaranteed to be held in memory by the sort merge " +
"join operator")
.intConf
.createWithDefault(Int.MaxValue)

val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold")
.internal()
.doc("Threshold for number of rows buffered in sort merge join operator")
.doc("Threshold for number of rows to be spilled by sort merge join operator")
.intConf
.createWithDefault(Int.MaxValue)
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)

val CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
buildConf("spark.sql.cartesianProductExec.buffer.in.memory.threshold")
.internal()
.doc("Threshold for number of rows guaranteed to be held in memory by the cartesian " +
"product operator")
.intConf
.createWithDefault(4096)

val CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.cartesianProductExec.buffer.spill.threshold")
.internal()
.doc("Threshold for number of rows buffered in cartesian product operator")
.doc("Threshold for number of rows to be spilled by cartesian product operator")
.intConf
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)

Expand Down Expand Up @@ -1137,11 +1160,19 @@ class SQLConf extends Serializable with Logging {

def joinReorderDPStarFilter: Boolean = getConf(SQLConf.JOIN_REORDER_DP_STAR_FILTER)

def windowExecBufferInMemoryThreshold: Int = getConf(WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD)

def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD)

def sortMergeJoinExecBufferInMemoryThreshold: Int =
getConf(SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD)

def sortMergeJoinExecBufferSpillThreshold: Int =
getConf(SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD)

def cartesianProductExecBufferInMemoryThreshold: Int =
getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD)

def cartesianProductExecBufferSpillThreshold: Int =
getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ import org.apache.spark.storage.BlockManager
import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator}

/**
* An append-only array for [[UnsafeRow]]s that spills content to disk when there a predefined
* threshold of rows is reached.
* An append-only array for [[UnsafeRow]]s that strictly keeps content in an in-memory array
* until [[numRowsInMemoryBufferThreshold]] is reached post which it will switch to a mode which
* would flush to disk after [[numRowsSpillThreshold]] is met (or before if there is
* excessive memory consumption). Setting these threshold involves following trade-offs:
*
* Setting spill threshold faces following trade-off:
*
* - If the spill threshold is too high, the in-memory array may occupy more memory than is
* available, resulting in OOM.
* - If the spill threshold is too low, we spill frequently and incur unnecessary disk writes.
* This may lead to a performance regression compared to the normal case of using an
* [[ArrayBuffer]] or [[Array]].
* - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory array may occupy more memory
* than is available, resulting in OOM.
* - If [[numRowsSpillThreshold]] is too low, data will be spilled frequently and lead to
* excessive disk writes. This may lead to a performance regression compared to the normal case
* of using an [[ArrayBuffer]] or [[Array]].
*/
private[sql] class ExternalAppendOnlyUnsafeRowArray(
taskMemoryManager: TaskMemoryManager,
Expand All @@ -49,21 +49,23 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
taskContext: TaskContext,
initialSize: Int,
pageSizeBytes: Long,
numRowsInMemoryBufferThreshold: Int,
numRowsSpillThreshold: Int) extends Logging {

def this(numRowsSpillThreshold: Int) {
def this(numRowsInMemoryBufferThreshold: Int, numRowsSpillThreshold: Int) {
this(
TaskContext.get().taskMemoryManager(),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
TaskContext.get(),
1024,
SparkEnv.get.memoryManager.pageSizeBytes,
numRowsInMemoryBufferThreshold,
numRowsSpillThreshold)
}

private val initialSizeOfInMemoryBuffer =
Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsSpillThreshold)
Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsInMemoryBufferThreshold)

private val inMemoryBuffer = if (initialSizeOfInMemoryBuffer > 0) {
new ArrayBuffer[UnsafeRow](initialSizeOfInMemoryBuffer)
Expand Down Expand Up @@ -102,11 +104,11 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
}

def add(unsafeRow: UnsafeRow): Unit = {
if (numRows < numRowsSpillThreshold) {
if (numRows < numRowsInMemoryBufferThreshold) {
inMemoryBuffer += unsafeRow.copy()
} else {
if (spillableArray == null) {
logInfo(s"Reached spill threshold of $numRowsSpillThreshold rows, switching to " +
logInfo(s"Reached spill threshold of $numRowsInMemoryBufferThreshold rows, switching to " +
s"${classOf[UnsafeExternalSorter].getName}")

// We will not sort the rows, so prefixComparator and recordComparator are null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ class UnsafeCartesianRDD(
left : RDD[UnsafeRow],
right : RDD[UnsafeRow],
numFieldsOfRight: Int,
inMemoryBufferThreshold: Int,
spillThreshold: Int)
extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) {

override def compute(split: Partition, context: TaskContext): Iterator[(UnsafeRow, UnsafeRow)] = {
val rowArray = new ExternalAppendOnlyUnsafeRowArray(spillThreshold)
val rowArray = new ExternalAppendOnlyUnsafeRowArray(inMemoryBufferThreshold, spillThreshold)

val partition = split.asInstanceOf[CartesianPartition]
rdd2.iterator(partition.s2, context).foreach(rowArray.add)
Expand Down Expand Up @@ -71,9 +72,12 @@ case class CartesianProductExec(
val leftResults = left.execute().asInstanceOf[RDD[UnsafeRow]]
val rightResults = right.execute().asInstanceOf[RDD[UnsafeRow]]

val spillThreshold = sqlContext.conf.cartesianProductExecBufferSpillThreshold

val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size, spillThreshold)
val pair = new UnsafeCartesianRDD(
leftResults,
rightResults,
right.output.size,
sqlContext.conf.cartesianProductExecBufferInMemoryThreshold,
sqlContext.conf.cartesianProductExecBufferSpillThreshold)
pair.mapPartitionsWithIndexInternal { (index, iter) =>
val joiner = GenerateUnsafeRowJoiner.create(left.schema, right.schema)
val filtered = if (condition.isDefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,14 @@ case class SortMergeJoinExec(
sqlContext.conf.sortMergeJoinExecBufferSpillThreshold
}

private def getInMemoryThreshold: Int = {
sqlContext.conf.sortMergeJoinExecBufferInMemoryThreshold
}

protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val spillThreshold = getSpillThreshold
val inMemoryThreshold = getInMemoryThreshold
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
val boundCondition: (InternalRow) => Boolean = {
condition.map { cond =>
Expand All @@ -158,6 +163,7 @@ case class SortMergeJoinExec(
keyOrdering,
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
)
private[this] val joinRow = new JoinedRow
Expand Down Expand Up @@ -201,6 +207,7 @@ case class SortMergeJoinExec(
keyOrdering,
streamedIter = RowIterator.fromScala(leftIter),
bufferedIter = RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
)
val rightNullRow = new GenericInternalRow(right.output.length)
Expand All @@ -214,6 +221,7 @@ case class SortMergeJoinExec(
keyOrdering,
streamedIter = RowIterator.fromScala(rightIter),
bufferedIter = RowIterator.fromScala(leftIter),
inMemoryThreshold,
spillThreshold
)
val leftNullRow = new GenericInternalRow(left.output.length)
Expand Down Expand Up @@ -247,6 +255,7 @@ case class SortMergeJoinExec(
keyOrdering,
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
)
private[this] val joinRow = new JoinedRow
Expand Down Expand Up @@ -281,6 +290,7 @@ case class SortMergeJoinExec(
keyOrdering,
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
)
private[this] val joinRow = new JoinedRow
Expand Down Expand Up @@ -322,6 +332,7 @@ case class SortMergeJoinExec(
keyOrdering,
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
)
private[this] val joinRow = new JoinedRow
Expand Down Expand Up @@ -420,8 +431,10 @@ case class SortMergeJoinExec(
val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName

val spillThreshold = getSpillThreshold
val inMemoryThreshold = getInMemoryThreshold

ctx.addMutableState(clsName, matches, s"$matches = new $clsName($spillThreshold);")
ctx.addMutableState(clsName, matches,
s"$matches = new $clsName($inMemoryThreshold, $spillThreshold);")
// Copy the left keys as class members so they could be used in next function call.
val matchedKeyVars = copyKeys(ctx, leftKeyVars)

Expand Down Expand Up @@ -626,14 +639,18 @@ case class SortMergeJoinExec(
* @param streamedIter an input whose rows will be streamed.
* @param bufferedIter an input whose rows will be buffered to construct sequences of rows that
* have the same join key.
* @param inMemoryThreshold Threshold for number of rows guaranteed to be held in memory by
* internal buffer
* @param spillThreshold Threshold for number of rows to be spilled by internal buffer
*/
private[joins] class SortMergeJoinScanner(
streamedKeyGenerator: Projection,
bufferedKeyGenerator: Projection,
keyOrdering: Ordering[InternalRow],
streamedIter: RowIterator,
bufferedIter: RowIterator,
bufferThreshold: Int) {
inMemoryThreshold: Int,
spillThreshold: Int) {
private[this] var streamedRow: InternalRow = _
private[this] var streamedRowKey: InternalRow = _
private[this] var bufferedRow: InternalRow = _
Expand All @@ -644,7 +661,8 @@ private[joins] class SortMergeJoinScanner(
*/
private[this] var matchJoinKey: InternalRow = _
/** Buffered rows from the buffered side of the join. This is empty if there are no matches. */
private[this] val bufferedMatches = new ExternalAppendOnlyUnsafeRowArray(bufferThreshold)
private[this] val bufferedMatches =
new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)

// Initialization (note: do _not_ want to advance streamed here).
advancedBufferedToRowWithNullFreeJoinKey()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ case class WindowExec(
// Unwrap the expressions and factories from the map.
val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1)
val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray
val inMemoryThreshold = sqlContext.conf.windowExecBufferInMemoryThreshold
val spillThreshold = sqlContext.conf.windowExecBufferSpillThreshold

// Start processing.
Expand Down Expand Up @@ -322,7 +323,8 @@ case class WindowExec(
val inputFields = child.output.length

val buffer: ExternalAppendOnlyUnsafeRowArray =
new ExternalAppendOnlyUnsafeRowArray(spillThreshold)
new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)

var bufferIterator: Iterator[UnsafeRow] = _

val windowFunctionResult = new SpecificInternalRow(expressions.map(_.dataType))
Expand Down
3 changes: 2 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {

test("test SortMergeJoin (with spill)") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1",
"spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> "0") {
"spark.sql.sortMergeJoinExec.buffer.in.memory.threshold" -> "0",
"spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> "1") {

assertSpilled(sparkContext, "inner join") {
checkAnswer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
benchmark.addCase("ExternalAppendOnlyUnsafeRowArray") { _: Int =>
var sum = 0L
for (_ <- 0L until iterations) {
val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold)
val array = new ExternalAppendOnlyUnsafeRowArray(
ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer,
numSpillThreshold)

rows.foreach(x => array.add(x))

val iterator = array.generateIterator()
Expand Down Expand Up @@ -143,7 +146,7 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
benchmark.addCase("ExternalAppendOnlyUnsafeRowArray") { _: Int =>
var sum = 0L
for (_ <- 0L until iterations) {
val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold)
val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold, numSpillThreshold)
rows.foreach(x => array.add(x))

val iterator = array.generateIterator()
Expand Down
Loading

0 comments on commit 9443999

Please sign in to comment.