Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add push down sort to the source (table provider) #10433

Open
karlovnv opened this issue May 9, 2024 · 4 comments
Open

Add push down sort to the source (table provider) #10433

karlovnv opened this issue May 9, 2024 · 4 comments
Labels
enhancement New feature or request

Comments

@karlovnv
Copy link

karlovnv commented May 9, 2024

Is your feature request related to a problem or challenge?

Consider we have huge data source consists of many record batches.
Now it's impossible to get last recent N rows without full scan:

SELECT * FROM Events
ORDER BE event_time DESC
LIMIT 1000

The query above will do full scan from the starting row. But we can avoid it if we let Provider know to perform scanning from the end to start.

It the example TableProvider may know that it needed to provide only last record batch (or the latest parquet file from folder).

Describe the solution you'd like

Now we have filter and limit in TableProvider::scan:

async fn scan(
        &self,
        state: &SessionState,
        projection: Option<&Vec<usize>>,
        // filters and limit can be used here to inject some push-down operations if needed
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {

Let's add SortExpression as well to push it down or just consider:

async fn scan(
        &self,
        state: &SessionState,
        projection: Option<&Vec<usize>>,
        // filters and limit can be used here to inject some push-down operations if needed
        filters: &[Expr],
        // sort expression
        expr: &[PhysicalSortExpr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {

Describe alternatives you've considered

Also we can add method with_sorting(self: Arc<Self>, ... ) to the trait ExecutionPlan and add an ability of pushing down sorting during optimization phase.

But I think that sorting is something fundamental so it's better to add it to TableProvider::scan

Additional context

No response

@karlovnv karlovnv added the enhancement New feature or request label May 9, 2024
@alamb
Copy link
Contributor

alamb commented May 9, 2024

Possibly related: #7871

@karlovnv
Copy link
Author

karlovnv commented May 9, 2024

Possibly related: #7871

@alamb Thank you for the reply!

I've read discussion in #7871 and think that this case is different.

I don't want to say that MySourceExec can do sort better than DF does.
I'd like to tell to MySourceExec how it should be load an external batches of data: from the beginning or from the tail (in reverse order).

How can I do that without knowing about sorting?

@alamb
Copy link
Contributor

alamb commented May 9, 2024

It the example TableProvider may know that it needed to provide only last record batch (or the latest parquet file from folder).

The provider can tell DataFusion it produces data in some pre-existing order via
https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.properties
which can then communicate the sortedness via https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.PlanProperties.html#method.output_ordering

However, that usecase is for data that is already always sorted. It doesn't permit DataFusion to say "I won't use the sort order you reported so you don't have to honor the contract" 🤔

@alamb
Copy link
Contributor

alamb commented May 9, 2024

(BTW @NGA-TRAN and I worked on a very similar feature in InfluxDB IOx -- and we implemented a special operator that knows how to do this "read only the most recent file" for queries very much like your example above

SELECT * FROM Events
ORDER BE event_time DESC
LIMIT 1000

There is additional discussion / links on #10313

I think we would be interested in collaborating / pushing some of this logic upstream to datafusion. @matthewmturner and @suremarc may also be interested

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants