-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-47960][SS] Allow chaining other stateful operators after transformWithState operator. #45376
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
a60c2f1 to
8b3d169
Compare
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
Outdated
Show resolved
Hide resolved
...catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
Outdated
Show resolved
Hide resolved
...catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
Outdated
Show resolved
Hide resolved
...catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateWatermarkSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateWatermarkSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateWatermarkSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateWatermarkSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateWatermarkSuite.scala
Outdated
Show resolved
Hide resolved
8b3d169 to
5789ff2
Compare
|
@sahnib - seems like there are still conflicts on the base branch ? |
5789ff2 to
214142e
Compare
214142e to
e21eb92
Compare
ec376f2 to
7c2a950
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateChainingSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateChainingSuite.scala
Outdated
Show resolved
Hide resolved
c442920 to
770f086
Compare
|
@HeartSaVioR PTAL, thanks. |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass.
...catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateChainingSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateChainingSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateChainingSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateChainingSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateChainingSuite.scala
Outdated
Show resolved
Hide resolved
…ator. This change allows the user to redefine the event time column in generated output of transformWithState, which is used in watermark expressions for operators following transformWithState
f39d941 to
3082693
Compare
…markColumn logical node.
3082693 to
5d02835
Compare
| Seq( | ||
| ResolveWithCTE, | ||
| ExtractDistributedSequenceID) ++ | ||
| Seq(ResolveUpdateEventTimeWatermarkColumn) ++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR I have added the ResolveUpdateEventTimeWatermarkColumn after all resolution rules. At this place, we are guaranteed (IIUC) to have resolved the query plan and should be able to extract watermark delay from child nodes of UpdateEventTimeWatermarkColumn. Let me know if you think otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not guaranteed for all resolutions to happen in one loop. fixedPoint means having iterations of application of the set of rules. That said, we shouldn't still assume that child is resolved, and only make the rule to be effective when child is resolved. I'll left a comment.
...ain/scala/org/apache/spark/sql/catalyst/analysis/ResolveUpdateEventTimeWatermarkColumn.scala
Outdated
Show resolved
Hide resolved
...ain/scala/org/apache/spark/sql/catalyst/analysis/ResolveUpdateEventTimeWatermarkColumn.scala
Outdated
Show resolved
Hide resolved
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we have an issue where we have to fix the issue and update the test case right now or defer both. I'm OK with latter, but let's file a blocker JIRA ticket if we want to do the latter.
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateChainingSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateChainingSuite.scala
Show resolved
Hide resolved
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
Let's be sure to either 1) introduce a method to users which gives a watermark value before advancing (late events) or 2) construct a story for users to set the event time timestamp properly without watermark value. |
|
Let's handle that later - I assume we will have a JIRA ticket. Thanks! Merging to master. |
@HeartSaVioR Created the JIRA https://issues.apache.org/jira/browse/SPARK-48199 for follow up item. |
…ansformWithStateInPandas API ### What changes were proposed in this pull request? This PR adds support to define event time column in the output dataset of `TransformWithStateInPandas` operator. The new event time column will be used to evaluate watermark expressions in downstream operators. ### Why are the changes needed? This change is to couple with the scala implementation of chaining of operators. PR in Scala: #45376 ### Does this PR introduce _any_ user-facing change? Yes. User can now specify a event time column as: ``` df.groupBy("id") .transformWithStateInPandas( statefulProcessor=stateful_processor, outputStructType=output_schema, outputMode="Update", timeMode=timeMode, eventTimeColumnName="outputTimestamp" ) ``` ### How was this patch tested? Integration tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #48124 from jingz-db/python-chaining-op. Lead-authored-by: jingz-db <jing.zhan@databricks.com> Co-authored-by: Jing Zhan <135738831+jingz-db@users.noreply.github.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
What changes were proposed in this pull request?
This PR adds support to define event time column in the output dataset of
TransformWithStateoperator. The new event time column will be used to evaluate watermark expressions in downstream operators.Why are the changes needed?
This change is required to support chaining of stateful operators after
transformWithState. Event time column is required to evaluate watermark expressions in downstream stateful operators.Does this PR introduce any user-facing change?
Yes. Adds a new version of transformWithState API which allows redefining the event time column.
How was this patch tested?
Added unit test cases.
Was this patch authored or co-authored using generative AI tooling?
No