Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and improve CommonSubexprEliminate rule #10396

Merged
merged 6 commits into from
May 8, 2024

Conversation

peter-toth
Copy link
Contributor

@peter-toth peter-toth commented May 6, 2024

Which issue does this PR close?

Part of #9873.

Rationale for this change

This PR started as part of #9873 to reduce number of Expr clones but after some investigation it shifted to be a fix for the rule's correctness issues.

  1. The current CommonSubexprEliminate was refactored in fix(9870): common expression elimination optimization, should always re-find the correct expression during re-write. #9871 to remove the IdArray cache and simplify the identifier of expresions. Unfortunately that change doesn't seem to be correct. The source of the issue is that an idenfifier needs to represent an expression subtreee and the newly chosen "stringified expr" as identifier doesn't seem to fulfill that purpose. E.g. an identifier shouldn't belong to 2 different expressions:

    println!("{}", ExprSet::expr_identifier(&(col("a") + col("b"))));    // a + b
    println!("{}", ExprSet::expr_identifier(&(col("a + b"))));           // a + b
    

    Sidenote:
    Actually I wanted to show that correctness issue of the current CommonSubexprEliminate in a test, but when I wrote a test with colliding column names I run into a different issue, that DataFusion resolves the col("a") + col("b") expression as if it was col("a + b") if an a + b field exists in the schema.
    This is a different issue (not related to CommonSubexprEliminate at all) and can be easily reproduced:

    DataFusion CLI v37.1.0
    > select a + b from (select 1 as a, 2 as b, 1 as "a + b");
    +-------+
    | a + b |
    +-------+
    | 1     | <- Should be 3
    +-------+
    1 row(s) fetched.
    Elapsed 0.009 seconds.
    

    So in this the first commit of PR I revert fix(9870): common expression elimination optimization, should always re-find the correct expression during re-write. #9871.

  2. Then I investigated what is the actual purpose of Identifiers, why don't we use a simple HashMap<Expr, (usize, DataType, Identifier)> as ExprSet? It is clear that we need to generate a unique alias for the extracted common expressions, but why is the key of the map is an Identifier and not &Expr or Expr itself. And actually it turned out that the reasons are already explained in the comments.

    /// An identifier should (ideally) be able to "hash", "accumulate", "equal" and "have no
    /// collision (as low as possible)"
    

    If we used Expr as the key of the map computing the hash() of the keys would require traversing on the whole Expr, which can be very costly as Exprs contain indirections to subexpressions as Box<Expr> or Vec<Expr>.

    Using special identifiers to represent Expr trees and caching those identifiers by the preorder visit indexes in IdArray should significantly speed up the second top-down traversal that does the actual expression rewrite.

    Sidenote: the current long String identifiers are also not a good choice. We need to revisit this in a follow-up PR and choose something like (usize, &Expr) tuple as identifiers. The first element of a tuple is a pre-calculated hash() of an expression tree, that is built-up during the first bottom-up traversal. And the referece to expression is there to implement the eq().

  3. The second commit is a refactor and fix of the algorithm as reverting Stop copying Exprs and LogicalPlans so much during Common Subexpression Elimination #9873 caused the Common expression elimination should also re-find the correct expression, during re-write. #9870 issue to resurface. This is a major refactor but I think the code of ExprIdentifierVisitor and CommonSubexprRewriter became much cleaner.

  4. The 3rd commit eliminates some Expr clones in ExprSets.

  5. The 4th and 5th commit contain only renames and docs fixes. I think ExprStats is a better name for ExprSet as the purpose of that data structure is store the counts. Also, IMO CommonExpr / common_exprs is a better name for affected_id to store the common expressions that got extracted.

What changes are included in this PR?

Please see above.

Are these changes tested?

Yes, with existing UTs.

Are there any user-facing changes?

No.

… always re-find the correct expression during re-write. (apache#9871)"

This reverts commit cd7a00b.
…ds, better JumpMark handling, better variable names, code cleanup, some new todos
@peter-toth
Copy link
Contributor Author

cc @alamb, @erratic-pattern and @MohamedAbdeen21 as this PR is related to your recent comments/PRs.

@alamb
Copy link
Contributor

alamb commented May 6, 2024

Possibly related #10333

match expr_set.get(&id) {
Some((expr, _, _, symbol)) => {
// todo: check `nullable`
agg_exprs.push(expr.clone().alias(symbol.as_str()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this clone is eliminated in the 3rd commit

// todo: check `nullable`
let field = Field::new(&id, data_type.clone(), true);
fields_set.insert(field.name().to_owned());
project_exprs.push(expr.clone().alias(symbol.as_str()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this clone is also eliminated


self.expr_set
.entry(curr_expr_identifier)
.or_insert_with(|| (expr.clone(), 0, data_type, alias_symbol))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this clone is also eliminated

@MohamedAbdeen21
Copy link
Contributor

MohamedAbdeen21 commented May 6, 2024

This seems to be a major conflict with #10333

The source of the issue is that an idenfifier needs to represent an expression subtreee and the newly chosen "stringified expr" as identifier doesn't seem to fulfill that purpose. E.g. an identifier shouldn't belong to 2 different expressions

I agree that an identifier shouldn't belong to 2 different expressions, but why does it have to represent a subtree? The expr IS the subtree itself. If we use an identifier like #{expr} that should be good enough.

And actually it turned out that the reasons are already explained in the comments

AFAIK, we only need the identifier to be unique (no collision) for correctness, I don't see why we require the other traits.

Edit:

Just took a look at the tests, we are basically trying to do the same thing, although your PR is probably more efficient. We only differ on the subtree part I mentioned above. I tried to do #{expr}, you're trying {expr|subtree}; which I think is unnecessary and unreadable. I would even argue that something like #1 is enough.

@alamb
Copy link
Contributor

alamb commented May 6, 2024

I will review this one carefully tomorrow morning.

@alamb
Copy link
Contributor

alamb commented May 6, 2024

cc @waynexia and @wiedld for your information

@peter-toth
Copy link
Contributor Author

peter-toth commented May 6, 2024

The source of the issue is that an idenfifier needs to represent an expression subtreee and the newly chosen "stringified expr" as identifier doesn't seem to fulfill that purpose. E.g. an identifier shouldn't belong to 2 different expressions

I agree that an identifier shouldn't belong to 2 different expressions, but why does it have to represent a subtree? The expr IS the subtree itself. If we use an identifier like #{expr} that should be good enough.

In the first traversal we need to count how many times we encountered with an Expr (expression subtree). We either need to use Expr as the key of the map to store the counts or an use an identifier that uniquely identifies an Expr.

Edit:

Just took a look at the tests, we are basically trying to do the same thing, although your PR is probably more efficient. We only differ on the subtree part I mentioned above. I tried to do #{expr}, you're trying {expr|subtree}; which I think is unnecessary and unreadable. I would even argue that something like #1 is enough.

The issue is not about the aliases we assign to the extracted common expressions, it is about the key of the map where we store the counts. Your PR can be a good improvement to the aliases after this PR, but we need need to fix the key of map first.

@MohamedAbdeen21
Copy link
Contributor

MohamedAbdeen21 commented May 6, 2024

Ok yeah, I see what you mean.

But the example you mention with a + b. Doesn't that go away if we fix the side note you mentioned?

col("a + b") should be interpreted as table."a + b" and not test.a + test.b (and vice versa for the SQL example), meaning that expr would never collide in the map, right or am I missing something?

@peter-toth
Copy link
Contributor Author

peter-toth commented May 6, 2024

But the example you mention with a + b. Doesn't that go away if you fix the side note you mentioned?

col("a + b") should be interpreted as table."a + b" and not test.a + test.b, meaning that expr would never collide in the map, right or am I missing something?

There are multiple questions here and I don't have the answers for.

  • Can we fix the string representation? Do we always have a table reference to prefix the columns with? And actually, is the resolution bug related to the string representation of expressions at all?
  • And even if the string representation of Expr can be fixed, using a Strings as a key of the map is not a good choice as building a string represatation will very likely require traversing the whole expression. Think of that we need to get an identifier for an expression, and for all its subtrees, and for the subtrees' subtrees... So a good identifier would be something that we can build-up incrementally from the identifiers of the subtrees (in the first traversal's bottom-up phase).

I think the best we can do now is to revert #9871 and return to the old chained string representation. (This PR improves the identifier readability a bit by adding {} around it and | as separator of elements.) And then in a follow-up PR replace the string representation to an alternative identifier like the one I mentioned in the PR description.

@MohamedAbdeen21
Copy link
Contributor

Although I'd like to find answers to these questions before giving more opinions, I don't mind merging this for now.

} else {
Ok(Transformed::no(expr))
}
let (up_index, expr_id) = &self.id_array[self.down_index];
Copy link
Contributor Author

@peter-toth peter-toth May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewers might wonder why the code of CommonSubexprRewriter after the #9871 revert doesn't resemble to the code before that PR: https://github.com/apache/datafusion/pull/9871/files#diff-351499880963d6a383c92e156e75019cd9ce33107724a9635853d7d4cd1898d0L754-L809?
And why the 2 halting conditions (https://github.com/apache/datafusion/pull/9871/files#diff-351499880963d6a383c92e156e75019cd9ce33107724a9635853d7d4cd1898d0L757-L761) are missing from the new code. This is because it turned out that:

  • the self.curr_index >= self.id_array.len() bound check is poinless as we already indexed the array with self.curr_index: https://github.com/apache/datafusion/pull/9871/files#diff-351499880963d6a383c92e156e75019cd9ce33107724a9635853d7d4cd1898d0L754.
    (Note: curr_index is called down_index in the new PR.)
  • self.max_series_number > *series_number is also pointless (and actually there is no point in storing max_series_number in the rewriter at all) as *series_number (called up_index in the new code, that is basically the postorder index of the node) can never be smaller than the last replaced expression's postorder index (max_series_number). This is because in the rewriter we are doing a preorder traversal and a smaller postorder index could appear only in a replaced expression's subtree or in a subtree of a previous child of the node's ancestors.
    (BTW, I verified this with adding a panic! to the halting condition and running all tests.)

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @peter-toth -- I went over this PR carefully and it looks like a significant improvement to me. In my opinion as long as it doesn't regress performance (I am running benchmarks now) I think we should merge it in as it adds additional test coverage

I also ran our test suite in InfluxDB IOx with this change and all the tests passed .

Doesn't seem to fix reported bug 🤔

I also filed #10413 to track the bug you found (🦅 👁️ ). However, this PR doesn't seem to fix it yet 🤔 . I pushed a test to show this and also tried it manually:

    Finished `dev` profile [unoptimized + debuginfo] target(s) in 35.29s
     Running `target/debug/datafusion-cli`
DataFusion CLI v37.1.0
> select a + b from (select 1 as a, 2 as b, 1 as "a + b");
+-------+
| a + b |
+-------+
| 1     |
+-------+
1 row(s) fetched.
Elapsed 0.018 seconds.

{} vs #{}

EDIT -- now I see @peter-toth 's #10396 (comment) and so maybe we should postpone the identifier update until a follow on PR

/// (a `String`) as `Identifier`.
/// Note that the current implementation contains:
/// - the `Display` of an expression (a `String`) and
/// - the identifiers of the childrens of the expression
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to form such complicated identifiers -- what if we simply use #{} like in #10333 as suggested by @MohamedAbdeen21 in #10396 (comment)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment here: #10396 (comment) that identifiers as keys of the ExprStats map is a must have but identifiers as aliases of exracted common expressions is not.

/// - The number of occurrences and
/// - The DataType
/// of an expression.
type ExprStats = HashMap<Identifier, (usize, DataType)>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit the code might be easier to follow if this a proper struct and move the manipulation functions (like to_arrays for example) into methods

Maybe as a follow on PR

///
/// `affected_id` is updated with any sub expressions that were replaced
fn expr_identifier(expr: &Expr, sub_expr_identifier: Identifier) -> Identifier {
format!("{{{expr}{sub_expr_identifier}}}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comments about identifiers elsewhere

// Alias this `Column` expr to it original "expr name",
// `projection_push_down` optimizer use "expr name" to eliminate useless
// projections.
// TODO: do we really need to alias here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I removed the alias and several tests failed:

$ cargo test --test sqllogictests
   Compiling datafusion-optimizer v37.1.0 (/Users/andrewlamb/Software/datafusion2/datafusion/optimizer)
warning: unused variable: `expr_name`
   --> datafusion/optimizer/src/common_subexpr_eliminate.rs:802:17
    |
802 |             let expr_name = expr.display_name()?;
    |                 ^^^^^^^^^ help: if this is intentional, prefix it with an underscore: `_expr_name`
    |
    = note: `#[warn(unused_variables)]` on by default

   Compiling datafusion v37.1.0 (/Users/andrewlamb/Software/datafusion2/datafusion/core)
warning: `datafusion-optimizer` (lib) generated 1 warning
...
Running "map.slt"
External error: query failed: DataFusion error: Optimizer rule 'common_sub_expression_eliminate' failed
caused by
Schema error: No field named "log(unsigned_integers.b)". Valid fields are a, "log({CAST(unsigned_integers.b AS Float32)|{unsigned_integers.b}})", "log(Int64(10),unsigned_integers.b)".
[SQL] select log(a, 64) a, log(b), log(10, b) from unsigned_integers;
at test_files/scalar.slt:592

External error: query failed: DataFusion error: Optimizer rule 'common_sub_expression_eliminate' failed
caused by
Schema error: No field named "aggregate_test_100.c2 % Int64(2) = Int64(0)". Valid fields are "{CAST(aggregate_test_100.c2 AS Int64) % Int64(2) = Int64(0)|{Int64(0)}|{CAST(aggregate_test_100.c2 AS Int64) % Int64(2)|{Int64(2)}|{CAST(aggregate_test_100.c2 AS Int64)|{aggregate_test_100.c2}}}}", "FIRST_VALUE(aggregate_test_100.c2) ORDER BY [CAST(aggregate_test_100.c2 AS Int64) % Int64(2) = Int64(0) ASC NULLS LAST, aggregate_test_100.c3 DESC NULLS FIRST]", "FIRST_VALUE(aggregate_test_100.c3 - Int64(100)) ORDER BY [CAST(aggregate_test_100.c2 AS Int64) % Int64(2) = Int64(0) ASC NULLS LAST, aggregate_test_100.c3 DESC NULLS FIRST]".
[SQL] SELECT DISTINCT ON (c2 % 2 = 0) c2, c3 - 100 FROM aggregate_test_100 ORDER BY c2 % 2 = 0, c3 DESC;
at test_files/distinct_on.slt:116

External error: query failed: DataFusion error: Optimizer rule 'common_sub_expression_eliminate' failed
caused by
Schema error: No field named "acos(round(Float64(1) / doubles.f64))". Valid fields are doubles.f64, i64_1, "acos({round(Float64(1) / doubles.f64)|{Float64(1) / doubles.f64|{doubles.f64}|{Float64(1)}}})".
[SQL] select f64, round(1.0 / f64) as i64_1, acos(round(1.0 / f64)) from doubles;
at test_files/expr.slt:2272

External error: query result mismatch:
[SQL] explain select a/2, a/2 + 1 from t
[Diff] (-expected|+actual)
-   logical_plan
-   01)Projection: {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2), {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2) + Int64(1)
-   02)--Projection: t.a / Int64(2) AS {t.a / Int64(2)|{Int64(2)}|{t.a}}
-   03)----TableScan: t projection=[a]
at test_files/subquery.slt:1071

External error: query result mismatch:
[SQL] EXPLAIN SELECT x/2, x/2+1 FROM t;
[Diff] (-expected|+actual)
-   logical_plan
-   01)Projection: {t.x / Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2), {t.x / Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2) + Int64(1)
-   02)--Projection: t.x / Int64(2) AS {t.x / Int64(2)|{Int64(2)}|{t.x}}
-   03)----TableScan: t projection=[x]
-   physical_plan
-   01)ProjectionExec: expr=[{t.x / Int64(2)|{Int64(2)}|{t.x}}@0 as t.x / Int64(2), {t.x / Int64(2)|{Int64(2)}|{t.x}}@0 + 1 as t.x / Int64(2) + Int64(1)]
-   02)--ProjectionExec: expr=[x@0 / 2 as {t.x / Int64(2)|{Int64(2)}|{t.x}}]
-   03)----MemoryExec: partitions=1, partition_sizes=[1]
at test_files/select.slt:1425

External error: query result mismatch:
[SQL] EXPLAIN SELECT c3,
    SUM(c9) OVER(ORDER BY c3+c4 DESC, c9 DESC, c2 ASC) as sum1,
    SUM(c9) OVER(ORDER BY c3+c4 ASC, c9 ASC ) as sum2
    FROM aggregate_test_100
    LIMIT 5
[Diff] (-expected|+actual)
    logical_plan
    01)Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
    02)--Limit: skip=0, fetch=5
-   03)----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+   03)----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
    04)------Projection: {aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, aggregate_test_100.c3, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-   05)--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+   05)--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
    06)----------Projection: aggregate_test_100.c3 + aggregate_test_100.c4 AS {aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, aggregate_test_100.c2, aggregate_test_100.c3, aggregate_test_100.c9
    07)------------TableScan: aggregate_test_100 projection=[c2, c3, c4, c9]
    physical_plan
    01)ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]
    02)--GlobalLimitExec: skip=0, fetch=5
    03)----WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int16(NULL)), is_causal: false }]
    04)------ProjectionExec: expr=[{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 as {aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c3@2 as c3, c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]
    05)--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]
    06)----------SortPreservingMergeExec: [{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 DESC,c9@3 DESC,c2@1 ASC NULLS LAST]
    07)------------SortExec: expr=[{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 DESC,c9@3 DESC,c2@1 ASC NULLS LAST], preserve_partitioning=[true]
    08)--------------ProjectionExec: expr=[c3@1 + c4@2 as {aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c2@0 as c2, c3@1 as c3, c9@3 as c9]
    09)----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
    10)------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3, c4, c9], has_header=true
at test_files/window.slt:1680

External error: query failed: DataFusion error: Optimizer rule 'common_sub_expression_eliminate' failed
caused by
Schema error: No field named "hits.ClientIP - Int64(1)". Valid fields are hits."ClientIP", "{CAST(hits.ClientIP AS Int64)|{hits.ClientIP}} - Int64(1)", "{CAST(hits.ClientIP AS Int64)|{hits.ClientIP}} - Int64(2)", "{CAST(hits.ClientIP AS Int64)|{hits.ClientIP}} - Int64(3)", "COUNT(*)".
[SQL] SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3 ORDER BY c DESC LIMIT 10;
at test_files/clickbench.slt:241

External error: query failed: DataFusion error: Optimizer rule 'common_sub_expression_eliminate' failed
caused by
Schema error: No field named "value_dict.x_dict % Int64(2)". Valid fields are "{CAST(value_dict.x_dict AS Int64)|{value_dict.x_dict}} % Int64(2)", "SUM(value_dict.x_dict)".
[SQL] select sum(x_dict) from value_dict group by x_dict % 2 order by sum(x_dict);
at test_files/aggregate.slt:2696

External error: query result mismatch:
[SQL] EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), MAX(DISTINCT CAST(x AS DOUBLE)) FROM t1 GROUP BY y;
[Diff] (-expected|+actual)
    logical_plan
-   01)Projection: SUM(alias1) AS SUM(DISTINCT t1.x), MAX(alias1) AS MAX(DISTINCT t1.x)
-   02)--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1), MAX(alias1)]]
-   03)----Aggregate: groupBy=[[t1.y, {CAST(t1.x AS Float64)|{t1.x}} AS t1.x AS alias1]], aggr=[[]]
-   04)------Projection: CAST(t1.x AS Float64) AS {CAST(t1.x AS Float64)|{t1.x}}, t1.y
-   05)--------TableScan: t1 projection=[x, y]
+   01)Projection: SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)
+   02)--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(DISTINCT {CAST(t1.x AS Float64)|{t1.x}}) AS SUM(DISTINCT t1.x), MAX(DISTINCT {CAST(t1.x AS Float64)|{t1.x}}) AS MAX(DISTINCT t1.x)]]
+   03)----Projection: CAST(t1.x AS Float64) AS {CAST(t1.x AS Float64)|{t1.x}}, t1.y
+   04)------TableScan: t1 projection=[x, y]
    physical_plan
-   01)ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), MAX(alias1)@2 as MAX(DISTINCT t1.x)]
-   02)--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(alias1), MAX(alias1)]
+   01)ProjectionExec: expr=[SUM(DISTINCT t1.x)@1 as SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)@2 as MAX(DISTINCT t1.x)]
+   02)--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)]
    03)----CoalesceBatchesExec: target_batch_size=2
    04)------RepartitionExec: partitioning=Hash([y@0], 8), input_partitions=8
-   05)--------AggregateExec: mode=Partial, gby=[y@0 as y], aggr=[SUM(alias1), MAX(alias1)]
-   06)----------AggregateExec: mode=FinalPartitioned, gby=[y@0 as y, alias1@1 as alias1], aggr=[]
-   07)------------CoalesceBatchesExec: target_batch_size=2
-   08)--------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 8), input_partitions=8
-   09)----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
-   10)------------------AggregateExec: mode=Partial, gby=[y@1 as y, {CAST(t1.x AS Float64)|{t1.x}}@0 as alias1], aggr=[]
-   11)--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as {CAST(t1.x AS Float64)|{t1.x}}, y@1 as y]
-   12)----------------------MemoryExec: partitions=1, partition_sizes=[1]
+   05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+   06)----------AggregateExec: mode=Partial, gby=[y@1 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)]
+   07)------------ProjectionExec: expr=[CAST(x@0 AS Float64) as {CAST(t1.x AS Float64)|{t1.x}}, y@1 as y]
+   08)--------------MemoryExec: partitions=1, partition_sizes=[1]
at test_files/group_by.slt:4184

Error: Execution("9 failures")
error: test failed, to rerun pass `-p datafusion-sqllogictest --test sqllogictests`

Caused by:
  process didn't exit successfully: `/Users/andrewlamb/Software/datafusion2/target/debug/deps/sqllogictests-ce3a36cfeab74789` (exit status: 1)

Copy link
Contributor Author

@peter-toth peter-toth May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I see that is needed but it looks weird to me. I mean if we consider the example:

select a + b as x, a + b as y 
from ...

that should be eliminated to:

select common as x, common as y
from (
  select a + b as common
  from ...
)

I.e. we need to add aliases the extracted common expressions, but with this alias here we alias common and this is what the rule does:

select common as "a + b" as x, common as "a + b" as y
from (
  select a + b as common
  from ...
)

I wanted to understand why exactly we need to alias at both places.

BTW I left a few other TODOs in the code but none of them means that code should be worse than it was before #9871. Maybe the longer identifiers due to the extra {, } and | can make it a bit slower. I left the TODOs there to myself and others to remember where is some room for improvement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a stashed PR that addresses this issue.

I'm not 100% sure about why the second alias is needed, but there was a comment saying that the second alias is used by another rule, so removing it inside CSE seems to break other rules (or CSE itself sometimes (??) idk, I'm still looking into it).

Anyway, the solution I have at the moment is calling .unalias() on the expr when applying a new alias through expr.alias(). That way, when the true alias x is later (I assume after all the rules finished) given to the expr common as a + b, it will become common as x.

It passes the tests, but I really want to understand why it works before pushing the PR + I hope maybe I can find a better way by removing the second alias added by CSE all together, even if it means changing other rules.

@@ -789,6 +861,74 @@ mod test {
assert_eq!(expected, formatted_plan);
}

#[test]
fn id_array_visitor() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really nice to see more of how this code works 👍

@@ -1613,6 +1613,14 @@ select count(1) from v;
----
1

# Ensure CSE resolves columns correctly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this test but it still resolves the expr as 1 (not 3) 🤔

@peter-toth
Copy link
Contributor Author

I also filed #10413 to track the bug you found (🦅 👁️ ). However, this PR doesn't seem to fix it yet 🤔 . I pushed a test to show this and also tried it manually:

No, this PR doesn't fix that issue at all. That issue is a resolution issue (#10413) and has nothing to do with CSE. The example I gave in the description doesn't contain any subexpressions to eliminate and CommonSubexprEliminate has no effect on the query plan.

The reason I mentioned the resolution issue is because of that issue I couldn't add a test case to this PR which would illustrate the issue of colflicting identifiers in CommonSubexprEliminate after #9871.

Once #10413 is solved I can add a test case here.

I believe @MohamedAbdeen21 used #{expr} in #10333 to follow what is done by DuckDB -- perhaps we could do so too in this PR (I also think #{} is slightly easier to notice visually than {})

I fully aggree that the current alias is very hard to read and this is because the identifiers are used for aliases as well.
But there are 2 different things here:

  1. The identifier must be unique for each unique expression when it is used as the key of the ExprStats map that stores the counts.
  2. The aliases we gave to extracted common expressions shouldn't collide.

Currently for both 1. and 2. we use the identifier and I'm sure that in 1. we have touse the identifier. In 2. I'm not sure and @MohamedAbdeen21's PR can be a good follow-up improvement.

@alamb
Copy link
Contributor

alamb commented May 7, 2024

No, this PR doesn't fix that issue at all. That issue is a resolution issue (#10413) and has nothing to do with CSE. The example I gave in the description doesn't contain any subexpressions to eliminate and CommonSubexprEliminate has no effect on the query plan.

Sorry -- I missed that -- updated #10413 to match

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR is an improvement:

My performance benchmarks show it appears to be slightly slower than main

logical_plan_tpcds_all                        1.00    157.7±1.80ms        ? ?/sec    1.03    162.6±2.85ms        ? ?/sec
logical_plan_tpch_all                         1.00     16.8±0.19ms        ? ?/sec    1.02     17.2±0.30ms        ? ?/sec

However I expect we'll make this up on subsequent PRs as we fix this rule to avoid the copies

Details

++ critcmp main refactor-commonsubexpreliminate
group                                         main                                   refactor-commonsubexpreliminate
-----                                         ----                                   -------------------------------
logical_aggregate_with_join                   1.00  1198.0±10.58µs        ? ?/sec    1.03  1231.8±65.81µs        ? ?/sec
logical_plan_tpcds_all                        1.00    157.7±1.80ms        ? ?/sec    1.03    162.6±2.85ms        ? ?/sec
logical_plan_tpch_all                         1.00     16.8±0.19ms        ? ?/sec    1.02     17.2±0.30ms        ? ?/sec
logical_select_all_from_1000                  1.05     18.8±0.10ms        ? ?/sec    1.00     18.0±0.10ms        ? ?/sec
logical_select_one_from_700                   1.01    826.5±7.83µs        ? ?/sec    1.00   817.5±21.62µs        ? ?/sec
logical_trivial_join_high_numbered_columns    1.00   763.6±11.20µs        ? ?/sec    1.00   761.5±10.77µs        ? ?/sec
logical_trivial_join_low_numbered_columns     1.00    745.5±8.27µs        ? ?/sec    1.00    745.3±7.96µs        ? ?/sec
physical_plan_tpcds_all                       1.00  1351.3±16.05ms        ? ?/sec    1.00  1345.2±13.72ms        ? ?/sec
physical_plan_tpch_all                        1.00     91.3±1.75ms        ? ?/sec    1.04     94.5±1.87ms        ? ?/sec
physical_plan_tpch_q1                         1.00      5.0±0.06ms        ? ?/sec    1.05      5.3±0.08ms        ? ?/sec
physical_plan_tpch_q10                        1.02      4.5±0.06ms        ? ?/sec    1.00      4.4±0.10ms        ? ?/sec
physical_plan_tpch_q11                        1.03      4.0±0.08ms        ? ?/sec    1.00      3.9±0.09ms        ? ?/sec
physical_plan_tpch_q12                        1.07      3.2±0.05ms        ? ?/sec    1.00      3.0±0.08ms        ? ?/sec
physical_plan_tpch_q13                        1.00      2.2±0.04ms        ? ?/sec    1.00      2.2±0.05ms        ? ?/sec
physical_plan_tpch_q14                        1.04      2.8±0.05ms        ? ?/sec    1.00      2.7±0.05ms        ? ?/sec
physical_plan_tpch_q16                        1.00      3.8±0.05ms        ? ?/sec    1.03      3.9±0.05ms        ? ?/sec
physical_plan_tpch_q17                        1.00      3.6±0.12ms        ? ?/sec    1.01      3.6±0.05ms        ? ?/sec
physical_plan_tpch_q18                        1.00      3.9±0.07ms        ? ?/sec    1.04      4.0±0.07ms        ? ?/sec
physical_plan_tpch_q19                        1.01      6.2±0.09ms        ? ?/sec    1.00      6.2±0.09ms        ? ?/sec
physical_plan_tpch_q2                         1.00      7.8±0.09ms        ? ?/sec    1.01      7.8±0.09ms        ? ?/sec
physical_plan_tpch_q20                        1.00      4.6±0.09ms        ? ?/sec    1.06      4.8±0.08ms        ? ?/sec
physical_plan_tpch_q21                        1.00      6.2±0.11ms        ? ?/sec    1.00      6.2±0.12ms        ? ?/sec
physical_plan_tpch_q22                        1.01      3.4±0.08ms        ? ?/sec    1.00      3.3±0.08ms        ? ?/sec
physical_plan_tpch_q3                         1.00      3.1±0.05ms        ? ?/sec    1.06      3.3±0.08ms        ? ?/sec
physical_plan_tpch_q4                         1.00      2.3±0.03ms        ? ?/sec    1.11      2.5±0.04ms        ? ?/sec
physical_plan_tpch_q5                         1.00      4.7±0.15ms        ? ?/sec    1.00      4.7±0.04ms        ? ?/sec
physical_plan_tpch_q6                         1.03  1629.0±90.38µs        ? ?/sec    1.00  1578.4±32.11µs        ? ?/sec
physical_plan_tpch_q7                         1.00      5.9±0.08ms        ? ?/sec    1.00      5.9±0.06ms        ? ?/sec
physical_plan_tpch_q8                         1.00      7.5±0.09ms        ? ?/sec    1.00      7.5±0.08ms        ? ?/sec
physical_plan_tpch_q9                         1.02      5.7±0.08ms        ? ?/sec    1.00      5.6±0.06ms        ? ?/sec
physical_select_all_from_1000                 1.04     61.4±0.28ms        ? ?/sec    1.00     59.1±0.37ms        ? ?/sec
physical_select_one_from_700                  1.07      3.8±0.03ms        ? ?/sec    1.00      3.6±0.04ms        ? ?/sec

@peter-toth
Copy link
Contributor Author

Thanks for the benchmarks @alamb! Maybe the longer identifiers can explain that gap.

@peter-toth
Copy link
Contributor Author

peter-toth commented May 7, 2024

@alamb, IMO if this PR can be merged then the next steps should be:

  1. Fix Incorrect results with expression resolution #10413 as that is correctness bug.
  2. Continue Rewrite CommonSubexprEliminate to avoid copies using TreeNode #10067 efforts to refactor the CommonSubexprEliminate rule to rewrite as that could improve a lot on Stop copying Exprs and LogicalPlans so much during Common Subexpression Elimination #9873.
  3. Rebase @MohamedAbdeen21's make common expression alias human-readable #10333 on the top of this PR as probably we don't need to use the current string identifiers in aliases and we could improve readablity.
  4. Revisit the identifiers as using these string identifiers as the keys of ExprStats was not the best choice. (Please note that this was not my choice but this is how CSE has been working since the feature was added initially.) See my comment on this in the PR description.

I'm happy to take 4 as I already worked on it a bit, but unfortunately I have very little time to work on this project lately so I can't take 1. and 2.

@MohamedAbdeen21
Copy link
Contributor

I'll rebase my PR this weekend.

I do have other changes in mind regarding plan readability. If 1 is still available by the time I'm done, I'll be happy to take a look at it.

@alamb
Copy link
Contributor

alamb commented May 8, 2024

@alamb, IMO if this PR can be merged then the next steps should be:

  1. Fix Incorrect results with expression resolution #10413 as that is correctness bug.

Agree -- this is now tracked as its own issue and we can deal with it separately

  1. Continue Rewrite CommonSubexprEliminate to avoid copies using TreeNode #10067 efforts to refactor the CommonSubexprEliminate rule to rewrite as that could improve a lot on Stop copying Exprs and LogicalPlans so much during Common Subexpression Elimination #9873.

I will do this

  1. Rebase @MohamedAbdeen21's make common expression alias human-readable #10333 on the top of this PR as probably we don't need to use the current string identifiers in aliases and we could improve readablity.

Sounds like @MohamedAbdeen21 is going to do this maybe this weekend

  1. Revisit the identifiers as using these string identifiers as the keys of ExprStats was not the best choice. (Please note that this was not my choice but this is how CSE has been working since the feature was added initially.) See my comment on this in the PR description.

👍

I'm happy to take 4 as I already worked on it a bit, but unfortunately I have very little time to work on this project lately so I can't take 1. and 2.

That would be amazing -- thank you @peter-toth -- I filed #10426 to track

@alamb
Copy link
Contributor

alamb commented May 8, 2024

All right, I think we have our next steps outlined and tracked with tickets. 🚀 !

@alamb alamb merged commit 0d283a4 into apache:main May 8, 2024
25 checks passed
@alamb
Copy link
Contributor

alamb commented May 8, 2024

Thanks again @peter-toth and @MohamedAbdeen21

@peter-toth
Copy link
Contributor Author

Thanks for the review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
logical-expr Logical plan and expressions optimizer Optimizer rules sqllogictest
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants