New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-47035][SS][CONNECT] Protocol for Client-Side Listener #45091
Conversation
connector/connect/common/src/main/protobuf/spark/connect/commands.proto
Outdated
Show resolved
Hide resolved
// The event_type for QueryProgressEvent is 1; | ||
// for QueryTerminatedEvent is 2; for QueryIdleEvent is 3; | ||
// There is no QueryStartedEvent defined here, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we formalize this somehow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we can use similar mechanism as PythonEvalType
https://github.com/databricks/runtime/blob/95f1afae584a89d20ecf2e411f8b3a267d354e46/python/pyspark/rdd.py#L141
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that would be a bit out of the scope of this protocol pr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I see what you meant, created a enum in proto file
// There is no QueryStartedEvent defined here, | ||
// it is added as a field in WriteStreamOperationStartResult | ||
message StreamingQueryListenerEvents { | ||
string event_json = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this guaranteed to be JSON?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes all streaming query events have a json method
spark/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
Line 127 in fd518da
class QueryStartedEvent private[sql]( |
on client there are fromJson methods to reconstruct the event:
def fromJson(cls, j: Dict[str, Any]) -> "QueryStartedEvent": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some nits
enum StreamingQueryEventType { | ||
QUERY_PROGRESS_EVENT = 0; | ||
QUERY_TERMINATED_EVENT = 1; | ||
QUERY_IDLE_EVENT = 2; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need something like QUERY_EVENT_UNSPECIFIED
as 0
?
https://protobuf.dev/programming-guides/proto3/#enum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, each variable should have the same prefix as the enum name? I'm not sure if this is a strict rule or not, though. @grundprinzip @amaliujia
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for QUERY_EVENT_UNSPECIFIED = 0
.
No IIRC we didn't have a strict rule to ask each variable have the same prefix as the enum name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion! let me update
LGTM once CI pass, thank you! |
Thanks! merging to master. |
### What changes were proposed in this pull request? Currently, the StreamingQueryListener for Connect runs on the server side. From the user point of view, the purpose of a listener is mainly to have a pushing mechanism that notifies them when queries start / end / make progress. Before, the server-side listener essentially loses this functionality, and we find out that internally there are needs for the client side listener. The new listener will be running on the client side, with the server continuously pushing the new listener events to the client, and the client will call corresponding callback functions for different listener event types. This is the first PR that defines the protocol of this new listener. ### Why are the changes needed? Add client side listener which makes more sense. ### Does this PR introduce _any_ user-facing change? Not for this one. ### How was this patch tested? No need for this one, new tests will be added later. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#45091 from WweiL/SPARK-47035-protocol. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
…ient side listener ### What changes were proposed in this pull request? Followup of previous protocol change #45091. Add the request proto `Command` and response proto message to `ExecutePlanResponse` ### Why are the changes needed? Continuation of client side listener for spark connect ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Will be tested in subsequent PR, the proto change itself doesn't do any harm ### Was this patch authored or co-authored using generative AI tooling? No Closes #45444 from WweiL/SPARK-47035-protocol-followup. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…ient side listener ### What changes were proposed in this pull request? Followup of previous protocol change apache#45091. Add the request proto `Command` and response proto message to `ExecutePlanResponse` ### Why are the changes needed? Continuation of client side listener for spark connect ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Will be tested in subsequent PR, the proto change itself doesn't do any harm ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#45444 from WweiL/SPARK-47035-protocol-followup. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…ient side listener ### What changes were proposed in this pull request? Followup of previous protocol change apache#45091. Add the request proto `Command` and response proto message to `ExecutePlanResponse` ### Why are the changes needed? Continuation of client side listener for spark connect ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Will be tested in subsequent PR, the proto change itself doesn't do any harm ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#45444 from WweiL/SPARK-47035-protocol-followup. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Currently, the StreamingQueryListener for Connect runs on the server side. From the user point of view, the purpose of a listener is mainly to have a pushing mechanism that notifies them when queries start / end / make progress. Before, the server-side listener essentially loses this functionality, and we find out that internally there are needs for the client side listener. The new listener will be running on the client side, with the server continuously pushing the new listener events to the client, and the client will call corresponding callback functions for different listener event types. This is the first PR that defines the protocol of this new listener. Add client side listener which makes more sense. Not for this one. No need for this one, new tests will be added later. No Closes apache#45091 from WweiL/SPARK-47035-protocol. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
…ient side listener Followup of previous protocol change apache#45091. Add the request proto `Command` and response proto message to `ExecutePlanResponse` Continuation of client side listener for spark connect No Will be tested in subsequent PR, the proto change itself doesn't do any harm No Closes apache#45444 from WweiL/SPARK-47035-protocol-followup. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
Currently, the StreamingQueryListener for Connect runs on the server side.
From the user point of view, the purpose of a listener is mainly to have a pushing mechanism that notifies them when queries start / end / make progress. Before, the server-side listener essentially loses this functionality.
The new listener will be running on the client side, with the server continuously pushing the new listener events to the client, and the client will call corresponding callback functions for different listener event types.
This is the first PR that defines the protocol of this new listener.
Why are the changes needed?
Add client side listener which makes more sense.
Does this PR introduce any user-facing change?
Not for this one.
How was this patch tested?
No need for this one, new tests will be added later.
Was this patch authored or co-authored using generative AI tooling?
No