diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 087c3573fbdbf..1f422e5a59cf8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -47,6 +47,18 @@ private object H2Dialect extends JdbcDialect { assert(f.inputs().length == 1) val distinct = if (f.isDistinct) "DISTINCT " else "" Some(s"STDDEV_SAMP($distinct${f.inputs().head})") + case f: GeneralAggregateFunc if f.name() == "COVAR_POP" => + assert(f.inputs().length == 2) + val distinct = if (f.isDistinct) "DISTINCT " else "" + Some(s"COVAR_POP($distinct${f.inputs().head}, ${f.inputs().last})") + case f: GeneralAggregateFunc if f.name() == "COVAR_SAMP" => + assert(f.inputs().length == 2) + val distinct = if (f.isDistinct) "DISTINCT " else "" + Some(s"COVAR_SAMP($distinct${f.inputs().head}, ${f.inputs().last})") + case f: GeneralAggregateFunc if f.name() == "CORR" => + assert(f.inputs().length == 2) + val distinct = if (f.isDistinct) "DISTINCT " else "" + Some(s"CORR($distinct${f.inputs().head}, ${f.inputs().last})") case _ => None } ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 1c77f7a426da4..72dde8fa13222 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -749,11 +749,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel val df = sql("select COVAR_POP(bonus, bonus), COVAR_SAMP(bonus, bonus)" + " FROM h2.test.employee where dept > 0 group by DePt") checkFiltersRemoved(df) - checkAggregateRemoved(df, false) + checkAggregateRemoved(df) df.queryExecution.optimizedPlan.collect { case _: DataSourceV2ScanRelation => val expected_plan_fragment = - "PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)]" + "PushedAggregates: [COVAR_POP(BONUS, BONUS), COVAR_SAMP(BONUS, BONUS)], " + + "PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)], " + + "PushedGroupByColumns: [DEPT]" checkKeywordsExistsInExplain(df, expected_plan_fragment) } checkAnswer(df, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, null))) @@ -763,11 +765,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel val df = sql("select CORR(bonus, bonus) FROM h2.test.employee where dept > 0" + " group by DePt") checkFiltersRemoved(df) - checkAggregateRemoved(df, false) + checkAggregateRemoved(df) df.queryExecution.optimizedPlan.collect { case _: DataSourceV2ScanRelation => val expected_plan_fragment = - "PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)]" + "PushedAggregates: [CORR(BONUS, BONUS)], " + + "PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)], " + + "PushedGroupByColumns: [DEPT]" checkKeywordsExistsInExplain(df, expected_plan_fragment) } checkAnswer(df, Seq(Row(1d), Row(1d), Row(null)))