Skip to content

Commit

Permalink
[SPARK-37527][SQL] Compile COVAR_POP, COVAR_SAMP and CORR in `H…
Browse files Browse the repository at this point in the history
…2Dialet`

### What changes were proposed in this pull request?
apache#35101 translate `COVAR_POP`, `COVAR_SAMP` and `CORR`, but the H2 lower version cannot support them.

After apache#35013, we can compile the three aggregate functions in `H2Dialet` now.

### Why are the changes needed?
Supplement the implement of `H2Dialet`.

### Does this PR introduce _any_ user-facing change?
'Yes'. Spark could complete push-down `COVAR_POP`, `COVAR_SAMP` and `CORR` into H2.

### How was this patch tested?
Test updated.

Closes apache#35145 from beliefer/SPARK-37527_followup.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
beliefer authored and chenzhx committed Mar 30, 2022
1 parent a56ca85 commit 0e50f11
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
12 changes: 12 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ private object H2Dialect extends JdbcDialect {
assert(f.inputs().length == 1)
val distinct = if (f.isDistinct) "DISTINCT " else ""
Some(s"STDDEV_SAMP($distinct${f.inputs().head})")
case f: GeneralAggregateFunc if f.name() == "COVAR_POP" =>
assert(f.inputs().length == 2)
val distinct = if (f.isDistinct) "DISTINCT " else ""
Some(s"COVAR_POP($distinct${f.inputs().head}, ${f.inputs().last})")
case f: GeneralAggregateFunc if f.name() == "COVAR_SAMP" =>
assert(f.inputs().length == 2)
val distinct = if (f.isDistinct) "DISTINCT " else ""
Some(s"COVAR_SAMP($distinct${f.inputs().head}, ${f.inputs().last})")
case f: GeneralAggregateFunc if f.name() == "CORR" =>
assert(f.inputs().length == 2)
val distinct = if (f.isDistinct) "DISTINCT " else ""
Some(s"CORR($distinct${f.inputs().head}, ${f.inputs().last})")
case _ => None
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,11 +749,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val df = sql("select COVAR_POP(bonus, bonus), COVAR_SAMP(bonus, bonus)" +
" FROM h2.test.employee where dept > 0 group by DePt")
checkFiltersRemoved(df)
checkAggregateRemoved(df, false)
checkAggregateRemoved(df)
df.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
"PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)]"
"PushedAggregates: [COVAR_POP(BONUS, BONUS), COVAR_SAMP(BONUS, BONUS)], " +
"PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)], " +
"PushedGroupByColumns: [DEPT]"
checkKeywordsExistsInExplain(df, expected_plan_fragment)
}
checkAnswer(df, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, null)))
Expand All @@ -763,11 +765,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val df = sql("select CORR(bonus, bonus) FROM h2.test.employee where dept > 0" +
" group by DePt")
checkFiltersRemoved(df)
checkAggregateRemoved(df, false)
checkAggregateRemoved(df)
df.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
"PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)]"
"PushedAggregates: [CORR(BONUS, BONUS)], " +
"PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)], " +
"PushedGroupByColumns: [DEPT]"
checkKeywordsExistsInExplain(df, expected_plan_fragment)
}
checkAnswer(df, Seq(Row(1d), Row(1d), Row(null)))
Expand Down

0 comments on commit 0e50f11

Please sign in to comment.