New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20030][SS] Event-time-based timeout for MapGroupsWithState #17361
Conversation
Test build #74881 has finished for PR 17361 at commit
|
Test build #74894 has finished for PR 17361 at commit
|
Test build #74910 has finished for PR 17361 at commit
|
Test build #74919 has finished for PR 17361 at commit
|
if (outputMode != InternalOutputModes.Update) { | ||
throwError("flatMapGroupsWithState in update mode is not supported with " + | ||
// mapGroupsWithState and flatMapGroupsWithState | ||
case m: FlatMapGroupsWithState if m.isStreaming => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored this to contains all tests related to map/flatMapGroupsWithState under a single case statement. this way its easier to reason whether all the possible combinations of operator+output-mode+aggregation has been covered.
Also it consolidates all the "valid combinations" of mode + aggs on which additional checks can be made (presence of watermark when timeoutConf = EventTimeTimeout)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This refactoring passes all existing tests in UnsupportedOperationsSuite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, this is getting complicated...
Test build #74969 has finished for PR 17361 at commit
|
Test build #74971 has finished for PR 17361 at commit
|
Test build #75003 has finished for PR 17361 at commit
|
Test build #75001 has finished for PR 17361 at commit
|
/** | ||
* Timeout based on processing time. The duration of timeout can be set for each group in | ||
* `map/flatMapGroupsWithState` by calling `KeyedState.setTimeoutDuration()`. | ||
*/ | ||
public static KeyedStateTimeout ProcessingTimeTimeout() { return ProcessingTimeTimeout$.MODULE$; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I'd consider removing the Timeout
here and as its kind of redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its just that if someone this does import KeyedStateTimeout._
the code boils down to
flatMapGroupsWithState(Update, ProcessingTime) { ... }
with no reference to timeout.
Fine either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd probably still remove it.
} | ||
if (watermarkAttributes.isEmpty) { | ||
throwError( | ||
"Event time timeout is not supported in a [map|flatMap]GroupsWithState " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are hyphenating event-time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we? I didnt know there was a policy. I am fine hyphenating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i want it to be consistent and the docs hyphenate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aah, I get it. The apache docs does have hyphenation.
if (watermarkAttributes.isEmpty) { | ||
throwError( | ||
"Event time timeout is not supported in a [map|flatMap]GroupsWithState " + | ||
"without watermark. Use '[Dataset/DataFrame].withWatermark()' to " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would consider making this an affirmative statement. "You must define a watermark on a dataframe in order to use event-time based timeouts".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay.
.withWatermark("eventTime", "10 seconds") | ||
.as[(String, Long)] | ||
.groupByKey[String]((x: (String, Long)) => x._1) | ||
.flatMapGroupsWithState[Long, (String, Int)](Update, EventTimeTimeout)(stateFunc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These types are just here for testing? (i.e. we didn't break inference right?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was debugging and I left them there thinking it help readability of tests. I can remove them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as they aren't required its okay.
* Timeout based on event time. The event time timestamp for timeout can be set for each | ||
* group in `map/flatMapGroupsWithState` by calling `KeyedState.setTimeoutTimestamp()`. | ||
* In addition, you have to define the watermark in the query using `Dataset.withWatermark`. | ||
* When the watermark advances beyond the set timestamp of a group, then the group times out. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And no data has arrived for that group.
* @param isTimeoutEnabled Whether timeout is enabled. This will be used to check whether the user | ||
* is allowed to configure timeouts. | ||
* @param timeoutConf Type of timeout configured. Based on this, different operations will | ||
* be supported. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent is inconsistent
LGTM |
Test build #75010 has finished for PR 17361 at commit
|
Test build #75017 has finished for PR 17361 at commit
|
Test build #3604 has finished for PR 17361 at commit
|
@tdas Just FYI, I'm getting lint-java error: yuhao@yuhao-devbox:~/workspace/github/hhbyyh/spark$ ./dev/lint-java Is it just me? Maybe we should suppress the style error. |
What changes were proposed in this pull request?
Adding event time based timeout. The user sets the timeout timestamp directly using
KeyedState.setTimeoutTimestamp
. The keys times out when the watermark crosses the timeout timestamp.How was this patch tested?
Unit tests