Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sort pushdown #7871

Closed
Tracked by #10313
backkem opened this issue Oct 19, 2023 · 17 comments
Closed
Tracked by #10313

Support sort pushdown #7871

backkem opened this issue Oct 19, 2023 · 17 comments
Labels
enhancement New feature or request

Comments

@backkem
Copy link
Contributor

backkem commented Oct 19, 2023

Is your feature request related to a problem or challenge?

It seems there is no pushdown for sort order yet in the TableProvider. It seems this could also lead to wrong results if filters and limit is used without specifying the sort order.

Describe the solution you'd like

Either extend supports_filters_pushdown to allow indicating if sorting is supported or add a new negotiation method for sorting specifically. I'm looking for guidance on a solid approach.

Describe alternatives you've considered

No response

Additional context

I wonder if there has been thinking on other pushdown options that could be interesting? Down the line, full query/Substrait pushdown also seems interesting. However, it's less clear how to negotiate partial support in that case.

@backkem backkem added the enhancement New feature or request label Oct 19, 2023
@alamb
Copy link
Contributor

alamb commented Oct 20, 2023

Thank you for bringing this up @backkem

Sort Pushdown?

When you say "sort pushdown" does that mean providing a sort order to TableProvider::scan?

Is the idea that the table providers have some faster way to sort than what is built into DataFusion?

Most of the sort based optimizations are done after TableProvider::scan has been called, so I am not sure how useful a pushed down sort would be

Existing ordering

There is a similar idea (maybe it is what you mean by sort order push down) where sources can tell DataFusion about any pre existing sort orders the data may have (e.g. because a parquet file was written with some sort order)

The TableProvider trait does not have any way to communicate this information directly as it is not used until physcal planning, but the ExecutionPlan returned by TableProvider::scan does allow setting this via ExecutionPlan::output_ordering

You can see an example of how this is hooked up in the built-in ListingTable TableProvider (via ListingTableConfig via ListingOptions via file_sort_order

@backkem
Copy link
Contributor Author

backkem commented Oct 20, 2023

I think there is merit to both cases. My use-case falls more under the former. My table provider wraps a remote DB and I want to fetch only a part of the table, for a simple pagination case. This combines a filter and limit to significantly reduce how much data needs to be transferred from the DB to be combined with other data in-process. Without limit, I exceed latency goals. In this case, the rows that should be returned are dependant on the sort order since the limit statement is restrictive enough that the result otherwise becomes arbitrary.

I know this may be somewhat atypical of a use-case. Happy to hear if this can be accommodated or not.

When working with remote DBs there are also other pushdown opportunities. For example: pushdown of merges across tables in the same DB. Hence why I was pondering about in the direction of 'execution plan federation'. I've seen similar ideas mentioned in a Substrait talk.

@backkem
Copy link
Contributor Author

backkem commented Oct 20, 2023

I just ran across #970 which touches on query federation. Though, while I'd be happy to explore that further, support for sort would already be a decent start.

@alamb
Copy link
Contributor

alamb commented Oct 21, 2023

The sort/limit usecase makes sense. Thank you for the explanation

One thing you could do is create a add custom optimizer passe (OptimizerRule and PhysicalOptimizerRule) that propagates the sort/limit information down into your scan.

@backkem
Copy link
Contributor Author

backkem commented Nov 6, 2023

It looks like we'll need to to support Joins over remote sources after all. Maybe we'll try to implement an ExecutionPlan based on the DataLoader concept. Hopefully that'll help us meet latency requirements when a restrictive limit is set.

Regarding Sort: I do agree (reading between the lines) that this pushdown direction feels like a slippery slope/feature creep. The query federation approach mentioned above seems more flexible. I'll close this for now.

@backkem backkem closed this as completed Nov 6, 2023
@lewiszlw
Copy link
Member

The TableProvider trait does not have any way to communicate this information directly as it is not used until physcal planning, but the ExecutionPlan returned by TableProvider::scan does allow setting this via ExecutionPlan::output_ordering

@alamb ExecutionPlan::output_ordering seems can not support both asc and desc for one column, because TableProvider doesn't pass sort option to scan execution plan.

#[async_trait]
pub trait TableProvider: Sync + Send {
    async fn scan(
        &self,
        state: &SessionState,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
        sort: &[Expr],
    ) -> Result<Arc<dyn ExecutionPlan>>;
}

We could pass sort exprs to scan, then users can construct ExecutionPlan based on sort exprs.

@alamb
Copy link
Contributor

alamb commented Nov 29, 2023

We could pass sort exprs to scan, then users can construct ExecutionPlan based on sort exprs.

I think the challenge is that DataFusion currently treats the sort order from an ExecutionPlan like "if it has a sort order, I will try and use it" rather than "I will try and push Sort into the scan"

Instead, DataFusion will introduce SortExec to resort the data if that is necessary to answer the query.

In order to "push" sorts into ExecutionPlans / scans, we would need some way to help DataFusion figure out if it should push the sort into the scan, or use a Sort Exec afterwards

For example, it is not clear which of the following plans is better as it depends on how the Sort within ExecutionPlan was implemented

SortExec
  Filter
   Scan (no sort)

vs

  Filter
   Scan (Sort in the Scan)

Depending on how selective the filter, it may be better to do the scan / filter and then sort.

of course in this case the filter is likely pushed down to the scan too, but I think in general the same issue still applies

For this usecase, I suggest adding a custom optimizer pass that does the sort pushdown you want and can take advantage of the details of what the underlying source is to make these choices

@lewiszlw
Copy link
Member

Your analysis make sense, thanks for explanation!

@Dandandan
Copy link
Contributor

Dandandan commented Nov 30, 2023

We have a similar use-case for pushing down TopK down to a column.
The idea is to push down TopK to a column, so we can reduce the (S3) scan by only fetching/decoding rows matching the TopK rows.

   TopK
      Scan

->

   Scan (RowFilter=column IN (TopK column))

@backkem
Copy link
Contributor Author

backkem commented Nov 30, 2023

The core problem here is: how far do you want to go with allowing the TableProvider to express compute ability versus a more full-blown form of query federation. The question is: where do you draw the line in expanding the TableProvider trait. Do we allow pushing down sort, aggregation, UDFs, etc.. ?

It may be a good idea to come up with a clear rule of where to draw this line.

One option would be to fully lean into the query federation idea: provide a good framework for that, provide basic implementation with filter/limit/sort pushdown out of the box and foster development of more complex federation cases (such as for remote DBMSs) out of tree.

One notable case regarding the complexity of this kind of pushdown is when you start combining joins with limit/sort pushdown. Not all tables may have the needed columns to sort/filter on. In that case you'd need something like what the Velox docs call Dynamic Filter Pushdown to avoid full table scans.

@alamb
Copy link
Contributor

alamb commented Nov 30, 2023

In that case you'd need something like what the Velox docs call Dynamic Filter Pushdown to avoid full table scans.

I think this is similar to what is described in #7955

@alamb
Copy link
Contributor

alamb commented Nov 30, 2023

The core problem here is: how far do you want to go with allowing the TableProvider to express compute ability versus a more full-blown form of query federation. The question is: where do you draw the line in expanding the TableProvider trait. Do we allow pushing down sort, aggregation, UDFs, etc.. ?

It may be a good idea to come up with a clear rule of where to draw this line.

One option would be to fully lean into the query federation idea: provide a good framework for that, provide basic implementation with filter/limit/sort pushdown out of the box and foster development of more complex federation cases (such as for remote DBMSs) out of tree.

I agree that this sounds like a good idea to explore further

@karlovnv
Copy link

karlovnv commented May 9, 2024

Is the idea that the table providers have some faster way to sort than what is built into DataFusion?

As I understand @backkem wanted to load data from an external datasource and delegate some sorting operations to it.
An executing query like ORDER BY col LIMIT 10 will result in load all the data from an external DB into DataFusion.
And it could be avoided by providing an information about sorting

@backkem
Copy link
Contributor Author

backkem commented May 9, 2024

Indeed, my use-case is querying across remote DBMSs. For now I'm experiencing with another approach in datafusion-federation.

@karlovnv
Copy link

karlovnv commented May 9, 2024

For now I'm experiencing with another approach in datafusion-federation.
@backkem Could you please provide more details of how did you handle sort issue using it?

@backkem
Copy link
Contributor Author

backkem commented May 9, 2024

The federation repo turns (part of) the query plan back into SQL. In the simple case, the query only uses table providers of one remote DBMS. In that case the entire query will be forwarded. In more complex cases, the federation repo splits the plan in pieces that can be sent to each corresponding remote DB. DataFusion pieces together the results to resolve the final query.

@alamb
Copy link
Contributor

alamb commented Jul 12, 2024

A usecase from discord https://discord.com/channels/885562378132000778/1166447479609376850/1261096613565304884

Basically if you have multiple indexes that can provide the data in sorted order, there is no way to have datafusion tell you what a "useful order" might be

I think one of the biggest challenges for providing multiple potential sort orders is that there is no mechanism now yet in DataFusion for tracking and costing multiple different plans (e.g. if one of your sort orders allowed a Sort-Merge-Join and one allow a Streaming grouping, somehow DataFusion would have to pick one of the orders to use -- in other systems this is done with a cost model to estimate the cost of each plan)

So I guess I am saying that this isnt just a matter of plumbing down some exprs into the TableProvider trait, more is required

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants