Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40045][SQL]Optimize the order of filtering predicates #37479

Closed

Conversation

caican00
Copy link
Contributor

@caican00 caican00 commented Aug 11, 2022

Why are the changes needed?

select id, data FROM testcat.ns1.ns2.table
where id =2
and md5(data) = '8cde774d6f7333752ed72cacddb05126'
and trim(data) = 'a' 

Based on the SQL, we currently get the filters in the following order:

// `(md5(cast(data#23 as binary)) = 8cde774d6f7333752ed72cacddb05126)) AND (trim(data#23, None) = a))` comes before `(id#22L = 2)`
== Physical Plan ==
 *(1) Project [id#22L, data#23]
 +- *(1) Filter ((((isnotnull(data#23) AND isnotnull(id#22L)) AND (md5(cast(data#23 as binary)) = 8cde774d6f7333752ed72cacddb05126)) AND (trim(data#23, None) = a)) AND (id#22L = 2))
    +- BatchScan[id#22L, data#23] class org.apache.spark.sql.connector.InMemoryTable$InMemoryBatchScan

In this predicate order, all data needs to participate in the evaluation, even if some data does not meet the later filtering criteria and it may causes spark tasks to execute slowly.

So i think that filtering predicates that need to be evaluated should automatically be placed to the far right to avoid data that does not meet the criteria being evaluated.

As shown below:

//  `(id#22L = 2)` comes before `(md5(cast(data#23 as binary)) = 8cde774d6f7333752ed72cacddb05126)) AND (trim(data#23, None) = a))`
== Physical Plan == 
*(1) Project [id#22L, data#23]
 +- *(1) Filter ((((isnotnull(data#23) AND isnotnull(id#22L)) AND (id#22L = 2) AND (md5(cast(data#23 as binary)) = 8cde774d6f7333752ed72cacddb05126)) AND (trim(data#23, None) = a)))
    +- BatchScan[id#22L, data#23] class org.apache.spark.sql.connector.InMemoryTable$InMemoryBatchScan

How was this patch tested?

  1. Add new test
  2. manually test:the stage execution time for reading data dropped from 6min+ to 24s

image

image

@caican00
Copy link
Contributor Author

gently ping @rdblue @cloud-fan
Could you help to review this patch?

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@@ -65,7 +65,7 @@ object PushDownUtils extends PredicateHelper {
val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter =>
DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr)
}
(Left(r.pushedFilters()), (untranslatableExprs ++ postScanFilters).toSeq)
(Left(r.pushedFilters()), (postScanFilters ++ untranslatableExprs).toSeq)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This order switching makes sense to me. I think the translated filters (postScanFilters) are simple filters that can be evaluated faster, while the untranslated filters are normally complicated filters that take more time to evaluate, so we want to evaluate the postScanFilters filters first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This order switching makes sense to me. I think the translated filters (postScanFilters) are simple filters that can be evaluated faster, while the untranslated filters are normally complicated filters that take more time to evaluate, so we want to evaluate the postScanFilters filters first.

Thank you for your reply. That's exactly what I was thinking

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this simple heuristic should be safe, although it can't optimize all the cases but it won't make things worse.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment here to state that untranslatableExprs needs to be on the right side and also briefly explain the reason?

@zinking
Copy link

zinking commented Aug 12, 2022

the point is clear and valid.

what about this case: day(ts)=1 versus col1 > 13. where ts is a partition column while col1 is not. and some v2 data source implementation is capable of pushing that day function down. ? what will happen?

thought the fix should be more complicated though.

@huaxingao
Copy link
Contributor

@zinking

what about this case: day(ts)=1 versus col1 > 13. where ts is a partition column while col1 is not. and some v2 data source implementation is capable of pushing that day function down. ? what will happen?

Seems to me the current implementation can only push down filters which are in the format of attribute cmp lit. Could you copy and paste the plan that pushes down day(ts)=1?

@caican00
Copy link
Contributor Author

@caican00 Could you change the filters order in case r: SupportsPushDownV2Filters too?

@huaxingao ok, i will optimize this case

@caican00 caican00 reopened this Aug 18, 2022
@caican00
Copy link
Contributor Author

@caican00 Could you change the filters order in case r: SupportsPushDownV2Filters too?

@huaxingao Updated

test("SPARK-40045: Move the post-Scan Filters to the far right") {
val t1 = s"${catalogAndNamespace}table"
withTable(t1) {
spark.udf.register("udfStrLen", (str: String) => str.length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wrap the test with withUserDefinedFunction to unregister the function at the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wrap the test with withUserDefinedFunction to unregister the function at the end.

@cloud-fan Thanks. Updated

val filtersAfter = find(filterAfter.queryExecution.executedPlan)(_.isInstanceOf[FilterExec])
.head.asInstanceOf[FilterExec]
.condition.toString
.split("AND")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's call splitConjunctivePredicates and check toString of the resulting predicates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's call splitConjunctivePredicates and check toString of the resulting predicates.

@cloud-fan Thanks. Updated

.condition.toString
.split("AND")
assert(filtersAfter.length == 5
&& filtersAfter(3).trim.startsWith("(udfStrLen(data")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, shouldn't this udf in the far right?

Copy link
Contributor Author

@caican00 caican00 Aug 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, shouldn't this udf in the far right?

@cloud-fan In the following SQL:

SELECT id, data FROM testcat.ns1.ns2.table
where udfStrLen(data) = 1
and trim(data) = 'a'
and id =2

udfStrLen and trim functions are untranslatable and they're on the far right with respect to id =2. Before
this optimization, id = 2 was on the far right.

== Physical Plan ==
*(1) Project [id#24L, data#25]
+- *(1) Filter (((isnotnull(id#24L) AND (id#24L = 2)) AND (udfStrLen(data#25) = 1)) AND (trim(data#25, None) = a))
   +- BatchScan[id#24L, data#25] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []

s"""
|SELECT id, data FROM $t1
|where udfStrLen(data) = 1
|and trim(data) = 'a'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove trim? to make the test easier to understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove trim? to make the test easier to understand.

okay, i have updated it. @cloud-fan

@@ -253,7 +254,8 @@ class InMemoryTable(
}

class InMemoryScanBuilder(tableSchema: StructType) extends ScanBuilder
with SupportsPushDownRequiredColumns {
with SupportsPushDownRequiredColumns with SupportsPushDownFilters
with SupportsPushDownV2Filters {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this SupportsPushDownV2Filters should be implemented in InMemoryV2FilterScanBuilder

@@ -94,7 +94,7 @@ object PushDownUtils extends PredicateHelper {
val postScanFilters = r.pushPredicates(translatedFilters.toArray).map { predicate =>
DataSourceV2Strategy.rebuildExpressionFromFilter(predicate, translatedFilterToExpr)
}
(Right(r.pushedPredicates), (untranslatableExprs ++ postScanFilters).toSeq)
(Right(r.pushedPredicates), (postScanFilters ++ untranslatableExprs).toSeq)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as the above

@shardulm94
Copy link
Contributor

@caican00 Do you think this PR is ready for another round of review? In our organization, we have seen a number of users impacted by this after migration to DSv2, so it would be nice to get this merged.

@mridulm
Copy link
Contributor

mridulm commented Jan 26, 2023

QQ: Why is this PR targeting 3.3 and not master ?

@shardulm94
Copy link
Contributor

Hey @caican00! Haven't seen an update from your side in the last few months. Are you still interested in contributing this patch to Spark?

@huaxingao
Copy link
Contributor

@caican00 Do you want to finish this? I think you can just remove implementing SupportsPushDownV2Filters here

, then it's ready to be merged.

@huaxingao
Copy link
Contributor

@caican00 if you don't have time for this any more, is it OK with you that I take this over and finish it up? We have quite some customers using DS V2, it would be nice if this fix can be merged. Thanks!

@huaxingao
Copy link
Contributor

@caican00 I have opened a new PR #39892. I don't have your github account email to add you as a co-author. You can add yourself as a co-author to get the commit credit.

@huaxingao
Copy link
Contributor

I will close this PR for now @caican00

@huaxingao huaxingao closed this Feb 6, 2023
dongjoon-hyun pushed a commit that referenced this pull request Feb 8, 2023
All the credit of this PR goes to caican00. Here is the original [PR](#37479)

### What changes were proposed in this pull request?
put untranslated filters to the right side of the translated filters.

### Why are the changes needed?
Normally the translated filters (postScanFilters) are simple filters that can be evaluated faster, while the untranslated filters are complicated filters that take more time to evaluate, so we want to evaluate the postScanFilters filters first.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
new UT

Closes #39892 from huaxingao/filter_order.

Authored-by: huaxingao <huaxin_gao@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
dongjoon-hyun pushed a commit that referenced this pull request Feb 8, 2023
All the credit of this PR goes to caican00. Here is the original [PR](#37479)

### What changes were proposed in this pull request?
put untranslated filters to the right side of the translated filters.

### Why are the changes needed?
Normally the translated filters (postScanFilters) are simple filters that can be evaluated faster, while the untranslated filters are complicated filters that take more time to evaluate, so we want to evaluate the postScanFilters filters first.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
new UT

Closes #39892 from huaxingao/filter_order.

Authored-by: huaxingao <huaxin_gao@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit fe67269)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
All the credit of this PR goes to caican00. Here is the original [PR](apache#37479)

### What changes were proposed in this pull request?
put untranslated filters to the right side of the translated filters.

### Why are the changes needed?
Normally the translated filters (postScanFilters) are simple filters that can be evaluated faster, while the untranslated filters are complicated filters that take more time to evaluate, so we want to evaluate the postScanFilters filters first.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
new UT

Closes apache#39892 from huaxingao/filter_order.

Authored-by: huaxingao <huaxin_gao@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit fe67269)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment