Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-15859][SQL] Optimize the partition pruning within the disjunction #13585

Closed
wants to merge 3 commits into from

Conversation

chenghao-intel
Copy link
Contributor

@chenghao-intel chenghao-intel commented Jun 10, 2016

What changes were proposed in this pull request?

The typical query may looks like: (part1 is the partition key)

SELECT * FROM table WHERE (part1 = 1 AND id > 12) OR (part1 = 5 AND id < 100)

Currently, Spark will ignore the partition key in the disjunction, and causes full table scan, however, only 2 partitions required. This PR aims to drop the non-partition keys from the filter and try to extract the partition key expressions for partition pruning.

For instance:

(part1 = 1 AND id > 12) OR (part1 = 5 AND id < 100) ==> (part1 = 1) OR (part1 = 5)
(part1 = 1 AND a > 3) OR (part2 = 2 AND a < 5)  ==> (part1 = 1 or part1 = 2)
(part1 = 1 AND a > 3) OR (a < 100) => None
(a > 100 AND b < 100) OR (part1 = 10) => None
(a > 100 AND b < 100 AND part1 = 10) OR (part1 = 2) => (part1 = 10 or part1 = 2)

This PR will only works for the HiveTableScan, will submit another PR to optimize the data source API back-end scan.

How was this patch tested?

The unit test is also included in this PR.

@SparkQA
Copy link

SparkQA commented Jun 10, 2016

Test build #60263 has finished for PR 13585 at commit 08519f2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper {
.map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a)
}
}

/**
* Drop the non-partition key expression in the disjunctions, to optimize the partition pruning.
Copy link

@yangw1234 yangw1234 Jun 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Drop the non-partition key expression in the disjunctions". Should it be "conjunctions", or I'm getting the meanings wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's should be disjunction. for example:

(part=1 and a=1) or (part = 2 and a=4), this should be disjunction, right?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is (part=1 conjunction a=1) disjunction (part=2 conjunction a=4), right? But the expression get dropped is a=1 which is in "conjunction" with part=1 and a=4 which is in "conjunction" with part=2. So I thought it should be conjunctions.

Or maybe we can phrase it in another way to avoid the confusion? ^_^

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, OK, originally, I think the conjunction cases was handled in collectProjectsAndFilters already, before being passed into this function, and here, we only handle the AND in the disjunction. (You can see this in HiveTableScans in HiveStrategies.scala)

Anyway, you convinced me. :)

@liancheng
Copy link
Contributor

You probably meant "conjunction" (aka "logical and") instead of "disjunction" (aka "logical or") in the PR title and comments.

As @clockfly had pointed out, the current approach isn't correct. I think a better approach to extract as many partition column predicates as possible is through CNF conversion, which pulls up all conjunctions to the top level, and then it's safe to do the optimization you intended to do in this PR.

There had been PR(s) tried to add CNF conversion to Spark SQL. However, one problem is that CNF conversion can lead to exponential explosion in respect to expression size (i.e. number of tree nodes in the expression tree). Thus usually we need to set an upper limit of the expression size and stops doing CNF conversion once the upper limit is exceeded.

@yangw1234
Copy link

Hi @liancheng , CNF is truly a more systematic way to deal with this problem.

Not really sure I am right or not, but I think as long as we push the not operator down to the lowest level of the expression tree, the approach proposed by @chenghao-intel will work. Take the above example, expression !(partition = 1 && a > 3) will be transformed to (!(partition = 1)) || (! (a > 3)), and according to the second example given by @chenghao-intel in the doc, the expression should be dropped entirely, so partition = 1 will not be pruned. (But this rule is not appeared in the code, maybe he is working in progress to implemented this rule. I don't know for sure.)

What do you think?

@chenghao-intel
Copy link
Contributor Author

Thank you all for the review, but I am not going to solve the CNF, the intention of this PR is to exact more partition pruning expression, so we will get have less partition to scan during the table scanning.

But I did find some bug in this PR, will add more unit test soon.

HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil
HiveTableScanExec(_,
relation,
pruningPredicates ++ additionalPartPredicates)(sparkSession)) :: Nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For HiveTableScan, the predicate here just to minimize the partition scanning, so what we need to do is to put a more specific partition pruning predicate.

Sorry if there is something confused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yangw1234 @liancheng @clockfly
pruningPredicates ++ additionalPartPredicates is the partition filter, and, the original filter still need to be applied after the partition pruned.

Copy link
Contributor

@clockfly clockfly Jun 13, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we understand that the additionalPartPredicates is the partition filter. But we may not be able to assure BooleanSimplification will push all NOT operator to leaf expressions, as BooleanSimplification is an "optimizer" rule, which can be skipped if exceeding max iterations during optimization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, @clockfly I am not so sure your mean, this PR is not designed to depends on the Optimizer (CNF), can you please give more concrete example if there is a bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @yangw1234 , I will update the code to be more strict for the partition pruning filter extraction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @clockfly to point the exception also. :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

glad I could help ^_^

@chenghao-intel
Copy link
Contributor Author

Updated with more meaningful function name and add more unit test.

@chenghao-intel
Copy link
Contributor Author

cc @liancheng

@SparkQA
Copy link

SparkQA commented Jun 13, 2016

Test build #60376 has finished for PR 13585 at commit 79f7acb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* (part1 == 1 and a > 3) or (part2 == 2 and a < 5) ==> (part1 == 1 or part1 == 2)
* (part1 == 1 and a > 3) or (a < 100) => None
* (a > 100 && b < 100) or (part1 = 10) => None
* (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or part1 == 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't mix up different sets of operators here (&&/and and =/==)

@liancheng
Copy link
Contributor

One problem in the tests is that other optimization rules may optimize the filter predicates before the newly added rule, and hide bugs in the new rule. The one @clockfly pointed out is one example.

@lianhuiwang
Copy link
Contributor

lianhuiwang commented Jun 14, 2016

@liancheng @chenghao-intel I think we can optimize it like mysql's range optimizer:http://dev.mysql.com/doc/refman/5.7/en/range-optimization.html. I have implemented it in my branch:https://github.com/lianhuiwang/spark/tree/partition_pruning. I did not combine many ranges because it will transform partition pruning predicates to Hive's filter expression in getPartitionsByFilter() and Hive's metastore database can filter partitions. if that is right, I will push a new PR for it.

@liancheng
Copy link
Contributor

@lianhuiwang @chenghao-intel Thanks for working on this! As you already know, we are currently trying to get Spark 2.0 RC1 ASAP, please allow me to revisit both of your branches later. Sorry for the delay!

@lianhuiwang
Copy link
Contributor

@liancheng I think you have access to revisit my branch. Thanks.

@yhuai
Copy link
Contributor

yhuai commented Jul 25, 2016

@chenghao-intel Will you have time to update this PR?

def extractPartitionKeyExpression(
predicate: Expression, partitionKeyIds: AttributeSet): Option[Expression] = {
// drop the non-partition key expression in conjunction of the expression tree
val additionalPartPredicate = predicate transformUp {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR may have critical bugs, when user implements a UDF which logically like the NOT operator in the partition filter expression. Probably we need a white list the built-in UDFs.

@yhuai @liancheng @yangw1234 @clockfly any comments on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can keep updating the code if we are agreed for approach, otherwise, I think we'd better close this PR for now.

@gatorsmile
Copy link
Member

@chenghao-intel Any update on this PR? Should we close this PR now and then revisit it later?

@chenghao-intel
Copy link
Contributor Author

Oh, yes, I am closing it, will reopen it when we have another idea.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants