Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize "per partition" top-k : ROW_NUMBER < 5 / TopK #6899

Open
alamb opened this issue Jul 10, 2023 · 6 comments
Open

Optimize "per partition" top-k : ROW_NUMBER < 5 / TopK #6899

alamb opened this issue Jul 10, 2023 · 6 comments
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jul 10, 2023

Is your feature request related to a problem or challenge?

DataFusion optimizes queries like ... ORDER BY value LIMIT 10 by only keeping the top 10 ("limit") rows when sorting which is great!

Another common pattern (that we also have in IOx) (https://github.com/influxdata/influxdb_iox/pull/8187/files#r1257834347) is queries like the following to select the top N values "per partition"

SELECT ...
  ROW_NUMBER() OVER (PARTITION BY value1, ORDER BY value2) as rn
WHERE
  rn < 10

Currently the plan will be something like:

Filter(rn < 10)
  WindowExec(ROW_NUMBER...)
    Sort(value1, value2)

The problem with this plan is that it will sort (and copy) the ENTIRE input even when the query only needs the first 10 rows of each partition

Describe the solution you'd like

It would be awesome to optimize this case somehow so that it did not need to sort the entire input (and somehow could only keep the top N values per partition). I am not sure how easy this would be to do for sorting

Describe alternatives you've considered

Maybe we could at least teach the window operator to only emit the top N values per partition if there was a row number predicate at at least save some of that work -- the sort would still be required, but at least the window operator would do less work

Additional context

No response

@alamb alamb added the enhancement New feature or request label Jul 10, 2023
@ozankabak
Copy link
Contributor

I think we need an optimization step that transforms the plan you gave to one that uses a fetching sort and does away with the filter. It seems to me the window operator would still be used as is.

@alamb
Copy link
Contributor Author

alamb commented Jul 11, 2023

I think we need an optimization step that transforms the plan you gave to one that uses a fetching sort and does away with the filter. It seems to me the window operator would still be used as is.

I agree the window operator probably should remain as is

Maybe we could use a specialized sort operator like

Filter(rn < 10)
  WindowExec(ROW_NUMBER...)
    PartitionedSort(order_by={value1, value2}, prefix={value1}, fetch = 10)

Where the PartitionedSort semantics are to only output the top 10 values for some prefix of the sort key (in this case, each distinct value of value1)

🤔

@comphead
Copy link
Contributor

Spark does the similar way: it sorts and limits data per partition then sends the output to single partition where final sort/limit performed. Spark has the logic encapsulated in separate operator and looks like https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L311

it also contains optimization like if tuples already ordered it will skip the excessive ordering, the same happens for projection

@yyy1000
Copy link
Contributor

yyy1000 commented Feb 2, 2024

I'd like to help this. (Looks not so difficult I think)
Currently, my thoughts are:

  1. Implement a PartitionedSort PhysicalPlan, which could sort in each partition, and merge them after each partition is sorted.
  2. When there's a PARTITION BY, I think it can be known by input.output_partitioning().partition_count(), match the LogicalPlan::Sort to PartitionedSort in https://github.com/apache/arrow-datafusion/blob/a6ef1bec480872f15f83628a7fb8c9bb2722cd49/datafusion/core/src/physical_planner.rs#L938-L950

A question is whether the fetch in LogicalPlan is what we need in PartitionedSort, (seems not), I could try it.

@alamb
Copy link
Contributor Author

alamb commented Feb 2, 2024

This is one where I would recommend you try hacking up a prototype that works enough to show some performance results, and then get feedback on it before spending too much time polishing. I think this one could easily turn into a large project

@yyy1000
Copy link
Contributor

yyy1000 commented Feb 2, 2024

@alamb Thanks for your reply! Also I'd like to do it after I got enough knowledge. 😎

@alamb alamb changed the title Optimize "per partition" top-k : ROW_NUMBER < 5 Optimize "per partition" top-k : ROW_NUMBER < 5 / TopK Jul 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants