Skip to content

Commit

Permalink
[SPARK-13450] Introduce ExternalAppendOnlyUnsafeRowArray. Change Cart…
Browse files Browse the repository at this point in the history
…esianProductExec, SortMergeJoin, WindowExec to use it

## What issue does this PR address ?

Jira: https://issues.apache.org/jira/browse/SPARK-13450

In `SortMergeJoinExec`, rows of the right relation having the same value for a join key are buffered in-memory. In case of skew, this causes OOMs (see comments in SPARK-13450 for more details). Heap dump from a failed job confirms this : https://issues.apache.org/jira/secure/attachment/12846382/heap-dump-analysis.png . While its possible to increase the heap size to workaround, Spark should be resilient to such issues as skews can happen arbitrarily.

## Change proposed in this pull request

- Introduces `ExternalAppendOnlyUnsafeRowArray`
  - It holds `UnsafeRow`s in-memory upto a certain threshold.
  - After the threshold is hit, it switches to `UnsafeExternalSorter` which enables spilling of the rows to disk. It does NOT sort the data.
  - Allows iterating the array multiple times. However, any alteration to the array (using `add` or `clear`) will invalidate the existing iterator(s)
- `WindowExec` was already using `UnsafeExternalSorter` to support spilling. Changed it to use the new array
- Changed `SortMergeJoinExec` to use the new array implementation
  - NOTE: I have not changed FULL OUTER JOIN to use this new array implementation. Changing that will need more surgery and I will rather put up a separate PR for that once this gets in.
- Changed `CartesianProductExec` to use the new array implementation

#### Note for reviewers

The diff can be divided into 3 parts. My motive behind having all the changes in a single PR was to demonstrate that the API is sane and supports 2 use cases. If reviewing as 3 separate PRs would help, I am happy to make the split.

## How was this patch tested ?

#### Unit testing
- Added unit tests `ExternalAppendOnlyUnsafeRowArray` to validate all its APIs and access patterns
- Added unit test for `SortMergeExec`
  - with and without spill for inner join, left outer join, right outer join to confirm that the spill threshold config behaves as expected and output is as expected.
  - This PR touches the scanning logic in `SortMergeExec` for _all_ joins (except FULL OUTER JOIN). However, I expect existing test cases to cover that there is no regression in correctness.
- Added unit test for `WindowExec` to check behavior of spilling and correctness of results.

#### Stress testing
- Confirmed that OOM is gone by running against a production job which used to OOM
- Since I cannot share details about prod workload externally, created synthetic data to mimic the issue. Ran before and after the fix to demonstrate the issue and query success with this PR

Generating the synthetic data

```
./bin/spark-shell --driver-memory=6G

import org.apache.spark.sql._
val hc = SparkSession.builder.master("local").getOrCreate()

hc.sql("DROP TABLE IF EXISTS spark_13450_large_table").collect
hc.sql("DROP TABLE IF EXISTS spark_13450_one_row_table").collect

val df1 = (0 until 1).map(i => ("10", "100", i.toString, (i * 2).toString)).toDF("i", "j", "str1", "str2")
df1.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(100, "i", "j").sortBy("i", "j").saveAsTable("spark_13450_one_row_table")

val df2 = (0 until 3000000).map(i => ("10", "100", i.toString, (i * 2).toString)).toDF("i", "j", "str1", "str2")
df2.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(100, "i", "j").sortBy("i", "j").saveAsTable("spark_13450_large_table")
```

Ran this against trunk VS local build with this PR. OOM repros with trunk and with the fix this query runs fine.

```
./bin/spark-shell --driver-java-options="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/spark.driver.heapdump.hprof"

import org.apache.spark.sql._
val hc = SparkSession.builder.master("local").getOrCreate()
hc.sql("SET spark.sql.autoBroadcastJoinThreshold=1")
hc.sql("SET spark.sql.sortMergeJoinExec.buffer.spill.threshold=10000")

hc.sql("DROP TABLE IF EXISTS spark_13450_result").collect
hc.sql("""
  CREATE TABLE spark_13450_result
  AS
  SELECT
    a.i AS a_i, a.j AS a_j, a.str1 AS a_str1, a.str2 AS a_str2,
    b.i AS b_i, b.j AS b_j, b.str1 AS b_str1, b.str2 AS b_str2
  FROM
    spark_13450_one_row_table a
  JOIN
    spark_13450_large_table b
  ON
    a.i=b.i AND
    a.j=b.j
""")
```

## Performance comparison

### Macro-benchmark

I ran a SMB join query over two real world tables (2 trillion rows (40 TB) and 6 million rows (120 GB)). Note that this dataset does not have skew so no spill happened. I saw improvement in CPU time by 2-4% over version without this PR. This did not add up as I was expected some regression. I think allocating array of capacity of 128 at the start (instead of starting with default size 16) is the sole reason for the perf. gain : https://github.com/tejasapatil/spark/blob/SPARK-13450_smb_buffer_oom/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala#L43 . I could remove that and rerun, but effectively the change will be deployed in this form and I wanted to see the effect of it over large workload.

### Micro-benchmark

Two types of benchmarking can be found in `ExternalAppendOnlyUnsafeRowArrayBenchmark`:

[A] Comparing `ExternalAppendOnlyUnsafeRowArray` against raw `ArrayBuffer` when all rows fit in-memory and there is no spill

```
Array with 1000 rows:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
ArrayBuffer                                   7821 / 7941         33.5          29.8       1.0X
ExternalAppendOnlyUnsafeRowArray              8798 / 8819         29.8          33.6       0.9X

Array with 30000 rows:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
ArrayBuffer                                 19200 / 19206         25.6          39.1       1.0X
ExternalAppendOnlyUnsafeRowArray            19558 / 19562         25.1          39.8       1.0X

Array with 100000 rows:                  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
ArrayBuffer                                   5949 / 6028         17.2          58.1       1.0X
ExternalAppendOnlyUnsafeRowArray              6078 / 6138         16.8          59.4       1.0X
```

[B] Comparing `ExternalAppendOnlyUnsafeRowArray` against raw `UnsafeExternalSorter` when there is spilling of data

```
Spilling with 1000 rows:                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
UnsafeExternalSorter                          9239 / 9470         28.4          35.2       1.0X
ExternalAppendOnlyUnsafeRowArray              8857 / 8909         29.6          33.8       1.0X

Spilling with 10000 rows:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
UnsafeExternalSorter                             4 /    5         39.3          25.5       1.0X
ExternalAppendOnlyUnsafeRowArray                 5 /    6         29.8          33.5       0.8X
```

Author: Tejas Patil <tejasp@fb.com>

Closes #16909 from tejasapatil/SPARK-13450_smb_buffer_oom.
  • Loading branch information
tejasapatil authored and hvanhovell committed Mar 15, 2017
1 parent 7387126 commit 02c274e
Show file tree
Hide file tree
Showing 11 changed files with 1,187 additions and 292 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines the configuration options for Spark SQL.
Expand Down Expand Up @@ -715,6 +716,27 @@ object SQLConf {
.stringConf
.createWithDefault(TimeZone.getDefault().getID())

val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.windowExec.buffer.spill.threshold")
.internal()
.doc("Threshold for number of rows buffered in window operator")
.intConf
.createWithDefault(4096)

val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold")
.internal()
.doc("Threshold for number of rows buffered in sort merge join operator")
.intConf
.createWithDefault(Int.MaxValue)

val CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.cartesianProductExec.buffer.spill.threshold")
.internal()
.doc("Threshold for number of rows buffered in cartesian product operator")
.intConf
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
Expand Down Expand Up @@ -945,6 +967,14 @@ class SQLConf extends Serializable with Logging {

def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD)

def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD)

def sortMergeJoinExecBufferSpillThreshold: Int =
getConf(SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD)

def cartesianProductExecBufferSpillThreshold: Int =
getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD)

def maxNestedViewDepth: Int = getConf(SQLConf.MAX_NESTED_VIEW_DEPTH)

/** ********************** SQLConf functionality methods ************ */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import java.util.ConcurrentModificationException

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.serializer.SerializerManager
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
import org.apache.spark.storage.BlockManager
import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator}

/**
* An append-only array for [[UnsafeRow]]s that spills content to disk when there a predefined
* threshold of rows is reached.
*
* Setting spill threshold faces following trade-off:
*
* - If the spill threshold is too high, the in-memory array may occupy more memory than is
* available, resulting in OOM.
* - If the spill threshold is too low, we spill frequently and incur unnecessary disk writes.
* This may lead to a performance regression compared to the normal case of using an
* [[ArrayBuffer]] or [[Array]].
*/
private[sql] class ExternalAppendOnlyUnsafeRowArray(
taskMemoryManager: TaskMemoryManager,
blockManager: BlockManager,
serializerManager: SerializerManager,
taskContext: TaskContext,
initialSize: Int,
pageSizeBytes: Long,
numRowsSpillThreshold: Int) extends Logging {

def this(numRowsSpillThreshold: Int) {
this(
TaskContext.get().taskMemoryManager(),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
TaskContext.get(),
1024,
SparkEnv.get.memoryManager.pageSizeBytes,
numRowsSpillThreshold)
}

private val initialSizeOfInMemoryBuffer =
Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsSpillThreshold)

private val inMemoryBuffer = if (initialSizeOfInMemoryBuffer > 0) {
new ArrayBuffer[UnsafeRow](initialSizeOfInMemoryBuffer)
} else {
null
}

private var spillableArray: UnsafeExternalSorter = _
private var numRows = 0

// A counter to keep track of total modifications done to this array since its creation.
// This helps to invalidate iterators when there are changes done to the backing array.
private var modificationsCount: Long = 0

private var numFieldsPerRow = 0

def length: Int = numRows

def isEmpty: Boolean = numRows == 0

/**
* Clears up resources (eg. memory) held by the backing storage
*/
def clear(): Unit = {
if (spillableArray != null) {
// The last `spillableArray` of this task will be cleaned up via task completion listener
// inside `UnsafeExternalSorter`
spillableArray.cleanupResources()
spillableArray = null
} else if (inMemoryBuffer != null) {
inMemoryBuffer.clear()
}
numFieldsPerRow = 0
numRows = 0
modificationsCount += 1
}

def add(unsafeRow: UnsafeRow): Unit = {
if (numRows < numRowsSpillThreshold) {
inMemoryBuffer += unsafeRow.copy()
} else {
if (spillableArray == null) {
logInfo(s"Reached spill threshold of $numRowsSpillThreshold rows, switching to " +
s"${classOf[UnsafeExternalSorter].getName}")

// We will not sort the rows, so prefixComparator and recordComparator are null
spillableArray = UnsafeExternalSorter.create(
taskMemoryManager,
blockManager,
serializerManager,
taskContext,
null,
null,
initialSize,
pageSizeBytes,
numRowsSpillThreshold,
false)

// populate with existing in-memory buffered rows
if (inMemoryBuffer != null) {
inMemoryBuffer.foreach(existingUnsafeRow =>
spillableArray.insertRecord(
existingUnsafeRow.getBaseObject,
existingUnsafeRow.getBaseOffset,
existingUnsafeRow.getSizeInBytes,
0,
false)
)
inMemoryBuffer.clear()
}
numFieldsPerRow = unsafeRow.numFields()
}

spillableArray.insertRecord(
unsafeRow.getBaseObject,
unsafeRow.getBaseOffset,
unsafeRow.getSizeInBytes,
0,
false)
}

numRows += 1
modificationsCount += 1
}

/**
* Creates an [[Iterator]] for the current rows in the array starting from a user provided index
*
* If there are subsequent [[add()]] or [[clear()]] calls made on this array after creation of
* the iterator, then the iterator is invalidated thus saving clients from thinking that they
* have read all the data while there were new rows added to this array.
*/
def generateIterator(startIndex: Int): Iterator[UnsafeRow] = {
if (startIndex < 0 || (numRows > 0 && startIndex > numRows)) {
throw new ArrayIndexOutOfBoundsException(
"Invalid `startIndex` provided for generating iterator over the array. " +
s"Total elements: $numRows, requested `startIndex`: $startIndex")
}

if (spillableArray == null) {
new InMemoryBufferIterator(startIndex)
} else {
new SpillableArrayIterator(spillableArray.getIterator, numFieldsPerRow, startIndex)
}
}

def generateIterator(): Iterator[UnsafeRow] = generateIterator(startIndex = 0)

private[this]
abstract class ExternalAppendOnlyUnsafeRowArrayIterator extends Iterator[UnsafeRow] {
private val expectedModificationsCount = modificationsCount

protected def isModified(): Boolean = expectedModificationsCount != modificationsCount

protected def throwExceptionIfModified(): Unit = {
if (expectedModificationsCount != modificationsCount) {
throw new ConcurrentModificationException(
s"The backing ${classOf[ExternalAppendOnlyUnsafeRowArray].getName} has been modified " +
s"since the creation of this Iterator")
}
}
}

private[this] class InMemoryBufferIterator(startIndex: Int)
extends ExternalAppendOnlyUnsafeRowArrayIterator {

private var currentIndex = startIndex

override def hasNext(): Boolean = !isModified() && currentIndex < numRows

override def next(): UnsafeRow = {
throwExceptionIfModified()
val result = inMemoryBuffer(currentIndex)
currentIndex += 1
result
}
}

private[this] class SpillableArrayIterator(
iterator: UnsafeSorterIterator,
numFieldPerRow: Int,
startIndex: Int)
extends ExternalAppendOnlyUnsafeRowArrayIterator {

private val currentRow = new UnsafeRow(numFieldPerRow)

def init(): Unit = {
var i = 0
while (i < startIndex) {
if (iterator.hasNext) {
iterator.loadNext()
} else {
throw new ArrayIndexOutOfBoundsException(
"Invalid `startIndex` provided for generating iterator over the array. " +
s"Total elements: $numRows, requested `startIndex`: $startIndex")
}
i += 1
}
}

// Traverse upto the given [[startIndex]]
init()

override def hasNext(): Boolean = !isModified() && iterator.hasNext

override def next(): UnsafeRow = {
throwExceptionIfModified()
iterator.loadNext()
currentRow.pointTo(iterator.getBaseObject, iterator.getBaseOffset, iterator.getRecordLength)
currentRow
}
}
}

private[sql] object ExternalAppendOnlyUnsafeRowArray {
val DefaultInitialSizeOfInMemoryBuffer = 128
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,65 +19,39 @@ package org.apache.spark.sql.execution.joins

import org.apache.spark._
import org.apache.spark.rdd.{CartesianPartition, CartesianRDD, RDD}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.{BinaryExecNode, ExternalAppendOnlyUnsafeRowArray, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

/**
* An optimized CartesianRDD for UnsafeRow, which will cache the rows from second child RDD,
* will be much faster than building the right partition for every row in left RDD, it also
* materialize the right RDD (in case of the right RDD is nondeterministic).
*/
class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numFieldsOfRight: Int)
class UnsafeCartesianRDD(
left : RDD[UnsafeRow],
right : RDD[UnsafeRow],
numFieldsOfRight: Int,
spillThreshold: Int)
extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) {

override def compute(split: Partition, context: TaskContext): Iterator[(UnsafeRow, UnsafeRow)] = {
// We will not sort the rows, so prefixComparator and recordComparator are null.
val sorter = UnsafeExternalSorter.create(
context.taskMemoryManager(),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
context,
null,
null,
1024,
SparkEnv.get.memoryManager.pageSizeBytes,
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
false)
val rowArray = new ExternalAppendOnlyUnsafeRowArray(spillThreshold)

val partition = split.asInstanceOf[CartesianPartition]
for (y <- rdd2.iterator(partition.s2, context)) {
sorter.insertRecord(y.getBaseObject, y.getBaseOffset, y.getSizeInBytes, 0, false)
}
rdd2.iterator(partition.s2, context).foreach(rowArray.add)

// Create an iterator from sorter and wrapper it as Iterator[UnsafeRow]
def createIter(): Iterator[UnsafeRow] = {
val iter = sorter.getIterator
val unsafeRow = new UnsafeRow(numFieldsOfRight)
new Iterator[UnsafeRow] {
override def hasNext: Boolean = {
iter.hasNext
}
override def next(): UnsafeRow = {
iter.loadNext()
unsafeRow.pointTo(iter.getBaseObject, iter.getBaseOffset, iter.getRecordLength)
unsafeRow
}
}
}
// Create an iterator from rowArray
def createIter(): Iterator[UnsafeRow] = rowArray.generateIterator()

val resultIter =
for (x <- rdd1.iterator(partition.s1, context);
y <- createIter()) yield (x, y)
CompletionIterator[(UnsafeRow, UnsafeRow), Iterator[(UnsafeRow, UnsafeRow)]](
resultIter, sorter.cleanupResources())
resultIter, rowArray.clear())
}
}

Expand All @@ -97,7 +71,9 @@ case class CartesianProductExec(
val leftResults = left.execute().asInstanceOf[RDD[UnsafeRow]]
val rightResults = right.execute().asInstanceOf[RDD[UnsafeRow]]

val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size)
val spillThreshold = sqlContext.conf.cartesianProductExecBufferSpillThreshold

val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size, spillThreshold)
pair.mapPartitionsWithIndexInternal { (index, iter) =>
val joiner = GenerateUnsafeRowJoiner.create(left.schema, right.schema)
val filtered = if (condition.isDefined) {
Expand Down
Loading

0 comments on commit 02c274e

Please sign in to comment.