Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-36463][SS] Prohibit update mode in streaming aggregation with session window #33689

Closed
wants to merge 2 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Aug 10, 2021

What changes were proposed in this pull request?

This PR proposes to prohibit update mode in streaming aggregation with session window.

UnsupportedOperationChecker will check and prohibit the case. As a side effect, this PR also simplifies the code as we can remove the implementation of iterator to support outputs of update mode.

This PR also cleans up test code via deduplicating.

Why are the changes needed?

The semantic of "update" mode for session window based streaming aggregation is quite unclear.

For normal streaming aggregation, Spark will provide the outputs which can be "upsert"ed based on the grouping key. This is based on the fact grouping key won't be changed.

This doesn't hold true for session window based streaming aggregation, as session range is changing.

If end users leverage their knowledge about streaming aggregation, they will consider the key as grouping key + session (since they'll specify these things in groupBy), and it's high likely possible that existing row is not updated (overwritten) and ended up with having different rows.

If end users consider the key as grouping key, there's a small chance for end users to upsert the session correctly, though only the last updated session will be stored so it won't work with event time processing which there could be multiple active sessions.

Does this PR introduce any user-facing change?

No, as we haven't released this feature.

How was this patch tested?

Updated tests.

@HeartSaVioR
Copy link
Contributor Author

To respect the semantic of "update" properly, we need "retraction" to provide two different events, remove old session and insert updated session. This could be implemented manually with flatMapGroupsWithState, though I'm not sure this is performant enough in practice, since it requires two operations "delete" and "insert" against external storage.

"numEvents")

sessionUpdates.explain()
val sessionUpdates = sessionWindowQuery(inputData)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most changes on this suite are about deduplication on queries. We can simply use two queries (keyed window vs global window) regardless of output mode.

@SparkQA
Copy link

SparkQA commented Aug 10, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46760/

@SparkQA
Copy link

SparkQA commented Aug 10, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46760/

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need to update (or mention it) in the document.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks okay to me. I will look this again after the document is changed too.

@github-actions github-actions bot added the DOCS label Aug 10, 2021
@SparkQA
Copy link

SparkQA commented Aug 10, 2021

Test build #142253 has finished for PR 33689 at commit 5eabdaf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 10, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46776/

@SparkQA
Copy link

SparkQA commented Aug 10, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46776/

@SparkQA
Copy link

SparkQA commented Aug 10, 2021

Test build #142269 has finished for PR 33689 at commit 188fe70.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Aug 10, 2021

This doesn't hold true for session window based streaming aggregation. If you're trying to upsert the output based on the grouping key, it's high likely possible that existing row is not updated (overwritten) and ended up with having different rows.

To be clear, I looked the current approach of UPDATE mode. We considers the row is updated if there is no existing row with the state key (session key + session start time), or the stored value isn't the same as the current value that will be stored.

But it is also possible that the session window was extended backward at the session start time. So the updated rows are not actually accurate.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Aug 11, 2021

I think the semantic is meaningful only when end users can store the output correctly. That said, we should evaluate the semantic in point of end users' view. They will evaluate whether they need to see the grouping key as grouping key vs grouping key + session. grouping key + session start is something Spark internally uses as state key, which end users wouldn't know, so no meaning in point of end users' view.

If they leverage their knowledge about streaming aggregation, they will consider the key as grouping key + session (since they'll specify these things in groupBy) which I already demonstrated the problem.

If they consider the key as grouping key, there's a chance for end users to upsert the session correctly, though only the last updated session will be stored, so it won't work with event time processing which there could be multiple active sessions.

@viirya
Copy link
Member

viirya commented Aug 11, 2021

Thanks for updating the description. lgtm

@HeartSaVioR
Copy link
Contributor Author

Thanks again @viirya for the quick reviewing! Merging to master/3.2.

HeartSaVioR added a commit that referenced this pull request Aug 11, 2021
…session window

### What changes were proposed in this pull request?

This PR proposes to prohibit update mode in streaming aggregation with session window.

UnsupportedOperationChecker will check and prohibit the case. As a side effect, this PR also simplifies the code as we can remove the implementation of iterator to support outputs of update mode.

This PR also cleans up test code via deduplicating.

### Why are the changes needed?

The semantic of "update" mode for session window based streaming aggregation is quite unclear.

For normal streaming aggregation, Spark will provide the outputs which can be "upsert"ed based on the grouping key. This is based on the fact grouping key won't be changed.

This doesn't hold true for session window based streaming aggregation, as session range is changing.

If end users leverage their knowledge about streaming aggregation, they will consider the key as grouping key + session (since they'll specify these things in groupBy), and it's high likely possible that existing row is not updated (overwritten) and ended up with having different rows.

If end users consider the key as grouping key, there's a small chance for end users to upsert the session correctly, though only the last updated session will be stored so it won't work with event time processing which there could be multiple active sessions.

### Does this PR introduce _any_ user-facing change?

No, as we haven't released this feature.

### How was this patch tested?

Updated tests.

Closes #33689 from HeartSaVioR/SPARK-36463.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit ed60aaa)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
flyrain pushed a commit to flyrain/spark that referenced this pull request Sep 21, 2021
…session window

This PR proposes to prohibit update mode in streaming aggregation with session window.

UnsupportedOperationChecker will check and prohibit the case. As a side effect, this PR also simplifies the code as we can remove the implementation of iterator to support outputs of update mode.

This PR also cleans up test code via deduplicating.

The semantic of "update" mode for session window based streaming aggregation is quite unclear.

For normal streaming aggregation, Spark will provide the outputs which can be "upsert"ed based on the grouping key. This is based on the fact grouping key won't be changed.

This doesn't hold true for session window based streaming aggregation, as session range is changing.

If end users leverage their knowledge about streaming aggregation, they will consider the key as grouping key + session (since they'll specify these things in groupBy), and it's high likely possible that existing row is not updated (overwritten) and ended up with having different rows.

If end users consider the key as grouping key, there's a small chance for end users to upsert the session correctly, though only the last updated session will be stored so it won't work with event time processing which there could be multiple active sessions.

No, as we haven't released this feature.

Updated tests.

Closes apache#33689 from HeartSaVioR/SPARK-36463.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit ed60aaa)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants