Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32096][SQL] Improve sorting performance of Spark SQL window function by removing window partition key from sort order #29725

Closed
wants to merge 8 commits into from

Conversation

xuzikun2003
Copy link

What changes were proposed in this pull request?

Spark SQL rank window function needs to sort the data in each window partition, and it relies on the execution operator SortExec to do the sort. During sorting, the window partition key is also put at the front of the sort order and thus it brings unnecessary comparisons on the partition key. Instead, we can group the rows by the partition key first, and inside each group we sort the rows without comparing the partition key.​

We use a HashMap to store the mapping between a partition key and a sorter. All the rows corresponding to a single partition key will be inserted into the same sorter. Each sorter will sort its rows. The partition keys stored in the HashMap will also be sorted at the end. When the sort operator is ready to return the rows to the window operator, it will follow the order of the partition key to go over each sorter, and each sorter will return the rows in the window order decided by the SQL syntax “ORDER BY”.

As we cannot store an unlimited number of key-value pairs in the HashMap, we set an upper bound for the number of pairs. If the number of distinct keys in the HashMap reaches the limit, the new incoming rows will be inserted to the main sorter. This main sorter will sort the rows in the order of the partition key plus the window order. If the number of distinct keys in the HashMap is under the limit, the main sorter will be always empty.

When there are two sequences of sorted rows in both the HashMap and the main sorter, we follow a merge sort to return the rows. We compare the next row ready to return from the HashMap and the next row ready to return from the main sorter, and always choose the one with a higher rank to return.

Why are the changes needed?

This is the related JIRA
https://issues.apache.org/jira/browse/SPARK-32096?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=17147504

This change brings performance improvement for window function. This is the change of performance when running q67 of TPCDS-1TB benchmark.
Query | Time in seconds (master) | Time in seconds (perf patch)
67-v2.4 | 450.515 | 226.124
This is the change of performance when running q67 of TPCDS-10TB benchmark.
Query | Time in seconds (master) | Time in seconds (perf patch)
67-v2.4 | 2486.404 | 1168.709
While this change brings performance improvement to query 67, it does not bring performance regression to other queries of TPCDS-1TB or TPCDS-10TB.

Does this PR introduce any user-facing change?

No

How was this patch tested?

  1. existing unit tests
  2. newly added unit tests

@xuzikun2003 xuzikun2003 changed the title Improve sorting performance of Spark SQL window function by removing window partition key from sort order [SPARK-32096] [SQL] Improve sorting performance of Spark SQL window function by removing window partition key from sort order Sep 11, 2020
@xuzikun2003 xuzikun2003 changed the title [SPARK-32096] [SQL] Improve sorting performance of Spark SQL window function by removing window partition key from sort order [SPARK-32096][SQL] Improve sorting performance of Spark SQL window function by removing window partition key from sort order Sep 11, 2020
@HyukjinKwon
Copy link
Member

cc @hvanhovell FYI

@hvanhovell
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Sep 28, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33798/

@SparkQA
Copy link

SparkQA commented Sep 28, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33798/

@SparkQA
Copy link

SparkQA commented Sep 28, 2020

Test build #129184 has finished for PR 29725 at commit 77d615a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 5, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34014/

@SparkQA
Copy link

SparkQA commented Oct 5, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34014/

@SparkQA
Copy link

SparkQA commented Oct 5, 2020

Test build #129407 has finished for PR 29725 at commit 6b8ca20.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 11, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34240/

@SparkQA
Copy link

SparkQA commented Oct 11, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34240/

@SparkQA
Copy link

SparkQA commented Oct 11, 2020

Test build #129636 has finished for PR 29725 at commit 7604f53.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 11, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34245/

@SparkQA
Copy link

SparkQA commented Oct 11, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34245/

@SparkQA
Copy link

SparkQA commented Oct 11, 2020

Test build #129641 has finished for PR 29725 at commit 32a6714.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuzikun2003
Copy link
Author

@hvanhovell , do you have a chance to review this pull request?

(Supplier<RecordComparator>)null,
prefixComparatorInWindow,
prefixComputerInWindow,
pageSizeBytes/totalNumSorters,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this page size is related to totalNumSorters?
consider the situation there are 100 different partition keys in one task. the pageSize will be 1/100 of the original pageSize which will lead to 100 times of page allocate.
Could you please explain why you want to reduce the page size here?

private final UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow;
private final boolean canUseRadixSortInWindow;
private final long pageSizeBytes;
private static final int windowSorterMapMaxSize = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the windowSorterMapMaxSize is 1?
so the windowSorterMap can only have 1 sorter?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same question. Could you parameterize it via SQLConf?

Copy link
Author

@xuzikun2003 xuzikun2003 Nov 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu , @opensky142857, here are the reasons for why we set the windowSorterMapMaxSize to be 1 and why we reduce the page size of each sorter.

Each UnsafeExternalRowSorter is using a different memory consumer. Whenever you insert the first row into an UnsafeExternalRowSorter, the memory consumer of this sorter will allocate a whole page to the sorter. In our perf run of TPCDS100TB, the default page size is 64MB. If we insert only a few rows into a sorter corresponding to a window, then a lot of memory resources are wasted and the non-necessary memory allocation also brings significant performance overhead. So that is why we do two things in this PR:

  1. Keep the number of window sorters small
  2. Decrease the page size of each window sorter.

To address this problem, actually we have two directions to go.

One direction is that we can let these window sorters share the same memory consumer. Thus we won't allocate many big pages to which very few rows are inserted. But this direction requires a lot of engineer effort to refactor the code of UnsafeExternalSorter.

The second direction is that we only keep one window sorter for each physical partition.

Here is why we choose the second direction. When we run TPCDS100TB, we are not seeing Spark engine is slow in sorting many windows in a physical partition. We are seeing Spark engine is slow in sorting a single window in a single physical partition (q67 is the case), and the executor is doing a lot of unnecessary comparisons on the window partition key. To address the slowness that we observe, we follow the second direction to keep only one window sorter for each physical partition. And this single window sorter in each physical partition does not need to compare the window partition key and thus it runs almost 2 times faster.

Perhaps I can rename these parameters to avoid confusion. How do you guys think?

Copy link
Contributor

@opensky142857 opensky142857 Nov 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you consider to reduce the page size with a fixed ratio instead of choosing to only optimize for 1 partition in each task?
In my understanding, it is not a rare case that one task handles several partition keys.

Copy link
Author

@xuzikun2003 xuzikun2003 Nov 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our current setting, we have one main sorter and one window sorter. If there is only window partition key on a physical partition, then all the rows will go to the window sorter and the main sorter will be empty; if there are more than one window partition key in a physical partition, then one window partition key goes to the window sorter and remaining partition keys go to the main sorter.

We just reduce the original page size by half in these two sorters. We observe that halving the page size gives no performance difference in the overall TPCDS 100TB run. The advantage is that if there are very few rows inserted to the window sorter, then less memory will be wasted in the first page allocated for the window sorter and thus less overhead caused by the memory allocation of the first page.

@opensky142857, You are right, it is not a rare case that one task handles several partition keys but reducing the page size by half wouldn't make much difference. The default page size is 64MB, and there is no performance difference between 64MB page size and 32 MB page size. We can also keep the page size of the main sorter unchanged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this PR is kind of like a TPCDS specific optimization to me if we make windowSorterMapMaxSize=1 since it works the best when the number of partition keys is fewer than the task numbers.

but the code itself looks like a general optimization.

have you ever considered to set windowSorterMapMaxSize to 10 or 100 and reduce page size in the meantime?
so we can accept some limited memory waste in exchange for the ability to handle more general cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my current impression, we need to try the first direction for achieving this optimization w/o the high memory pressure as you pointed out above. Probably, we need to implement (or, extend?) BytesToBytesMap-like data structure (values in the map need to be sorted in an output iterator) instead of using HashMap.

private final UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow;
private final boolean canUseRadixSortInWindow;
private final long pageSizeBytes;
private static final int windowSorterMapMaxSize = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same question. Could you parameterize it via SQLConf?

private final long pageSizeBytes;
private static final int windowSorterMapMaxSize = 1;
private static final int totalNumSorters = windowSorterMapMaxSize + 1;
private final HashMap<UnsafeRow,AbstractUnsafeExternalRowSorter> windowSorterMap;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: HashMap<UnsafeRow, AbstractUnsafeExternalRowSorter>

private UnsafeExternalRowSorter createUnsafeExternalRowSorterForWindow() throws IOException {
UnsafeExternalRowSorter sorter = null;
try {
if (this.orderingInWindow == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When orderingInWindow == null, we need WindowSortExec?

prefixComparatorInWindow,
prefixComputerInWindow,
pageSizeBytes/totalNumSorters,
false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: pageSizeBytes/totalNumSorters, -> pageSizeBytes / totalNumSorters,

}

/**
* Returns an UnsafeExternalRowWindowSorter object.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wrong format.

partitionSpec: Seq[Expression],
sortOrderInWindow: Seq[SortOrder],
sortOrderAcrossWindows: Seq[SortOrder],
global: Boolean,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we don't need global for this node.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, we can remove this parameter.

override def compare(prefix1: Long, prefix2: Long): Int = 0
}

if (sortOrderInWindow == null || sortOrderInWindow.size == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When orderingInWindow == null or ortOrderInWindow.size == 0, we need WindowSortExec?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't run WindowSortExec when orderingInWindow == null or ortOrderInWindow.size == 0. We run the original SortExec if there is no need to sort within each group.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, It seems we don't this if section?

Comment on lines +192 to +207
val enableRadixSort = sqlContext.conf.enableRadixSort

lazy val boundSortExpression = BindReferences.bindReference(sortOrder.head, output)
lazy val ordering = RowOrdering.create(sortOrder, output)
lazy val sortPrefixExpr = SortPrefix(boundSortExpression)

// The comparator for comparing prefix
lazy val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression)

// The generator for prefix
lazy val prefixComputer = createPrefixComputer(sortPrefixExpr)

lazy val canUseRadixSort = enableRadixSort && sortOrder.length == 1 &&
SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)

lazy val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add protected for the variables above.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will add it.

}

/**
* Performs (external) sorting for multiple windows.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you leave some comments about what's a difference from SortExec?

import org.apache.spark.util.collection.unsafe.sort.PrefixComparator;
import org.apache.spark.util.collection.unsafe.sort.RecordComparator;

public final class UnsafeExternalRowWindowSorter extends AbstractUnsafeExternalRowSorter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you leave some comments about what's a difference from UnsafeExternalRowSorter?

SortExec(requiredOrdering, global = false, child = child)
operator match {
case WindowExec(_, partitionSpec, orderSpec, _)
if (!partitionSpec.isEmpty && !orderSpec.isEmpty) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isEmpty -> nonEmpty

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, will fix it.

global,
child,
testSpillFrequency) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add assert(partitionSpec.nonEmpty && sortOrderInWindow.nonEmpty, "XXX") here?

AbstractUnsafeExternalRowSorter sorter = createUnsafeExternalRowSorterForWindow();

if (sorter == null) {
this.mainSorter.spill();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we fail to create the new sorter, why we need to spill the main sorter?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Feb 28, 2021
@github-actions github-actions bot closed this Mar 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants