Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39453][SQL] DS V2 supports push down misc non-aggregate functions(non ANSI) #36830

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,24 @@
* <li>Since version: 3.3.0</li>
* </ul>
* </li>
* <li>Name: <code>GREATEST</code>
* <ul>
* <li>SQL semantic: <code>GREATEST(expr, ...)</code></li>
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* <li>Name: <code>LEAST</code>
* <ul>
* <li>SQL semantic: <code>LEAST(expr, ...)</code></li>
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* <li>Name: <code>RAND</code>
* <ul>
* <li>SQL semantic: <code>RAND([seed])</code></li>
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* <li>Name: <code>LN</code>
* <ul>
* <li>SQL semantic: <code>LN(expr)</code></li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ public String build(Expression expr) {
return visitUnaryArithmetic(name, inputToSQL(e.children()[0]));
case "ABS":
case "COALESCE":
case "GREATEST":
case "LEAST":
case "RAND":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's file a separate JIRA if it adds more supports of other stuff

Copy link
Contributor Author

@beliefer beliefer Jun 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

case "LN":
case "EXP":
case "POWER":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,27 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) {
} else {
None
}
case Greatest(children) =>
val childrenExpressions = children.flatMap(generateExpression(_))
if (children.length == childrenExpressions.length) {
Some(new GeneralScalarExpression("GREATEST", childrenExpressions.toArray[V2Expression]))
} else {
None
}
case Least(children) =>
val childrenExpressions = children.flatMap(generateExpression(_))
if (children.length == childrenExpressions.length) {
Some(new GeneralScalarExpression("LEAST", childrenExpressions.toArray[V2Expression]))
} else {
None
}
case Rand(child, hideSeed) =>
if (hideSeed) {
Some(new GeneralScalarExpression("RAND", Array.empty[V2Expression]))
} else {
generateExpression(child)
.map(v => new GeneralScalarExpression("RAND", Array[V2Expression](v)))
}
case Log(child) => generateExpression(child)
.map(v => new GeneralScalarExpression("LN", Array[V2Expression](v)))
case Exp(child) => generateExpression(child)
Expand Down Expand Up @@ -195,6 +216,13 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) {
} else {
None
}
case iff: If =>
val childrenExpressions = iff.children.flatMap(generateExpression(_))
if (iff.children.length == childrenExpressions.length) {
Some(new GeneralScalarExpression("CASE_WHEN", childrenExpressions.toArray[V2Expression]))
} else {
None
}
case substring: Substring =>
val children = if (substring.len == Literal(Integer.MAX_VALUE)) {
Seq(substring.str, substring.pos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ private[sql] object H2Dialect extends JdbcDialect {
url.toLowerCase(Locale.ROOT).startsWith("jdbc:h2")

private val supportedFunctions =
Set("ABS", "COALESCE", "LN", "EXP", "POWER", "SQRT", "FLOOR", "CEIL",
"SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM")
Set("ABS", "COALESCE", "GREATEST", "LEAST", "RAND", "LN", "EXP", "POWER", "SQRT", "FLOOR",
"CEIL", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM")

override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,26 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkFiltersRemoved(df10)
checkPushedInfo(df10, "PushedFilters: [ID IS NOT NULL, ID > 1], ")
checkAnswer(df10, Row("mary", 2))

val df11 = sql(
"""
|SELECT * FROM h2.test.employee
|WHERE GREATEST(bonus, 1100) > 1200 AND LEAST(salary, 10000) > 9000 AND RAND(1) < 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RAND(1) < 1 is always true, which is bad as a filter pushdown test. Can we change it to RAND(1) > 0.5?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, the results will be unstable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the seed is given, so result is still stable.

|""".stripMargin)
checkFiltersRemoved(df11)
checkPushedInfo(df11, "PushedFilters: " +
"[(GREATEST(BONUS, 1100.0)) > 1200.0, (LEAST(SALARY, 10000.00)) > 9000.00, RAND(1) < 1.0]")
checkAnswer(df11, Row(2, "david", 10000, 1300, true))

val df12 = sql(
"""
|SELECT * FROM h2.test.employee
|WHERE IF(SALARY > 10000, SALARY, LEAST(SALARY, 1000)) > 1200
|""".stripMargin)
checkFiltersRemoved(df12)
checkPushedInfo(df12, "PushedFilters: " +
"[(CASE WHEN SALARY > 10000.00 THEN SALARY ELSE LEAST(SALARY, 1000.00) END) > 1200.00]")
checkAnswer(df12, Seq(Row(2, "alex", 12000, 1200, false), Row(6, "jen", 12000, 1200, true)))
}

test("scan with filter push-down with ansi mode") {
Expand Down