Skip to content

Conversation

@waynexia
Copy link
Member

Which issue does this PR close?

Resolve #566.

Rationale for this change

This pull request trying to implement the common subexpression eliminate optimization. The current implementation only supports detecting & eliminating common subexpressions under one logical plan, not cross plans.

This optimizes is consists of two parts: detecting and replacing. It first scans all expressions we are going to rewrite (all expressions under one plan here), generates identifiers for each Expr node for later comparison. Then for the same group of expressions, we extract common expressions, put them into a Projection plan, and replace the original expression with a column Expr.

This implementation doesn't cover the situation of eliminating the whole plan's common subexpression. This requires considering not only the expression itself, but also sharing data with different plans or some plan will modify the data (e.g. filter) or other aspects I haven't realize. In the beginning I plan to make it cross plans but it seems better to me to do them in another PR.

What changes are included in this PR?

Add optimize rule CommonSubexprEliminate.

Are there any user-facing changes?

I think there is, as the optimization rule will rewrite input SQL.

Status

This is still a work in progress. I have only covered part of plans and am trying to make it work with the entire execution procedure.

@Dandandan
Copy link
Contributor

Great @waynexia let me know when it's ready for review!

In the TCP-H benchmarks there are also a few that could benefit from CSE, so might make sense to inspect a few of those plans if it's implemented. Although it might not be affecting performance that much for those queries maybe.

@waynexia
Copy link
Member Author

Thanks! I'm also curious to see how much impact CSE will bring. I'll try to gather the TCP-H bench result once I finish this.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking cool @waynexia - I didn't have a chance to go through the identifier bit but it looks reasonable.

@waynexia
Copy link
Member Author

waynexia commented Aug 1, 2021

The implementation is almost finished. I plan to add more tests and make all cases pass next.

I tried to run the tpc-h benchmark and got a bad result 😕

with cse:

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 1, debug: false, iterations: 5, concurrency: 2, batch_size: 4096, path: "./data", file_format: "tbl", mem_table: false, partitions: 8 }
Query 1 iteration 0 took 2901.0 ms
Query 1 iteration 1 took 2767.9 ms
Query 1 iteration 2 took 3011.2 ms
Query 1 iteration 3 took 2861.0 ms
Query 1 iteration 4 took 2955.2 ms
Query 1 avg time: 2899.27 ms

master:

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 1, debug: false, iterations: 5, concurrency: 2, batch_size: 4096, path: "./data", file_format: "tbl", mem_table: false, partitions: 8 }
Query 1 iteration 0 took 2751.7 ms
Query 1 iteration 1 took 2703.5 ms
Query 1 iteration 2 took 2787.9 ms
Query 1 iteration 3 took 2863.1 ms
Query 1 iteration 4 took 2831.3 ms
Query 1 avg time: 2787.50 ms

@Dandandan
Copy link
Contributor

The implementation is almost finished. I plan to add more tests and make all cases pass next.

I tried to run the tpc-h benchmark and got a bad result 😕

with cse:

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 1, debug: false, iterations: 5, concurrency: 2, batch_size: 4096, path: "./data", file_format: "tbl", mem_table: false, partitions: 8 }
Query 1 iteration 0 took 2901.0 ms
Query 1 iteration 1 took 2767.9 ms
Query 1 iteration 2 took 3011.2 ms
Query 1 iteration 3 took 2861.0 ms
Query 1 iteration 4 took 2955.2 ms
Query 1 avg time: 2899.27 ms

master:

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 1, debug: false, iterations: 5, concurrency: 2, batch_size: 4096, path: "./data", file_format: "tbl", mem_table: false, partitions: 8 }
Query 1 iteration 0 took 2751.7 ms
Query 1 iteration 1 took 2703.5 ms
Query 1 iteration 2 took 2787.9 ms
Query 1 iteration 3 took 2863.1 ms
Query 1 iteration 4 took 2831.3 ms
Query 1 avg time: 2787.50 ms

Could be within the noise. Maybe you can check / log whether the optimization is applied at all and for which query?

@waynexia
Copy link
Member Author

waynexia commented Aug 1, 2021

This is the result of Q1, run with this command:

cargo run --release --bin tpch -- benchmark datafusion --iterations 5 --path ./data --format tbl --query 1 --batch-size 4096

I've run it a couple of times and can observe a stable difference of ~100ms.
I checked the optimized logical plan with debug log and make sure this optimization is applied:

=== Optimized logical plan ===
Sort: #lineitem.l_returnflag ASC NULLS FIRST, #lineitem.l_linestatus ASC NULLS FIRST
Projection: #lineitem.l_returnflag, #lineitem.l_linestatus, #SUM(lineitem.l_quantity) AS sum_qty, #SUM(lineitem.l_extendedprice) AS sum_base_price, #SUM(lineitem.l_extendedprice Multiply Int64(1) Minus lineitem.l_discount) AS sum_disc_price, #SUM(lineitem.l_extendedprice Multiply Int64(1) Minus lineitem.l_discount Multiply Int64(1) Plus lineitem.l_tax) AS sum_charge, #AVG(lineitem.l_quantity) AS avg_qty, #AVG(lineitem.l_extendedprice) AS avg_price, #AVG(lineitem.l_discount) AS avg_disc, #COUNT(UInt8(1)) AS count_order
Aggregate: groupBy=[[#lineitem.l_returnflag, #lineitem.l_linestatus]], aggr=[[SUM(#lineitem.l_quantity), SUM(#lineitem.l_extendedprice), SUM(#BinaryExpr-*BinaryExpr--Column-lineitem.l_discountLiteral1Column-lineitem.l_extendedprice AS lineitem.l_extendedprice Multiply Int64(1) Minus lineitem.l_discount), SUM(#BinaryExpr-*BinaryExpr--Column-lineitem.l_discountLiteral1Column-lineitem.l_extendedprice AS lineitem.l_extendedprice Multiply Int64(1) Minus lineitem.l_discount Multiply Int64(1) Plus #lineitem.l_tax), AVG(#lineitem.l_quantity), AVG(#lineitem.l_extendedprice), AVG(#lineitem.l_discount), COUNT(UInt8(1))]]
Projection: #lineitem.l_extendedprice Multiply Int64(1) Minus #lineitem.l_discount, #l_quantity, #l_extendedprice, #l_discount, #l_tax, #l_returnflag, #l_linestatus
Filter: #lineitem.l_shipdate LtEq Date32("10471")
TableScan: lineitem projection=Some([4, 5, 6, 7, 8, 9, 10])

@houqp houqp added the enhancement New feature or request label Aug 2, 2021
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
} => Expr::InList {
expr: rewrite_boxed(expr, rewriter)?,
list,
list: rewrite_vec(list, rewriter)?,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an accident.

Since this PR now contains two changes about "expr" (this one and RewriteRecursion below), perhaps it would be clearer to move these changes to another PR?

cc @alamb

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree two PRs is almost always clearer :)

please do let me know when you would like another look at this PR

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
@waynexia
Copy link
Member Author

Sorry for the late update. I'm afraid I got some problems when trying to fix the last two failed cases (Logs here) .

They failed in the same place where the result table's column name doesn't match. The optimized will produce a column with qualified name which isn't expected. Like test.c1 PLUS test.c2 but the expected is c1 PLUS c2.

When performing common subexpression eliminate optimization, the common expression will be replaced by a single Column expression which references to the pre-calculated column in the input plan. This Column's name is a generated "identifier" that is used to identify common subexpressions.

And in many other places we will use an expr's name generated by Expr::name(). Like in the projection push-down optimizer, we use expr's name to represent a column. Therefore there should have an Alias expr after the Column expr, or we will get a strange string (the "identifier") when trying to naming the rewrote expr.

Everything works as expected so far. It becomes wired when comes to physical plan phase. In this phase, Expr will be converted to PhysicalExpr with a new name generated by physical_name(). This new name will become the result column's name, like c1 PLUS c2 mentioned before. But for those who are rewrote, the expr is just a col().alias(), not a real expr. Its physical name is directly obtained from the alias, which is generated by Expr::name().

And for the root cause here, Expr::name() gives Column expr flat name (test.c1) and physical_name() gives unqualified name (c1). Finally we got test.c1 PLUS test.c2 rather than c1 PLUS c2.

I've tried a few ways to work around this but they end up breaking other parts. I haven't come up with an elegant solution so far. It's really appreciated if someone could put up some discussion on this.

@waynexia
Copy link
Member Author

Let me propose some relatively "big" changes:

  • Make every result columns' name become the qualified name. (If we have any invariant on the column's name please let me know.)
  • Add a new internal Expr for this case. Like AliasWithExpr(Expr) that can keep the original expression.
  • Calculate the result table's header from query SQL. There might be some situations that are impossible to do this.

I haven't tried these, just put them into the discussion as some of them might be unrealistic.

@alamb
Copy link
Contributor

alamb commented Aug 19, 2021

Hi @waynexia -- I plan to review your question/comments carefully tomorrow

@alamb
Copy link
Contributor

alamb commented Aug 20, 2021

Hi @waynexia -- I noticed that the failures in window_partition_by_order_by seem to be all related to window functions. Since window functions were added about the same time as we reworked the fully qualified names, I wonder if perhaps the bug you are chasing is something related to do with window functions specifically rather than a more general problem.

I think @houqp might be the one with the most knowledge of aliasing / fully qualified column names at the moment.

@houqp
Copy link
Member

houqp commented Aug 20, 2021

I will take a closer look at it this weekend, my hunch on this is we only want to strip qualifier to direct column projections, i.e. SELECT test.c1. For compound projections like test.c1 PLUS test.c2, it might be preferable to include qualifiers in the name for better readability.

@waynexia
Copy link
Member Author

Thank you @alamb and @houqp !

I wonder if perhaps the bug you are chasing is something related to do with window functions specifically rather than a more general problem.

AFAIK, It's general problem. A SQL without window function can also run into this problem, like

SELECT AVG(c1 + 1), SUM(c1 + 1) FROM test GROUP BY c2;

it is reproducible if there is some common sub-expression for this optimizer to eliminate, without the need to have a window expression.

For compound projections like test.c1 PLUS test.c2, it might be preferable to include qualifiers in the name for better readability.

The current implementation of physical_name() seems to be stripping all qualifiers. I agree that the behavior can vary depending on the situation.

@houqp
Copy link
Member

houqp commented Aug 23, 2021

The current implementation of physical_name() seems to be stripping all qualifiers. I agree that the behavior can vary depending on the situation.

Yes, this is done mainly for direct column projections so relations are not saved as part of the output. Take the following query as an example:

SELECT table1.id FROM table1

If we don't strip out the relation from the output, saving the output to a csv file will result in a column named table1.id. Now if we try to load that csv file as table2 in downstream queries, we will end up with a column table2.table1.id, which is not expected.

However, for projection on compound expressions, this is not a problem because SUM(table1.id) is still an unqualified field, so we should be good here.

In short, I think it should be fine to include qualifiers in compound projection columns. The only thing is we need to make sure is the behavior should be consistent before and after the subexpression elimination rule has been applied.

@waynexia
Copy link
Member Author

In short, I think it should be fine to include qualifiers in compound projection columns. The only thing is we need to make sure the behavior should be consistent before and after the subexpression elimination rule has been applied.

Agree.

This optimize won't touch a single column expr, so only those with extra operators are affected by rewriting. And since a column name like SUM(table1.id) is acceptable, I propose that we can change the column name in nested expr to qualified form. I'd like to file a PR for this (in a few days :D)

@houqp
Copy link
Member

houqp commented Aug 24, 2021

Sounds good @waynexia , thanks for taking care of this! If you don't mind, it would be great if you could also add the new proposed semantic to the specification at https://github.com/apache/arrow-datafusion/blob/master/docs/specification/output-field-name-semantic.md.

@waynexia waynexia marked this pull request as ready for review September 3, 2021 16:22
@houqp houqp requested review from Dandandan and alamb September 4, 2021 23:34
@alamb
Copy link
Contributor

alamb commented Sep 10, 2021

Hi @waynexia -- I am sorry -- I have been on vacation for a while and am now catching up. I will try and review this over the next few days. I think the python test has been fixed, so if you rebase this PR against master the tests should all pass now

@waynexia
Copy link
Member Author

Thanks, @alamb! No problem, hope you had a great vacation 😃

I have merged the master, let's see how CI goes. I'm not sure if the document and test are concrete. Please let me know if anything is not clear :)

@waynexia
Copy link
Member Author

Well, it still fails... 😢

@houqp
Copy link
Member

houqp commented Sep 13, 2021

@waynexia the failure is not related to your change

Actually, never mind, it is related :P

@houqp
Copy link
Member

houqp commented Sep 13, 2021

OK, so now we are running into a limitation in datafusion. Datafusion doesn't support filtering on unprojected columns yet. The rule here optimized the compound expressions into a single dummy projected column, which removes the individual column projection. I bet if you change it to the following it would work:

        df = df.select(
            f.col("a"),
            f.col("a") + f.col("b"),
            f.col("a") - f.col("b"),
        ).filter(f.col("a") > f.lit(2)) 

This means the impact of this change is bigger than we expected and could break queries that are working today. The design and implementation of this change is correct, it's just it exposed a limitation of datafusion to broader range of queries :(

@waynexia
Copy link
Member Author

waynexia commented Sep 14, 2021

Sorry for the late reply. I haven't set up the test environment for python yet so I tried a SQL like this and got the expected result.

SELECT c1 + c2, c1 * c2 from test where c1 > 2;

I'll run the python test on local later.

But for this case, I'm not sure whether it is related to this change. As a+b and a-b aren't "common expression". The optimizer should leave the logical plan unchanged. Please correct me if I missed something :D

@houqp
Copy link
Member

houqp commented Sep 15, 2021

You are right @waynexia , the optimization rule shouldn't have kicked in for this query 🤔

@houqp
Copy link
Member

houqp commented Sep 15, 2021

It's also interesting that the adding an explicit column a projection does fix the test error:

        df = df.select(
            f.col("a") + f.col("b"),
            f.col("a") - f.col("b"),
            f.col("a"),
        ).filter(f.col("a") > f.lit(2)) 

I am not able to reproduce this error with the Rust SQL API though.

@houqp
Copy link
Member

houqp commented Sep 15, 2021

@waynexia I forced a stack trace using panic to pin point the issue:

thread '<unnamed>' panicked at 'explicit panic', /home/houqp/Documents/code/delta/arrow-datafusion/datafusion/src/logical_plan/dfschema.rs:173:17                                            
stack backtrace:  
   0: std::panicking::begin_panic      
             at /rustc/1698e3cac54aa8691d4e9e207567672af8231cb6/library/std/src/panicking.rs:543:12
   1: datafusion::logical_plan::dfschema::DFSchema::index_of_column_by_name
             at /home/houqp/Documents/code/delta/arrow-datafusion/datafusion/src/logical_plan/dfschema.rs:173:17
   2: datafusion::logical_plan::dfschema::DFSchema::field_with_qualified_name
             at /home/houqp/Documents/code/delta/arrow-datafusion/datafusion/src/logical_plan/dfschema.rs:242:19
   3: datafusion::logical_plan::dfschema::DFSchema::field_from_column
             at /home/houqp/Documents/code/delta/arrow-datafusion/datafusion/src/logical_plan/dfschema.rs:249:24
   4: datafusion::logical_plan::expr::Expr::get_type
             at /home/houqp/Documents/code/delta/arrow-datafusion/datafusion/src/logical_plan/expr.rs:375:35
   5: datafusion::logical_plan::expr::Expr::get_type
             at /home/houqp/Documents/code/delta/arrow-datafusion/datafusion/src/logical_plan/expr.rs:425:18
   6: datafusion::optimizer::common_subexpr_eliminate::optimize
             at /home/houqp/Documents/code/delta/arrow-datafusion/datafusion/src/optimizer/common_subexpr_eliminate.rs:109:29
   7: <datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate as datafusion::optimizer::optimizer::OptimizerRule>::optimize
             at /home/houqp/Documents/code/delta/arrow-datafusion/datafusion/src/optimizer/common_subexpr_eliminate.rs:61:9
   8: datafusion::execution::context::ExecutionContext::optimize_internal
             at /home/houqp/Documents/code/delta/arrow-datafusion/datafusion/src/execution/context.rs:618:24
   9: datafusion::execution::context::ExecutionContext::optimize
             at /home/houqp/Documents/code/delta/arrow-datafusion/datafusion/src/execution/context.rs:499:13
  10: datafusion::dataframe::DataFrame::collect 
             at ./src/dataframe.rs:125:20
  11: datafusion::dataframe::<impl pyo3::class::impl_::PyMethods<datafusion::dataframe::DataFrame> for pyo3::class::impl_::PyClassImplCollector<datafusion::dataframe::DataFrame>>::py_method
s::METHODS::__wrap::{{closure}}
             at ./src/dataframe.rs:49:1
  12: pyo3::callback::handle_panic::{{closure}} 
             at /home/houqp/.cargo/registry/src/github.com-1ecc6299db9ec823/pyo3-0.14.5/src/callback.rs:247:9
  13: std::panicking::try::do_call
             at /rustc/1698e3cac54aa8691d4e9e207567672af8231cb6/library/std/src/panicking.rs:403:40
  14: __rust_try            
  15: std::panicking::try     
             at /rustc/1698e3cac54aa8691d4e9e207567672af8231cb6/library/std/src/panicking.rs:367:19
  16: std::panic::catch_unwind
             at /rustc/1698e3cac54aa8691d4e9e207567672af8231cb6/library/std/src/panic.rs:129:14
  17: pyo3::callback::handle_panic
             at /home/houqp/.cargo/registry/src/github.com-1ecc6299db9ec823/pyo3-0.14.5/src/callback.rs:245:24
  18: datafusion::dataframe::<impl pyo3::class::impl_::PyMethods<datafusion::dataframe::DataFrame> for pyo3::class::impl_::PyClassImplCollector<datafusion::dataframe::DataFrame>>::py_method
s::METHODS::__wrap                                                                            
             at ./src/dataframe.rs:49:1      

More specifically, this line:

let data_type = predicate.get_type(input.schema())?;

I think we can't just use the input schema here. For this failing query, the input (the projection plan) only contains those two compound expressions. You may want to use plan.all_schemas() instead. Here is what the input of the filter plan looks like:

Projection: #S7BgsxQKDc.a Plus #S7BgsxQKDc.b, #S7BgsxQKDc.a Minus #S7BgsxQKDc.b
  TableScan: S7BgsxQKDc projection=None

Would be good if we can create a reproducible test case using Rust's dataframe API first.

@waynexia
Copy link
Member Author

Brilliant job @houqp! Appreciate for pining this down. I'll look at this tonight.

@waynexia
Copy link
Member Author

Hi @houqp, here are some updates.

The reproducer with rust dataframe API is here. IMO the problem we are facing is that Filter plan doesn't have its own schema. And it doesn't require following right after the plan it's going to filter too (a bit wired to me... I haven't inspected how other systems act), which caused this problem. Taking the reproducer for example, the optimizer use "input plan" 's schema to query expr's type and got nothing. The Filter is actually performed in the table scan plan, and that's where to get the schema. Later in another optimizer FilterPushdown the Filter plan is moved after the table scan plan.

I changed the behavior of getting schema in 64330bd. It now will get all the schemas under the Filter plan and merge them into one for querying. This can pass the tests (locally) but I'm wondering whether there are some other approaches to achieve this. One in my mind is to place this optimizer after FilterPushDown, which may solve this problem since Filter is in the "right" place now.

@houqp
Copy link
Member

houqp commented Sep 17, 2021

Sorry for the delay @waynexia . I think going through all the schemas from the input tree is the right thing to do here. Filter and other plans like Projection should be able to access all fields from its full input query plan tree, not just the immediate query plan node. I do agree that we currently don't have a convenient way to access the full schema without asking the user to manually write a schema merge operation or changing the consumer code to work with the schema slice instead of merged schema. I think this ergonomics problem is something we can address in a separate PR.

On top of this, I think it would be beneficial to also reorder the optimizer run to run the filter push down before this optimization rule for a slight performance gain. But I think the subexpression elimination optimizer should work by itself too without having a hard dependency on the filter push down optimizer rule.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an epic PR @waynexia - very nicely done. I went through the changes and tests carefully and they look good to me.

I also tried running IOx (and its tests) against this branch and they all passed which is increases my confidence for it being correct.

🏅

@alamb alamb merged commit 61834d4 into apache:master Sep 18, 2021
@waynexia
Copy link
Member Author

Thank you all! You do help me a lot in carrying this out ❤️

@waynexia waynexia deleted the cse2 branch September 18, 2021 15:50
@houqp
Copy link
Member

houqp commented Sep 18, 2021

Thank you @waynexia for being patient with this PR, this is a really cool optimization rule.

@Dandandan
Copy link
Contributor

Thanks @waynexia !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request performance Make DataFusion faster

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Common subexpression elimination

4 participants