-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-36132][SS][SQL] Support initial state for batch mode of flatMapGroupsWithState #33336
[SPARK-36132][SS][SQL] Support initial state for batch mode of flatMapGroupsWithState #33336
Conversation
cc: @tdas |
|
||
override def right: SparkPlan = initialState | ||
|
||
override def outputPartitioning: Partitioning = child.outputPartitioning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is something im not sure of
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the output partitioning in MapGroupsExec... it should be the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its the same
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #141003 has finished for PR 33336 at commit
|
/** | ||
* Groups the input rows together and calls the function with each group and an iterator containing | ||
* all elements in the group. The result of this function is flattened before being output. This | ||
* version of the Physical operator takes a user provided initial state. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
param docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
improve the docs as well. "This physical operator is similar to MapGroupsExec but the difference that it calls the user-defined function with .... group + iterator + user-defined initial state"
inputData.toDS() | ||
.groupByKey(x => x) | ||
.flatMapGroupsWithState( | ||
Update, GroupStateTimeout.NoTimeout, initialState)(flatMapGroupsWithStateFunc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldnt you be testing this with all timeouts?
val result = func( | ||
getKey(keyRow), | ||
valueRowIter.map(getValue), | ||
groupState) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: does it have to be multiline?
var foundInitialStateForKey = false | ||
val optionalState = initialStateRowIter.map { initialStateRow => | ||
if (foundInitialStateForKey) { | ||
throw new IllegalArgumentException("The initial state provided contained " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error should be made into a common error between FlatMapGroupswithStateExec and this class. If possible.
groupedChildDataIter, groupedInitialStateIter, groupingAttributes).flatMap { | ||
case (keyRow, valueRowIter, initialStateRowIter) => | ||
var foundInitialStateForKey = false | ||
val optionalState = initialStateRowIter.map { initialStateRow => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add docs here explaining the logic with foundInitialStateForKey
isFlatMapGroupsWithState, timeout, hasInitialState, initialStateGroupAttrs, | ||
initialStateDataAttrs, initialStateDeserializer, initialState, child) => | ||
if (hasInitialState) { | ||
execution.MapGroupsWithInitialStateExec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need a separate physical plan? Can you implement this using CoGroupExec
func, keyDeserializer, valueDeserializer, initialStateDeserializer, groupingAttributes, | ||
initialStateGroupAttrs, dataAttributes, initialStateDataAttrs, outputObjAttr, | ||
child, initialState) { | ||
override def outputPartitioning: Partitioning = child.outputPartitioning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am actually not sure if this is correct. Even for MapGroupsExec. This operators output rows that are completely new columns... completely different from the key-value columns as input. So the output cannot be partitioned in the same way as the input is.
Its safer to not specify this. If you dont specify, then the worst case is that an extra shuffle will be added if the next operator needs it to be partitioned in some way. If it is specified and it is wrong, then the next operator will end up incorrect assuming that things are partitioned in some way and produce incorrect results.
so you were right about this being fishy.
@@ -1284,6 +1284,9 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { | |||
assertCanGetProcessingTime { state.getCurrentProcessingTimeMs() >= 0 } | |||
assertCannotGetWatermark { state.getCurrentWatermarkMs() } | |||
assert(!state.hasTimedOut) | |||
if(key.contains("Timeout")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space after if
Kubernetes integration test starting |
Kubernetes integration test status success |
}.toSeq | ||
|
||
// Create group state object | ||
val groupState = GroupStateImpl.createForStreaming( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we call createForStreaming
rather than createForBatch
?
inputData.toDS() | ||
.groupByKey(x => x) | ||
.flatMapGroupsWithState( | ||
Update, EventTimeTimeout(), initialState)(flatMapGroupsWithStateFunc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need testing with all 3 timeout settings.
val func = (keyRow: Any, values: Iterator[Any], states: Iterator[Any]) => { | ||
// Check if there is only one state for every key. | ||
var foundInitialStateForKey = false | ||
val optionalState = states.map { initialState => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dont override variables! there is already a initialState
function param. its confusing.
} | ||
|
||
/** | ||
* Special handling for when the child relation is a batch relation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the handling here? the doc should say plan logical flatmapGroupsWIthState for batch queries.
* state is not provided we create an instance of the MapGroupsExec | ||
*/ | ||
// scalastyle:off argcount | ||
def forBatch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
objectName.forX()
generally implies creating instances of type objectName
's corresponding class. This is not non-intuitive. rather say "FlatMapGroupsWithStateExec.generateSparkPlanForBatchQueries(...)`
Test build #141089 has finished for PR 33336 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #141104 has finished for PR 33336 at commit
|
("keyOnlyInData"), ("keyInStateAndData-2") | ||
) | ||
val timeoutMode = timeout match { | ||
case "NoTimeout" => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you doing this?? Why not create seq of objects directly Seq(NoTimeout(), EventTimeTimeout(), ProcessingTimeTimeout()).foreach { timeout =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If i do that then I need to match on that to get the key right or does timeout have a string method ?
var foundInitialStateForKey = false | ||
val optionalState = states.map { stateValue => | ||
if (foundInitialStateForKey) { | ||
foundDuplicateInitialKeyException() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't catch duplicate keys because states
is an Iterator. E.g.,
val states = Iterator(1, 1)
var foundInitialStateForKey = false
val optionalState = states.map { stateValue =>
if (foundInitialStateForKey) {
throw new RuntimeException("foo")
}
foundInitialStateForKey = true
stateValue
}.toSeq
optionalState.headOption
the above case won't fail.
You can change toSeq
to toArray
to fix this issue. Can we add a unit test for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh great catch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this bug exist for streaming as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no we have a test for that flatMapGroupsWithState - initial state - duplicate keys
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
streaming is using Iterator.foreach
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM pending tests.
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #141364 has finished for PR 33336 at commit
|
…pGroupsWithState ### What changes were proposed in this pull request? Adding support for accepting an initial state with flatMapGroupsWithState in batch mode. ### Why are the changes needed? SPARK-35897 added support for accepting an initial state for streaming queries using flatMapGroupsWithState. the code flow is separate for batch and streaming and required a different PR. ### Does this PR introduce _any_ user-facing change? Yes as discussed above flatMapGroupsWithState in batch mode can accept an initialState, previously this would throw an UnsupportedOperationException ### How was this patch tested? Added relevant unit tests in FlatMapGroupsWithStateSuite and modified the tests `JavaDatasetSuite` Closes #33336 from rahulsmahadev/flatMapGroupsWithStateBatch. Authored-by: Rahul Mahadev <rahul.mahadev@databricks.com> Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com> (cherry picked from commit efcce23) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
merged to master and backported to 3.2 since this is minor change but a good improvement to new feature added in 3.2 |
Test build #141367 has finished for PR 33336 at commit
|
What changes were proposed in this pull request?
Adding support for accepting an initial state with flatMapGroupsWithState in batch mode.
Why are the changes needed?
SPARK-35897 added support for accepting an initial state for streaming queries using flatMapGroupsWithState. the code flow is separate for batch and streaming and required a different PR.
Does this PR introduce any user-facing change?
Yes as discussed above flatMapGroupsWithState in batch mode can accept an initialState, previously this would throw an UnsupportedOperationException
How was this patch tested?
Added relevant unit tests in FlatMapGroupsWithStateSuite and modified the tests
JavaDatasetSuite