Skip to content

Conversation

@mustafasrepo
Copy link
Contributor

@mustafasrepo mustafasrepo commented Apr 10, 2023

Which issue does this PR close?

Closes #.

Rationale for this change

Sometimes PipelineFixer will receive executors that cannot run on unbounded data. In these cases currently, we generate error. However, during optimization stages these pipeline breaking executors may be removed, may be changed, etc. Hence, PipelineFixer shouldn't give pre-mature decisions.

For instance, If window expression is in the form
SUM(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1 it would be calculated with WindowAggExec. However, it can be converted to the equivalent form
SUM(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as sum1
Second form can be calculated with BoundedWindowAggExec without breaking the pipeline (given that its input is already ordered with ts ASC).
If source is unbounded, during optimization we may convert expression in the form 1 to in the form 2 which is not breaking pipeline.

What changes are included in this PR?

With this PR, PipelineFixer doesn't generate an error, when some executor cannot be run unbounded data (Those cases may be fixed during optimization.) If those executors cannot be fixed, we generate error in PipelineChecker anyway.
Util codes to construct test table are moved from window.rs to under test_util/mod.rs to be able to use utils in other files.

Are these changes tested?

Yes test_source_sorted_unbounded_source is added to check this behaviour.

Are there any user-facing changes?

@github-actions github-actions bot added the core Core DataFusion crate label Apr 10, 2023
@mustafasrepo mustafasrepo marked this pull request as draft April 10, 2023 13:38
@mustafasrepo mustafasrepo changed the title Add window reversal sub rule to pipeline fixer. Move error check from pipeline fixer to pipeline checker Apr 10, 2023
@mustafasrepo mustafasrepo marked this pull request as ready for review April 10, 2023 14:54
LIMIT 5".to_string(),
cases: vec![Arc::new(test1), Arc::new(test2)],
error_operator: "Window Error".to_string()
error_operator: "Sort Error".to_string()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, thanks for taking this, why the error message changed to Sort Error, I dont see the respective logic change..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this happens because when the task of erroring out is left to PipelineChecker, which is the last rule, sorts are already in-place. While checking, the algorithm first encounters the sort (before the window), which is is pipeline-breaking. Hence the error messsage.

@alamb
Copy link
Contributor

alamb commented Apr 11, 2023

Thank you @mustafasrepo @ozankabak and @comphead for the review

@alamb alamb merged commit a6dcc2d into apache:main Apr 11, 2023
korowa pushed a commit to korowa/arrow-datafusion that referenced this pull request Apr 13, 2023
* Add window reversal sub rule to pipeline fixer.

* update test

* Remove an unnecessary clone, avoid object construction by using mutable binding

* Propagate error to pipeline checker

---------

Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
@mustafasrepo mustafasrepo deleted the feature/window_swap_rule branch April 26, 2023 07:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants