Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47035][SS][CONNECT][FOLLOWUP] Additional protobuf change to client side listener #45444

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ message ExecutePlanResponse {
// Response for commands on the streaming query manager.
StreamingQueryManagerCommandResult streaming_query_manager_command_result = 11;

// Response for commands on the client side streaming query listener.
StreamingQueryListenerEventsResult streaming_query_listener_events_result = 16;

// Response type informing if the stream is complete in reattachable execution.
ResultComplete result_complete = 14;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ message Command {
GetResourcesCommand get_resources_command = 8;
StreamingQueryManagerCommand streaming_query_manager_command = 9;
CommonInlineUserDefinedTableFunction register_table_function = 10;
StreamingQueryListenerBusCommand streaming_query_listener_bus_command = 11;

// This field is used to mark extensions to the protocol. When plugins generate arbitrary
// Commands they can add them here. During the planning the correct resolution is done.
Expand Down Expand Up @@ -456,6 +457,7 @@ message StreamingQueryListenerEvent {

message StreamingQueryListenerEventsResult {
repeated StreamingQueryListenerEvent events = 1;
optional bool listener_bus_listener_added = 2;
}

// Command to get the output of 'SparkContext.resources'
Expand Down
204 changes: 102 additions & 102 deletions python/pyspark/sql/connect/proto/base_pb2.py

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions python/pyspark/sql/connect/proto/base_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1410,6 +1410,7 @@ class ExecutePlanResponse(google.protobuf.message.Message):
STREAMING_QUERY_COMMAND_RESULT_FIELD_NUMBER: builtins.int
GET_RESOURCES_COMMAND_RESULT_FIELD_NUMBER: builtins.int
STREAMING_QUERY_MANAGER_COMMAND_RESULT_FIELD_NUMBER: builtins.int
STREAMING_QUERY_LISTENER_EVENTS_RESULT_FIELD_NUMBER: builtins.int
RESULT_COMPLETE_FIELD_NUMBER: builtins.int
EXTENSION_FIELD_NUMBER: builtins.int
METRICS_FIELD_NUMBER: builtins.int
Expand Down Expand Up @@ -1456,6 +1457,11 @@ class ExecutePlanResponse(google.protobuf.message.Message):
) -> pyspark.sql.connect.proto.commands_pb2.StreamingQueryManagerCommandResult:
"""Response for commands on the streaming query manager."""
@property
def streaming_query_listener_events_result(
self,
) -> pyspark.sql.connect.proto.commands_pb2.StreamingQueryListenerEventsResult:
"""Response for commands on the client side streaming query listener."""
@property
def result_complete(self) -> global___ExecutePlanResponse.ResultComplete:
"""Response type informing if the stream is complete in reattachable execution."""
@property
Expand Down Expand Up @@ -1493,6 +1499,8 @@ class ExecutePlanResponse(google.protobuf.message.Message):
| None = ...,
streaming_query_manager_command_result: pyspark.sql.connect.proto.commands_pb2.StreamingQueryManagerCommandResult
| None = ...,
streaming_query_listener_events_result: pyspark.sql.connect.proto.commands_pb2.StreamingQueryListenerEventsResult
| None = ...,
result_complete: global___ExecutePlanResponse.ResultComplete | None = ...,
extension: google.protobuf.any_pb2.Any | None = ...,
metrics: global___ExecutePlanResponse.Metrics | None = ...,
Expand Down Expand Up @@ -1521,6 +1529,8 @@ class ExecutePlanResponse(google.protobuf.message.Message):
b"sql_command_result",
"streaming_query_command_result",
b"streaming_query_command_result",
"streaming_query_listener_events_result",
b"streaming_query_listener_events_result",
"streaming_query_manager_command_result",
b"streaming_query_manager_command_result",
"write_stream_operation_start_result",
Expand Down Expand Up @@ -1558,6 +1568,8 @@ class ExecutePlanResponse(google.protobuf.message.Message):
b"sql_command_result",
"streaming_query_command_result",
b"streaming_query_command_result",
"streaming_query_listener_events_result",
b"streaming_query_listener_events_result",
"streaming_query_manager_command_result",
b"streaming_query_manager_command_result",
"write_stream_operation_start_result",
Expand All @@ -1574,6 +1586,7 @@ class ExecutePlanResponse(google.protobuf.message.Message):
"streaming_query_command_result",
"get_resources_command_result",
"streaming_query_manager_command_result",
"streaming_query_listener_events_result",
"result_complete",
"extension",
]
Expand Down
180 changes: 90 additions & 90 deletions python/pyspark/sql/connect/proto/commands_pb2.py

Large diffs are not rendered by default.

40 changes: 39 additions & 1 deletion python/pyspark/sql/connect/proto/commands_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class Command(google.protobuf.message.Message):
GET_RESOURCES_COMMAND_FIELD_NUMBER: builtins.int
STREAMING_QUERY_MANAGER_COMMAND_FIELD_NUMBER: builtins.int
REGISTER_TABLE_FUNCTION_FIELD_NUMBER: builtins.int
STREAMING_QUERY_LISTENER_BUS_COMMAND_FIELD_NUMBER: builtins.int
EXTENSION_FIELD_NUMBER: builtins.int
@property
def register_function(
Expand All @@ -124,6 +125,8 @@ class Command(google.protobuf.message.Message):
self,
) -> pyspark.sql.connect.proto.relations_pb2.CommonInlineUserDefinedTableFunction: ...
@property
def streaming_query_listener_bus_command(self) -> global___StreamingQueryListenerBusCommand: ...
@property
def extension(self) -> google.protobuf.any_pb2.Any:
"""This field is used to mark extensions to the protocol. When plugins generate arbitrary
Commands they can add them here. During the planning the correct resolution is done.
Expand All @@ -143,6 +146,8 @@ class Command(google.protobuf.message.Message):
streaming_query_manager_command: global___StreamingQueryManagerCommand | None = ...,
register_table_function: pyspark.sql.connect.proto.relations_pb2.CommonInlineUserDefinedTableFunction
| None = ...,
streaming_query_listener_bus_command: global___StreamingQueryListenerBusCommand
| None = ...,
extension: google.protobuf.any_pb2.Any | None = ...,
) -> None: ...
def HasField(
Expand All @@ -164,6 +169,8 @@ class Command(google.protobuf.message.Message):
b"sql_command",
"streaming_query_command",
b"streaming_query_command",
"streaming_query_listener_bus_command",
b"streaming_query_listener_bus_command",
"streaming_query_manager_command",
b"streaming_query_manager_command",
"write_operation",
Expand Down Expand Up @@ -193,6 +200,8 @@ class Command(google.protobuf.message.Message):
b"sql_command",
"streaming_query_command",
b"streaming_query_command",
"streaming_query_listener_bus_command",
b"streaming_query_listener_bus_command",
"streaming_query_manager_command",
b"streaming_query_manager_command",
"write_operation",
Expand All @@ -217,6 +226,7 @@ class Command(google.protobuf.message.Message):
"get_resources_command",
"streaming_query_manager_command",
"register_table_function",
"streaming_query_listener_bus_command",
"extension",
]
| None
Expand Down Expand Up @@ -1938,18 +1948,46 @@ class StreamingQueryListenerEventsResult(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

EVENTS_FIELD_NUMBER: builtins.int
LISTENER_BUS_LISTENER_ADDED_FIELD_NUMBER: builtins.int
@property
def events(
self,
) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
global___StreamingQueryListenerEvent
]: ...
listener_bus_listener_added: builtins.bool
def __init__(
self,
*,
events: collections.abc.Iterable[global___StreamingQueryListenerEvent] | None = ...,
listener_bus_listener_added: builtins.bool | None = ...,
) -> None: ...
def HasField(
self,
field_name: typing_extensions.Literal[
"_listener_bus_listener_added",
b"_listener_bus_listener_added",
"listener_bus_listener_added",
b"listener_bus_listener_added",
],
) -> builtins.bool: ...
def ClearField(
self,
field_name: typing_extensions.Literal[
"_listener_bus_listener_added",
b"_listener_bus_listener_added",
"events",
b"events",
"listener_bus_listener_added",
b"listener_bus_listener_added",
],
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["events", b"events"]) -> None: ...
def WhichOneof(
self,
oneof_group: typing_extensions.Literal[
"_listener_bus_listener_added", b"_listener_bus_listener_added"
],
) -> typing_extensions.Literal["listener_bus_listener_added"] | None: ...

global___StreamingQueryListenerEventsResult = StreamingQueryListenerEventsResult

Expand Down