New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState #38013
Conversation
@HeartSaVioR The applyInPandasWithState session window example. |
Thanks for the contribution @chaoqin-li1123 ! Looks like python linter is complaining - could you please look into this? Also would be good to explicitly document how you ran the example in the section of Thanks in advance! |
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
Outdated
Show resolved
Hide resolved
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
Outdated
Show resolved
Hide resolved
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
Outdated
Show resolved
Hide resolved
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
Outdated
Show resolved
Hide resolved
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
Outdated
Show resolved
Hide resolved
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
Outdated
Show resolved
Hide resolved
Can one of the admins verify this patch? |
One tip, unlike Scala/Java code, we can leverage |
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
Outdated
Show resolved
Hide resolved
It seems that |
@chaoqin-li1123 Linter is still complaining. Could you take a look? You can install necessary python dependency requirements from |
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, but I'd like to have second eyes of reviews from PySpark experts to make sure the code looks good for them as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM2
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
Outdated
Show resolved
Hide resolved
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
Outdated
Show resolved
Hide resolved
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
Outdated
Show resolved
Hide resolved
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
Outdated
Show resolved
Hide resolved
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
Outdated
Show resolved
Hide resolved
Thanks! Merging to master. |
) | ||
|
||
def func( | ||
key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for post-hoc reviews.
Let's change Iterable
to Iterator
. This can only be an iterator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The linter force me to annotate the type here as Iterable instead of Iterator, maybe we should do some investigation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh, okay NVM. Let's leave it as is for now 👍
key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState | ||
) -> Iterable[pd.DataFrame]: | ||
if state.hasTimedOut: | ||
count, start, end = state.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would extract the key like:
(session_id,) = key
or
(word,) = key
"sessionId": [key[0]], | ||
"count": [count], | ||
"start": [start], | ||
"end": [end], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we cast this and show as a timestamp in the console? Numeric timestamp values look a bit difficult to read.
) | ||
|
||
def func( | ||
key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I would name pdf_iter
-> pdfs
end = 0 | ||
count = 0 | ||
for pdf in pdf_iter: | ||
start = min(start, min(pdf["timestamp"])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use pandas API here instead of built-in Python function to show users that we can use pandas. e.g.) int(min(start, pdf["timestamp"].min()))
and int(max(start, pdf["timestamp"].max()))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just tried a similar scenario, this causes data type errors downstream:
executor driver: net.razorvine.pickle.PickleException (expected zero arguments for construction of ClassDict (for numpy.dtype). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect the return type is not matched to the SQL type provided. Do you mind show the reproducer?
end = max(end, max(pdf["timestamp"])) | ||
count = count + len(pdf) | ||
if state.exists: | ||
old_session = state.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would do:
(old_count, start, old_end) = state.get
count = count + old_count
end = max(end, old_end)
sessions = events.groupBy(events["sessionId"]).applyInPandasWithState( | ||
func, | ||
session_schema, | ||
session_state_schema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, you can just pass a DDL formatted string schema too. e.g., "sessionId STRING, count LONG, start LONG, end LONG"
which will be shorter.
|
||
def func( | ||
key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState | ||
) -> Iterable[pd.DataFrame]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. output should better be Iterator
in this case.
|
||
r""" | ||
Split lines into words, group by words and use the state per key to track session of each key. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a bit of more explanation? e.g.) the timeout is set as 10 seconds so each session window lasts until there is no more input to the key for 10 seconds.
@chaoqin-li1123, I happened to nitpick some. Would you mind creating a followup PR with reusing the same JIRA? |
No problem, I will create a new PR with the suggested change. |
…thState followup ### What changes were proposed in this pull request? This is a followup of #38013 which introduce an example for applyInPandasWithState. Address some comments on code style. Closes #38066 from chaoqin-li1123/example_followup. Authored-by: Chaoqin Li <chaoqin.li@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
An example for applyInPandasWithState usage. This example split lines into words, group by words as key and use the state per key to track session of each key.
Why are the changes needed?
To demonstrate the usage of applyInPandasWithState
Does this PR introduce any user-facing change?
No.
How was this patch tested?
This is an example that can be run manually.
To run this on your local machine, you need to first run a Netcat server
$ nc -lk 9999
and then run the example
$ bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py localhost 9999