Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8638] [SQL] Window Function Performance Improvements #7057

Closed
wants to merge 9 commits into from

Conversation

hvanhovell
Copy link
Contributor

Description

Performance improvements for Spark Window functions. This PR will also serve as the basis for moving away from Hive UDAFs to Spark UDAFs. See JIRA tickets SPARK-8638 and SPARK-7712 for more information.

Improvements

  • Much better performance (10x) in running cases (e.g. BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) and UNBOUDED FOLLOWING cases. The current implementation in spark uses a sliding window approach in these cases. This means that an aggregate is maintained for every row, so space usage is N (N being the number of rows). This also means that all these aggregates all need to be updated separately, this takes N*(N-1)/2 updates. The running case differs from the Sliding case because we are only adding data to an aggregate function (no reset is required), we only need to maintain one aggregate (like in the UNBOUNDED PRECEDING AND UNBOUNDED case), update the aggregate for each row, and get the aggregate value after each update. This is what the new implementation does. This approach only uses 1 buffer, and only requires N updates; I am currently working on data with window sizes of 500-1000 doing running sums and this saves a lot of time. The CURRENT ROW AND UNBOUNDED FOLLOWING case also uses this approach and the fact that aggregate operations are communitative, there is one twist though it will process the input buffer in reverse.
  • Fewer comparisons in the sliding case. The current implementation determines frame boundaries for every input row. The new implementation makes more use of the fact that the window is sorted, maintains the boundaries, and only moves them when the current row order changes. This is a minor improvement.
  • A single Window node is able to process all types of Frames for the same Partitioning/Ordering. This saves a little time/memory spent buffering and managing partitions. This will be enabled in a follow-up PR.
  • A lot of the staging code is moved from the execution phase to the initialization phase. Minor performance improvement, and improves readability of the execution code.

Benchmarking

I have done a small benchmark using on time performance data of the month april. I have used the origin as a partioning key, as a result there is quite some variation in window sizes. The code for the benchmark can be found in the JIRA ticket. These are the results per Frame type:

Frame Master SPARK-8638
Entire Frame 2 s 1 s
Sliding 18 s 1 s
Growing 14 s 0.9 s
Shrinking 13 s 1 s

@JoshRosen
Copy link
Contributor

Jenkins, this is ok to test.

@SparkQA
Copy link

SparkQA commented Jun 27, 2015

Test build #35897 has finished for PR 7057 at commit ad7820c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Jun 30, 2015

@hvanhovell Thank you for breaking the original PR to multiple ones. I am reviewing this one. Will add a comment once I finish my first round.

* value of the order by clause and depends on its ordering. The group must be sorted for this to
* produce sensible output.
* - Shifted: The aggregate is a displaced value relative to the position of the given row.
* Examples are Lead and Lag.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we are mixing the concepts of (1) how a frame updates; and (2) how the frame boundary is determined together at here. Let me summarize them separately.

For frame boundary, we have two types, row and range.

For how frame updates, we have four types of frame:

  • Entire partition: The frame is the entire partition, i.e. UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING. For this case, window function will take all rows as inputs and be evaluated once.
  • Growing frame: We only add new rows into the frame, i.e. UNBOUNDED PRECEDING AND .... Every time we move to a new row to process, we add some rows to the frame. We do not remove rows from this frame.
  • Shrinking frame: We only remove rows from the frame, i.e. ... AND UNBOUNDED FOLLOWING. Every time we move to a new row to process, we remove some rows from the frame. We do not add rows to this frame. The frame will originally contain all rows of the partition.
  • Moving frame: Every time we move to a new row to process, we remove some rows from the frame and we add some rows to the frame. Examples are 1 PRECEDING AND CURRENT ROW and 1 FOLLOWING AND 2 FOLLOWING.

I feel summarizing these two concepts separately can help people understand them. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. There are still a few other documentation inconsistencies, and I'll try to fix those as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this PR, one of your main targets is to optimize Growing frame, right? With your optimization, we can just update the aggregation buffer and get the evaluated results instead of creating a buffer for every row.

I see you also try to optimize Shrinking frame by reversing the sort order. Then, we have to take care some functions very carefully (as you mentioned FIRST/LAST). Also, since we reverse the sort order, the window function should be commutative. My concern is that if a user implement a user-defined window function that is commutative, he/she will not get the correct results. I feel the right way is before we add this optimization, we need to first have a separate task to add this kind properties to the function definition. Then, we optimize functions that are safe to optimize. For example, if a window function is commutative (let's say the commutative field in this function is true), we apply this optimization. Otherwise, we do not apply this optimization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR also optimizes the processing of Moving and Shrinking frames:

  • For moving frame processing the number of comparisons are reduced. This didn't look like the most rewarding improvement, but I was surprised to find it did improved performance by quite a margin.
  • Shrinking frames are indeed processed in reverse order. Which makes building it as fast as the growing case (it uses more memory though). I share your concerns, and solving this at the root (the function itself) would indeed be the best. I'll revert this for now, and file a JIRA request for future reference.

@SparkQA
Copy link

SparkQA commented Jul 1, 2015

Test build #36225 timed out for PR 7057 at commit 27b0329 after a configured wait of 175m.

@SparkQA
Copy link

SparkQA commented Jul 1, 2015

Test build #36267 has finished for PR 7057 at commit 34f63f8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* rows from this frame.
* - Shrinking frame: We only remove rows from the frame, i.e. ... AND UNBOUNDED FOLLOWING.
* Every time we move to a new row to process, we remove some rows from the frame. We do not add
* rows to this frame. The frame will originally contain all rows of the partition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a mistake at here. When we have n FOLLOWING AND UNBOUNDED FOLLOWING, the frame will contain all rows of the partition at the beginning. So, we can remove The frame will originally contain all rows of the partition..

@yhuai
Copy link
Contributor

yhuai commented Jul 8, 2015

@hvanhovell I have finished my first round. Sorry for taking a long time. I think I understand the new workflow of the operator and it looks pretty good. I think it will be great if we can have more comments to explain how it works (specially for some important methods like createBoundOrdering). I will focus more on readability of the code in my next round.

@hvanhovell
Copy link
Contributor Author

@yhuai I have updated the PR.

As for the documentation. I will add another section to the general class documentation, which explains the inner workings of the operator. Let me know what else needs some more documentation.

@SparkQA
Copy link

SparkQA commented Jul 9, 2015

Test build #36850 has finished for PR 7057 at commit 480bb05.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Jul 14, 2015

ok to test

@SparkQA
Copy link

SparkQA commented Jul 14, 2015

Test build #37244 has finished for PR 7057 at commit 480bb05.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

(windowSpec.orderSpec, projection(), projection())
}
// Use only the first order expression when the offset is non-null.
else if (windowSpec.orderSpec.size == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we change the code format to something like the following?

if (...) {
  // Comment to explain we reach here.
  ...
} else if () {
  // Comment to explain we reach here.
  ...
} else {
  // Comment to explain we reach here.
  ...
}

@yhuai
Copy link
Contributor

yhuai commented Jul 16, 2015

@hvanhovell I remember you have some benchmarking results. Can you add results to the description? Also, does your benchmark include tests for all of four kinds of frames (entire partition, growing frame, shrinking frame, and moving frame)? It will be good if we can have results for all these kinds of frames and we make sure there is no performance regression (I think it is unlikely that we introduce regression. But, it still good to have benchmarking results for different kinds of cases).

@SparkQA
Copy link

SparkQA commented Jul 16, 2015

Test build #37545 has finished for PR 7057 at commit c3e4287.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor Author

@yhuai the benchmarking results are attached. It might be interesting to see how the operator performs on different datasets.

@SparkQA
Copy link

SparkQA commented Jul 17, 2015

Test build #37567 has finished for PR 7057 at commit 7207ef5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 19, 2015

Test build #37743 has finished for PR 7057 at commit 3bfdc49.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Jul 19, 2015

@hvanhovell Overall looks good. I am merging it to master. I will leave a few comments for minor changes. Can you submit a follow-up PR to address them?

@asfgit asfgit closed this in a9a0d0c Jul 19, 2015
*
* TODO Move this class to the sql/core project when we move to Native Spark UDAFs.
*/
class WindowSuite extends QueryTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we do not need to create a new suite, right? We can just use HiveDataFrameWindowSuite.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants