-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-49162][SQL] Push down date_trunc function #47666
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| "VAR_POP", "VAR_SAMP", "STDDEV_POP", "STDDEV_SAMP", "COVAR_POP", "COVAR_SAMP", "CORR", | ||
| "REGR_INTERCEPT", "REGR_R2", "REGR_SLOPE", "REGR_SXY") | ||
| private val supportedFunctions = supportedAggregateFunctions | ||
| private val supportedDatetimeFunctions = Set("DATE_TRUNC") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private val supportedAggregateFunctions: Set[String] = Set( "MAX", "MIN", "SUM", "COUNT", "AVG", "VAR_POP", "VAR_SAMP", "STDDEV_POP", "STDDEV_SAMP", "COVAR_POP", "COVAR_SAMP", "CORR", "REGR_INTERCEPT", "REGR_R2", "REGR_SLOPE", "REGR_SXY", "DATE_TRUNC" )
This approach simplifies the code and keeps everything in one place. Let me know what you think!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Imo DATE_TRUNC should not be in the supportedAggregateFunctions as it's not an aggregate function
...integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
Outdated
Show resolved
Hide resolved
|
cc @beliefer |
| s""" | ||
| SELECT DATE_TRUNC('$format', time), COUNT(*) | ||
| | FROM $catalogName.datetime_table | ||
| | GROUP BY 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding new tests! Can we do tests where DATE_TRUNC is a part of the predicate (where DATE_TRUNC = ....) to see if it works when we push down predicates. I am not sure if we do pushdowns in projection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added tests for that as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why introduce aggregate here? we just need test predicates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original query that was used to point out the function wasn't pushed down was similar to this one so i modeled the test case after it. Should we remove the aggregate test cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Let's remove them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| private val supportedDatetimeFunctions = Set("DATE_TRUNC") | ||
| private val supportedFunctions = | ||
| supportedAggregateFunctions ++ | ||
| supportedDatetimeFunctions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we remove supportedDatetimeFunctions by
private val supportedFunctions = supportedAggregateFunctions ++ Set("DATE_TRUNC") ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking here is that we might have other datetime functions pushed down in the future and this provides a convenient grouping for them, similar to existing supportedAggregateFunctions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The aggregate functions is very different from others(string functions, datetime functions and so on) and already defined a lot of aggregate functions.
We can define supportedDatetimeFunctions in future if need.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| def testAggregatePushdown(format: String, expectedResult: Set[Row]): Unit = { | ||
| val df = sql( | ||
| s""" | ||
| SELECT DATE_TRUNC('$format', time), COUNT(*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is DATE_TRUNC supported by H2 database ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://www.h2database.com/html/functions.html#date_trunc
Looks like yes, however the original task was adding support only for Postgres for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should support for H2 dialect first, then other database dialect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added pushdown for H2 as well
| checkAnswer(df9, Seq(Row("alex"))) | ||
|
|
||
| val df10 = sql("SELECT name FROM h2.test.datetime WHERE " + | ||
| "DATE_TRUNC('DAY', date1) = date'2022-05-19'") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add more test cases for other format supported by Spark, such as: YEAR, MM and so on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's create a separate test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| s""" | ||
| SELECT DATE_TRUNC('$format', time), COUNT(*) | ||
| | FROM $catalogName.datetime_table | ||
| | GROUP BY 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why introduce aggregate here? we just need test predicates.
| val filters = df.queryExecution.optimizedPlan.collect { | ||
| case f: Filter => f | ||
| } | ||
| assert(filters.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please reuse checkFilterPushed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
@beliefer Kind reminder, all comments have been addressed. |
beliefer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. cc @cloud-fan Do you have any other comments?
| } | ||
|
|
||
| private def checkFilterPushed(df: DataFrame, pushed: Boolean = true): Unit = { | ||
| protected def checkFilterPushed(df: DataFrame, pushed: Boolean = true): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you rebase the repository? because the modifier already is protected.
milastdbx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets add more tests for date_trunc precisions.
Also, please update description a bit.
In this PR you are:
- Adding support for
DATE_TRUNCin V2 optimization pushdown - Consuming this pushdown for Postgres & H2 Connectors
| } | ||
| } | ||
|
|
||
| test("SPARK-49162: Push down filter date_trunc function") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reading the documentation for pgsql:
https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-TRUNC
and databricks:
https://docs.databricks.com/en/sql/language-manual/functions/date_trunc.html
I see that they support microseconds while we support microsecond.
Can we test for all our supported precisions ? ALso can we try different casings ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, let's test all the supported units.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Could we add other precisions for H2 and Postgres? @IvanK-db
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| case _: DateAdd => generateExpressionWithName("DATE_ADD", expr, isPredicate) | ||
| case _: DateDiff => generateExpressionWithName("DATE_DIFF", expr, isPredicate) | ||
| case _: TruncDate => generateExpressionWithName("TRUNC", expr, isPredicate) | ||
| case _: TruncTimestamp => generateExpressionWithName("DATE_TRUNC", expr, isPredicate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is DATE_TRUNC a standard SQL function or widely supported in the industry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a standard function but it's supported by several databases - Postgres, Redshift, Snowflake, BQ
c81f7df to
2d312a6
Compare
| val expectedPlanFragment9 = | ||
| "PushedFilters: [(DATE_TRUNC('MicroseconD', TIME1)) = 1725560625000000]" | ||
| checkPushedInfo(df9, expectedPlanFragment9) | ||
| checkAnswer(df9, Seq(Row("adam"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add test cases for all supported precisions.
Please add the negative test cases, such as: MicroSecondS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added all precisions. MicroSecondS seems to be a positive test case, looks to be an alias of microsecond so i added another negative test case.
beliefer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
|
My biggest concern is we translate the Spark also cc @srielau |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
This PR:
Why are the changes needed?
Performance improvements.
Does this PR introduce any user-facing change?
No
How was this patch tested?
New integration test in PostgresIntegrationSuite.
Was this patch authored or co-authored using generative AI tooling?
No