Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47035][SS][CONNECT] Protocol for Client-Side Listener #45091

Closed
wants to merge 14 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,9 @@ message WriteStreamOperationStartResult {
// An optional query name.
string name = 2;

// Optional query started event if there is any listener registered on the client side.
optional string query_started_event_json = 3;

// TODO: How do we indicate errors?
// TODO: Consider adding status, last progress etc here.
}
Expand Down Expand Up @@ -421,6 +424,40 @@ message StreamingQueryManagerCommandResult {
}
}

// The protocol for client-side StreamingQueryListener.
// This command will only be set when either the first listener is added to the client, or the last
// listener is removed from the client.
// The add_listener_bus_listener command will only be set true in the first case.
// The remove_listener_bus_listener command will only be set true in the second case.
message StreamingQueryListenerBusCommand {
oneof command {
bool add_listener_bus_listener = 1;
bool remove_listener_bus_listener = 2;
}
}

// The enum used for client side streaming query listener event
// There is no QueryStartedEvent defined here,
// it is added as a field in WriteStreamOperationStartResult
enum StreamingQueryEventType {
QUERY_PROGRESS_UNSPECIFIED = 0;
QUERY_PROGRESS_EVENT = 1;
QUERY_TERMINATED_EVENT = 2;
QUERY_IDLE_EVENT = 3;
}

// The protocol for the returned events in the long-running response channel.
message StreamingQueryListenerEvent {
// (Required) The json serialized event, all StreamingQueryListener events have a json method
string event_json = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this guaranteed to be JSON?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes all streaming query events have a json method

on client there are fromJson methods to reconstruct the event:

def fromJson(cls, j: Dict[str, Any]) -> "QueryStartedEvent":

// (Required) Query event type used by client to decide how to deserialize the event_json
StreamingQueryEventType event_type = 2;
}

message StreamingQueryListenerEventsResult {
repeated StreamingQueryListenerEvent events = 1;
}

// Command to get the output of 'SparkContext.resources'
message GetResourcesCommand { }

Expand Down