Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminating multi-column sort when major column is a one-to-one and monotonic expression #8838

Closed
suremarc opened this issue Jan 11, 2024 · 5 comments · Fixed by #9273
Closed
Labels
enhancement New feature or request

Comments

@suremarc
Copy link
Contributor

suremarc commented Jan 11, 2024

Describe the bug

DataFusion is unable to eliminate multi-column sorts when the major column is a one-to-one and monotonic expression of a sorted input column:

  • In certain cases when the expression is not one-to-one, eliminating the sort is invalid (e.g. sorting by floor(x), y is not equivalent to sorting by x, y, even though floor is monotonic).
  • However, casting from Int32 to Int64 is one-to-one and monotonic, so DataFusion should be able to avoid sorting in such a case. See below.

To Reproduce

Data acquired from parquet_testing/data/delta_encoding_required_column_expect.csv (I couldn't get datafusion-cli to work with the parquet file for some reason).

CREATE EXTERNAL TABLE delta_encoding_required_column (
    c_customer_sk INT NOT NULL,
    c_current_cdemo_sk INT NOT NULL
)
STORED AS CSV WITH HEADER ROW
WITH ORDER (
    c_customer_sk DESC,
    c_current_cdemo_sk DESC
)
LOCATION 'delta_encoding_required_column_expect.csv';
-- sort required here
EXPLAIN
SELECT 
    CAST(c_customer_sk AS BIGINT) AS c_customer_sk_big,
    c_current_cdemo_sk
FROM delta_encoding_required_column
ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC;

Resulting physical plan has a SortExec:

+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                            |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: c_customer_sk_big DESC NULLS FIRST, delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST                                                                                                                                    |
|               |   Projection: CAST(delta_encoding_required_column.c_customer_sk AS Int64) AS c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk                                                                                               |
|               |     TableScan: delta_encoding_required_column projection=[c_customer_sk, c_current_cdemo_sk]                                                                                                                                                    |
| physical_plan | SortPreservingMergeExec: [c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]                                                                                                                                                                   |
|               |   SortExec: expr=[c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]                                                                                                                                                                           |
|               |     ProjectionExec: expr=[CAST(c_customer_sk@0 AS Int64) as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk]                                                                                                                      |
|               |       RepartitionExec: partitioning=RoundRobinBatch(32), input_partitions=1                                                                                                                                                                     |
|               |         CsvExec: file_groups={1 group: [[home/suremarc/rust-app-atlas/delta_encoding_required_column.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=true |
|               |                                                                                                                                                                                                                                                 |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- sort NOT required here
-- notice we have a single sorting column
EXPLAIN
SELECT 
    CAST(c_customer_sk AS BIGINT) AS c_customer_sk_big
FROM delta_encoding_required_column
ORDER BY c_customer_sk_big DESC;

Resulting physical plan has no SortExec:

+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                           |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: c_customer_sk_big DESC NULLS FIRST                                                                                                                                                       |
|               |   Projection: CAST(delta_encoding_required_column.c_customer_sk AS Int64) AS c_customer_sk_big                                                                                                 |
|               |     TableScan: delta_encoding_required_column projection=[c_customer_sk]                                                                                                                       |
| physical_plan | SortPreservingMergeExec: [c_customer_sk_big@0 DESC]                                                                                                                                            |
|               |   ProjectionExec: expr=[CAST(c_customer_sk@0 AS Int64) as c_customer_sk_big]                                                                                                                   |
|               |     RepartitionExec: partitioning=RoundRobinBatch(32), input_partitions=1                                                                                                                      |
|               |       CsvExec: file_groups={1 group: [[home/suremarc/rust-app-atlas/delta_encoding_required_column.csv]]}, projection=[c_customer_sk], output_ordering=[c_customer_sk@0 DESC], has_header=true |
|               |                                                                                                                                                                                                |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Expected behavior

The first query provided above should not require a sort

Additional context

I encountered this bug when trying to re-cast the timezone of a table ordered by both timestamp and a secondary ticker column.

@suremarc suremarc added the bug Something isn't working label Jan 11, 2024
@alamb alamb added enhancement New feature or request and removed bug Something isn't working labels Jan 15, 2024
@alamb
Copy link
Contributor

alamb commented Jan 15, 2024

Thank you @suremarc -- I agree DataFusion should be able to remove the sort in this case.

I marked this as a enhancement rather than a bug because I think it would be new functionality rather than something that used to work (though the distinction may be a bit arbitrary)

@Lordworms
Copy link
Contributor

I want to try this

@alamb
Copy link
Contributor

alamb commented Jan 30, 2024

I suggest looking for monotonic rather than disabling by expression -- if the expressions are monotonic functions of the first column then I think that is sufficient

@suremarc
Copy link
Contributor Author

Just to make sure we're all on the same page, I think the first case (single order by expression) already works due to monotonicity, and I think CAST is considered monotonic. In the original post I mentioned that the following query does not require a sort, due to having only one sorting column:

EXPLAIN
SELECT 
    CAST(c_customer_sk AS BIGINT) AS c_customer_sk_big
FROM delta_encoding_required_column
ORDER BY c_customer_sk_big DESC;

The second case (multiple order by expressions) is the troublesome one, because I think monotonicity alone is not sufficient (that's where one-to-one-ness becomes relevant).

@Lordworms
Copy link
Contributor

No, the reason the second one works and the first one fails is that when two projections come, the first one would build a wrong dependency graph
image
then leads to a wrong projection_ordering. and now I have two ways to fix it
image

  1. changing the generation of the dependency graph
  2. change the generation of project ordering
    but the two ways all need to specify the Expr, in this case, it generated a CAST expr, I know in buildinscalarFunction has an is_monotonic function which could show whether it is a monotonic function or not, but I think there are some other functions not covered and we need a way to figure out whether the func(multi-expression) is monotonic or not. But I don't know the correct way to do it. Still struggling with it

Lordworms added a commit to Lordworms/arrow-datafusion that referenced this issue Feb 5, 2024
fix: issue apache#8838 discard extra sort when sorted element is wrapped

fix: issue apache#8838 discard extra sort when sorted element is wrapped
Lordworms added a commit to Lordworms/arrow-datafusion that referenced this issue Feb 19, 2024
fix: issue apache#8838 discard extra sort when sorted element is wrapped

fix: issue apache#8838 discard extra sort when sorted element is wrapped
mustafasrepo pushed a commit that referenced this issue Feb 20, 2024
…9127)

* fix: issue #8838 discard extra sort when sorted element is wrapped

fix: issue #8838 discard extra sort when sorted element is wrapped

fix: issue #8838 discard extra sort when sorted element is wrapped

* fix bugs

* fix bugs

* fix bugs

* fix:bugs

* adding tests

* adding cast UTF8 type and diable scalarfunction situation

* fix typo
mustafasrepo added a commit that referenced this issue Feb 21, 2024
* fix: issue #8838 discard extra sort when sorted element is wrapped

fix: issue #8838 discard extra sort when sorted element is wrapped

fix: issue #8838 discard extra sort when sorted element is wrapped

* fix bugs

* fix bugs

* fix bugs

* fix:bugs

* adding tests

* adding cast UTF8 type and diable scalarfunction situation

* fix typo

* Simplifications, add new test

* Make resulting order deterministic after projection

* Add comment to explain ratioanale of using IndexMap, and IndexSet

* Add comment

* Add negative tests

---------

Co-authored-by: Yanxin Xiang <yanxinxiang0917@outlook.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants