Skip to content

Commit

Permalink
[SPARK-36359][SQL] Coalesce drop all expressions after the first non …
Browse files Browse the repository at this point in the history
…nullable expression

### What changes were proposed in this pull request?

`Coalesce` drop all expressions after the first non nullable expression. For example:
```scala
sql("create table t1(a string, b string) using parquet")
sql("select a, Coalesce(count(b), 0) from t1 group by a").explain(true)
```

Before this pr:
```
== Optimized Logical Plan ==
Aggregate [a#0], [a#0, coalesce(count(b#1), 0) AS coalesce(count(b), 0)#3L]
+- Relation default.t1[a#0,b#1] parquet
```
After this pr:
```
== Optimized Logical Plan ==
Aggregate [a#0], [a#0, count(b#1) AS coalesce(count(b), 0)#3L]
+- Relation default.t1[a#0,b#1] parquet
```

### Why are the changes needed?

Improve query performance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #33590 from wangyum/SPARK-36359.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
  • Loading branch information
wangyum committed Aug 6, 2021
1 parent 6e72951 commit 4624e59
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.trees.TreePattern.{NULL_CHECK, TreePattern}
import org.apache.spark.sql.catalyst.trees.TreePattern.{COALESCE, NULL_CHECK, TreePattern}
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -55,6 +55,8 @@ case class Coalesce(children: Seq[Expression]) extends ComplexTypeMergingExpress
// Coalesce is foldable if all children are foldable.
override def foldable: Boolean = children.forall(_.foldable)

final override val nodePatterns: Seq[TreePattern] = Seq(COALESCE)

override def checkInputDataTypes(): TypeCheckResult = {
if (children.length < 1) {
TypeCheckResult.TypeCheckFailure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,10 +752,10 @@ object NullPropagation extends Rule[LogicalPlan] {
}

def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
t => t.containsAnyPattern(NULL_CHECK, NULL_LITERAL, COUNT)
t => t.containsAnyPattern(NULL_CHECK, NULL_LITERAL, COUNT, COALESCE)
|| t.containsAllPatterns(WINDOW_EXPRESSION, CAST, LITERAL), ruleId) {
case q: LogicalPlan => q.transformExpressionsUpWithPruning(
t => t.containsAnyPattern(NULL_CHECK, NULL_LITERAL, COUNT)
t => t.containsAnyPattern(NULL_CHECK, NULL_LITERAL, COUNT, COALESCE)
|| t.containsAllPatterns(WINDOW_EXPRESSION, CAST, LITERAL), ruleId) {
case e @ WindowExpression(Cast(Literal(0L, _), _, _, _), _) =>
Cast(Literal(0L), e.dataType, Option(conf.sessionLocalTimeZone))
Expand All @@ -781,7 +781,12 @@ object NullPropagation extends Rule[LogicalPlan] {
} else if (newChildren.length == 1) {
newChildren.head
} else {
Coalesce(newChildren)
val nonNullableIndex = newChildren.indexWhere(e => !e.nullable)
if (nonNullableIndex > -1) {
Coalesce(newChildren.take(nonNullableIndex + 1))
} else {
Coalesce(newChildren)
}
}

// If the value expression is NULL then transform the In expression to null literal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ object TreePattern extends Enumeration {
val BOOL_AGG: Value = Value
val CASE_WHEN: Value = Value
val CAST: Value = Value
val COALESCE: Value = Value
val CONCAT: Value = Value
val COUNT: Value = Value
val COUNT_IF: Value = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,25 @@ class BinaryComparisonSimplificationSuite extends PlanTest with PredicateHelper
}
}
}

test("SPARK-36359: Coalesce drop all expressions after the first non nullable expression") {
val testRelation = LocalRelation(
'a.int.withNullability(false),
'b.int.withNullability(true),
'c.int.withNullability(false),
'd.int.withNullability(true))

comparePlans(
Optimize.execute(testRelation.select(Coalesce(Seq('a, 'b, 'c, 'd)).as("out")).analyze),
testRelation.select('a.as("out")).analyze)
comparePlans(
Optimize.execute(testRelation.select(Coalesce(Seq('a, 'c)).as("out")).analyze),
testRelation.select('a.as("out")).analyze)
comparePlans(
Optimize.execute(testRelation.select(Coalesce(Seq('b, 'c, 'd)).as("out")).analyze),
testRelation.select(Coalesce(Seq('b, 'c)).as("out")).analyze)
comparePlans(
Optimize.execute(testRelation.select(Coalesce(Seq('b, 'd)).as("out")).analyze),
testRelation.select(Coalesce(Seq('b, 'd)).as("out")).analyze)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
val df = sql("select ifnull(id, 'x'), nullif(id, 'x'), nvl(id, 'x'), nvl2(id, 'x', 'y') " +
"from range(2)")
checkKeywordsExistsInExplain(df,
"Project [coalesce(cast(id#xL as string), x) AS ifnull(id, x)#x, " +
"id#xL AS nullif(id, x)#xL, coalesce(cast(id#xL as string), x) AS nvl(id, x)#x, " +
"Project [cast(id#xL as string) AS ifnull(id, x)#x, " +
"id#xL AS nullif(id, x)#xL, cast(id#xL as string) AS nvl(id, x)#x, " +
"x AS nvl2(id, x, y)#x]")
}

Expand Down

0 comments on commit 4624e59

Please sign in to comment.