[SPARK-36314][SS] Update Sessionization examples to use native support of session window#33548
[SPARK-36314][SS] Update Sessionization examples to use native support of session window#33548HeartSaVioR wants to merge 3 commits intoapache:masterfrom
Conversation
… of session window
|
Target branches are master/3.2. |
|
cc. @viirya @xuanyuanking |
|
Test build #141737 has finished for PR 33548 at commit
|
|
Test build #141741 has finished for PR 33548 at commit
|
|
Kubernetes integration test starting |
|
|
||
| if __name__ == "__main__": | ||
| if len(sys.argv) != 3 and len(sys.argv) != 2: | ||
| msg = "Usage: structured_network_wordcount_windowed.py <hostname> <port> " |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test status failure |
|
Test build #141744 has finished for PR 33548 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Thanks! Merging to master/3.2. |
…t of session window ### What changes were proposed in this pull request? This PR proposes to update Sessionization examples to use native support of session window. It also adds the example for PySpark as native support of session window is available to PySpark as well. ### Why are the changes needed? We should guide the simplest way to achieve the same workload. I'll provide another example for cases we can't do with native support of session window. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually tested. Closes #33548 from HeartSaVioR/SPARK-36314. Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit 1fafa8e) Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
|
Thanks all for the quick reviews and merge! |
|
Hello fellow spark structured streaming sessionizers. According to the apache spark docs, output mode update is not allowed with session window. I tried running the example code in my local machine for 3.2.1 and 3.2.0 and it failed with error:
According to this example code it can be done. What am I missing? |
|
Ah we seem to miss updating the example... We allowed update mode first and disallowed due to unclear semantic. Could you please raise a PR to update the output mode to append for Scala/Java/Python examples for session window if you don't mind? I can jump in if you're not available to address this. Thanks in advance! |
|
Sure, but you see, if we do it with append mode, then that means the sessions will be returned only after they are closed. Due to this, possibility and purpose of real-time session tagging is defeated. Should we not revert the example to mapGroupsWithState method? |
|
We still have an example of session window against (flat)mapGroupsWithState. We just don't want to duplicate the code example. The main reason I proposed dropping the functionality of update mode for session window was due to the characteristic of session window. When someone uses update mode for the streaming query, it does not only mean they want to get update immediately, but also mean they are going to "upsert" the target table. Upsertion is normally performed by replacing the existing value(s) for specific key to new value(s). If we consider the out-of-order events, session window will expand arbitrarily - both start and end of session can change. (Technically saying, the old example of session window was incorrect in terms of event-time semantic.) Given Spark does not produce the old value(s), the query can't always safely upsert the result. E.g. user A has multiple sessions in the target table and some of sessions were already closed. Spark never gives them, and the query is in a risk of dropping these old-but-valid sessions. |
|
Got it. So the problem is we run a risk of taking wrong real-time decisions on the basis of immediate updates and it is very much possible the session we calculated was incorrect due to late events. So, the safer and more accurate solution would be append mode with watermarking. |
What changes were proposed in this pull request?
This PR proposes to update Sessionization examples to use native support of session window. It also adds the example for PySpark as native support of session window is available to PySpark as well.
Why are the changes needed?
We should guide the simplest way to achieve the same workload. I'll provide another example for cases we can't do with native support of session window.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Manually tested.