[SPARK-30657][SPARK-30658][SS] Fixed two bugs in streaming limits#27373
[SPARK-30657][SPARK-30658][SS] Fixed two bugs in streaming limits#27373tdas wants to merge 4 commits intoapache:masterfrom
Conversation
|
Test build #117479 has finished for PR 27373 at commit
|
|
Test build #117481 has finished for PR 27373 at commit
|
|
@tdas is this good to go before the code freeze? |
| } | ||
|
|
||
| test("streaming limit in complete mode") { | ||
| test("streaming limit before agg in complete mode") { |
There was a problem hiding this comment.
nit: could you add the jira id to the test name for this and the rest of tests? We recently added the following rule:
Also, you should consider writing a JIRA ID in the tests when your pull request targets to fix a specific issue. In practice, usually it is added when a JIRA type is a bug or a PR adds a couple of tests to an existing test class. See the examples below: Scala test("SPARK-12345: a short description of the test") {
| case ReturnAnswer(Limit(IntegerLiteral(limit), child)) if generatesStreamingAppends(child) => | ||
| StreamingGlobalLimitExec(limit, StreamingLocalLimitExec(limit, planLater(child))) :: Nil | ||
|
|
||
| case Limit(IntegerLiteral(limit), child) if generatesStreamingAppends (child) => |
There was a problem hiding this comment.
super nit: extra space between generatesStreamingAppends and (child)
There was a problem hiding this comment.
i am surprised that the style checker did not catch this
|
@srowen this is good to go |
| } | ||
|
|
||
| test("streaming limit in complete mode") { | ||
| test("streaming limit before agg in complete mode (SPARK-30658)") { |
There was a problem hiding this comment.
@tdas this is not the right style. It should be test("SPARK-12345: a short description of the test"). Could you also fix other test names?
|
Test build #117593 has finished for PR 27373 at commit
|
|
Test build #117617 has finished for PR 27373 at commit
|
|
jenkins retest this please |
|
Test build #117636 has finished for PR 27373 at commit
|
|
jenkins retest this please. |
|
the last failure was in unrelated python test. nonetheless kicking off another round of tests to be sure. |
|
Test build #117654 has finished for PR 27373 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
LGTM. There is one test missing jira id. I will fix it when merging the PR.
| false)) | ||
| } | ||
|
|
||
| test("streaming limit should not apply on limits on state subplans") { |
There was a problem hiding this comment.
nit: this is missing the jira id.
|
@tdas I merged this to master. Could you also submit a PR for branch-2.4? |
|
Thank you for merging. Should this be merged to branch 2.4? This is a slightly scary change deep in the incremental execution stuff. |
What changes were proposed in this pull request?
This PR solves two bugs related to streaming limits
Bug 1 (SPARK-30658): Limit before a streaming aggregate (i.e.
df.limit(5).groupBy().count()) in complete mode was not being planned as a stateful streaming limit. The planner rule planned a logical limit with a stateful streaming limit plan only if the query is in append mode. As a result, instead of allowing max 5 rows across batches, the planned streaming query was allowing 5 rows in every batch thus producing incorrect results.Solution: Change the planner rule to plan the logical limit with a streaming limit plan even when the query is in complete mode if the logical limit has no stateful operator before it.
Bug 2 (SPARK-30657):
LocalLimitExecdoes not consume the iterator of the child plan. So if there is a limit after a stateful operator like streaming dedup in append mode (e.g.df.dropDuplicates().limit(5)), the state changes of streaming duplicate may not be committed (most stateful ops commit state changes only after the generated iterator is fully consumed).Solution: Change the planner rule to always use a new
StreamingLocalLimitExecwhich always fully consumes the iterator. This is the safest thing to do. However, this will introduce a performance regression as consuming the iterator is extra work. To minimize this performance impact, add an additional post-planner optimization rule to replaceStreamingLocalLimitExecwithLocalLimitExecwhen there is no stateful operator before the limit that could be affected by it.Does this PR introduce any user-facing change?
No
How was this patch tested?
Updated incorrect unit tests and added new ones