-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-47673][SS] Implementing TTL for ListState #45932
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making these changes. Reviewed the code logic and left some comments/questions. Still reviewing testcases.
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
Outdated
Show resolved
Hide resolved
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TTLState.scala
Outdated
Show resolved
Hide resolved
Should we update the PR description ? |
sql/api/src/main/scala/org/apache/spark/sql/streaming/ListState.scala
Outdated
Show resolved
Hide resolved
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making these changes. Left a few comments.
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImplWithTTL.scala
Outdated
Show resolved
Hide resolved
...test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TTLState.scala
Outdated
Show resolved
Hide resolved
...test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala
Outdated
Show resolved
Hide resolved
...test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for adding this functionality
@HeartSaVioR PTAL, thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not fully reviewed TransformWithListStateTTLSuite as I found oddness of the testing, as well as the logic itself.
It's great to test the case where elements in the middle are expired first, but the test doesn't seem to catch the edge case. Please correct me if I'm missing on my explanation of edge case.
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala
Outdated
Show resolved
Hide resolved
), | ||
StopStream | ||
) | ||
// add 3 more elements with a duration of a minute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, if we allow the change of TTL during query restart, it is possible that earlier elements in the list has later expiration time.
How this will work with ListStateImplWithTTL.get()
? dropWhile would not work.
E.g. some elements in the middle weren't expired at batch N but going to be expired at batch N + 1, and earlier elements are somehow having later expiration time and do not expire at batch N + 1. Calling get() in batch N + 1 will produce all elements including the elements which are going to be expired in batch N + 1, because earlier elements will terminate dropWhile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused by this comment - I think using dropWhile should be okay since we are only returning one row at a time in the iterator, and getNext
is called after every invocation of next()
Consider the following case (e for expired, v for valid):
[v1, v2, v3, e1, e2, e3, v4, v5, v6]
Calling dropWhile
will drop the first elements (of which there are none in this example) and point the iterator to v1, and we set the currentRow to v1. After we return v1, we set the currentRow to v2, then v3 (as dropWhile
doesn't drop these elements)
Now, we have returned v3, and unsafeRowValuesIterator
is pointing at e4. We would call dropWhile on this, causing the following three elements to be dropped, and the iterator is consumed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah you're right, it was confusing because you're creating a new iterator based on underlying iterator. It's a logical (view) iterator which also moves the underlying iterator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Thanks! Merging to master. |
What changes were proposed in this pull request?
This PR adds support for expiring state based on TTL for ListState. Using this functionality, Spark users can specify a TTL Mode for transformWithState operator, and provide a ttlDuration for each value in ListState. TTL support for Map State will be added in future PRs. Once the ttlDuration has expired, the value will not be returned as part of get() and would be cleaned up at the end of the micro-batch.
Why are the changes needed?
These changes are needed to support TTL for ListState. The PR supports specifying ttl for processing time.
Does this PR introduce any user-facing change?
Yes, modifies the ListState interface for specifying ttlDuration
How was this patch tested?
Added the TransformWithListStateTTLSuite, ListStateSuite, StatefulProcessorHandleSuite
Was this patch authored or co-authored using generative AI tooling?
No