Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13450] Introduce ExternalAppendOnlyUnsafeRowArray. Change CartesianProductExec, SortMergeJoin, WindowExec to use it #16909

Closed

Conversation

tejasapatil
Copy link
Contributor

@tejasapatil tejasapatil commented Feb 13, 2017

What issue does this PR address ?

Jira: https://issues.apache.org/jira/browse/SPARK-13450

In SortMergeJoinExec, rows of the right relation having the same value for a join key are buffered in-memory. In case of skew, this causes OOMs (see comments in SPARK-13450 for more details). Heap dump from a failed job confirms this : https://issues.apache.org/jira/secure/attachment/12846382/heap-dump-analysis.png . While its possible to increase the heap size to workaround, Spark should be resilient to such issues as skews can happen arbitrarily.

Change proposed in this pull request

  • Introduces ExternalAppendOnlyUnsafeRowArray
    • It holds UnsafeRows in-memory upto a certain threshold.
    • After the threshold is hit, it switches to UnsafeExternalSorter which enables spilling of the rows to disk. It does NOT sort the data.
    • Allows iterating the array multiple times. However, any alteration to the array (using add or clear) will invalidate the existing iterator(s)
  • WindowExec was already using UnsafeExternalSorter to support spilling. Changed it to use the new array
  • Changed SortMergeJoinExec to use the new array implementation
    • NOTE: I have not changed FULL OUTER JOIN to use this new array implementation. Changing that will need more surgery and I will rather put up a separate PR for that once this gets in.
  • Changed CartesianProductExec to use the new array implementation

Note for reviewers

The diff can be divided into 3 parts. My motive behind having all the changes in a single PR was to demonstrate that the API is sane and supports 2 use cases. If reviewing as 3 separate PRs would help, I am happy to make the split.

How was this patch tested ?

Unit testing

  • Added unit tests ExternalAppendOnlyUnsafeRowArray to validate all its APIs and access patterns
  • Added unit test for SortMergeExec
    • with and without spill for inner join, left outer join, right outer join to confirm that the spill threshold config behaves as expected and output is as expected.
    • This PR touches the scanning logic in SortMergeExec for all joins (except FULL OUTER JOIN). However, I expect existing test cases to cover that there is no regression in correctness.
  • Added unit test for WindowExec to check behavior of spilling and correctness of results.

Stress testing

  • Confirmed that OOM is gone by running against a production job which used to OOM
  • Since I cannot share details about prod workload externally, created synthetic data to mimic the issue. Ran before and after the fix to demonstrate the issue and query success with this PR

Generating the synthetic data

./bin/spark-shell --driver-memory=6G

import org.apache.spark.sql._
val hc = SparkSession.builder.master("local").getOrCreate()

hc.sql("DROP TABLE IF EXISTS spark_13450_large_table").collect
hc.sql("DROP TABLE IF EXISTS spark_13450_one_row_table").collect

val df1 = (0 until 1).map(i => ("10", "100", i.toString, (i * 2).toString)).toDF("i", "j", "str1", "str2")
df1.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(100, "i", "j").sortBy("i", "j").saveAsTable("spark_13450_one_row_table")

val df2 = (0 until 3000000).map(i => ("10", "100", i.toString, (i * 2).toString)).toDF("i", "j", "str1", "str2")
df2.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(100, "i", "j").sortBy("i", "j").saveAsTable("spark_13450_large_table")

Ran this against trunk VS local build with this PR. OOM repros with trunk and with the fix this query runs fine.

./bin/spark-shell --driver-java-options="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/spark.driver.heapdump.hprof"

import org.apache.spark.sql._
val hc = SparkSession.builder.master("local").getOrCreate()
hc.sql("SET spark.sql.autoBroadcastJoinThreshold=1")
hc.sql("SET spark.sql.sortMergeJoinExec.buffer.spill.threshold=10000")

hc.sql("DROP TABLE IF EXISTS spark_13450_result").collect
hc.sql("""
  CREATE TABLE spark_13450_result
  AS
  SELECT
    a.i AS a_i, a.j AS a_j, a.str1 AS a_str1, a.str2 AS a_str2,
    b.i AS b_i, b.j AS b_j, b.str1 AS b_str1, b.str2 AS b_str2
  FROM
    spark_13450_one_row_table a 
  JOIN
    spark_13450_large_table b 
  ON
    a.i=b.i AND
    a.j=b.j
""")

Performance comparison

Macro-benchmark

I ran a SMB join query over two real world tables (2 trillion rows (40 TB) and 6 million rows (120 GB)). Note that this dataset does not have skew so no spill happened. I saw improvement in CPU time by 2-4% over version without this PR. This did not add up as I was expected some regression. I think allocating array of capacity of 128 at the start (instead of starting with default size 16) is the sole reason for the perf. gain : https://github.com/tejasapatil/spark/blob/SPARK-13450_smb_buffer_oom/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala#L43 . I could remove that and rerun, but effectively the change will be deployed in this form and I wanted to see the effect of it over large workload.

Micro-benchmark

Two types of benchmarking can be found in ExternalAppendOnlyUnsafeRowArrayBenchmark:

[A] Comparing ExternalAppendOnlyUnsafeRowArray against raw ArrayBuffer when all rows fit in-memory and there is no spill

Array with 1000 rows:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
ArrayBuffer                                   7821 / 7941         33.5          29.8       1.0X
ExternalAppendOnlyUnsafeRowArray              8798 / 8819         29.8          33.6       0.9X

Array with 30000 rows:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
ArrayBuffer                                 19200 / 19206         25.6          39.1       1.0X
ExternalAppendOnlyUnsafeRowArray            19558 / 19562         25.1          39.8       1.0X

Array with 100000 rows:                  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
ArrayBuffer                                   5949 / 6028         17.2          58.1       1.0X
ExternalAppendOnlyUnsafeRowArray              6078 / 6138         16.8          59.4       1.0X

[B] Comparing ExternalAppendOnlyUnsafeRowArray against raw UnsafeExternalSorter when there is spilling of data

Spilling with 1000 rows:                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
UnsafeExternalSorter                          9239 / 9470         28.4          35.2       1.0X
ExternalAppendOnlyUnsafeRowArray              8857 / 8909         29.6          33.8       1.0X

Spilling with 10000 rows:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
UnsafeExternalSorter                             4 /    5         39.3          25.5       1.0X
ExternalAppendOnlyUnsafeRowArray                 5 /    6         29.8          33.5       0.8X

@tejasapatil
Copy link
Contributor Author

ok to test

@tejasapatil
Copy link
Contributor Author

@rxin : can you please recommend someone who could review this PR ?

@tejasapatil tejasapatil changed the title [SPARK-13450] Introduce UnsafeRowExternalArray. Change SortMergeJoin and WindowExec to use it [SPARK-13450] Introduce ExternalAppendOnlyUnsafeRowArray. Change SortMergeJoin and WindowExec to use it Feb 13, 2017
@SparkQA
Copy link

SparkQA commented Feb 13, 2017

Test build #72806 has started for PR 16909 at commit e9cdd30.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tejasapatil I have taken a quick pass. It looks good overall. I think it might be a good idea to do some benchmarking to get an idea of potential performance implications.

}

def add(entry: InternalRow): Unit = {
val unsafeRow = entry.asInstanceOf[UnsafeRow]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems tricky. Lets move this cast to the call site (and preferably avoid it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved cast to client

* [[add()]] or [[clear()]] calls made on this array after creation of the iterator,
* then the iterator is invalidated thus saving clients from getting inconsistent data.
*/
def generateIterator(): Iterator[UnsafeRow] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also have a version that skips n elements? This will be a little more efficient for the unbounded following window case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

var rowBuffer: RowBuffer = null
if (sqlContext == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

???

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@@ -285,6 +283,9 @@ case class WindowExec(
val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1)
val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray

var spillThreshold =
sqlContext.conf.getConfString("spark.sql.windowExec.buffer.spill.threshold", "4096").toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make an internal configuration in SQLConf for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created a SQLConf


// A counter to keep track of total additions made to this array since its creation.
// This helps to invalidate iterators when there are changes done to the backing array.
private var modCount: Long = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only an issue after we have cleared the buffer right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. There is more than that. Apart from clear(), even add() would be regarded as a modification.

The inbuilt iterators (from both ArrayBuffer and UnsafeExternalSorter) once created do not see any new data added. Clients of ExternalAppendOnlyUnsafeRowArray may not realise this and think that they have read all the data. To prevent that, I have to have such mechanism to invalidate existing iterators.

    val buffer = ArrayBuffer.empty[Int]
    buffer.append(1)

    val iterator = buffer.iterator
    assert(iterator.hasNext)
    assert(iterator.next() == 1)

    buffer.append(2)
    assert(iterator.hasNext)       // <-- THIS FAILS

Also, when add() transparently switches the backing storage from ArrayBuffer => UnsafeExternalSorter and there was an open iterator created over the ArrayBuffer, it will lead to IndexOutOfBoundsException

    val buffer = ArrayBuffer.empty[Int]
    buffer.append(1)
    buffer.append(2)

    val iterator = buffer.iterator
    buffer.clear()
    assert(iterator.hasNext)       // <-- THIS FAILS
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: I changed to not use inbuilt iterator from ArrayBuffer and instead use a counter to iterate over the array. However I still depend on inbuilt iterator of UnsafeExternalSorter

* [[ArrayBuffer]] or [[Array]].
*/
private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) extends Logging {
private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = ArrayBuffer.empty[UnsafeRow]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it is better just to allocate an array, but that might be overkill.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention behind not using raw Array is to avoid holding that memory (if we go this route, one would have to set the spill threshold to a relatively lower value to avoid potential wastage of memory).

Before this PR:

  • Initial size:
    • SortMergeJoin started off withArrayBuffer of default size (ie. 16)
    • WindowExec started off with empty ArrayBuffer
  • For both the cases, there was no shrinking of the array so memory is not reclaimed until the operator finishes.

Proposed change:

  • I am switching to new ArrayBuffer(128) for both cases in order to init with decent size and not start with an empty array. Allocating space for 128 entries upfront is trivial memory footprint.
  • Keeping the "no shrinking" behavior same. A part of me thinks I could do something smarter by shrinking based on running average of actual lengths of the array, but it might be over-optimization. I will first focus on getting the basic stuff in.

@@ -97,6 +98,11 @@ case class SortMergeJoinExec(

protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val spillThreshold =
sqlContext.conf.getConfString(
"spark.sql.sortMergeJoinExec.buffer.spill.threshold",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets also move this configuration in SQLConf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

ctx.addMutableState(clsName, matches, s"$matches = new $clsName();")
val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName

val spillThreshold =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Place it in a def/lazy cal if we need it more than once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to a method

private[this] val iter: UnsafeSorterIterator = sorter.getIterator

private[this] val currentRow = new UnsafeRow(numFields)
private[window] class RowBuffer(appendOnlyExternalArray: ExternalAppendOnlyUnsafeRowArray) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets just drop row buffer in favor of ExternalAppendOnlyUnsafeRowArray it doesn't make a lot of sense to keep this around. We just need a generateIterator(offset) for the unbounded following case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed RowBuffer

valueInserted
}

private def checkIfValueExits(iterator: Iterator[UnsafeRow], expectedValue: Long): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @tejasapatil .
checkIfValueExists?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed the typo

private var spillableArray: UnsafeExternalSorter = null
private var numElements = 0

// A counter to keep track of total additions made to this array since its creation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to here, total additions -> total modifications?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

} else {
if (spillableArray == null) {
logInfo(s"Reached spill threshold of $numRowsSpillThreshold rows, switching to " +
s"${classOf[org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray].getName}")
Copy link
Member

@dongjoon-hyun dongjoon-hyun Feb 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the intention here, but the log message looks a little bit misleading to me because we are already using ExternalAppendOnlyUnsafeRowArray. Also, technically, we are switching to UnsafeExternalSorter under the hood.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a typo. Thanks for pointing this out. Corrected.

@@ -285,6 +283,9 @@ case class WindowExec(
val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1)
val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray

var spillThreshold =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var -> val?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

// This helps to invalidate iterators when there are changes done to the backing array.
private var modCount: Long = 0

private var numFieldPerRow = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numFieldsPerRow for consistency ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed

@dongjoon-hyun
Copy link
Member

Retest this please

@SparkQA
Copy link

SparkQA commented Feb 13, 2017

Test build #72827 has finished for PR 16909 at commit e9cdd30.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class ExternalAppendOnlyUnsafeRowArrayIterator extends Iterator[UnsafeRow]

@zhzhan
Copy link
Contributor

zhzhan commented Feb 14, 2017

@tejasapatil Do you want to fix the BufferedRowIterator for WholeStageCodegenExec as well? As for inner join, the LinkedList currentRows would cause the same issue as it buffer the rows from inner join, and takes more memory (probably double if left and right has similar size). Also they can share the similar iterator data structure.

@hvanhovell
Copy link
Contributor

@zhzhan is this an actual problem? The BufferedRowIterator should not hold a lot of rows in practice. cc @davies

@zhzhan
Copy link
Contributor

zhzhan commented Feb 14, 2017

@hvanhovell @davies Correct me if I am wrong. My understanding is that following code will go though all matching rows on the right side, and put them into the BufferedRowIterator. If there is OOM caused by ArrayList matches in SortMergeJoinExec, the memory usage will be doubled in currentRows in BufferedRowIterator (assuming the left and right have the same size).

There are two way to solve it. One is to consume one row at a time, and the other one is make BufferedRowIterator spillable.

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L556

BTW, I used to see that the memory consumption in BufferedRowIterator caused OOM along with matches in SortMergeJoinExec. The former took much more heap memory.

@tejasapatil
Copy link
Contributor Author

The problem pointed out by @zhzhan is legit problem and I have seen that personally. However, I would scope out this PR to 2 use cases and handle other cases separately.

Copy link
Contributor Author

@tejasapatil tejasapatil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review comments @hvanhovell and @dongjoon-hyun. I have made the changes.

@hvanhovell : Wrt. potential performance implications, the updated PR has a benchmark class with numbers. It's a isolated micro-benchmark comparing the new array impl with ArrayBuffer. In order to see how much this actually adds up at a query level, I am doing an integration test over a large dataset by running SMB join with this PR and comparing it against version without this PR. Will get back with those numbers in sometime.

var rowBuffer: RowBuffer = null
if (sqlContext == null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed


// A counter to keep track of total additions made to this array since its creation.
// This helps to invalidate iterators when there are changes done to the backing array.
private var modCount: Long = 0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. There is more than that. Apart from clear(), even add() would be regarded as a modification.

The inbuilt iterators (from both ArrayBuffer and UnsafeExternalSorter) once created do not see any new data added. Clients of ExternalAppendOnlyUnsafeRowArray may not realise this and think that they have read all the data. To prevent that, I have to have such mechanism to invalidate existing iterators.

    val buffer = ArrayBuffer.empty[Int]
    buffer.append(1)

    val iterator = buffer.iterator
    assert(iterator.hasNext)
    assert(iterator.next() == 1)

    buffer.append(2)
    assert(iterator.hasNext)       // <-- THIS FAILS

Also, when add() transparently switches the backing storage from ArrayBuffer => UnsafeExternalSorter and there was an open iterator created over the ArrayBuffer, it will lead to IndexOutOfBoundsException

    val buffer = ArrayBuffer.empty[Int]
    buffer.append(1)
    buffer.append(2)

    val iterator = buffer.iterator
    buffer.clear()
    assert(iterator.hasNext)       // <-- THIS FAILS
  }

}

def add(entry: InternalRow): Unit = {
val unsafeRow = entry.asInstanceOf[UnsafeRow]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved cast to client

* [[ArrayBuffer]] or [[Array]].
*/
private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) extends Logging {
private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = ArrayBuffer.empty[UnsafeRow]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention behind not using raw Array is to avoid holding that memory (if we go this route, one would have to set the spill threshold to a relatively lower value to avoid potential wastage of memory).

Before this PR:

  • Initial size:
    • SortMergeJoin started off withArrayBuffer of default size (ie. 16)
    • WindowExec started off with empty ArrayBuffer
  • For both the cases, there was no shrinking of the array so memory is not reclaimed until the operator finishes.

Proposed change:

  • I am switching to new ArrayBuffer(128) for both cases in order to init with decent size and not start with an empty array. Allocating space for 128 entries upfront is trivial memory footprint.
  • Keeping the "no shrinking" behavior same. A part of me thinks I could do something smarter by shrinking based on running average of actual lengths of the array, but it might be over-optimization. I will first focus on getting the basic stuff in.

// This helps to invalidate iterators when there are changes done to the backing array.
private var modCount: Long = 0

private var numFieldPerRow = 0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed

@@ -285,6 +283,9 @@ case class WindowExec(
val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1)
val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray

var spillThreshold =
sqlContext.conf.getConfString("spark.sql.windowExec.buffer.spill.threshold", "4096").toInt
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created a SQLConf

ctx.addMutableState(clsName, matches, s"$matches = new $clsName();")
val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName

val spillThreshold =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to a method

@@ -97,6 +98,11 @@ case class SortMergeJoinExec(

protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val spillThreshold =
sqlContext.conf.getConfString(
"spark.sql.sortMergeJoinExec.buffer.spill.threshold",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* [[add()]] or [[clear()]] calls made on this array after creation of the iterator,
* then the iterator is invalidated thus saving clients from getting inconsistent data.
*/
def generateIterator(): Iterator[UnsafeRow] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private[this] val iter: UnsafeSorterIterator = sorter.getIterator

private[this] val currentRow = new UnsafeRow(numFields)
private[window] class RowBuffer(appendOnlyExternalArray: ExternalAppendOnlyUnsafeRowArray) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed RowBuffer

}
}

def generateIterator(): Iterator[UnsafeRow] = generateIterator(0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self review : will change this to generateIterator(startIndex = 0)

@SparkQA
Copy link

SparkQA commented Feb 15, 2017

Test build #72918 has finished for PR 16909 at commit a4c4416.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 15, 2017

Test build #72920 has started for PR 16909 at commit e653171.

@tejasapatil
Copy link
Contributor Author

@hvanhovell : Re performance:

I ran a SMB join query over two real world tables (2 trillion rows (40 TB) and 6 million rows (120 GB)). Note that this dataset does not have skew so no spill happened. I saw improvement in CPU time by 2-4% over version without this PR. This did not add up as I was expected some regression. I think allocating array of capacity of 128 at the start (instead of starting with default size 16) is the sole reason for the perf. gain : https://github.com/tejasapatil/spark/blob/SPARK-13450_smb_buffer_oom/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala#L43 . I could remove that and rerun, but effectively the change will be deployed in this form and I wanted to see the effect of it over large workload.

For micro-benchmarking, I took care of making the test case allocate array buffers of same size to avoid this from interfering the results. The numbers there look sane as there is some regression from using the spillable array.

I see testcases DataFrameJoinSuite failing over jenkins but they run fine for me over intellij. Have you seen that before ? Just wanted to know if you if its a KP before I invest more time on that.

* This may lead to a performance regression compared to the normal case of using an
* [[ArrayBuffer]] or [[Array]].
*/
private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this compare to use ExternalUnsafeSorter directly? There is a similar use case here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala#L42

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davies :

  • See PR description with comparison results.
  • Updated the PR to include CartesianProductExec

Copy link
Member

@viirya viirya Feb 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the comparison results, ExternalUnsafeSorter performs slightly better? If so, any reason not to use ExternalUnsafeSorter directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. My original guess before writing benchmark was ArrayBuffer would be superior to ExternalUnsafeSorter so having a impl which would initially behave like ArrayBuffer but later switch to ExternalUnsafeSorter was what I went with. I won't make this call solely based on the micro benchmark results as it might not reflect what acutally happens when a query runs because there are other operations that happen while the buffer is populated and accessed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I created a test version by removing the ArrayBuffer and let alone ExternalUnsafeSorter be used to back the data. Over prod queries, this turned out to be slightly slower unlike what the micro-benchmark results showed. I will stick with the current approach.

@tejasapatil
Copy link
Contributor Author

ok to test

@hvanhovell
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Feb 15, 2017

Test build #72941 has finished for PR 16909 at commit e653171.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tejasapatil tejasapatil changed the title [SPARK-13450] Introduce ExternalAppendOnlyUnsafeRowArray. Change SortMergeJoin and WindowExec to use it [SPARK-13450] Introduce ExternalAppendOnlyUnsafeRowArray. Change CartesianProductExec, SortMergeJoin, WindowExec to use it Feb 15, 2017
@SparkQA
Copy link

SparkQA commented Feb 16, 2017

Test build #72963 has finished for PR 16909 at commit 1494c83.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class UnsafeCartesianRDD(

@SparkQA
Copy link

SparkQA commented Feb 16, 2017

Test build #72964 has finished for PR 16909 at commit 5b73e2b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class UnsafeCartesianRDD(

@SparkQA
Copy link

SparkQA commented Mar 15, 2017

Test build #74603 has finished for PR 16909 at commit 23acc3f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

LGTM - merging to master. Thanks!

@asfgit asfgit closed this in 02c274e Mar 15, 2017
@tejasapatil tejasapatil deleted the SPARK-13450_smb_buffer_oom branch March 15, 2017 21:03

// Setup the frames.
var i = 0
while (i < numFrames) {
frames(i).prepare(rowBuffer.copy())
frames(i).prepare(buffer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No copy?

asfgit pushed a commit that referenced this pull request Jun 30, 2017
…lling

## What changes were proposed in this pull request?
`WindowExec` currently improperly stores complex objects (UnsafeRow, UnsafeArrayData, UnsafeMapData, UTF8String) during aggregation by keeping a reference in the buffer used by `GeneratedMutableProjections` to the actual input data. Things go wrong when the input object (or the backing bytes) are reused for other things. This could happen in window functions when it starts spilling to disk. When reading the back the spill files the `UnsafeSorterSpillReader` reuses the buffer to which the `UnsafeRow` points, leading to weird corruption scenario's. Note that this only happens for aggregate functions that preserve (parts of) their input, for example `FIRST`, `LAST`, `MIN` & `MAX`.

This was not seen before, because the spilling logic was not doing actual spills as much and actually used an in-memory page. This page was not cleaned up during window processing and made sure unsafe objects point to their own dedicated memory location. This was changed by #16909, after this PR Spark spills more eagerly.

This PR provides a surgical fix because we are close to releasing Spark 2.2. This change just makes sure that there cannot be any object reuse at the expensive of a little bit of performance. We will follow-up with a more subtle solution at a later point.

## How was this patch tested?
Added a regression test to `DataFrameWindowFunctionsSuite`.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #18470 from hvanhovell/SPARK-21258.

(cherry picked from commit e2f32ee)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
ghost pushed a commit to dbtsai/spark that referenced this pull request Jun 30, 2017
…lling

## What changes were proposed in this pull request?
`WindowExec` currently improperly stores complex objects (UnsafeRow, UnsafeArrayData, UnsafeMapData, UTF8String) during aggregation by keeping a reference in the buffer used by `GeneratedMutableProjections` to the actual input data. Things go wrong when the input object (or the backing bytes) are reused for other things. This could happen in window functions when it starts spilling to disk. When reading the back the spill files the `UnsafeSorterSpillReader` reuses the buffer to which the `UnsafeRow` points, leading to weird corruption scenario's. Note that this only happens for aggregate functions that preserve (parts of) their input, for example `FIRST`, `LAST`, `MIN` & `MAX`.

This was not seen before, because the spilling logic was not doing actual spills as much and actually used an in-memory page. This page was not cleaned up during window processing and made sure unsafe objects point to their own dedicated memory location. This was changed by apache#16909, after this PR Spark spills more eagerly.

This PR provides a surgical fix because we are close to releasing Spark 2.2. This change just makes sure that there cannot be any object reuse at the expensive of a little bit of performance. We will follow-up with a more subtle solution at a later point.

## How was this patch tested?
Added a regression test to `DataFrameWindowFunctionsSuite`.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes apache#18470 from hvanhovell/SPARK-21258.
asfgit pushed a commit that referenced this pull request Jun 30, 2017
…lling

## What changes were proposed in this pull request?
`WindowExec` currently improperly stores complex objects (UnsafeRow, UnsafeArrayData, UnsafeMapData, UTF8String) during aggregation by keeping a reference in the buffer used by `GeneratedMutableProjections` to the actual input data. Things go wrong when the input object (or the backing bytes) are reused for other things. This could happen in window functions when it starts spilling to disk. When reading the back the spill files the `UnsafeSorterSpillReader` reuses the buffer to which the `UnsafeRow` points, leading to weird corruption scenario's. Note that this only happens for aggregate functions that preserve (parts of) their input, for example `FIRST`, `LAST`, `MIN` & `MAX`.

This was not seen before, because the spilling logic was not doing actual spills as much and actually used an in-memory page. This page was not cleaned up during window processing and made sure unsafe objects point to their own dedicated memory location. This was changed by #16909, after this PR Spark spills more eagerly.

This PR provides a surgical fix because we are close to releasing Spark 2.2. This change just makes sure that there cannot be any object reuse at the expensive of a little bit of performance. We will follow-up with a more subtle solution at a later point.

## How was this patch tested?
Added a regression test to `DataFrameWindowFunctionsSuite`.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #18470 from hvanhovell/SPARK-21258.

(cherry picked from commit e2f32ee)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
asfgit pushed a commit that referenced this pull request Aug 11, 2017
…nalAppendOnlyUnsafeRowArray

## What changes were proposed in this pull request?

[SPARK-21595](https://issues.apache.org/jira/browse/SPARK-21595) reported that there is excessive spilling to disk due to default spill threshold for `ExternalAppendOnlyUnsafeRowArray` being quite small for WINDOW operator. Old behaviour of WINDOW operator (pre #16909) would hold data in an array for first 4096 records post which it would switch to `UnsafeExternalSorter` and start spilling to disk after reaching `spark.shuffle.spill.numElementsForceSpillThreshold` (or earlier if there was paucity of memory due to excessive consumers).

Currently the (switch from in-memory to `UnsafeExternalSorter`) and (`UnsafeExternalSorter` spilling to disk) for `ExternalAppendOnlyUnsafeRowArray` is controlled by a single threshold. This PR aims to separate that to have more granular control.

## How was this patch tested?

Added unit tests

Author: Tejas Patil <tejasp@fb.com>

Closes #18843 from tejasapatil/SPARK-21595.

(cherry picked from commit 9443999)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
ghost pushed a commit to dbtsai/spark that referenced this pull request Aug 11, 2017
…nalAppendOnlyUnsafeRowArray

## What changes were proposed in this pull request?

[SPARK-21595](https://issues.apache.org/jira/browse/SPARK-21595) reported that there is excessive spilling to disk due to default spill threshold for `ExternalAppendOnlyUnsafeRowArray` being quite small for WINDOW operator. Old behaviour of WINDOW operator (pre apache#16909) would hold data in an array for first 4096 records post which it would switch to `UnsafeExternalSorter` and start spilling to disk after reaching `spark.shuffle.spill.numElementsForceSpillThreshold` (or earlier if there was paucity of memory due to excessive consumers).

Currently the (switch from in-memory to `UnsafeExternalSorter`) and (`UnsafeExternalSorter` spilling to disk) for `ExternalAppendOnlyUnsafeRowArray` is controlled by a single threshold. This PR aims to separate that to have more granular control.

## How was this patch tested?

Added unit tests

Author: Tejas Patil <tejasp@fb.com>

Closes apache#18843 from tejasapatil/SPARK-21595.
dilipbiswal pushed a commit to dilipbiswal/spark that referenced this pull request Dec 20, 2017
…nalAppendOnlyUnsafeRowArray

## What changes were proposed in this pull request?

[SPARK-21595](https://issues.apache.org/jira/browse/SPARK-21595) reported that there is excessive spilling to disk due to default spill threshold for `ExternalAppendOnlyUnsafeRowArray` being quite small for WINDOW operator. Old behaviour of WINDOW operator (pre apache#16909) would hold data in an array for first 4096 records post which it would switch to `UnsafeExternalSorter` and start spilling to disk after reaching `spark.shuffle.spill.numElementsForceSpillThreshold` (or earlier if there was paucity of memory due to excessive consumers).

Currently the (switch from in-memory to `UnsafeExternalSorter`) and (`UnsafeExternalSorter` spilling to disk) for `ExternalAppendOnlyUnsafeRowArray` is controlled by a single threshold. This PR aims to separate that to have more granular control.

## How was this patch tested?

Added unit tests

Author: Tejas Patil <tejasp@fb.com>

Closes apache#18843 from tejasapatil/SPARK-21595.
@sheperdh
Copy link

I have one question regarding this change.
We are doing some tests using spark-sql.
In our SQL, we have used "Full Outer Join", we found it's quite easy to hit OOM when there is large dataset.
After analyzed the heap dump, we found it's because for "Full Outer Join", we haven't used the ExternalAppendOnlyUnsafeRowArray.
I am not sure whether there is any special reason why Spark-SQL doesn't optimize "Full Outer Join".

@tejasapatil
Can you let me know why you don't want to support "Full Outer Join"?

thanks,
Sheperd

MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
…nalAppendOnlyUnsafeRowArray

## What changes were proposed in this pull request?

[SPARK-21595](https://issues.apache.org/jira/browse/SPARK-21595) reported that there is excessive spilling to disk due to default spill threshold for `ExternalAppendOnlyUnsafeRowArray` being quite small for WINDOW operator. Old behaviour of WINDOW operator (pre apache#16909) would hold data in an array for first 4096 records post which it would switch to `UnsafeExternalSorter` and start spilling to disk after reaching `spark.shuffle.spill.numElementsForceSpillThreshold` (or earlier if there was paucity of memory due to excessive consumers).

Currently the (switch from in-memory to `UnsafeExternalSorter`) and (`UnsafeExternalSorter` spilling to disk) for `ExternalAppendOnlyUnsafeRowArray` is controlled by a single threshold. This PR aims to separate that to have more granular control.

## How was this patch tested?

Added unit tests

Author: Tejas Patil <tejasp@fb.com>

Closes apache#18843 from tejasapatil/SPARK-21595.

(cherry picked from commit 9443999)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
@sujithjay
Copy link

Hi @sheperdh , the PR author does make a brief note about 'Full Outer Joins' in the PR description.

NOTE: I have not changed FULL OUTER JOIN to use this new array implementation. Changing that will need more surgery and I will rather put up a separate PR for that once this gets in.

Apparently, this separate PR is pending.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
10 participants