Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30536][CORE][SQL] Sort-merge join operator spilling performance improvements #27246

Closed
wants to merge 1 commit into from

Conversation

siknezevic
Copy link

@siknezevic siknezevic commented Jan 17, 2020

What changes were proposed in this pull request?

The following list of changes will improve SQL execution performance when data is spilled on the disk:

  1. Implement lazy initialization of UnsafeSorterSpillReader - iterator on top of spilled rows:
    ... During SortMergeJoin (Left Semi Join) execution, the iterator on the spill data is created but no iteration over the data is done.
    ... Having lazy initialization of UnsafeSorterSpillReader will enable efficient processing of SortMergeJoin even if data is spilled onto disk. Unnecessary I/O will be avoided.
  2. Decrease initial memory read buffer size in UnsafeSorterSpillReader from 1MB to 1KB:
    ... UnsafeSorterSpillReader constructor takes lot of time due to size of default 1MB memory read buffer.
    ... The code already has logic to increase the memory read buffer if it cannot fit the data, so decreasing the size to 1K is safe and has positive performance impact.
  3. Improve memory utilization when spilling is enabled in ExternalAppendOnlyUnsafeRowArrey
    ... In the current implementation, when spilling is enabled, UnsafeExternalSorter object is created and then data moved from ExternalAppendOnlyUnsafeRowArrey object into UnsafeExternalSorter and then ExternalAppendOnlyUnsafeRowArrey object is emptied. Just before ExternalAppendOnlyUnsafeRowArrey object is emptied there are both objects in the memory with the same data. That require double memory and there is duplication of data. This can be avoided.
    ... In the proposed solution, when spark.sql.sortMergeJoinExec.buffer.in.memory.threshold is reached adding new rows into ExternalAppendOnlyUnsafeRowArray object stops. UnsafeExternalSorter object is created and new rows are added into this object. ExternalAppendOnlyUnsafeRowArray object retains all rows already added into this object. This approach will enable better memory utilization and avoid unnecessary movement of data from one object into another.

Why are the changes needed?

Testing with TPC-DS 100 TB benchmark data set showed that some of SQLs (example query 14) are not able to run even with extremely large Spark executor memory.Spark spilling feature has to be enabled, in order to be able to process these SQLs. Processing of SQLs becomes extremely slow when spilling is enabled. The test of this solution with query 14 and enabled spilling on the disk, showed 500X performance improvements and it didn�t degrade performance of the other SQLs from TPC-DS benchmark.

The testing was done with micro-benchmark that simulates typical join scenario for 100000 rows when spilling is enabled. In this micro-benchmark the spilling will create three spill files. The test covered two scenarios: Scenario when patch is NOT applied, so micro-benchmark runs existing Spark code. Then, scenario when patch is applied. The average processing time of 100000 join rows without patch is 943989 milliseconds. The average processing time of 100000 joins rows when patch is applied is 426 milliseconds. Comparison of these two micro-benchmarks shows more than 2000 X (943898/426 = 2215.72 X ) difference in performance. If the spilling produces more then three spill files, the performance difference would be even greater.

Does this PR introduce any user-facing change?

No

How was this patch tested?

By running TPC-DS SQLs with different data sets 10 TB and 100 TB
By running all Spark tests.

@HyukjinKwon HyukjinKwon changed the title [SPARK-30536][CORE,SQL] Sort-merge join operator spilling performance improvements [SPARK-30536][CORE][SQL] Sort-merge join operator spilling performance improvements Jan 17, 2020
@JoshRosen JoshRosen added the SQL label Jan 18, 2020
@kiszk
Copy link
Member

kiszk commented Jan 22, 2020

@siknezevic Thank you for your contribution. Could you please reformat the PR description?
Which test cases can cover these new features? If not, could you please add test cases?

@siknezevic
Copy link
Author

@kiszk. Thank you for the questions. I hope that I reformatted PR description correctly.
There are existing tests that cover this feature. This is the list:
org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
forcedSpillingWithReadIterator()
org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
Thank you

@maropu
Copy link
Member

maropu commented Jan 23, 2020

Does this change only have an good effect only on large data? For example, how about small dataset, e.g., TPCDS sf=10 and cores=1 case?

@siknezevic
Copy link
Author

The small data set will fit into executor memory, so there is no need to spill. I tested with just few records and it works fine. This solution will be faster for small data set when comparing with the current Spark spilling. My focus are bigger data sets and this solution will greatly improve spilling performance in those cases.

@JoshRosen
Copy link
Contributor

JoshRosen commented Jan 23, 2020

Regarding the "Implement lazy initialization of UnsafeSorterSpillReader" portion of the changes:

It looks like @liutang123 previously proposed a similar change in 2018 in #20184 / SPARK-22987. I stumbled across that PR earlier this year and left some comments: overall, I think that lazy initialization of the individual spill readers makes sense.

In order to enable incremental review and merge of these changes, do you think it could make sense to split the lazy spill reader initialization changes into a separate PR? I haven't compared the implementations in detail, so I haven't formed any opinion on the approach here vs. the one in #20184.

Copy link
Contributor

@JoshRosen JoshRosen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the ExternalAppendOnlyUnsafeRowArray change:

  • It looks like there's some benchmarks for this class in ExternalAppendOnlyUnsafeRowArrayBenchmark; we should probably re-run those after this change.
  • It seems like a lot of the complexity in ExternalAppendOnlyUnsafeRowArray comes from the fact that we want to define a "do not spill" threshold in terms of an absolute row count. If it wasn't for this, I think we could just directly use an UnsafeExternalSorter here (the performance overhead inserting a record when you don't spill should be roughly the same AFAIK, since both paths are more-or-less just copying the row's memory into the end of buffer). It looks like this was previously discussed at [SPARK-13450] Introduce ExternalAppendOnlyUnsafeRowArray. Change CartesianProductExec, SortMergeJoin, WindowExec to use it #16909 (comment), where it sounds like the current design was motivated by performance considerations.
  • Your idea of having a "merged iterator" between the in-memory and spill iterators might make sense; I left some feedback / comments on the current implementation of that idea (I think I've spotted a couple of bugs).

@JoshRosen
Copy link
Contributor

Regarding decreasing the initial memory read buffer size in UnsafeSorterSpillReader from 1MB to 1KB:

Intuitively this also make sense to me: AFAIK this buffer only needs to be large enough to hold a single record and records >= 1MB are probably relatively rare. I'm not sure what the optimal threshold here is, though, since I don't really have data on median record sizes for representative real-world workloads.

@siknezevic
Copy link
Author

Regarding the "Implement lazy initialization of UnsafeSorterSpillReader" portion of the changes:

It looks like @liutang123 previously proposed a similar change in 2018 in #20184 / SPARK-22987. I stumbled across that PR earlier this year and left some comments: overall, I think that lazy initialization of the individual spill readers makes sense.

In order to enable incremental review and merge of these changes, do you think it could make sense to split the lazy spill reader initialization changes into a separate PR? I haven't compared the implementations in detail, so I haven't formed any opinion on the approach here vs. the one in #20184.

Thank you for the comments.
Yes. I am OK with submitting new PR only for lazy spill reader initialization.
I looked #20184 and I can see there is still I/O in the constructor
Line 69: numRecords = numRecordsRemaining = dis.readInt();
But, I am open for suggestions. Please let me know which approach would make more sense.

@siknezevic
Copy link
Author

siknezevic commented Jan 23, 2020

Regarding decreasing the initial memory read buffer size in UnsafeSorterSpillReader from 1MB to 1KB:

Intuitively this also make sense to me: AFAIK this buffer only needs to be large enough to hold a single record and records >= 1MB are probably relatively rare. I'm not sure what the optimal threshold here is, though, since I don't really have data on median record sizes for representative real-world workloads.

I just want to mention that having 1 MB buffer has negative 10X performance impact. The constructor of UnsafeSorterSpillReader takes extra time to create 1M buffer. The benchmark tests with 1K buffer showed good performance

@siknezevic
Copy link
Author

Regarding the ExternalAppendOnlyUnsafeRowArray change:

  • It looks like there's some benchmarks for this class in ExternalAppendOnlyUnsafeRowArrayBenchmark; we should probably re-run those after this change.
  • It seems like a lot of the complexity in ExternalAppendOnlyUnsafeRowArray comes from the fact that we want to define a "do not spill" threshold in terms of an absolute row count. If it wasn't for this, I think we could just directly use an UnsafeExternalSorter here (the performance overhead inserting a record when you don't spill should be roughly the same AFAIK, since both paths are more-or-less just copying the row's memory into the end of buffer). It looks like this was previously discussed at #16909 (comment), where it sounds like the current design was motivated by performance considerations.
  • Your idea of having a "merged iterator" between the in-memory and spill iterators might make sense; I left some feedback / comments on the current implementation of that idea (I think I've spotted a couple of bugs).

I agree it would be good if we could use ExternalUnsafeSorter only. ExternalAppendOnlyUnsafeRowArray class has issue that it does not have protection from OutOfMemory (OOM) like ExternalUnsafeSorter does. OOM issue (and Spark executor loss) would happen at point of doubling of capacity of the ExternalAppendOnlyUnsafeRowArray object. This object gets full and needs to expand and then OOM happens when trying to allocate memory for new “double in capacity” object (ArrayBuffer).
OOM would happen in ResizableArray.scala in ensureSize() line 103 : val newArray: Array[AnyRef] = new Array(newSize.toInt) . I have seen that over and over while testing with 100 TB data set and query 14. To avoid OOM issue you have to spill and then you run into “spilling performance” issue.

@siknezevic
Copy link
Author

I fixed the issues in ExternalAppendOnlyUnsafeRowArray. Next, I coming days will push new PR for lazy spill reader initialization.

@siknezevic
Copy link
Author

I did test to see what is the impact on the spilling performance without changes in ExternalAppendOnlyUnsafeRowArray class.The design of MergerSorterIteretor.hasNext() in ExternalAppendOnlyUnsafeRowArray is critical for the good performance.

The below is the stack of the code execution which has lazy initialization of the spill reader but no changes in ExternalAppendOnlyUnsafeRowArray. As you can see invocation of SpillableArrayIterator.hasNext() will cause initialization of UnsafeSorterSpillReader and it will produce I/0.

The invocation of MergerSorterIteretor.hasNext() will not cause initialization of UnsafeSorterSpillReader, so I/0 will not happen. The startIndex that is passed to MergerSorterIteretor is 0. startIndex initialize currentIndex, so we have currentIndex < numRowBufferedInMemor (0 < numRowBufferedInMemor ) .This means that MergerSorterIteretor.hasNext() will not invoke iterator.hasNext() method which is the case for SpillableArrayIterator.hasNext(). Invocation of iterator.hasNext() is producing I/O.

Could you please let me know if I should still submit PR for lazy spill reader initialization?
This PR without changes in ExternalAppendOnlyUnsafeRowArray class would not improve spilling performance. It would make sense only as “transition” PR that would be followed by PR for ExternalAppendOnlyUnsafeRowArray changes.

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
org.apache.spark.io.ReadAheadInputStream.waitForAsyncReadComplete(ReadAheadInputStream.java:235)
org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:276)
org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:255)
java.io.DataInputStream.readInt(DataInputStream.java:387)
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.readFile(UnsafeSorterSpillReader.java:175)
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.getNumRecords(UnsafeSorterSpillReader.java:72)
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$ChainedIterator.initializeNumRecords(UnsafeExternalSorter.java:733)
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$ChainedIterator.hasNext(UnsafeExternalSorter.java:700)
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray$SpillableArrayIterator.hasNext(ExternalAppendOnlyUnsafeRowArray.scala:214)
org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1$$anon$2.advanceNext(SortMergeJoinExec.scala:278)
org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage12.agg_doAggregateWithKeys_0$(Unknown Source)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage12.processNext(Unknown Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
org.apache.spark.scheduler.Task.run(Task.scala:123)
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

@shanyu
Copy link
Contributor

shanyu commented Feb 18, 2020

Can we get this PR in please?

@jiangxb1987
Copy link
Contributor

OK to test

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label May 29, 2020
@siknezevic
Copy link
Author

Could you please reopen this PR and remove the Stale tag? This PR brings significant performance improvements when spilling is enabled. Thank you

@maropu maropu removed the Stale label May 29, 2020
@maropu
Copy link
Member

maropu commented May 29, 2020

ok to test

@SparkQA
Copy link

SparkQA commented May 29, 2020

Test build #123264 has finished for PR 27246 at commit e48a936.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented May 29, 2020

retest this please

@siknezevic siknezevic closed this Jun 23, 2020
@maropu
Copy link
Member

maropu commented Jun 23, 2020

Why this closed?

@maropu maropu reopened this Jun 23, 2020
@maropu
Copy link
Member

maropu commented Jun 23, 2020

I added micro-benchmark results in the section "Why are the changes needed?" of PR with description of the performance difference.

Could you reformat the description for easy-to-read instead of just copying the benchmark result. That's because the description will be used as commit log messages.

@siknezevic
Copy link
Author

Why this closed?

Sorry, I added info about what I did and I thought the task is done. Apparently I was wrong

@siknezevic
Copy link
Author

I added micro-benchmark results in the section "Why are the changes needed?" of PR with description of the performance difference.

Could you reformat the description for easy-to-read instead of just copying the benchmark result. That's because the description will be used as commit log messages.

I changed the description. Could you please review it? Thank you for the help !!

@maropu
Copy link
Member

maropu commented Jun 25, 2020

Looks okay to me. Anyone could check this? @cloud-fan @dongjoon-hyun @JoshRosen @jiangxb1987

@siknezevic
Copy link
Author

Just checking to see if there is anything on my side that I should do to move this PR. Thank you

@maropu
Copy link
Member

maropu commented Jul 15, 2020

kindly ping: @cloud-fan @dongjoon-hyun @JoshRosen @jiangxb1987

@gatorsmile
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Jul 31, 2020

Test build #126860 has finished for PR 27246 at commit 2e91826.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 31, 2020

Test build #126900 has started for PR 27246 at commit b824828.

@maropu
Copy link
Member

maropu commented Aug 1, 2020

What changes were proposed in this pull request?

The following list of changes will improve SQL execution performance when data is spilled on the disk:
1)	Implement lazy initialization of UnsafeSorterSpillReader - iterator on top of spilled rows:
... During SortMergeJoin (Left Semi Join) execution, the iterator on the spill data is created but no iteration over the data is done.
... Having lazy initialization of UnsafeSorterSpillReader will enable efficient processing of SortMergeJoin even if data is spilled onto disk. Unnecessary I/O will be avoided.
2)	Decrease initial memory read buffer size in UnsafeSorterSpillReader from 1MB to 1KB:
... UnsafeSorterSpillReader constructor takes lot of time due to size of default 1MB memory read buffer.
... The code already has logic to increase the memory read buffer if it cannot fit the data, so decreasing the size to 1K is safe and has positive performance impact.
3)	Improve memory utilization when spilling is enabled in ExternalAppendOnlyUnsafeRowArrey
... In the current implementation, when spilling is enabled, UnsafeExternalSorter object is created and then data moved from ExternalAppendOnlyUnsafeRowArrey object into UnsafeExternalSorter and then ExternalAppendOnlyUnsafeRowArrey object is emptied. Just before ExternalAppendOnlyUnsafeRowArrey object is emptied there are both objects in the memory with the same data. That require double memory and there is duplication of data. This can be avoided.
... In the proposed solution, when spark.sql.sortMergeJoinExec.buffer.in.memory.threshold is reached  adding new rows into ExternalAppendOnlyUnsafeRowArray object stops. UnsafeExternalSorter object is created and new rows are added into this object. ExternalAppendOnlyUnsafeRowArray object retains all rows already added into this object. This approach will enable better memory utilization and avoid unnecessary movement of data from one object into another.

Why are the changes needed?

Testing with TPC-DS 100 TB benchmark data set showed that some of SQLs (example query 14) are not able to run even with extremely large Spark executor memory.
Spark spilling feature has to be enabled, in order to be able to process these SQLs. Processing of SQLs becomes extremely slow when spilling is enabled.
The test of this solution with query 14 and enabled spilling on the disk, showed 500X performance improvements and it didn�t degrade performance of the other SQLs from TPC-DS benchmark.

Does this PR introduce any user-facing change?

No

How was this patch tested?

By running TPC-DS SQLs with different data sets 10 TB and 100 TB
By running all Spark tests.
@SparkQA
Copy link

SparkQA commented Aug 1, 2020

Test build #126922 has finished for PR 27246 at commit 9d5bd99.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Aug 22, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Aug 22, 2020

Test build #127776 has finished for PR 27246 at commit 9d5bd99.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Sep 9, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Sep 9, 2020

Test build #128448 has finished for PR 27246 at commit 9d5bd99.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Dec 19, 2020
@github-actions github-actions bot closed this Dec 20, 2020
@caican00
Copy link
Contributor

@siknezevic Hi, is there any progress on this pr?

@siknezevic
Copy link
Author

No.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet