Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7078] [SPARK-7079] Binary processing sort for Spark SQL #6444

Closed
wants to merge 67 commits into from

Conversation

JoshRosen
Copy link
Contributor

This patch adds a cache-friendly external sorter which operates on serialized bytes and uses this sorter to implement a new sort operator for Spark SQL and DataFrames.

Overview of the new sorter

The new sorter design is inspired by Alphasort and implements a key-prefix optimization in order to improve the cache friendliness of the sort. In naive sort implementations, the sorting algorithm operates on an array of record pointers. To compare two records for ordering, the sorter must dereference these pointers, which likely involves random memory access, then compare the objects themselves.

image

In a key-prefix sort, the sort operates on an array which stores the record pointer alongside a prefix of the record's key. When comparing two records for ordering, the sorter first compares the the stored key prefixes. If the ordering can be determined from the key prefixes (i.e. the prefixes are unequal), then the sort can avoid directly comparing the records, avoiding random memory accesses and full record comparisons. For example, if we're sorting a list of strings then we can store the first 8 bytes of the UTF-8 encoded string as the key-prefix and can perform unsigned byte-at-a-time comparisons to determine the ordering of strings based on their prefixes, only resorting to full comparisons for strings that share a common prefix. In cases where the sort key can fit entirely in the space allotted for the key prefix (e.g. the sorting key is an integer), we completely avoid direct record comparison.

In this patch's implementation of key-prefix sorting, our sorter's internal array stores a 64-bit long and 64-bit pointer for each record being sorted. The key prefixes are generated by the user when inserting records into the sorter, which uses a user-defined comparison function for comparing them. The PrefixComparators object implements a set of comparators for many common types, including primitive numeric types and UTF-8 strings.

The actual sorting is implemented by UnsafeInMemorySorter. Most consumers will not use this directly, but instead will use UnsafeExternalSorter, a class which implements a sort that can spill to disk in response to memory pressure. Internally, UnsafeExternalSorter creates UnsafeInMemorySorters to perform sorting and uses UnsafeSortSpillReader/Writer to spill and read back runs of sorted records and UnsafeSortSpillMerger to merge multiple sorted spills into a single sorted iterator. This external sorter integrates with Spark's existing ShuffleMemoryManager for controlling spilling.

Many parts of this sorter's design are based on / copied from the more specialized external sort implementation that I designed for the new UnsafeShuffleManager write path; see #5868 for more details on that patch.

Sorting rows in Spark SQL

For now, UnsafeExternalSorter is only used by Spark SQL, which uses it to implement a new sort operator, UnsafeExternalSort. This sort operator uses a SQL-specific class called UnsafeExternalRowSorter that configures an UnsafeExternalSorter to use prefix generators and comparators that operate on rows encoded in the UnsafeRow format that was designed for Project Tungsten.

I used some interesting unit-testing techniques to test this patch's SQL-specific components. UnsafeExternalSortSuite uses the SQL random data generators introduced in #7176 to test the UnsafeSort operator with all atomic types both with and without nullability and in both ascending and descending sort orders. PrefixComparatorsSuite contains a cool use of ScalaCheck + ScalaTest's GeneratorDrivenPropertyChecks in order to test UTF8String prefix comparison.

Misc. additional improvements made in this patch

This patch made several miscellaneous improvements to related code in Spark SQL:

  • The logic for selecting physical sort operator implementations, which was partially duplicated in both Exchange and SparkStrategies, has now been consolidated into agetSortOperator()helper function inSparkStrategies`.
  • The SparkPlanTest unit testing helper trait has been extended with new methods for comparing the output produced by two different physical plans. This makes it easy to write tests which assert that two physical operator implementations should produce the same output. I also added a method for disabling the implicit sorting of outputs prior to comparing them, a change which is necessary in order to be able to write proper SparkPlan tests for sort operators.

Tasks deferred to followup patches

While most of this patch's features are reasonably well-tested and complete, there are a number of tasks that are intentionally being deferred to followup patches:

  • Add tests which mock the ShuffleMemoryManager to check that memory pressure properly triggers spilling (there are examples of this type of test in [SPARK-7081] Faster sort-based shuffle path using binary processing cache-aware sort #5868).
  • Add tests to ensure that spill files are properly cleaned up after errors. I'd like to do this in the context of a patch which introduces more general metrics for ensuring proper cleanup of tasks' temporary files; see https://issues.apache.org/jira/browse/SPARK-8966 for more details.
  • Metrics integration: there are some open questions regarding how to track / report spill metrics for non-shuffle operations, so I've deferred most of the IO / shuffle metrics integration for now.
  • Performance profiling.

Review on Reviewable

@SparkQA
Copy link

SparkQA commented May 27, 2015

Test build #33615 has finished for PR 6444 at commit 2c6a389.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 27, 2015

Test build #33624 has finished for PR 6444 at commit 6261ded.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 4, 2015

Test build #34166 has finished for PR 6444 at commit cc59ddd.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 5, 2015

Test build #34333 has finished for PR 6444 at commit aeed5eb.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 6, 2015

Test build #34334 has finished for PR 6444 at commit 4dbd4ce.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 12, 2015

Test build #34723 has finished for PR 6444 at commit c13b7ea.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 14, 2015

Test build #34857 has finished for PR 6444 at commit c125027.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen JoshRosen changed the title [WIP] Binary processing external sort for SQL's sort-merge join [WIP] Binary processing sort for Spark SQL Jun 18, 2015
@SparkQA
Copy link

SparkQA commented Jun 18, 2015

Test build #35152 has finished for PR 6444 at commit 07d3c04.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Override
public void loadNext() throws IOException {
keyPrefix = din.readLong();
ByteStreams.readFully(in, arr, 0, nextRecordLength);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: we need to grow the buffer dynamically in case we're reading really big records. Add a test for this.

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35182 has finished for PR 6444 at commit 5c64837.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35187 has finished for PR 6444 at commit 6663172.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35198 has finished for PR 6444 at commit 5140be0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 22, 2015

Test build #35476 has finished for PR 6444 at commit 526a1bb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

@zsxwing, there's still a lot that I have to implement here, but if you have a chance could you review the pieces that are here now? Most of this code is substantially similar to the earlier Tungsten shuffle sorter patch that you reviewed, so hopefully most of it is easy to follow and looks okay. There's a Reviewable link in the pull request description if you prefer to use that to manage the long review.

@SparkQA
Copy link

SparkQA commented Jun 28, 2015

Test build #35926 has finished for PR 6444 at commit c74851b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

execution.Sort(sortExprs, global = false, planLater(child)) :: Nil
case logical.Sort(sortExprs, global, child) if sqlContext.conf.externalSortEnabled =>
execution.ExternalSort(sortExprs, global, planLater(child)):: Nil
getSortOperator(sortExprs, global = false, planLater(child)) :: Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove global =?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good to have it, nvm

@JoshRosen
Copy link
Contributor Author

I'm in the process of updating this to reflect @davies' recent changes to support arbitrary column types in UnsafeRow. One bit of trickiness is the fact that UnsafeExternalSorter, which manages the spilling, doesn't know anything about SQL and thus can't directly implement the logic for spilling and reading ObjectPools back during the merge. In the worst case, I guess that the merging step might need to keep a bunch of ObjectPools in memory at the same time, which could end up causing us to OOM, but in those cases we would have OOM'd before, too.

@JoshRosen
Copy link
Contributor Author

In order to address the object spilling case, I'm going to need to write a failing test case first. In order to do this, I'm going to have to borrow some of the manual spill triggering test interfaces that I added for UnsafeShuffle so that I can write deterministic unit tests which hit the spill paths with small inputs.

@JoshRosen
Copy link
Contributor Author

In the process of writing a real PR description so please don't merge prematurely

@SparkQA
Copy link

SparkQA commented Jul 10, 2015

Test build #36992 has finished for PR 6444 at commit 2f48777.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Jul 10, 2015

LGTM

@SparkQA
Copy link

SparkQA commented Jul 10, 2015

Test build #36998 has finished for PR 6444 at commit 5135200.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@JoshRosen
Copy link
Contributor Author

@davies, there's one minor change in SparkPlanTest that I want to partially undo: the sorting prior to output comparison should continue to be opt-out, not opt-in, since you want sorting most of the time. Let me push another commit for this real quick.

@JoshRosen JoshRosen changed the title [SPARK-7078] [SPARK-7079] [WIP] Binary processing sort for Spark SQL [SPARK-7078] [SPARK-7079] Binary processing sort for Spark SQL Jul 10, 2015
@SparkQA
Copy link

SparkQA commented Jul 10, 2015

Test build #37018 has finished for PR 6444 at commit 35dad9f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 10, 2015

Test build #37015 has finished for PR 6444 at commit 5135200.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Jul 10, 2015

Test build #37045 has finished for PR 6444 at commit 35dad9f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

This is failing tests because my change to SparkPlanTest introduced multiple overloaded methods with default arguments, which isn't legal in Scala. Let me see if I can find a clean way to fix this.

@JoshRosen
Copy link
Contributor Author

I hacked around the default args. issue by removing many overloaded methods in SparkPlanTest. Please take a look.

@SparkQA
Copy link

SparkQA commented Jul 10, 2015

Test build #37051 has finished for PR 6444 at commit 6beb467.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Jul 10, 2015

LGTM, The last failure is caused by an known flaky test, I think this one is good to go.

@SparkQA
Copy link

SparkQA commented Jul 10, 2015

Test build #1041 has finished for PR 6444 at commit 6beb467.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Jul 10, 2015

Thanks - I'm going to merge this and review more next week. We can address feedbacks incrementally for this one.

JoshRosen added a commit that referenced this pull request Jul 11, 2015
This patch adds a cache-friendly external sorter which operates on serialized bytes and uses this sorter to implement a new sort operator for Spark SQL and DataFrames.

### Overview of the new sorter

The new sorter design is inspired by [Alphasort](http://research.microsoft.com/pubs/68249/alphasort.doc) and implements a key-prefix optimization in order to improve the cache friendliness of the sort.  In naive sort implementations, the sorting algorithm operates on an array of record pointers.  To compare two records for ordering, the sorter must dereference these pointers, which likely involves random memory access, then compare the objects themselves.

![image](https://cloud.githubusercontent.com/assets/50748/8611390/3b1402ae-2675-11e5-8308-1a10bf347e6e.png)

In a key-prefix sort, the sort operates on an array which stores the record pointer alongside a prefix of the record's key. When comparing two records for ordering, the sorter first compares the the stored key prefixes. If the ordering can be determined from the key prefixes (i.e. the prefixes are unequal), then the sort can avoid directly comparing the records, avoiding random memory accesses and full record comparisons. For example, if we're sorting a list of strings then we can store the first 8 bytes of the UTF-8 encoded string as the key-prefix and can perform unsigned byte-at-a-time comparisons to determine the ordering of strings based on their prefixes, only resorting to full comparisons for strings that share a common prefix.  In cases where the sort key can fit entirely in the space allotted for the key prefix (e.g. the sorting key is an integer), we completely avoid direct record comparison.

In this patch's implementation of key-prefix sorting, our sorter's internal array stores a 64-bit long and 64-bit pointer for each record being sorted. The key prefixes are generated by the user when inserting records into the sorter, which uses a user-defined comparison function for comparing them.  The `PrefixComparators` object implements a set of comparators for many common types, including primitive numeric types and UTF-8 strings.

The actual sorting is implemented by `UnsafeInMemorySorter`.  Most consumers will not use this directly, but instead will use `UnsafeExternalSorter`, a class which implements a sort that can spill to disk in response to memory pressure.  Internally, `UnsafeExternalSorter` creates `UnsafeInMemorySorters` to perform sorting and uses `UnsafeSortSpillReader/Writer` to spill and read back runs of sorted records and `UnsafeSortSpillMerger` to merge multiple sorted spills into a single sorted iterator.  This external sorter integrates with Spark's existing ShuffleMemoryManager for controlling spilling.

Many parts of this sorter's design are based on / copied from the more specialized external sort implementation that I designed for the new UnsafeShuffleManager write path; see #5868 for more details on that patch.

### Sorting rows in Spark SQL

For now, `UnsafeExternalSorter` is only used by Spark SQL, which uses it to implement a new sort operator, `UnsafeExternalSort`.  This sort operator uses a SQL-specific class called `UnsafeExternalRowSorter` that configures an `UnsafeExternalSorter` to use prefix generators and comparators that operate on rows encoded in the UnsafeRow format that was designed for Project Tungsten.

I used some interesting unit-testing techniques to test this patch's SQL-specific components.  `UnsafeExternalSortSuite` uses the SQL random data generators introduced in #7176 to test the UnsafeSort operator with all atomic types both with and without nullability and in both ascending and descending sort orders.  `PrefixComparatorsSuite` contains a cool use of ScalaCheck + ScalaTest's `GeneratorDrivenPropertyChecks` in order to test UTF8String prefix comparison.

### Misc. additional improvements made in this patch

This patch made several miscellaneous improvements to related code in Spark SQL:

- The logic for selecting physical sort operator implementations, which was partially duplicated in both `Exchange` and `SparkStrategies, has now been consolidated into a `getSortOperator()` helper function in `SparkStrategies`.
- The `SparkPlanTest` unit testing helper trait has been extended with new methods for comparing the output produced by two different physical plans. This makes it easy to write tests which assert that two physical operator implementations should produce the same output.  I also added a method for disabling the implicit sorting of outputs prior to comparing them, a change which is necessary in order to be able to write proper SparkPlan tests for sort operators.

### Tasks deferred to followup patches

While most of this patch's features are reasonably well-tested and complete, there are a number of tasks that are intentionally being deferred to followup patches:

- Add tests which mock the ShuffleMemoryManager to check that memory pressure properly triggers spilling (there are examples of this type of test in #5868).
- Add tests to ensure that spill files are properly cleaned up after errors.  I'd like to do this in the context of a patch which introduces more general metrics for ensuring proper cleanup of tasks' temporary files; see https://issues.apache.org/jira/browse/SPARK-8966 for more details.
- Metrics integration: there are some open questions regarding how to track / report spill metrics for non-shuffle operations, so I've deferred most of the IO / shuffle metrics integration for now.
- Performance profiling.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/6444)
<!-- Reviewable:end -->

Author: Josh Rosen <joshrosen@databricks.com>

Closes #6444 from JoshRosen/sql-external-sort and squashes the following commits:

6beb467 [Josh Rosen] Remove a bunch of overloaded methods to avoid default args. issue
2bbac9c [Josh Rosen] Merge remote-tracking branch 'origin/master' into sql-external-sort
35dad9f [Josh Rosen] Make sortAnswers = false the default in SparkPlanTest
5135200 [Josh Rosen] Fix spill reading for large rows; add test
2f48777 [Josh Rosen] Add test and fix bug for sorting empty arrays
d1e28bc [Josh Rosen] Merge remote-tracking branch 'origin/master' into sql-external-sort
cd05866 [Josh Rosen] Fix scalastyle
3947fc1 [Josh Rosen] Merge remote-tracking branch 'origin/master' into sql-external-sort
d13ac55 [Josh Rosen] Hacky approach to copying of UnsafeRows for sort followed by limit.
845bea3 [Josh Rosen] Remove unnecessary zeroing of row conversion buffer
c56ec18 [Josh Rosen] Clean up final row copying code.
d31f180 [Josh Rosen] Re-enable NullType sorting test now that SPARK-8868 is fixed
844f4ca [Josh Rosen] Merge remote-tracking branch 'origin/master' into sql-external-sort
293f109 [Josh Rosen] Add missing license header.
f99a612 [Josh Rosen] Fix bugs in string prefix comparison.
9d00afc [Josh Rosen] Clean up prefix comparators for integral types
88aff18 [Josh Rosen] NULL_PREFIX has to be negative infinity for floating point types
613e16f [Josh Rosen] Test with larger data.
1d7ffaa [Josh Rosen] Somewhat hacky fix for descending sorts
08701e7 [Josh Rosen] Fix prefix comparison of null primitives.
b86e684 [Josh Rosen] Set global = true in UnsafeExternalSortSuite.
1c7bad8 [Josh Rosen] Make sorting of answers explicit in SparkPlanTest.checkAnswer().
b81a920 [Josh Rosen] Temporarily enable only the passing sort tests
5d6109d [Josh Rosen] Fix inconsistent handling / encoding of record lengths.
87b6ed9 [Josh Rosen] Fix critical issues in test which led to false negatives.
8d7fbe7 [Josh Rosen] Fixes to multiple spilling-related bugs.
82e21c1 [Josh Rosen] Force spilling in UnsafeExternalSortSuite.
88b72db [Josh Rosen] Test ascending and descending sort orders.
f27be09 [Josh Rosen] Fix tests by binding attributes.
0a79d39 [Josh Rosen] Revert "Undo part of a SparkPlanTest change in #7162 that broke my test."
7c3c864 [Josh Rosen] Undo part of a SparkPlanTest change in #7162 that broke my test.
9969c14 [Josh Rosen] Merge remote-tracking branch 'origin/master' into sql-external-sort
5822e6f [Josh Rosen] Fix test compilation issue
939f824 [Josh Rosen] Remove code gen experiment.
0dfe919 [Josh Rosen] Implement prefix sort for strings (albeit inefficiently).
66a813e [Josh Rosen] Prefix comparators for float and double
b310c88 [Josh Rosen] Integrate prefix comparators for Int and Long (others coming soon)
95058d9 [Josh Rosen] Add missing SortPrefixUtils file
4c37ba6 [Josh Rosen] Add tests for sorting on all primitive types.
6890863 [Josh Rosen] Fix memory leak on empty inputs.
d246e29 [Josh Rosen] Fix consideration of column types when choosing sort implementation.
6b156fb [Josh Rosen] Some WIP work on prefix comparison.
7f875f9 [Josh Rosen] Commit failing test demonstrating bug in handling objects in spills
41b8881 [Josh Rosen] Get UnsafeInMemorySorterSuite to pass (WIP)
90c2b6a [Josh Rosen] Update test name
6d6a1e6 [Josh Rosen] Centralize logic for picking sort operator implementations
9869ec2 [Josh Rosen] Clean up Exchange code a bit
82bb0ec [Josh Rosen] Fix IntelliJ complaint due to negated if condition
1db845a [Josh Rosen] Many more changes to harmonize with shuffle sorter
ebf9eea [Josh Rosen] Harmonization with shuffle's unsafe sorter
206bfa2 [Josh Rosen] Add some missing newlines at the ends of files
26c8931 [Josh Rosen] Back out some Hive changes that aren't needed anymore
62f0bb8 [Josh Rosen] Update to reflect SparkPlanTest changes
21d7d93 [Josh Rosen] Back out of BlockObjectWriter change
7eafecf [Josh Rosen] Port test to SparkPlanTest
d468a88 [Josh Rosen] Update for InternalRow refactoring
269cf86 [Josh Rosen] Back out SMJ operator change; isolate changes to selection of sort op.
1b841ca [Josh Rosen] WIP towards copying
b420a71 [Josh Rosen] Move most of the existing SMJ code into Java.
dfdb93f [Josh Rosen] SparkFunSuite change
73cc761 [Josh Rosen] Fix whitespace
9cc98f5 [Josh Rosen] Move more code to Java; fix bugs in UnsafeRowConverter length type.
c8792de [Josh Rosen] Remove some debug logging
dda6752 [Josh Rosen] Commit some missing code from an old git stash.
58f36d0 [Josh Rosen] Merge in a sketch of a unit test for the new sorter (now failing).
2bd8c9a [Josh Rosen] Import my original tests and get them to pass.
d5d3106 [Josh Rosen] WIP towards external sorter for Spark SQL.
@JoshRosen
Copy link
Contributor Author

Looks like this was merged but GitHub didn't auto-close.

@JoshRosen JoshRosen closed this Jul 11, 2015
*
* [# of records (int)] [[len (int)][prefix (long)][data (bytes)]...]
*/
final class UnsafeSorterSpillWriter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this class declare 'implement Closeable' ?
This way, its caller would be able to use try-with-resources construct

@JoshRosen JoshRosen deleted the sql-external-sort branch March 10, 2017 03:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants