Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-9619: [Rust] [DataFusion] Add predicate push-down #7880

Closed
wants to merge 5 commits into from
Closed

ARROW-9619: [Rust] [DataFusion] Add predicate push-down #7880

wants to merge 5 commits into from

Conversation

jorgecarleitao
Copy link
Member

@jorgecarleitao jorgecarleitao commented Aug 2, 2020

This PR adds a new optimizer to push filters down. For example, a plan of the form

Selection: #SUM(c) Gt Int64(10)\
  Selection: #b Gt Int64(10)\
    Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
      Projection: #a AS b, #c\
        TableScan: test projection=None"

is converted to

Selection: #SUM(c) Gt Int64(10)\
  Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
    Projection: #a AS b, #c\
      Selection: #a Gt Int64(10)\
        TableScan: test projection=None";

(note how the filter expression changed from #b Gt Int64(10) to #a Gt Int64(10), and how only the filter on the key of the aggregate was pushed)

This works by performing two passes on the plan. On the first pass (analyze), it identifies:

  1. all filters on the plan (selections)
  2. all projections on the plan (projections)
  3. all places where a filter on a column cannot be pushed down (break_points)

After this pass, it computes the maximum depth that a filter can be pushed down as well as the new expression that the filter should have, given all the projections that exist.

On the second pass (optimize), it:

  • removes all old filters
  • adds all new filters

See comments on the code for details.

This PR is built on top of #7879 (first two commits).

FYI @andygrove @sunchao

@github-actions
Copy link

github-actions bot commented Aug 2, 2020

@jorgecarleitao
Copy link
Member Author

Any of you @alamb @houqp @nevi-me @paddyhoran could help out here? I think that this does significantly speeds querying for anything more complex, as we run aggregations and projections on much less data.

@alamb
Copy link
Contributor

alamb commented Aug 12, 2020 via email

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All in all I think this is a good initial Selection pushdown optimization.

I spent a while reading the code, and while I can't say I was able to follow the algorithm exactly ( I found the split of logic between the calculation of where to break push the Selection and the actual pushing of them hard to follow), I think the number and breadth of tests is good and in general sufficient to convince me this code does what it says.

One question I had was if you have any particular SQL queriesthis optimization will help with? The one kind I could think of (a HAVING clause which is normally executed after aggregate but could be pushed down) doesn't appear to be implemented yet:

> SELECT status, COUNT(1) FROM http_api_requests_total WHERE path = '/api/v2/write' GROUP BY status HAVING status = '2XX';
NotImplemented("HAVING is not implemented yet")

And typically the WHERE clause in SQL statements ar already starts as far down in the plan as possible:

> explain SELECT status, COUNT(1) FROM http_api_requests_total WHERE path = '/api/v2/write' GROUP BY status;
+--------------+----------------------------------------------------------------+
| plan_type    | plan                                                           |
+--------------+----------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[#status]], aggr=[[COUNT(UInt8(1))]]       |
|              |   Selection: #path Eq Utf8("/api/v2/write")                    |
|              |     TableScan: http_api_requests_total projection=Some([6, 8]) |
+--------------+----------------------------------------------------------------+

Another case that might be interesting would be if you had a Selection of ANDs where one clause could be pushed down and part could not be. For example

  Selection: #b Gt Int64(10) AND SUM(c) Gt Int64(10)\
    Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
      Projection: #a AS b, #c\
        TableScan: test projection=None"

Could still be converted into the following (though there are now a different number of LogicalPlan nodes)

Selection: #SUM(c) Gt Int64(10)\
  Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
    Projection: #a AS b, #c\
      Selection: #a Gt Int64(10)\
        TableScan: test projection=None";

rust/datafusion/src/optimizer/utils.rs Outdated Show resolved Hide resolved
rust/datafusion/src/optimizer/utils.rs Outdated Show resolved Hide resolved
rust/datafusion/src/optimizer/utils.rs Outdated Show resolved Hide resolved
rust/datafusion/src/optimizer/utils.rs Outdated Show resolved Hide resolved
rust/datafusion/src/optimizer/type_coercion.rs Outdated Show resolved Hide resolved
.filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
.build()?;

// not part of the test, just good to know:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@jorgecarleitao
Copy link
Member Author

Thank you very much @alamb for reviewing it!

This optimizer is mostly useful in the table or DataFrame API, on which a view can be declared as a sequence of statements that are not optimized for execution, but optimized for a logical and code organization's point of view.

One example is when we have a dataframe df that was constructed optimally, but we would like to only look at rows whose 'a' > 2. Instead of having to go through the actual code that built that DataFrame and place the filter in the correct place after investigating where we should place it, we can just write df.filter(df['a'] > 2).collect(), and let the optimizer figure it out where to place it.

I incorporated the comments above into #7879 , as IMO they are part of that PR, and rebased the whole thing. I will still address your comment about not full understanding the algorithm by adding a more extended comment and maybe try drawing some ASCII to better explain the idea, so that it is not only on my head.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jorgecarleitao !

rust/datafusion/src/optimizer/type_coercion.rs Outdated Show resolved Hide resolved
@jorgecarleitao
Copy link
Member Author

@alamb , I have added a comment describing the algorithm. Could you take a look and evaluate if it helps at understanding the underlying code?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started reading through this more carefully @jorgecarleitao and there are some additional test cases I want to try (when there are several selections). Thanks for the comments -- they are super helpful

rust/datafusion/src/optimizer/filter_push_down.rs Outdated Show resolved Hide resolved
rust/datafusion/src/optimizer/filter_push_down.rs Outdated Show resolved Hide resolved
rust/datafusion/src/optimizer/filter_push_down.rs Outdated Show resolved Hide resolved
Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great work 👍

@alamb
Copy link
Contributor

alamb commented Aug 15, 2020

I have had this nagging sensation that the algorithm isn't quite right when the same column is used multiple times. I finally came up with an example that shows part of what I have been worrying about.

Here is a new test that passes on this branch but I think is incorrect. Specifically, with two Selections for the same variable separated by a Limit, one of the Selections is lost with the algorithm as written. Am I missing something?


    #[test]
    fn filter_2_breaks_limits() -> Result<()> {
        let table_scan = test_table_scan()?;
        let plan = LogicalPlanBuilder::from(&table_scan)
            .project(vec![col("a")])?
            .filter(col("a").lt_eq(&Expr::Literal(ScalarValue::Int64(1))))?
            .limit(1)?
            .project(vec![col("a")])?
            .filter(col("a").gt_eq(&Expr::Literal(ScalarValue::Int64(1))))?
            .build()?;
        // Should be able to move both filters below the projections

        // not part of the test
        assert_eq!(
            format!("{:?}", plan),
            "Selection: #a GtEq Int64(1)\
             \n  Projection: #a\
             \n    Limit: 1\
             \n      Selection: #a LtEq Int64(1)\
             \n        Projection: #a\
             \n          TableScan: test projection=None"
        );

        // This just seems wong: we lost a selection....
        let expected = "\
        Projection: #a\
        \n  Selection: #a GtEq Int64(1)\
        \n    Limit: 1\
        \n      Projection: #a\
        \n        TableScan: test projection=None";

        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

FYI @jorgecarleitao

@jorgecarleitao jorgecarleitao marked this pull request as draft August 15, 2020 18:39
@jorgecarleitao
Copy link
Member Author

@alamb , thank you so much for taking the time to think through this and come up with an example. I agree with you that it is wrong. I will evaluate whether the current approach is able to coupe with this, or whether we will have to scratch it and start from a different direction.

I changed this PR back to draft as it is obviously out of spec.

@jorgecarleitao jorgecarleitao marked this pull request as ready for review August 16, 2020 03:10
@jorgecarleitao
Copy link
Member Author

@alamb, @houqp @andygrove , I think that this is ready to re-review.

I modified the result returned by analyze to ensure that we do not lose relevant information (that lead to the error @alamb found).

I also found and fixed another error related to the placement of two filters in the same depth, that caused filters to be dropped: their expressions are now ANDed instead, which has the added bonus of gobbling filters together whenever possible.

All the changes are in new commits, in case it is easier for the review.

if max_depth.is_none() {
// it is unlikely that the plan is correct without break points as all scans
// adds breakpoints. We just return the plan and let others handle the error
return Ok(plan.clone());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we return error here instead if the plan is not correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is poorly written. What I was trying to say is that we allow the compiler to not return an error on poorly designed plans, in which case it just does not perform any optimization. This way, the user is likely to receive a better error message.

This is a design decision that we need to take wrt to optimizers (error or ignore?). I have no strong opinion about it either: we can also return an error.

Let me know what you prefer that I will change it.

@houqp
Copy link
Member

houqp commented Aug 16, 2020

Something that can be left for future optimization: we can also go the other direction, i.e. break And filters into into individual boolean expressions so these filters can be partially pushed further down the plan.

@jorgecarleitao
Copy link
Member Author

Something that can be left for future optimization: we can also go the other direction, i.e. break And filters into into individual boolean expressions so these filters can be partially pushed further down the plan.

Yeap, good idea. AFAI experienced, spark is not doing this - at least up to spark 2.4.5.

This implements filter pushdown optimization with double pass algorithm.

Currently, "limit" and aggregates block the push down of the filter.

This currently does not push the filter to the scan as we
currently do not support filtered scans.
Big kudos to @alamb for identifying this error.
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is looking good. I spent a bunch of time trying to come up with counter examples / fool the pushdown logic and I could not. Nice work @jorgecarleitao

Some example plans it handled without issue:

---- optimizer::filter_push_down::tests::filter_2_aggs stdout ----
********Original plan:
Selection: #a Eq Int64(1)
  Projection: #a AS b, #b AS a
    Selection: #a GtEq Int64(1)
      Projection: #a AS b, #b AS a
        Selection: #a LtEq Int64(1)
          Aggregate: groupBy=[[#a, #b]], aggr=[[MIN(#b)]]
            TableScan: test projection=None
********Optimized plan plan:
Projection: #a AS b, #b AS a
  Projection: #a AS b, #b AS a
    Aggregate: groupBy=[[#a, #b]], aggr=[[MIN(#b)]]
      Selection: #a Eq Int64(1) And #b GtEq Int64(1) And #a LtEq Int64(1)
        TableScan: test projection=None
********Original plan:
Selection: #b GtEq Int64(1)
  Selection: #b LtEq Int64(1)
    Aggregate: groupBy=[[#a]], aggr=[[MIN(#b)]]
      Selection: #a GtEq Int64(1)
        Projection: #a AS b, #b AS a
          Selection: #a LtEq Int64(1)
            Aggregate: groupBy=[[#a, #b]], aggr=[[MIN(#b)]]
              TableScan: test projection=None
********Optimized plan:
Selection: #b GtEq Int64(1) And #b LtEq Int64(1)
  Aggregate: groupBy=[[#a]], aggr=[[MIN(#b)]]
    Projection: #a AS b, #b AS a
      Aggregate: groupBy=[[#a, #b]], aggr=[[MIN(#b)]]
        Selection: #b GtEq Int64(1) And #a LtEq Int64(1)
          TableScan: test projection=None

Projection: #a\
\n Selection: #a GtEq Int64(1)\
\n Limit: 1\
\n Projection: #a\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something doesn't quite seem right here: I am surprised that one Projection is left in the plan while another is not (it is fine given that this pass is just supposed to push Selections), this just seems odd

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I did not understand your comment. I thought that both projections were left in the plan (line 590 and line 593).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right -- I apologize I misread the diff. This looks good to me

@alamb
Copy link
Contributor

alamb commented Aug 17, 2020

Something that can be left for future optimization: we can also go the other direction, i.e. break And filters into into individual boolean expressions so these filters can be partially pushed further down the plan.

Yeap, good idea. AFAI experienced, spark is not doing this - at least up to spark 2.4.5.

I filed https://issues.apache.org/jira/browse/ARROW-9771 to track this suggestion

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might also be useful to reference the Apache Spark optimizer rules as we implement new rules in this project. Their PredicatePushDown rules starts around line 1100 here https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

I haven't had time to review this PR fully but am happy to approve based on the other reviews.

@houqp
Copy link
Member

houqp commented Aug 18, 2020

One thing I do like about spark's optimizer is all optimization rules share a common plan tree traversal and mutation routine, which made individual optimization rule easier to reason about. I can see us adopting the same pattern in the future to simplify the existing code base.

@andygrove andygrove closed this in 197f903 Aug 19, 2020
emkornfield pushed a commit to emkornfield/arrow that referenced this pull request Sep 8, 2020
This PR adds a new optimizer to push filters down. For example, a plan of the form

```
Selection: #SUM(c) Gt Int64(10)\
  Selection: #b Gt Int64(10)\
    Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
      Projection: #a AS b, #c\
        TableScan: test projection=None"
```

is converted to

```
Selection: #SUM(c) Gt Int64(10)\
  Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
    Projection: #a AS b, #c\
      Selection: #a Gt Int64(10)\
        TableScan: test projection=None";
```

(note how the filter expression changed from `#b Gt Int64(10)` to `#a Gt Int64(10)`, and how only the filter on the key of the aggregate was pushed)

This works by performing two passes on the plan. On the first pass (analyze), it identifies:

1. all filters on the plan (selections)
2. all projections on the plan (projections)
3. all places where a filter on a column cannot be pushed down (break_points)

After this pass, it computes the maximum depth that a filter can be pushed down as well as the new expression that the filter should have, given all the projections that exist.

On the second pass (optimize), it:

* removes all old filters
* adds all new filters

See comments on the code for details.

This PR is built on top of apache#7879 (first two commits).

FYI @andygrove @sunchao

Closes apache#7880 from jorgecarleitao/filter_push

Authored-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
Signed-off-by: Andy Grove <andygrove73@gmail.com>
emkornfield pushed a commit to emkornfield/arrow that referenced this pull request Oct 16, 2020
This PR adds a new optimizer to push filters down. For example, a plan of the form

```
Selection: #SUM(c) Gt Int64(10)\
  Selection: #b Gt Int64(10)\
    Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
      Projection: #a AS b, #c\
        TableScan: test projection=None"
```

is converted to

```
Selection: #SUM(c) Gt Int64(10)\
  Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
    Projection: #a AS b, #c\
      Selection: #a Gt Int64(10)\
        TableScan: test projection=None";
```

(note how the filter expression changed from `#b Gt Int64(10)` to `#a Gt Int64(10)`, and how only the filter on the key of the aggregate was pushed)

This works by performing two passes on the plan. On the first pass (analyze), it identifies:

1. all filters on the plan (selections)
2. all projections on the plan (projections)
3. all places where a filter on a column cannot be pushed down (break_points)

After this pass, it computes the maximum depth that a filter can be pushed down as well as the new expression that the filter should have, given all the projections that exist.

On the second pass (optimize), it:

* removes all old filters
* adds all new filters

See comments on the code for details.

This PR is built on top of apache#7879 (first two commits).

FYI @andygrove @sunchao

Closes apache#7880 from jorgecarleitao/filter_push

Authored-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
Signed-off-by: Andy Grove <andygrove73@gmail.com>
@jorgecarleitao jorgecarleitao deleted the filter_push branch October 28, 2020 04:17
GeorgeAp pushed a commit to sirensolutions/arrow that referenced this pull request Jun 7, 2021
This PR adds a new optimizer to push filters down. For example, a plan of the form

```
Selection: #SUM(c) Gt Int64(10)\
  Selection: #b Gt Int64(10)\
    Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
      Projection: #a AS b, #c\
        TableScan: test projection=None"
```

is converted to

```
Selection: #SUM(c) Gt Int64(10)\
  Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
    Projection: #a AS b, #c\
      Selection: #a Gt Int64(10)\
        TableScan: test projection=None";
```

(note how the filter expression changed from `#b Gt Int64(10)` to `#a Gt Int64(10)`, and how only the filter on the key of the aggregate was pushed)

This works by performing two passes on the plan. On the first pass (analyze), it identifies:

1. all filters on the plan (selections)
2. all projections on the plan (projections)
3. all places where a filter on a column cannot be pushed down (break_points)

After this pass, it computes the maximum depth that a filter can be pushed down as well as the new expression that the filter should have, given all the projections that exist.

On the second pass (optimize), it:

* removes all old filters
* adds all new filters

See comments on the code for details.

This PR is built on top of apache#7879 (first two commits).

FYI @andygrove @sunchao

Closes apache#7880 from jorgecarleitao/filter_push

Authored-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
Signed-off-by: Andy Grove <andygrove73@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants