-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25860][SQL] Replace Literal(null, _) with FalseLiteral whenever possible #22857
Conversation
Test build #98127 has finished for PR 22857 at commit
|
@dbtsai @gatorsmile @cloud-fan could you guys, please, take a look? |
case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) | ||
case p: LogicalPlan => p transformExpressions { | ||
case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) | ||
case CaseWhen(branches, elseValue) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit,
case cw @ CaseWhen(branches, _) =>
..
..
cw.copy(branches = newBranches)
LGTM. @cloud-fan and @gatorsmile, this is the PR I mentioned to you earlier this year in the SF Spark summit which can simplify some of our queries. Also add @dongjoon-hyun and @viirya Thanks. |
val expectedPlan = func(testRelation, expectedExpr).analyze | ||
comparePlans(optimizedPlan, expectedPlan) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove extra line.
} | ||
val newElseValue = cw.elseValue.map(replaceNullWithFalse) | ||
CaseWhen(newBranches, newElseValue) | ||
case If(pred, trueVal, falseVal) if Seq(trueVal, falseVal).forall(isNullOrBoolean) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, in other place, we use trueValue
and falseValue
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I shortened this to stay in one line below. I can either rename pred
to p
or split line 783 into multiple.
* Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit | ||
* an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. | ||
*/ | ||
private def replaceNullWithFalse(e: Expression): Expression = e match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IsNull(Literal(null, _)) => IsNull(FalseLiteral)
Will this be a problem for this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only do the replacements when 1) within Join
or Filter
such as Filter(If(cond, FalseLiteral, Literal(null, _)))
, or 2) If(Literal(null, _), trueValue, falseValue)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, that's the reason why we don't use transformExpressionsDown
. We will stop the replacement as soon as we hit an expression that is not CaseWhen
, If
, And
, Or
or Literal(null, _)
. Therefore, If(IsNull(Literal(null, _)))
won't be transformed.
testJoin(originalCond, expectedCond = FalseLiteral) | ||
} | ||
|
||
test("successful replacement of null literals in filter and join conditions (13)") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pinging me, @dbtsai . And, thank you for contribution, @aokolnychyi .
I also clearly feel the benefit of this optimizer. It's worth to be review throughly.
BTW, the test case names are very unclear to me. It only looks like positive case (1)~(13)
and negative case (1) ~ (3)
. Can we have more elaborated and specific names? It will help readability of these test cases and shorten review process.
- successful replacement of null literals in filter and join conditions (1)
...
- successful replacement of null literals in filter and join conditions (13)
- inability to replace null literals in filter and join conditions (1)
...
- inability to replace null literals in filter and join conditions (3)
* As a result, many unnecessary computations can be removed in the query optimization phase. | ||
* | ||
* Similarly, the same logic can be applied to conditions in [[Join]], predicates in [[If]], | ||
* conditions in [[CaseWhen]]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The examples are good, but we have to be more clear the scope of this optimizer.
For now, this PR touches not only predicates in WHERE, but also some expressions in SELECT.
Also, it's unclear with aggregation like HAVING. Could you a little bit more clearly enumerate the targets in this documentation, @aokolnychyi ?
@@ -83,6 +83,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) | |||
BooleanSimplification, | |||
SimplifyConditionals, | |||
RemoveDispensableExpressions, | |||
ReplaceNullWithFalse, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Although this batch will be repeated, logically, ReplaceNullWithFalse
can be better to use the result of SimplifyBinaryComparison
. How about postponing this after SimplifyBinaryComparison
? In other words, switch ReplaceNullWithFalse
and SimplifyBinaryComparison
?
* an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. | ||
*/ | ||
private def replaceNullWithFalse(e: Expression): Expression = e match { | ||
case cw: CaseWhen if getValues(cw).forall(isNullOrBoolean) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about cw.dataType == BooleanType || cw.dataType == NullType
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually just cw.dataType == BooleanType
. If an expression is NullType
, it should be replaced by null literal already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this applies to If
as well.
CaseWhen(newBranches, newElseValue) | ||
case If(pred, trueVal, falseVal) if Seq(trueVal, falseVal).forall(isNullOrBoolean) => | ||
If(replaceNullWithFalse(pred), replaceNullWithFalse(trueVal), replaceNullWithFalse(falseVal)) | ||
case And(left, right) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to be careful here. null && fales is false, null || true is true. Please take a look at #22702
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate a bit more on null && false
?
I had in mind AND(true, null)
and OR(false, null)
, which are tricky. For example, if we use AND(true, null)
in SELECT, we will get null
. However, if we use it inside Filter
or predicate of If
, it will be semantically equivalent to false
(e.g., If$eval
). Therefore, the proposed rule has a limited scope. I explored the source code & comments in And/Or
to come up with an edge case that wouldn’t work. I could not find such a case. To me, it seems safe because the rule is applied only to expressions that evaluate to false
if the underlying expression is null
(i.e., conditions in Filter
/Join
, predicates in If
, conditions in CaseWhen
).
Please, let me know if you have a particular case to test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a particular case, this is just to double check that these corner cases are considered. I think we are fine now :)
|
||
test("SPARK-25860: Replace Literal(null, _) with FalseLiteral whenever possible") { | ||
|
||
def checkPlanIsEmptyLocalScan(df: DataFrame): Unit = df.queryExecution.executedPlan match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this assumes we run ConvertToLocalRelation
, let's use withSQLConf
to make sure this rule is on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually have a way to enable/disable ConvertToLocalRelation
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea we have. Take a look at TestHive
, and we did something similar before
// Disable ConvertToLocalRelation for better test coverage. Test cases built on
// LocalRelation will exercise the optimization rules better by disabling it as
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks.
So you mean using withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> "") {...}
to ensure that ConvertToLocalRelation
is not excluded?
0eac890
to
4c35955
Compare
* Recursively replaces `Literal(null, _)` with `FalseLiteral`. | ||
* | ||
* Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit | ||
* an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make it more general? I think the expected expression is:
- It's
NullIntolerant
. If any child is null, it will be null. - it has a null child.
so I would write something like
def replaceNullWithFalse(e: Expression): Expression = e match {
case _ if alwaysNull(e) => FalseLiteral
case And...
case Or...
case _ => e
}
def alwaysNull(e: Expression): Boolean = e match {
case Literal(null, _) => true
case n: NullIntolerant => n.children.exists(alwaysNull)
case _ => false
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like your snippet because it is clean. We also considered a similar approach.
- Unfortunately, it does not handle nested
If
/CaseWhen
expressions as they are notNullIntolerant
. For example, cases likeIf(If(a > 1, FalseLiteral, Literal(null, _)), 1, 2)
will not be optimized if we remove branches forIf
/CaseWhen
. - If we just add one more brach to handle all
NullIntolerant
expressions, I am not sure it will give a lot of benefits as those expressions are transformed intoLiteral(null, _)
byNullPropagation
and we operate in the same batch. - As @gatorsmile said, we should be really careful. Generalization might be tricky. For example,
Not
isNullIntolerant
.Not(null)
is transformed intonull
byNullPropagation
. We need to ensure that we do not replacenull
insideNot
and do not convertNot(null)
intoNot(FalseLiteral)
.
Therefore, the intention was to keep things simple to be safe.
Test build #98240 has finished for PR 22857 at commit
|
val newElseValue = cw.elseValue.map(replaceNullWithFalse) | ||
CaseWhen(newBranches, newElseValue) | ||
case i @ If(pred, trueVal, falseVal) if i.dataType == BooleanType => | ||
If(replaceNullWithFalse(pred), replaceNullWithFalse(trueVal), replaceNullWithFalse(falseVal)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When i.dataType != BooleanType
, we still can do replaceNullWithFalse(pred)
, don't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This case is handled in apply
and tested in "replace null in predicates of If"
, "replace null in predicates of If inside another If"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know if I got you correctly here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The general rule for LogicalPlan
at apply
looks at predicate
of If
, no matter its dataType
is BooleanType
or not.
But in replaceNullWithFalse
, the rule for If
only works if its dataType
is BooleanType
. "replace null in predicates of If inside another If"
is a such case. The If
inside another If
is of BooleanType
. If the inside If
is not of BooleanType
, this rule doesn't work. And I think it should be ok to replace the null with false when it is not boolean type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, I see. replaceNullWithFalse
should only work in boolean type cases. Then I think we are fine with it.
* an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. | ||
*/ | ||
private def replaceNullWithFalse(e: Expression): Expression = e match { | ||
case cw: CaseWhen if cw.dataType == BooleanType => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When cw.dataType != BooleanType
, we can still do replaceNullWithFalse(cond)
, don't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This case is also covered and tested in "replace null in conditions of CaseWhen"
, "replace null in conditions of CaseWhen inside another CaseWhen"
.
Test build #98239 has finished for PR 22857 at commit
|
Please be really careful in null handling. It could easily introduce the correctness bugs like what we recently fixed. |
@@ -2585,4 +2585,45 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { | |||
|
|||
checkAnswer(swappedDf.filter($"key"($"map") > "a"), Row(2, Map(2 -> "b"))) | |||
} | |||
|
|||
test("SPARK-25860: Replace Literal(null, _) with FalseLiteral whenever possible") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's weird to put optimizer end-to-end test in DataFrameSuite
. Can we create a ReplaceNullWithFalseEndToEndSuite
?
val q5 = df1.selectExpr("IF(l > 1 AND null, 5, 1) AS out") | ||
checkAnswer(q5, Row(1) :: Row(1) :: Nil) | ||
q5.queryExecution.executedPlan.foreach { p => | ||
assert(p.expressions.forall(e => e.find(_.isInstanceOf[If]).isEmpty)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test can pass without the optimization. The ConvertToLocalRelation
rule will eliminate the Project
.
Can we use a table as input data? e.g.
withTable("t1", "t2") {
Seq((1, true), (2, false)).toDF("l", "b").write.saveAsTable("t1")
Seq(2, 3).toDF("l").write.saveAsTable("t2")
val df1 = spark.table("t1")
val df2 = spark.table("t2")
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, this can pass if ConvertToLocalRelation
is enabled. When I tested this check, I did not take into account that SharedSparkSession
disables ConvertToLocalRelation
. So, the check worked correctly but only because ConvertToLocalRelation
was disabled in SharedSparkSession
. Let’s switch to tables. Thanks!
LGTM except the end-to-end test |
Test build #98317 has finished for PR 22857 at commit
|
Thanks all for reviewing! The latest change looks good to me too. Merged into master. |
And(replaceNullWithFalse(left), replaceNullWithFalse(right)) | ||
case Or(left, right) => | ||
Or(replaceNullWithFalse(left), replaceNullWithFalse(right)) | ||
case Literal(null, _) => FalseLiteral |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, for safety, we should check the data types.
* | ||
* As a result, many unnecessary computations can be removed in the query optimization phase. | ||
*/ | ||
object ReplaceNullWithFalse extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us move it to a new file. The file is growing too big.
@@ -31,14 +31,14 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd} | |||
import org.apache.spark.sql.catalyst.TableIdentifier | |||
import org.apache.spark.sql.catalyst.expressions.Uuid | |||
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation | |||
import org.apache.spark.sql.catalyst.plans.logical.{Filter, OneRowRelation, Union} | |||
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Union} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, please do not remove these in a huge feature PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, also it's unrelated import cleanup. It should be discouraged because it might make backporting / reverting potentially difficult, and sometimes those changes make readers confused.
…r possible ## What changes were proposed in this pull request? This PR proposes a new optimization rule that replaces `Literal(null, _)` with `FalseLiteral` in conditions in `Join` and `Filter`, predicates in `If`, conditions in `CaseWhen`. The idea is that some expressions evaluate to `false` if the underlying expression is `null` (as an example see `GeneratePredicate$create` or `doGenCode` and `eval` methods in `If` and `CaseWhen`). Therefore, we can replace `Literal(null, _)` with `FalseLiteral`, which can lead to more optimizations later on. Let’s consider a few examples. ``` val df = spark.range(1, 100).select($"id".as("l"), ($"id" > 50).as("b")) df.createOrReplaceTempView("t") df.createOrReplaceTempView("p") ``` **Case 1** ``` spark.sql("SELECT * FROM t WHERE if(l > 10, false, NULL)").explain(true) // without the new rule … == Optimized Logical Plan == Project [id#0L AS l#2L, cast(id#0L as string) AS s#3] +- Filter if ((id#0L > 10)) false else null +- Range (1, 100, step=1, splits=Some(12)) == Physical Plan == *(1) Project [id#0L AS l#2L, cast(id#0L as string) AS s#3] +- *(1) Filter if ((id#0L > 10)) false else null +- *(1) Range (1, 100, step=1, splits=12) // with the new rule … == Optimized Logical Plan == LocalRelation <empty>, [l#2L, s#3] == Physical Plan == LocalTableScan <empty>, [l#2L, s#3] ``` **Case 2** ``` spark.sql("SELECT * FROM t WHERE CASE WHEN l < 10 THEN null WHEN l > 40 THEN false ELSE null END”).explain(true) // without the new rule ... == Optimized Logical Plan == Project [id#0L AS l#2L, cast(id#0L as string) AS s#3] +- Filter CASE WHEN (id#0L < 10) THEN null WHEN (id#0L > 40) THEN false ELSE null END +- Range (1, 100, step=1, splits=Some(12)) == Physical Plan == *(1) Project [id#0L AS l#2L, cast(id#0L as string) AS s#3] +- *(1) Filter CASE WHEN (id#0L < 10) THEN null WHEN (id#0L > 40) THEN false ELSE null END +- *(1) Range (1, 100, step=1, splits=12) // with the new rule ... == Optimized Logical Plan == LocalRelation <empty>, [l#2L, s#3] == Physical Plan == LocalTableScan <empty>, [l#2L, s#3] ``` **Case 3** ``` spark.sql("SELECT * FROM t JOIN p ON IF(t.l > p.l, null, false)").explain(true) // without the new rule ... == Optimized Logical Plan == Join Inner, if ((l#2L > l#37L)) null else false :- Project [id#0L AS l#2L, cast(id#0L as string) AS s#3] : +- Range (1, 100, step=1, splits=Some(12)) +- Project [id#0L AS l#37L, cast(id#0L as string) AS s#38] +- Range (1, 100, step=1, splits=Some(12)) == Physical Plan == BroadcastNestedLoopJoin BuildRight, Inner, if ((l#2L > l#37L)) null else false :- *(1) Project [id#0L AS l#2L, cast(id#0L as string) AS s#3] : +- *(1) Range (1, 100, step=1, splits=12) +- BroadcastExchange IdentityBroadcastMode +- *(2) Project [id#0L AS l#37L, cast(id#0L as string) AS s#38] +- *(2) Range (1, 100, step=1, splits=12) // with the new rule ... == Optimized Logical Plan == LocalRelation <empty>, [l#2L, s#3, l#37L, s#38] ``` ## How was this patch tested? This PR comes with a set of dedicated tests. Closes apache#22857 from aokolnychyi/spark-25860. Authored-by: Anton Okolnychyi <aokolnychyi@apple.com> Signed-off-by: DB Tsai <d_tsai@apple.com>
…higher-order functions: ArrayExists, ArrayFilter, MapFilter ## What changes were proposed in this pull request? Extend the `ReplaceNullWithFalse` optimizer rule introduced in SPARK-25860 (apache#22857) to also support optimizing predicates in higher-order functions of `ArrayExists`, `ArrayFilter`, `MapFilter`. Also rename the rule to `ReplaceNullWithFalseInPredicate` to better reflect its intent. Example: ```sql select filter(a, e -> if(e is null, null, true)) as b from ( select array(null, 1, null, 3) as a) ``` The optimized logical plan: **Before**: ``` == Optimized Logical Plan == Project [filter([null,1,null,3], lambdafunction(if (isnull(lambda e#13)) null else true, lambda e#13, false)) AS b#9] +- OneRowRelation ``` **After**: ``` == Optimized Logical Plan == Project [filter([null,1,null,3], lambdafunction(if (isnull(lambda e#13)) false else true, lambda e#13, false)) AS b#9] +- OneRowRelation ``` ## How was this patch tested? Added new unit test cases to the `ReplaceNullWithFalseInPredicateSuite` (renamed from `ReplaceNullWithFalseSuite`). Closes apache#23079 from rednaxelafx/catalyst-master. Authored-by: Kris Mok <kris.mok@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…icate ## What changes were proposed in this pull request? Based on apache#22857 and apache#23079, this PR did a few updates - Limit the data types of NULL to Boolean. - Limit the input data type of replaceNullWithFalse to Boolean; throw an exception in the testing mode. - Create a new file for the rule ReplaceNullWithFalseInPredicate - Update the description of this rule. ## How was this patch tested? Added a test case Closes apache#23139 from gatorsmile/followupSpark-25860. Authored-by: gatorsmile <gatorsmile@gmail.com> Signed-off-by: DB Tsai <d_tsai@apple.com>
What changes were proposed in this pull request?
This PR proposes a new optimization rule that replaces
Literal(null, _)
withFalseLiteral
in conditions inJoin
andFilter
, predicates inIf
, conditions inCaseWhen
.The idea is that some expressions evaluate to
false
if the underlying expression isnull
(as an example seeGeneratePredicate$create
ordoGenCode
andeval
methods inIf
andCaseWhen
). Therefore, we can replaceLiteral(null, _)
withFalseLiteral
, which can lead to more optimizations later on.Let’s consider a few examples.
Case 1
Case 2
Case 3
How was this patch tested?
This PR comes with a set of dedicated tests.