-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-35897][SS] Support user defined initial state with flatMapGroupsWithState in Structured Streaming #33093
[SPARK-35897][SS] Support user defined initial state with flatMapGroupsWithState in Structured Streaming #33093
Conversation
@tdas can you enable CI on this |
cc: @tdas can you enable CI on this |
add to whitelist |
Test build #140337 has finished for PR 33093 at commit
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #140342 has finished for PR 33093 at commit
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
ok to test |
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #140347 has finished for PR 33093 at commit
|
@rahulsmahadev, mind keeping the PR description template (https://github.com/apache/spark/blob/master/.github/PULL_REQUEST_TEMPLATE)? |
69b3c7e
to
6c1443c
Compare
Test build #140405 has finished for PR 33093 at commit
|
Test build #140407 has finished for PR 33093 at commit
|
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test status success |
Kubernetes integration test unable to build dist. exiting with code: 1 |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #140413 has finished for PR 33093 at commit
|
Test build #140415 has finished for PR 33093 at commit
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #140514 has finished for PR 33093 at commit
|
("keyInStateAndData-2", Seq[String](), "1"), | ||
("keyOnlyInData", Seq[String]("keyOnlyInData"), "1") // inc by 1 | ||
), | ||
assertNumStateRows(total = 5, updated = 5), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are not testing whether the initial group state is actually being saved or not. you could have just created th e input GroupState object with the initial state and not saved to state store, and this test will still pass. So you need to run another batch to retrieve and test the save state.
Furthermore, you need to explicitly test whether the initial state is saved to store even if you dont call GroupState.update()
. Right now in your test function, you are always calling update. So even if you incorrectly did not save the initial state store, the update will always make sure the state store is updated. So you need to test for more cases, with more keys.
Test build #140518 has finished for PR 33093 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
def testWithTimeout(timeoutConf: GroupStateTimeout): Unit = { | ||
test("SPARK-20714: watermark does not fail query when timeout = " + timeoutConf) { | ||
// Function to maintain running count up to 2, and then remove the count | ||
// Returns the data and the count (-1 if count reached beyond 2 and state was just removed) | ||
val stateFunc = | ||
(key: String, values: Iterator[(String, Long)], state: GroupState[RunningCount]) => { | ||
(key: String, values: Iterator[(String, Long)], state: GroupState[RunningCount]) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: fix this
Kubernetes integration test starting |
assert(state.exists) | ||
assertCanGetProcessingTime { state.getCurrentProcessingTimeMs() >= 0 } | ||
assertCannotGetWatermark { state.getCurrentWatermarkMs() } | ||
assert(!state.hasTimedOut) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does the last 3 tests need to be inside if (valList.isEmpty) {
?
// We need to check if not explicitly calling update will still save the init state or not | ||
if (valList.nonEmpty || state.getOption.map(_.count).getOrElse(0L) != 2L) { | ||
// this is not reached when valList is empty and the state count is 2 | ||
state.update(new RunningCount(count)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rather than doing this complicated logic of no updating when some magical set of conditions are met .. isnt it simpler to have
if (!key.contains("NoUpdate")) state.update(...)
and then pass a key name keyOnlyInStateButNoUpate
or keyInStateAndDataButNoUpate
??
("keyOnlyInState-1", Seq[String](), "1"), | ||
("keyOnlyInState-2", Seq[String](), "2"), | ||
("keyInStateAndData-2", Seq[String]("keyInStateAndData-2"), "3"), // inc by 1 | ||
("keyInStateAndData-1", Seq[String](), "1"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keyInStateAndData-1
is NOT in data. this is confusing!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm it is added in second batch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is nit, the point naming it like this is its in both state and data of the first batch which is what we are mainly testing. hence it is confusing.
assert(state.exists) | ||
assertCanGetProcessingTime { state.getCurrentProcessingTimeMs() >= 0 } | ||
assertCannotGetWatermark { state.getCurrentWatermarkMs() } | ||
assert(!state.hasTimedOut) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same questions as above
Test build #140545 has finished for PR 33093 at commit
|
Kubernetes integration test status success |
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #140538 has finished for PR 33093 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #140543 has finished for PR 33093 at commit
|
Thanks, merging to master |
Test build #140550 has finished for PR 33093 at commit
|
What changes were proposed in this pull request?
This PR aims to add support for specifying a user defined initial state for arbitrary structured streaming stateful processing using [flat]MapGroupsWithState operator.
Why are the changes needed?
Users can load previous state of their stateful processing as an initial state instead of redoing the entire processing once again.
Does this PR introduce any user-facing change?
Yes this PR introduces new API
How was this patch tested?
Through unit tests in FlatMapGroupsWithStateSuite