Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reimplement push_down_filter to remove global-state #4365

Merged
merged 10 commits into from
Nov 30, 2022

Conversation

jackwener
Copy link
Member

Which issue does this PR close?

Close #4266
Part of #4267

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the optimizer Optimizer rules label Nov 25, 2022
@jackwener jackwener force-pushed the pushdown_filter branch 2 times, most recently from 07931c4 to ee5c9c7 Compare November 25, 2022 07:06
@jackwener
Copy link
Member Author

jackwener commented Nov 25, 2022

The rule re-implementation of this rule was complex and now is completed.

Now all that remains is an equally complex ProjectionPushDown.

@alamb
Copy link
Contributor

alamb commented Nov 26, 2022

I need to set aside time to review this PR carefully given how important FilterPushdown is -- I will try and find some in the upcoming week.

.into_iter()
.zip(on_or_to_left.1)
.for_each(|(expr, cols)| left_state.filters.push((expr, cols)));
let left = optimize(left, left_state)?;
Copy link
Member Author

@jackwener jackwener Nov 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original implementation here is bottom-up, outside is top down. Now unify to top down in all places.

@mingmwang
Copy link
Contributor

This week I have some time and I can take a closer look at this PR carefully.

@andygrove
Copy link
Member

@ayushdg @sarahyurick It would be good to test the impact of this change in Dask SQL to make sure it doesn't cause any regressions for us.

@sarahyurick
Copy link
Contributor

sarahyurick commented Nov 28, 2022

@ayushdg @sarahyurick It would be good to test the impact of this change in Dask SQL to make sure it doesn't cause any regressions for us.

I only did a couple of trials but looks good on my end so far - I'm seeing a performance improvement with the changes in this PR over DF 14.0.0

.unwrap_or_else(Vec::new);

if join.join_type == JoinType::Inner {
// For inner joins, duplicate filters for joined columns so filters can be pushed down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the same logic can apply to LeftSemi and RightSemi join ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And for Out joins:
For Left Out join, if we have a query SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON (t1_id = t2_id and t2_id >= 100); t2_id >= 100 can be pushed down to the right side.

For Right Out join, if we have a query SELECT t1_id, t1_name, t2_name FROM t1 Right JOIN t2 ON (t1_id = t2_id and t1_id >= 100); t1_id >= 100 can be pushed down to the left side.

Copy link
Member Author

@jackwener jackwener Nov 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we can.
In fact, SEMI/ANTI JOIN is similar with FILTER
Current code is just for refactor, I don't include extra enhancement.
Related issue to track problems like this #4413

@mingmwang
Copy link
Contributor

It seems that with the LogicalPlan structs, it is not that convenient to write pattern-matching and destructuring code, because of the Arc.

match plan {
  LogicalPlan::Filter(Filter{predicate, LogicalPlan::Sort(sort)}) => {
  }
  LogicalPlan::Filter(Filter{predicate, LogicalPlan::Projection(proj)}) =>{
  }
  LogicalPlan::Filter(Filter{predicate, LogicalPlan::Aggregate(agg)}) =>{
  }
}

@mingmwang
Copy link
Contributor

Why we use Arc to wrap all the inputs? Is it because of nested types and Rust does not support nested struct, must wrap the nested structs with Box or Arc ?

Comment on lines 564 to 565
// Convert both qualified and unqualified fields
[
(field.name().clone(), expr.clone()),
(field.qualified_name(), expr),
]
Copy link
Contributor

@mingmwang mingmwang Nov 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this is coming from the existing logic, but could it encounter conflicting unqualified field names here? For example a Projection contains both a.id, b.id

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, original code here exist bug.

Copy link
Member Author

@jackwener jackwener Nov 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has resolved this bug, and add UT, thanks @mingmwang !

The original UT test_crossjoin_with_or_clause can discover this bug there, but it bypassed it with alias.

@jackwener jackwener force-pushed the pushdown_filter branch 3 times, most recently from d50b5d9 to 1f75d1d Compare November 29, 2022 10:58
@alamb
Copy link
Contributor

alamb commented Nov 29, 2022

I plan to test this PR against the IOx test suite later today

@mingmwang
Copy link
Contributor

mingmwang commented Nov 29, 2022

Some part of the Filter can also be pushed down through theLogicalPlan::Window if the filter exprs are part of the partition by keys of the WIndow. The logic should be similar to LogicalPlan::Aggregate group by keys.

@jackwener
Copy link
Member Author

jackwener commented Nov 29, 2022

Regression is fixed in This PR, which is convenient for others test this PR.

@alamb
Copy link
Contributor

alamb commented Nov 29, 2022

I plan to review the code and test failure in this PR later today

@jackwener
Copy link
Member Author

jackwener commented Nov 29, 2022

@mingmwang look like alias can't be in groupby.

sql 1999

<group by clause> Function
Specify a grouped table derived by the application of the <group by clause> to the result of the previously specified clause.
Format
<group by clause> ::=
       GROUP BY <grouping specification>
<grouping specification> ::=
         <grouping column reference>
       | <rollup list>
       | <cube list>
       | <grouping sets list>
       | <grand total>
       | <concatenated grouping>

pg:

-- create
CREATE TABLE EMPLOYEE (
  a INTEGER PRIMARY KEY,
  b INTEGER,
  c INTEGER
);

-- insert
INSERT INTO EMPLOYEE VALUES (0001, 0001, 0001);
INSERT INTO EMPLOYEE VALUES (0002, 0002, 0002);

-- fetch 
SELECT sum(b) FROM EMPLOYEE group by a as hh;

ERROR:  syntax error at or near "as"

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the source of the problem in IOx -- it is exposed but not caused by this change.

After reading through this PR carefully it is really nice work @jackwener 🏅

Thank you so much for the contribution and @mingmwang for the reviews

}
push_down_all_join(predicates, filter.input(), left, right, vec![])?
}
LogicalPlan::TableScan(scan) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the pushdown into scan could be done as part of the physical planning phase. However, since the current filter pushdown happens in the logical planning phase I think it is ok to keep the same behavior in this PR and move the pushdown in some other PR

@mingmwang
Copy link
Contributor

@mingmwang look like alias can't be in groupby.

sql 1999

<group by clause> Function
Specify a grouped table derived by the application of the <group by clause> to the result of the previously specified clause.
Format
<group by clause> ::=
       GROUP BY <grouping specification>
<grouping specification> ::=
         <grouping column reference>
       | <rollup list>
       | <cube list>
       | <grouping sets list>
       | <grand total>
       | <concatenated grouping>

pg:

-- create
CREATE TABLE EMPLOYEE (
  a INTEGER PRIMARY KEY,
  b INTEGER,
  c INTEGER
);

-- insert
INSERT INTO EMPLOYEE VALUES (0001, 0001, 0001);
INSERT INTO EMPLOYEE VALUES (0002, 0002, 0002);

-- fetch 
SELECT sum(b) FROM EMPLOYEE group by a as hh;

ERROR:  syntax error at or near "as"

You can try this:

select (a + b) as c, count(*) from Table_A group by 1

@jackwener
Copy link
Member Author

jackwener commented Nov 30, 2022

You can try this: select (a + b) as c, count(*) from Table_A group by 1

#[test]
fn push_down_filter_groupby_expr_contains_alias() {
    let sql = "SELECT (col_int32 + col_uint32) AS c, count(*) from test group by 1";
    let plan = test_sql(sql).unwrap();
    let expected = "Projection: test.col_int32 + test.col_uint32 AS c, COUNT(UInt8(1))\
    \n  Aggregate: groupBy=[[test.col_int32 + CAST(test.col_uint32 AS Int32)]], aggr=[[COUNT(UInt8(1))]]\
    \n    TableScan: test projection=[col_int32, col_uint32]";
    assert_eq!(expected, format!("{:?}", plan));
}

current implementation will put alias into projection expr

run in integration test. exist bug #4430, run it need to remove bug rule.

#[test]
#[ignore]
// TODO: UnwrapCastInComparison exist bug.
fn push_down_filter_groupby_expr_contains_alias() {
    let sql = "SELECT * FROM (SELECT (col_int32 + col_uint32) AS c, count(*) FROM test GROUP BY 1) where c > 3";
    let plan = test_sql(sql).unwrap();
    let expected = "Projection: c, COUNT(UInt8(1))\
    \n  Projection: test.col_int32 + test.col_uint32 AS c, COUNT(UInt8(1))\
    \n    Aggregate: groupBy=[[test.col_int32 + CAST(test.col_uint32 AS Int32)]], aggr=[[COUNT(UInt8(1))]]\
    \n      Filter: CAST(test.col_int32 + test.col_uint32 AS Int64) > Int64(3)\
    \n        TableScan: test projection=[col_int32, col_uint32]";
    assert_eq!(expected, format!("{:?}", plan));
}

@mingmwang
Copy link
Contributor

You can try this: select (a + b) as c, count(*) from Table_A group by 1

#[test]
fn push_down_filter_groupby_expr_contains_alias() {
    let sql = "SELECT (col_int32 + col_uint32) AS c, count(*) from test group by 1";
    let plan = test_sql(sql).unwrap();
    let expected = "Projection: test.col_int32 + test.col_uint32 AS c, COUNT(UInt8(1))\
    \n  Aggregate: groupBy=[[test.col_int32 + CAST(test.col_uint32 AS Int32)]], aggr=[[COUNT(UInt8(1))]]\
    \n    TableScan: test projection=[col_int32, col_uint32]";
    assert_eq!(expected, format!("{:?}", plan));
}

current implementation will put alias into projection expr

run in integration test. exist bug #4430, run it need to remove bug rule.

#[test]
#[ignore]
// TODO: UnwrapCastInComparison exist bug.
fn push_down_filter_groupby_expr_contains_alias() {
    let sql = "SELECT * FROM (SELECT (col_int32 + col_uint32) AS c, count(*) FROM test GROUP BY 1) where c > 3";
    let plan = test_sql(sql).unwrap();
    let expected = "Projection: c, COUNT(UInt8(1))\
    \n  Projection: test.col_int32 + test.col_uint32 AS c, COUNT(UInt8(1))\
    \n    Aggregate: groupBy=[[test.col_int32 + CAST(test.col_uint32 AS Int32)]], aggr=[[COUNT(UInt8(1))]]\
    \n      Filter: CAST(test.col_int32 + test.col_uint32 AS Int64) > Int64(3)\
    \n        TableScan: test projection=[col_int32, col_uint32]";
    assert_eq!(expected, format!("{:?}", plan));
}

Ok, got it.

@mingmwang
Copy link
Contributor

mingmwang commented Nov 30, 2022

Could you please also modify the UT optimize_plan() method and let the rule run twice and see what will happen ?

    fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
        let rule = FilterPushDown::new();
        rule.optimize(plan, &mut OptimizerConfig::new())
            .expect("failed to optimize plan")
    }

I think some of the logic in the rule like extract_or_clauses_for_join() a reasonable optimization, but if the optimizer rules were applied more than once that duplicate Filters might be generated.

@mingmwang
Copy link
Contributor

Before this PR, there is a global state which can help to avoid duplicate Filters been generated and pushed down.
Now the global state is removed. Need to double conform the behavior is unchanged.

@jackwener
Copy link
Member Author

Has added it in UT.

@mingmwang
Copy link
Contributor

mingmwang commented Nov 30, 2022

Except for the LogicalPlan::Window, the others LGTM. You can also add it in the following PR.

extract_or_clauses_for_join(&to_keep.0, left.schema(), left_preserved);
let or_to_right =
extract_or_clauses_for_join(&to_keep.0, right.schema(), right_preserved);
// TODO: we just get, but don't remove them from origin expr.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it still a TODO here ?

Copy link
Member Author

@jackwener jackwener Nov 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It existed originally. We can found it in tpch-q19.

Yes, I just add a comment, because I think it's a point that we can improve it in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid they can not be removed from the original exprs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think carefully, yes, we can't remove it.

@jackwener
Copy link
Member Author

All followup enhancement in #4433

Comment on lines +462 to +472
for col in columns.iter() {
for (l, r) in join.on.iter() {
if col == l {
join_cols_to_replace.insert(col, r);
break;
} else if col == r {
join_cols_to_replace.insert(col, l);
break;
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is break to return the inner loop. I think for join conditions: on(a.id = b.id and a.id = b.id2) where b.id = 10, we should be able to infer more equality predicates.

Copy link
Member Author

@jackwener jackwener Nov 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is break to return the inner loop. I think for join conditions: on(a.id = b.id and a.id = b.id2) where b.id = 10, we should be able to infer more equality predicates.

It should be common optimization, infer conditon, Many rule about join need to it.

It should be a future ticket, and I think it's complex, because infer condtion sometime will cause bad effect

@alamb
Copy link
Contributor

alamb commented Nov 30, 2022

I think this PR has been outstanding enough and has had enough feedback and testing. Let's plan on completing the follow on work as other PRs. Thank you so much @jackwener and @mingmwang for your careful reviews and @sarahyurick for your testing

@alamb alamb merged commit 3fe542f into apache:master Nov 30, 2022
@ursabot
Copy link

ursabot commented Nov 30, 2022

Benchmark runs are scheduled for baseline = f2e2c29 and contender = 3fe542f. 3fe542f is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@jackwener jackwener deleted the pushdown_filter branch December 1, 2022 01:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core datafusion crate optimizer Optimizer rules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reimplement filter_push_down
7 participants