Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for StreamingQueryProgressWrapper" #39416

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -685,54 +685,3 @@ message ExecutorPeakMetricsDistributions {
repeated double quantiles = 1;
repeated ExecutorMetrics executor_metrics = 2;
}

message StateOperatorProgress {
string operator_name = 1;
int64 num_rows_total = 2;
int64 num_rows_updated = 3;
int64 all_updates_time_ms = 4;
int64 num_rows_removed = 5;
int64 all_removals_time_ms = 6;
int64 commit_time_ms = 7;
int64 memory_used_bytes = 8;
int64 num_rows_dropped_by_watermark = 9;
int64 num_shuffle_partitions = 10;
int64 num_state_store_instances = 11;
map<string, int64> custom_metrics = 12;
}

message SourceProgress {
string description = 1;
string start_offset = 2;
string end_offset = 3;
string latest_offset = 4;
int64 num_input_rows = 5;
double input_rows_per_second = 6;
double processed_rows_per_second = 7;
map<string, string> metrics = 8;
}

message SinkProgress {
string description = 1;
int64 num_output_rows = 2;
map<string, string> metrics = 3;
}

message StreamingQueryProgress {
string id = 1;
string run_id = 2;
string name = 3;
string timestamp = 4;
int64 batch_id = 5;
int64 batch_duration = 6;
map<string, int64> duration_ms = 7;
map<string, string> event_time = 8;
repeated StateOperatorProgress state_operators = 9;
repeated SourceProgress sources = 10;
SinkProgress sink = 11;
map<string, string> observed_metrics = 12;
}

message StreamingQueryProgressWrapper {
StreamingQueryProgress progress = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,3 @@
org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer
org.apache.spark.status.protobuf.sql.SparkPlanGraphWrapperSerializer
org.apache.spark.status.protobuf.sql.StreamingQueryDataSerializer
org.apache.spark.status.protobuf.sql.StreamingQueryProgressWrapperSerializer
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
* Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
*/
@Evolving
class StateOperatorProgress private[spark](
class StateOperatorProgress private[sql](
val operatorName: String,
val numRowsTotal: Long,
val numRowsUpdated: Long,
Expand Down Expand Up @@ -125,7 +125,7 @@ class StateOperatorProgress private[spark](
* @since 2.1.0
*/
@Evolving
class StreamingQueryProgress private[spark](
class StreamingQueryProgress private[sql](
val id: UUID,
val runId: UUID,
val name: String,
Expand Down Expand Up @@ -190,7 +190,7 @@ class StreamingQueryProgress private[spark](
* @since 2.1.0
*/
@Evolving
class SourceProgress protected[spark](
class SourceProgress protected[sql](
val description: String,
val startOffset: String,
val endOffset: String,
Expand Down Expand Up @@ -236,7 +236,7 @@ class SourceProgress protected[spark](
* @since 2.1.0
*/
@Evolving
class SinkProgress protected[spark](
class SinkProgress protected[sql](
val description: String,
val numOutputRows: Long,
val metrics: ju.Map[String, String] = Map[String, String]().asJava) extends Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ private[sql] case class StreamingQueryUIData(
}
}

private[spark] class StreamingQueryProgressWrapper(val progress: StreamingQueryProgress) {
private[sql] class StreamingQueryProgressWrapper(val progress: StreamingQueryProgress) {
@JsonIgnore @KVIndex
private val uniqueId: String = getUniqueId(progress.runId, progress.batchId, progress.timestamp)

Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.