Skip to content

Commit

Permalink
[SPARK-38997][SPARK-39037][SQL][FOLLOWUP] PushableColumnWithoutNested…
Browse files Browse the repository at this point in the history
…Column` need be translated to predicate too

### What changes were proposed in this pull request?
#35768 assume the expression in `And`, `Or` and `Not` must be predicate.
#36370 and #36325 supported push down expressions in `GROUP BY` and `ORDER BY`. But the children of `And`, `Or` and `Not` can be `FieldReference.column(name)`.
`FieldReference.column(name)` is not a predicate, so the assert may fail.

### Why are the changes needed?
This PR fix the bug for `PushableColumnWithoutNestedColumn`.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Let the push-down framework more correctly.

### How was this patch tested?
New tests

Closes #36776 from beliefer/SPARK-38997_SPARK-39037_followup.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 125555c)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
beliefer authored and cloud-fan committed Jun 9, 2022
1 parent 4e5ada9 commit ea0571e
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ class V2ExpressionBuilder(
case Literal(true, BooleanType) => Some(new AlwaysTrue())
case Literal(false, BooleanType) => Some(new AlwaysFalse())
case Literal(value, dataType) => Some(LiteralValue(value, dataType))
case col @ pushableColumn(name) if nestedPredicatePushdownEnabled =>
case col @ pushableColumn(name) =>
val ref = if (nestedPredicatePushdownEnabled) {
FieldReference(name)
} else {
FieldReference.column(name)
}
if (isPredicate && col.dataType.isInstanceOf[BooleanType]) {
Some(new V2Predicate("=", Array(FieldReference(name), LiteralValue(true, BooleanType))))
Some(new V2Predicate("=", Array(ref, LiteralValue(true, BooleanType))))
} else {
Some(FieldReference(name))
Some(ref)
}
case pushableColumn(name) if !nestedPredicatePushdownEnabled =>
Some(FieldReference.column(name))
case in @ InSet(child, hset) =>
generateExpression(child).map { v =>
val children =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,48 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
|[DEPT, CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END],
|""".stripMargin.replaceAll("\n", " "))
checkAnswer(df5, Seq(Row(1, 0, 10000), Row(1, 9000, 9000), Row(2, 0, 22000), Row(6, 0, 12000)))

val df6 = sql(
"""
|SELECT CASE WHEN SALARY > 8000 AND is_manager <> false THEN SALARY ELSE 0 END as key,
| SUM(SALARY) FROM h2.test.employee GROUP BY key""".stripMargin)
checkAggregateRemoved(df6)
checkPushedInfo(df6,
"""
|PushedAggregates: [SUM(SALARY)],
|PushedFilters: [],
|PushedGroupByExpressions:
|[CASE WHEN (SALARY > 8000.00) AND (IS_MANAGER = true) THEN SALARY ELSE 0.00 END],
|""".stripMargin.replaceAll("\n", " "))
checkAnswer(df6, Seq(Row(0, 21000), Row(10000, 20000), Row(12000, 12000)))

val df7 = sql(
"""
|SELECT CASE WHEN SALARY > 8000 OR is_manager <> false THEN SALARY ELSE 0 END as key,
| SUM(SALARY) FROM h2.test.employee GROUP BY key""".stripMargin)
checkAggregateRemoved(df7)
checkPushedInfo(df7,
"""
|PushedAggregates: [SUM(SALARY)],
|PushedFilters: [],
|PushedGroupByExpressions:
|[CASE WHEN (SALARY > 8000.00) OR (IS_MANAGER = true) THEN SALARY ELSE 0.00 END],
|""".stripMargin.replaceAll("\n", " "))
checkAnswer(df7, Seq(Row(10000, 20000), Row(12000, 24000), Row(9000, 9000)))

val df8 = sql(
"""
|SELECT CASE WHEN NOT(is_manager <> false) THEN SALARY ELSE 0 END as key,
| SUM(SALARY) FROM h2.test.employee GROUP BY key""".stripMargin)
checkAggregateRemoved(df8)
checkPushedInfo(df8,
"""
|PushedAggregates: [SUM(SALARY)],
|PushedFilters: [],
|PushedGroupByExpressions:
|[CASE WHEN NOT (IS_MANAGER = true) THEN SALARY ELSE 0.00 END],
|""".stripMargin.replaceAll("\n", " "))
checkAnswer(df8, Seq(Row(0, 32000), Row(12000, 12000), Row(9000, 9000)))
}

test("scan with aggregate push-down: DISTINCT SUM with group by") {
Expand Down

0 comments on commit ea0571e

Please sign in to comment.