Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,12 @@ message ExecutePlanResponse {
// Special case for executing SQL commands.
SqlCommandResult sql_command_result = 5;

// Response for a streaming query.
WriteStreamOperationStartResult write_stream_operation_start_result = 8;

// Response for commands on a streaming query.
StreamingQueryCommandResult streaming_query_command_result = 9;

// Support arbitrary result objects.
google.protobuf.Any extension = 999;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ message Command {
CreateDataFrameViewCommand create_dataframe_view = 3;
WriteOperationV2 write_operation_v2 = 4;
SqlCommand sql_command = 5;
WriteStreamOperationStart write_stream_operation_start = 6;
StreamingQueryCommand streaming_query_command = 7;

// This field is used to mark extensions to the protocol. When plugins generate arbitrary
// Commands they can add them here. During the planning the correct resolution is done.
Expand Down Expand Up @@ -174,3 +176,125 @@ message WriteOperationV2 {
// (Optional) A condition for overwrite saving mode
Expression overwrite_condition = 8;
}

// Starts write stream operation as streaming query. Query ID and Run ID of the streaming
// query are returned.
message WriteStreamOperationStart {

// (Required) The output of the `input` streaming relation will be written.
Relation input = 1;

// The following fields directly map to API for DataStreamWriter().
// Consult API documentation unless explicitly documented here.

string format = 2;
map<string, string> options = 3;
repeated string partitioning_column_names = 4;

oneof trigger {
string processing_time_interval = 5;
bool available_now = 6;
bool once = 7;
string continuous_checkpoint_interval = 8;
}

string output_mode = 9;
string query_name = 10;

// The destination is optional. When set, it can be a path or a table name.
oneof sink_destination {
string path = 11;
string table_name = 12;
}
}

message WriteStreamOperationStartResult {

// (Required) Query instance. See `StreamingQueryInstanceId`.
StreamingQueryInstanceId query_id = 1;

// An optional query name.
string name = 2;

// TODO: How do we indicate errors?
// TODO: Consider adding status, last progress etc here.
}

// A tuple that uniquely identifies an instance of streaming query run. It consists of `id` that
// persists across the streaming runs and `run_id` that changes between each run of the
// streaming query that resumes from the checkpoint.
message StreamingQueryInstanceId {

Comment on lines +226 to +227
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: @grundprinzip @amaliujia : Added StreamingQueryInstanceId that uniquely identifies an instance of streaming query run. Added full description of the fields (ported from Scala doc).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

// (Required) The unique id of this query that persists across restarts from checkpoint data.
// That is, this id is generated when a query is started for the first time, and
// will be the same every time it is restarted from checkpoint data.
string id = 1;

// (Required) The unique id of this run of the query. That is, every start/restart of a query
// will generate a unique run_id. Therefore, every time a query is restarted from
// checkpoint, it will have the same `id` but different `run_id`s.
string run_id = 2;
}

// Commands for a streaming query.
message StreamingQueryCommand {

// (Required) Query instance. See `StreamingQueryInstanceId`.
StreamingQueryInstanceId query_id = 1;

oneof command {
// See documentation for the corresponding API method in StreamingQuery.

// status() API.
bool status = 2;
// lastProgress() API.
bool last_progress = 3;
// recentProgress() API.
bool recent_progress = 4;
// stop() API. Stops the query.
bool stop = 5;
// processAllAvailable() API. Waits till all the available data is processed
bool process_all_available = 6;
// explain() API. Returns logical and physical plans.
ExplainCommand explain = 7;

// TODO(SPARK-42960) Add more commands: await_termination(), exception() etc.
}

message ExplainCommand {
// TODO: Consider reusing Explain from AnalyzePlanRequest message.
// We can not do this right now since it base.proto imports this file.
bool extended = 1;
}
}

// Response for commands on a streaming query.
message StreamingQueryCommandResult {
// (Required) Query instance id. See `StreamingQueryInstanceId`.
StreamingQueryInstanceId query_id = 1;

oneof result_type {
StatusResult status = 2;
RecentProgressResult recent_progress = 3;
ExplainResult explain = 4;
}

message StatusResult {
// See documentation for these Scala 'StreamingQueryStatus' struct
string status_message = 1;
bool is_data_available = 2;
bool is_trigger_active = 3;
bool is_active = 4;
}

message RecentProgressResult {
// Progress reports as an array of json strings.
repeated string recent_progress_json = 5;
}

message ExplainResult {
// Logical and physical plans as string
string result = 1;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ message Relation {
Parse parse = 30;
GroupMap group_map = 31;
CoGroupMap co_group_map = 32;
WithWatermark with_watermark = 33;

// NA functions
NAFill fill_na = 90;
Expand Down Expand Up @@ -120,6 +121,9 @@ message Read {
DataSource data_source = 2;
}

// (Optional) Indicates if this is a streaming read.
bool is_streaming = 3;

message NamedTable {
// (Required) Unparsed identifier for the table.
string unparsed_identifier = 1;
Expand Down Expand Up @@ -726,6 +730,18 @@ message WithColumns {
repeated Expression.Alias aliases = 2;
}

message WithWatermark {

// (Required) The input relation
Relation input = 1;

// (Required) Name of the column containing event time.
string event_time = 2;

// (Required)
string delay_threshold = 3;
}

// Specify a hint over a relation. Hint should have a name and optional parameters.
message Hint {
// (Required) The input relation.
Expand Down
Loading