From 4c0c35818c0917482f6bd22e54da1637b12e7f2b Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Fri, 10 Jun 2022 18:18:27 +0800 Subject: [PATCH 1/4] [SPARK-38761][SQL][FOLLOWUP] DS V2 supports push down misc non-aggregate functions --- .../util/V2ExpressionSQLBuilder.java | 4 +++ .../catalyst/util/V2ExpressionBuilder.scala | 28 +++++++++++++++++++ .../org/apache/spark/sql/jdbc/H2Dialect.scala | 4 +-- .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 18 ++++++++++++ 4 files changed, 52 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java index 7e489399cfecc..6853dc69fb0c4 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java @@ -97,6 +97,10 @@ public String build(Expression expr) { return visitUnaryArithmetic(name, inputToSQL(e.children()[0])); case "ABS": case "COALESCE": + case "GREATEST": + case "LEAST": + case "IF": + case "RAND": case "LN": case "EXP": case "POWER": diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala index 81d0b7dfeb4bc..1fa715996888a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala @@ -99,6 +99,34 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) { } else { None } + case Greatest(children) => + val childrenExpressions = children.flatMap(generateExpression(_)) + if (children.length == childrenExpressions.length) { + Some(new GeneralScalarExpression("GREATEST", childrenExpressions.toArray[V2Expression])) + } else { + None + } + case Least(children) => + val childrenExpressions = children.flatMap(generateExpression(_)) + if (children.length == childrenExpressions.length) { + Some(new GeneralScalarExpression("LEAST", childrenExpressions.toArray[V2Expression])) + } else { + None + } + case iff: If => + val childrenExpressions = iff.children.flatMap(generateExpression(_)) + if (iff.children.length == childrenExpressions.length) { + Some(new GeneralScalarExpression("IF", childrenExpressions.toArray[V2Expression])) + } else { + None + } + case Rand(child, hideSeed) => + if (hideSeed) { + Some(new GeneralScalarExpression("RAND", Array.empty[V2Expression])) + } else { + generateExpression(child) + .map(v => new GeneralScalarExpression("RAND", Array[V2Expression](v))) + } case Log(child) => generateExpression(child) .map(v => new GeneralScalarExpression("LN", Array[V2Expression](v))) case Exp(child) => generateExpression(child) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index fad463be3df3d..5f9a707fce498 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -35,8 +35,8 @@ private[sql] object H2Dialect extends JdbcDialect { url.toLowerCase(Locale.ROOT).startsWith("jdbc:h2") private val supportedFunctions = - Set("ABS", "COALESCE", "LN", "EXP", "POWER", "SQRT", "FLOOR", "CEIL", - "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM") + Set("ABS", "COALESCE", "GREATEST", "LEAST", "RAND", "LN", "EXP", "POWER", "SQRT", "FLOOR", + "CEIL", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM") override def isSupportedFunction(funcName: String): Boolean = supportedFunctions.contains(funcName) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index a607356681354..3f28ddc49da2d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -425,6 +425,24 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkFiltersRemoved(df10) checkPushedInfo(df10, "PushedFilters: [ID IS NOT NULL, ID > 1], ") checkAnswer(df10, Row("mary", 2)) + + val df11 = sql( + """ + |SELECT * FROM h2.test.employee + |WHERE GREATEST(bonus, 1100) > 1200 AND LEAST(salary, 10000) > 9000 AND RAND(1) < 1 + |""".stripMargin) + checkFiltersRemoved(df11) + checkPushedInfo(df11, "PushedFilters: " + + "[(GREATEST(BONUS, 1100.0)) > 1200.0, (LEAST(SALARY, 10000.00)) > 9000.00, RAND(1) < 1.0]") + checkAnswer(df11, Row(2, "david", 10000, 1300, true)) + + val df12 = sql(""" + |SELECT * FROM h2.test.employee + |WHERE IF(SALARY > 10000, BONUS, BONUS + 200) > 1200 + |""".stripMargin) + checkFiltersRemoved(df12, false) + checkPushedInfo(df12, "PushedFilters: []") + checkAnswer(df12, Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "david", 10000, 1300, true))) } test("scan with filter push-down with ansi mode") { From 0ff8cb5eea840cd1aafcb672472b3e7f67791722 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Wed, 15 Jun 2022 10:44:26 +0800 Subject: [PATCH 2/4] Update code --- .../expressions/GeneralScalarExpression.java | 24 +++++++++++++++++++ .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 9 +++---- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java index 4edc3521d841d..33d6bcc74259e 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java @@ -148,6 +148,30 @@ *
  • Since version: 3.3.0
  • * * + *
  • Name: GREATEST + *
      + *
    • SQL semantic: GREATEST(expr, ...)
    • + *
    • Since version: 3.4.0
    • + *
    + *
  • + *
  • Name: LEAST + *
      + *
    • SQL semantic: LEAST(expr, ...)
    • + *
    • Since version: 3.4.0
    • + *
    + *
  • + *
  • Name: IF + *
      + *
    • SQL semantic: IF(expr1, expr2, expr3)
    • + *
    • Since version: 3.4.0
    • + *
    + *
  • + *
  • Name: RAND + *
      + *
    • SQL semantic: RAND([seed])
    • + *
    • Since version: 3.4.0
    • + *
    + *
  • *
  • Name: SUBSTRING *
      *
    • SQL semantic: SUBSTRING(str, pos[, len])
    • diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 3f28ddc49da2d..a528c2f1e1b94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -436,10 +436,11 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "[(GREATEST(BONUS, 1100.0)) > 1200.0, (LEAST(SALARY, 10000.00)) > 9000.00, RAND(1) < 1.0]") checkAnswer(df11, Row(2, "david", 10000, 1300, true)) - val df12 = sql(""" - |SELECT * FROM h2.test.employee - |WHERE IF(SALARY > 10000, BONUS, BONUS + 200) > 1200 - |""".stripMargin) + val df12 = sql( + """ + |SELECT * FROM h2.test.employee + |WHERE IF(SALARY > 10000, BONUS, BONUS + 200) > 1200 + |""".stripMargin) checkFiltersRemoved(df12, false) checkPushedInfo(df12, "PushedFilters: []") checkAnswer(df12, Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "david", 10000, 1300, true))) From 92a5fb8d8ecd27b8df62b3f6be007b76f89c2f7d Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Wed, 15 Jun 2022 11:35:36 +0800 Subject: [PATCH 3/4] Update code --- .../expressions/GeneralScalarExpression.java | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java index 33d6bcc74259e..f3af868737531 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java @@ -106,6 +106,30 @@ *
    • Since version: 3.3.0
    • *
    *
  • + *
  • Name: GREATEST + *
      + *
    • SQL semantic: GREATEST(expr, ...)
    • + *
    • Since version: 3.4.0
    • + *
    + *
  • + *
  • Name: LEAST + *
      + *
    • SQL semantic: LEAST(expr, ...)
    • + *
    • Since version: 3.4.0
    • + *
    + *
  • + *
  • Name: IF + *
      + *
    • SQL semantic: IF(expr1, expr2, expr3)
    • + *
    • Since version: 3.4.0
    • + *
    + *
  • + *
  • Name: RAND + *
      + *
    • SQL semantic: RAND([seed])
    • + *
    • Since version: 3.4.0
    • + *
    + *
  • *
  • Name: LN *
      *
    • SQL semantic: LN(expr)
    • @@ -148,30 +172,6 @@ *
    • Since version: 3.3.0
    • *
    *
  • - *
  • Name: GREATEST - *
      - *
    • SQL semantic: GREATEST(expr, ...)
    • - *
    • Since version: 3.4.0
    • - *
    - *
  • - *
  • Name: LEAST - *
      - *
    • SQL semantic: LEAST(expr, ...)
    • - *
    • Since version: 3.4.0
    • - *
    - *
  • - *
  • Name: IF - *
      - *
    • SQL semantic: IF(expr1, expr2, expr3)
    • - *
    • Since version: 3.4.0
    • - *
    - *
  • - *
  • Name: RAND - *
      - *
    • SQL semantic: RAND([seed])
    • - *
    • Since version: 3.4.0
    • - *
    - *
  • *
  • Name: SUBSTRING *
      *
    • SQL semantic: SUBSTRING(str, pos[, len])
    • From 8319ee15f1048f689c95aba7bcabd6df1513f0e9 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Tue, 21 Jun 2022 18:33:42 +0800 Subject: [PATCH 4/4] Update code --- .../expressions/GeneralScalarExpression.java | 6 ------ .../sql/connector/util/V2ExpressionSQLBuilder.java | 1 - .../sql/catalyst/util/V2ExpressionBuilder.scala | 14 +++++++------- .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 9 +++++---- 4 files changed, 12 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java index f3af868737531..1bafd3c9659b2 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java @@ -118,12 +118,6 @@ *
    • Since version: 3.4.0
    • *
    *
  • - *
  • Name: IF - *
      - *
    • SQL semantic: IF(expr1, expr2, expr3)
    • - *
    • Since version: 3.4.0
    • - *
    - *
  • *
  • Name: RAND *
      *
    • SQL semantic: RAND([seed])
    • diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java index 6853dc69fb0c4..6db72feb2cb31 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java @@ -99,7 +99,6 @@ public String build(Expression expr) { case "COALESCE": case "GREATEST": case "LEAST": - case "IF": case "RAND": case "LN": case "EXP": diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala index 1fa715996888a..5af6a68b8aae6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala @@ -113,13 +113,6 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) { } else { None } - case iff: If => - val childrenExpressions = iff.children.flatMap(generateExpression(_)) - if (iff.children.length == childrenExpressions.length) { - Some(new GeneralScalarExpression("IF", childrenExpressions.toArray[V2Expression])) - } else { - None - } case Rand(child, hideSeed) => if (hideSeed) { Some(new GeneralScalarExpression("RAND", Array.empty[V2Expression])) @@ -223,6 +216,13 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) { } else { None } + case iff: If => + val childrenExpressions = iff.children.flatMap(generateExpression(_)) + if (iff.children.length == childrenExpressions.length) { + Some(new GeneralScalarExpression("CASE_WHEN", childrenExpressions.toArray[V2Expression])) + } else { + None + } case substring: Substring => val children = if (substring.len == Literal(Integer.MAX_VALUE)) { Seq(substring.str, substring.pos) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index a528c2f1e1b94..76aaf366dcc22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -439,11 +439,12 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel val df12 = sql( """ |SELECT * FROM h2.test.employee - |WHERE IF(SALARY > 10000, BONUS, BONUS + 200) > 1200 + |WHERE IF(SALARY > 10000, SALARY, LEAST(SALARY, 1000)) > 1200 |""".stripMargin) - checkFiltersRemoved(df12, false) - checkPushedInfo(df12, "PushedFilters: []") - checkAnswer(df12, Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "david", 10000, 1300, true))) + checkFiltersRemoved(df12) + checkPushedInfo(df12, "PushedFilters: " + + "[(CASE WHEN SALARY > 10000.00 THEN SALARY ELSE LEAST(SALARY, 1000.00) END) > 1200.00]") + checkAnswer(df12, Seq(Row(2, "alex", 12000, 1200, false), Row(6, "jen", 12000, 1200, true))) } test("scan with filter push-down with ansi mode") {