Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26736][SQL] if filter condition And has non-determined sub function it does not do partition prunning #24118

Closed
wants to merge 10 commits into from

Conversation

zhaorongsheng
Copy link
Contributor

@zhaorongsheng zhaorongsheng commented Mar 17, 2019

What changes were proposed in this pull request?

If filter condition And includes non-determined sub expression partition prunning will not work. This patch will take out the determined sub expression in And to make the partition prunning work.

Example:
A partitioned table definition:
create table test(id int) partitioned by (dt string);
The following sql does not do partition prunning:
select * from test where dt='20190101' and rand() < 0.5;

This PR will fix this problem.

How was this patch tested?

It will be tested in PruningSuite by adding test case Partition pruning - with filter containing non-determined condition

Please review http://spark.apache.org/contributing.html before opening a pull request.

@maropu
Copy link
Member

maropu commented Mar 17, 2019

Could you describe more (e.g., an example query to reproduce the issue you described) in the PR description?

@zhaorongsheng
Copy link
Contributor Author

zhaorongsheng commented Mar 17, 2019

@maropu The description was updated. Please review it, thanks~

@@ -63,7 +63,7 @@ object PhysicalOperation extends PredicateHelper {
val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
(Some(substitutedFields), filters, other, collectAliases(substitutedFields))

case Filter(condition, child) if condition.deterministic =>
case Filter(condition, child) if condition.deterministic || condition.isInstanceOf[And] =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this pattern should not return non-determinisitc exprs, but this current change does so, right?
Can we modify code to extract deterministic exprs in line 67-69?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. I will do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been updated. Please review it, thanks~~

@maropu
Copy link
Member

maropu commented Mar 18, 2019

ok to test

@SparkQA
Copy link

SparkQA commented Mar 18, 2019

Test build #103613 has finished for PR 24118 at commit b5d0c67.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Mar 19, 2019

Could you make the PR title/description more precise before reviews? I think this pr does not target the rand() function only....

@zhaorongsheng zhaorongsheng changed the title [SPARK-26736][SQL] if filter condition has rand() function it does not do partition prunning [SPARK-26736][SQL] if filter condition has non-determined function it does not do partition prunning Mar 19, 2019
@zhaorongsheng zhaorongsheng changed the title [SPARK-26736][SQL] if filter condition has non-determined function it does not do partition prunning [SPARK-26736][SQL] if filter condition And has non-determined function it does not do partition prunning Mar 19, 2019
@zhaorongsheng
Copy link
Contributor Author

OK, The title has been updated~

@zhaorongsheng zhaorongsheng changed the title [SPARK-26736][SQL] if filter condition And has non-determined function it does not do partition prunning [SPARK-26736][SQL] if filter condition And having non-determined sub function it does not do partition prunning Mar 19, 2019
@zhaorongsheng zhaorongsheng changed the title [SPARK-26736][SQL] if filter condition And having non-determined sub function it does not do partition prunning [SPARK-26736][SQL] if filter condition And has non-determined sub function it does not do partition prunning Mar 19, 2019
val (fields, filters, other, aliases) = collectProjectsAndFilters(child)
val substitutedCondition = substitute(aliases)(condition)
(fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases)
case filter: Filter if filter.condition.deterministic || filter.condition.isInstanceOf[And] =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PhysicalOperation is used in many places, so have you checked this change has no side-effect for the other behivours?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have checked this change and I think that it has no side-effect for the other behaviors.

@maropu
Copy link
Member

maropu commented Mar 19, 2019

It seems you forget to update the title in the jira?

@zhaorongsheng
Copy link
Contributor Author

It has been updated.

Some(condition)
} else {
val andCondition = condition.asInstanceOf[And]
if (andCondition.left.deterministic) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we intend to handle only at the top level ? What happens when the non-deterministic predicate is nested like :

where ((c1 = 10 and rand() < 1) and (c2 = 20))

in this case, in my understanding of the code, we will ignore (c1 = 10) for pruning purposes ? cc @maropu

Edited: Another example :

where c1 = 10 and c2 = 20 and c3 = 30 and rand() < 1 and c4 = 40

in this case, we would only consider c4 = 40 for pruning, no ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, we need more general solution for that..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dilipbiswal @maropu It has been updated. Please review it, thanks~~

@SparkQA
Copy link

SparkQA commented Mar 22, 2019

Test build #103830 has finished for PR 24118 at commit 39fac02.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

(fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases)
} else {
(None, Nil, filter, Map.empty)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests somewhere to check if this addition could extract deterministic conditions you expect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the test case named Partition pruning - with filter containing non-determined condition in sub And-expr in PruningSuite.

Copy link
Member

@maropu maropu Mar 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, but it seems they are end-to-end tests, so I think we need more fine-grained tests for collectProjectsAndFilters.

val substitutedCondition = substitute(aliases)(condition)
(fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases)
case filter @ Filter(condition, child)
if condition.deterministic || condition.isInstanceOf[And] =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the isInstanceOf check required any more ?

@@ -91,6 +97,27 @@ object PhysicalOperation extends PredicateHelper {
.map(Alias(_, a.name)(a.exprId, a.qualifier)).getOrElse(a)
}
}

private def getDeterminedExpression(expr: Expression): Option[Expression] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getDeterministicExpression or extractDeterministicExpression ? What do you think @maropu ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I like the name including Deterministic

@@ -91,6 +97,27 @@ object PhysicalOperation extends PredicateHelper {
.map(Alias(_, a.name)(a.exprId, a.qualifier)).getOrElse(a)
}
}

private def getDeterminedExpression(expr: Expression): Option[Expression] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also some comments about this function at the top with small snippets of input and output.

@dilipbiswal
Copy link
Contributor

We should also update comments here to reflect the changes ?

@zhaorongsheng
Copy link
Contributor Author

zhaorongsheng commented Mar 26, 2019

I have updated some codes and add test cases. Please review it, thanks~

@SparkQA
Copy link

SparkQA commented Mar 26, 2019

Test build #103976 has finished for PR 24118 at commit 9093df4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* col = 1 and rand() < 1
* (col1 = 1 and rand() < 1) and col2 = 1
* col1 = 1 or rand() < 1
* (col1 = 1 and rand() < 1) or (col2 = 1 and rand() < 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we don't need to handle this case (col1 = 1 and rand() < 1) or (col2 = 1 and rand() < 1) in this pr because DNF forms should be handled in another normalization logic (e.g., SPARK-6624). So, I think its ok to handle CFN forms only here. In fact, I think we should keep the same semantics with PushDownPredicate. cc: @gatorsmile @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to be consistent with PushDownPredicate

@SparkQA
Copy link

SparkQA commented Mar 31, 2019

Test build #104131 has finished for PR 24118 at commit be633be.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Apr 1, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Apr 1, 2019

Test build #104154 has finished for PR 24118 at commit be633be.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhaorongsheng
Copy link
Contributor Author

@maropu Is there any progress about this PR?

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

github-actions bot commented Jan 1, 2020

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Jan 1, 2020
@github-actions github-actions bot closed this Jan 2, 2020
@boneanxs
Copy link

Is there any problems about the pr? We also encounter this problem, and we find that hive can hanle this well.

@cloud-fan
Copy link
Contributor

this should have been fixed by a58d91b

@maropu
Copy link
Member

maropu commented Jan 15, 2020

It seems this pr intended to target hive tables? I tried the example query in the current master;

// hive table
scala> sql("""create table test(id int) partitioned by (dt string)""")
scala> sql("""select * from test where dt='20190101' and rand() < 0.5""").explain(true)

== Physical Plan ==
*(1) Filter ((isnotnull(dt#19) AND (dt#19 = 20190101)) AND (rand(6515336563966543616) < 0.5))
+- Scan hive default.test [id#18, dt#19], HiveTableRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#18], [dt#19], Statistics(sizeInBytes=8.0 EiB)


// datasource table
sql("""create table test(id int, dt string) using parquet partitioned by (dt)""")
sql("""select * from test where dt='20190101' and rand() < 0.5""").explain(true)

== Physical Plan ==
*(1) Filter (rand(1519810875701056142) < 0.5)
+- *(1) ColumnarToRow
   +- FileScan parquet default.test[id#30,dt#31] Batched: true, DataFilters: [], Format: Parquet, Location: CatalogFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/test], PartitionFilters: [isnotnull(dt#31), (dt#31 = 20190101)], PushedFilters: [], ReadSchema: struct<id:int>

@cloud-fan
Copy link
Contributor

Seems we need to update HiveTableScans to use ScanOperation as well.

@maropu
Copy link
Member

maropu commented Jan 15, 2020

Ah, right. I'll make a pr for that later.

maropu added a commit that referenced this pull request Jan 15, 2020
…ions in Hive tables

### What changes were proposed in this pull request?

This PR intends to improve partition pruning for nondeterministic expressions in Hive tables:

Before this PR:
```
scala> sql("""create table test(id int) partitioned by (dt string)""")
scala> sql("""select * from test where dt='20190101' and rand() < 0.5""").explain()

== Physical Plan ==
*(1) Filter ((isnotnull(dt#19) AND (dt#19 = 20190101)) AND (rand(6515336563966543616) < 0.5))
+- Scan hive default.test [id#18, dt#19], HiveTableRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#18], [dt#19], Statistics(sizeInBytes=8.0 EiB)
```
After this PR:
```
== Physical Plan ==
*(1) Filter (rand(-9163956883277176328) < 0.5)
+- Scan hive default.test [id#0, dt#1], HiveTableRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#0], [dt#1], Statistics(sizeInBytes=8.0 EiB), [isnotnull(dt#1), (dt#1 = 20190101)]
```
This PR is the rework of #24118.

### Why are the changes needed?

For better performance.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Unit tests added.

Closes #27219 from maropu/SPARK-26736.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants