New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-46442][SQL] DS V2 supports push down PERCENTILE_CONT and PERCENTILE_DISC #44397
Conversation
case DecimalType.Fixed(p, s) => | ||
(rs: ResultSet, row: InternalRow, pos: Int) => | ||
val decimal = | ||
nullSafeConvert[java.math.BigDecimal](rs.getBigDecimal(pos + 1), d => Decimal(d, p, s)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The origin code will throws exception.
Caused by: org.apache.spark.SparkArithmeticException: [DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION] Decimal precision 42 exceeds max precision 38. SQLSTATE: 22003
at org.apache.spark.sql.errors.DataTypeErrors$.decimalPrecisionExceedsMaxPrecisionError(DataTypeErrors.scala:48)
at org.apache.spark.sql.types.Decimal.set(Decimal.scala:124)
at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:577)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$4(JdbcUtils.scala:408)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.nullSafeConvert(JdbcUtils.scala:552)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3(JdbcUtils.scala:408)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3$adapted(JdbcUtils.scala:406)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:358)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:339)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The precision is 38 and scale is 38 based on DecimalType
.
In fact, the decimal return from JDBC is BigDecimal(7, 3)
.
da47b85
to
1c985b3
Compare
…ialect ### What changes were proposed in this pull request? This PR fix a but by make JDBC dialect decide the decimal precision and scale. **How to reproduce the bug?** #44397 proposed DS V2 push down `PERCENTILE_CONT` and `PERCENTILE_DISC`. The bug fired when pushdown the below SQL to H2 JDBC. `SELECT "DEPT",PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY "SALARY" ASC NULLS FIRST) FROM "test"."employee" WHERE 1=0 GROUP BY "DEPT"` **The root cause** `getQueryOutputSchema` used to get the output schema of query by call `JdbcUtils.getSchema`. The query for database H2 show below. `SELECT "DEPT",PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY "SALARY" ASC NULLS FIRST) FROM "test"."employee" WHERE 1=0 GROUP BY "DEPT"` We can get the five variables from `ResultSetMetaData`, please refer: ``` columnName = "PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY SALARY NULLS FIRST)" dataType = 2 typeName = "NUMERIC" fieldSize = 100000 fieldScale = 50000 ``` Then we get the catalyst schema with `JdbcUtils.getCatalystType`, it calls `DecimalType.bounded(precision, scale)` actually. The `DecimalType.bounded(100000, 50000)` returns `DecimalType(38, 38)`. At finally, `makeGetter` throws exception. ``` Caused by: org.apache.spark.SparkArithmeticException: [DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION] Decimal precision 42 exceeds max precision 38. SQLSTATE: 22003 at org.apache.spark.sql.errors.DataTypeErrors$.decimalPrecisionExceedsMaxPrecisionError(DataTypeErrors.scala:48) at org.apache.spark.sql.types.Decimal.set(Decimal.scala:124) at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:577) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$4(JdbcUtils.scala:408) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.nullSafeConvert(JdbcUtils.scala:552) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3(JdbcUtils.scala:408) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3$adapted(JdbcUtils.scala:406) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:358) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:339) ``` ### Why are the changes needed? This PR fix the bug that `JdbcUtils` can't get the correct decimal type. ### Does this PR introduce _any_ user-facing change? 'Yes'. Fix a bug. ### How was this patch tested? Manual tests in #44397 ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #44398 from beliefer/SPARK-46443. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ialect ### What changes were proposed in this pull request? This PR fix a but by make JDBC dialect decide the decimal precision and scale. **How to reproduce the bug?** #44397 proposed DS V2 push down `PERCENTILE_CONT` and `PERCENTILE_DISC`. The bug fired when pushdown the below SQL to H2 JDBC. `SELECT "DEPT",PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY "SALARY" ASC NULLS FIRST) FROM "test"."employee" WHERE 1=0 GROUP BY "DEPT"` **The root cause** `getQueryOutputSchema` used to get the output schema of query by call `JdbcUtils.getSchema`. The query for database H2 show below. `SELECT "DEPT",PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY "SALARY" ASC NULLS FIRST) FROM "test"."employee" WHERE 1=0 GROUP BY "DEPT"` We can get the five variables from `ResultSetMetaData`, please refer: ``` columnName = "PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY SALARY NULLS FIRST)" dataType = 2 typeName = "NUMERIC" fieldSize = 100000 fieldScale = 50000 ``` Then we get the catalyst schema with `JdbcUtils.getCatalystType`, it calls `DecimalType.bounded(precision, scale)` actually. The `DecimalType.bounded(100000, 50000)` returns `DecimalType(38, 38)`. At finally, `makeGetter` throws exception. ``` Caused by: org.apache.spark.SparkArithmeticException: [DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION] Decimal precision 42 exceeds max precision 38. SQLSTATE: 22003 at org.apache.spark.sql.errors.DataTypeErrors$.decimalPrecisionExceedsMaxPrecisionError(DataTypeErrors.scala:48) at org.apache.spark.sql.types.Decimal.set(Decimal.scala:124) at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:577) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$4(JdbcUtils.scala:408) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.nullSafeConvert(JdbcUtils.scala:552) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3(JdbcUtils.scala:408) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3$adapted(JdbcUtils.scala:406) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:358) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:339) ``` ### Why are the changes needed? This PR fix the bug that `JdbcUtils` can't get the correct decimal type. ### Does this PR introduce _any_ user-facing change? 'Yes'. Fix a bug. ### How was this patch tested? Manual tests in #44397 ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #44398 from beliefer/SPARK-46443. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit a921da8) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1c985b3
to
87561b7
Compare
ping @cloud-fan cc @huaxingao |
ping @cloud-fan |
|
||
public GeneralAggregateFunc(String name, boolean isDistinct, Expression[] children) { | ||
this.name = name; | ||
this.isDistinct = isDistinct; | ||
this.children = children; | ||
this.orderingWithinGroups = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
empty array is a better default
String funcName, boolean isDistinct, String[] inputs, String[] orderingWithinGroups) { | ||
assert(isDistinct == false); | ||
String withinGroup = | ||
joinArrayToString(orderingWithinGroups, ", ", "WITHIN GROUP (ORDER BY ", ")"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do we translate ASC/DESC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please refer visitSortOrder
.
The GA failure is unrelated. |
@@ -51,11 +53,21 @@ public final class GeneralAggregateFunc extends ExpressionWithToString implement | |||
private final String name; | |||
private final boolean isDistinct; | |||
private final Expression[] children; | |||
private final Expression[] orderingWithinGroups; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we use SortOrder[]
?
@@ -64,6 +76,8 @@ public GeneralAggregateFunc(String name, boolean isDistinct, Expression[] childr | |||
@Override | |||
public Expression[] children() { return children; } | |||
|
|||
public Expression[] orderingWithinGroups() { return orderingWithinGroups; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -42,7 +42,7 @@ private[sql] object H2Dialect extends JdbcDialect { | |||
|
|||
private val distinctUnsupportedAggregateFunctions = | |||
Set("COVAR_POP", "COVAR_SAMP", "CORR", "REGR_INTERCEPT", "REGR_R2", "REGR_SLOPE", "REGR_SXY", | |||
"MODE") | |||
"MODE", "PERCENTILE_CONT", "PERCENTILE_DISC") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this change? H2 dialect deals with these two functions in visitInverseDistributionFunction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because H2 dialect overrides the visitInverseDistributionFunction
and check with isSupportedFunction
.
override def visitInverseDistributionFunction(
funcName: String,
isDistinct: Boolean,
inputs: Array[String],
orderingWithinGroups: Array[String]): String = {
if (isSupportedFunction(funcName)) {
super.visitInverseDistributionFunction(
dialectFunctionName(funcName), isDistinct, inputs, orderingWithinGroups)
} else {
throw new UnsupportedOperationException(
s"${this.getClass.getSimpleName} does not support " +
s"inverse distribution function: $funcName")
}
}
thanks, merging to master! |
@cloud-fan Thank you! |
What changes were proposed in this pull request?
This PR will translate the aggregate function
PERCENTILE_CONT
andPERCENTILE_DISC
for pushdown.This PR adds
Expression[] orderingWithinGroups
intoGeneralAggregateFunc
, so as DS V2 pushdown framework could compile theWITHIN GROUP (ORDER BY ...)
easily.This PR also split
visitInverseDistributionFunction
fromvisitAggregateFunction
, so as DS V2 pushdown framework could generate the syntaxWITHIN GROUP (ORDER BY ...)
easily.This PR also fix a bug that
JdbcUtils
can't treat the precision and scale of decimal returned from JDBC.Why are the changes needed?
DS V2 supports push down
PERCENTILE_CONT
andPERCENTILE_DISC
.Does this PR introduce any user-facing change?
'No'.
New feature.
How was this patch tested?
New test cases.
Was this patch authored or co-authored using generative AI tooling?
'No'.