Skip to content

Commit

Permalink
[SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for Streamin…
Browse files Browse the repository at this point in the history
…gQueryProgressWrapper

### What changes were proposed in this pull request?
Add Protobuf serializer for `StreamingQueryProgressWrapper `

### Why are the changes needed?
Support fast and compact serialization/deserialization for `StreamingQueryProgressWrapper ` over RocksDB.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add new UT

Closes #39357 from LuciferYang/SPARK-41677.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
  • Loading branch information
LuciferYang authored and gengliangwang committed Jan 4, 2023
1 parent 492356d commit 915e9c6
Show file tree
Hide file tree
Showing 10 changed files with 537 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -685,3 +685,54 @@ message ExecutorPeakMetricsDistributions {
repeated double quantiles = 1;
repeated ExecutorMetrics executor_metrics = 2;
}

message StateOperatorProgress {
string operator_name = 1;
int64 num_rows_total = 2;
int64 num_rows_updated = 3;
int64 all_updates_time_ms = 4;
int64 num_rows_removed = 5;
int64 all_removals_time_ms = 6;
int64 commit_time_ms = 7;
int64 memory_used_bytes = 8;
int64 num_rows_dropped_by_watermark = 9;
int64 num_shuffle_partitions = 10;
int64 num_state_store_instances = 11;
map<string, int64> custom_metrics = 12;
}

message SourceProgress {
string description = 1;
string start_offset = 2;
string end_offset = 3;
string latest_offset = 4;
int64 num_input_rows = 5;
double input_rows_per_second = 6;
double processed_rows_per_second = 7;
map<string, string> metrics = 8;
}

message SinkProgress {
string description = 1;
int64 num_output_rows = 2;
map<string, string> metrics = 3;
}

message StreamingQueryProgress {
string id = 1;
string run_id = 2;
string name = 3;
string timestamp = 4;
int64 batch_id = 5;
int64 batch_duration = 6;
map<string, int64> duration_ms = 7;
map<string, string> event_time = 8;
repeated StateOperatorProgress state_operators = 9;
repeated SourceProgress sources = 10;
SinkProgress sink = 11;
map<string, string> observed_metrics = 12;
}

message StreamingQueryProgressWrapper {
StreamingQueryProgress progress = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer
org.apache.spark.status.protobuf.sql.SparkPlanGraphWrapperSerializer
org.apache.spark.status.protobuf.sql.StreamingQueryDataSerializer
org.apache.spark.status.protobuf.sql.StreamingQueryProgressWrapperSerializer
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
* Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
*/
@Evolving
class StateOperatorProgress private[sql](
class StateOperatorProgress private[spark](
val operatorName: String,
val numRowsTotal: Long,
val numRowsUpdated: Long,
Expand Down Expand Up @@ -125,7 +125,7 @@ class StateOperatorProgress private[sql](
* @since 2.1.0
*/
@Evolving
class StreamingQueryProgress private[sql](
class StreamingQueryProgress private[spark](
val id: UUID,
val runId: UUID,
val name: String,
Expand Down Expand Up @@ -190,7 +190,7 @@ class StreamingQueryProgress private[sql](
* @since 2.1.0
*/
@Evolving
class SourceProgress protected[sql](
class SourceProgress protected[spark](
val description: String,
val startOffset: String,
val endOffset: String,
Expand Down Expand Up @@ -236,7 +236,7 @@ class SourceProgress protected[sql](
* @since 2.1.0
*/
@Evolving
class SinkProgress protected[sql](
class SinkProgress protected[spark](
val description: String,
val numOutputRows: Long,
val metrics: ju.Map[String, String] = Map[String, String]().asJava) extends Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ private[sql] case class StreamingQueryUIData(
}
}

private[sql] class StreamingQueryProgressWrapper(val progress: StreamingQueryProgress) {
private[spark] class StreamingQueryProgressWrapper(val progress: StreamingQueryProgress) {
@JsonIgnore @KVIndex
private val uniqueId: String = getUniqueId(progress.runId, progress.batchId, progress.timestamp)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.protobuf.sql

import org.apache.spark.sql.streaming.SinkProgress
import org.apache.spark.status.protobuf.StoreTypes

private[protobuf] object SinkProgressSerializer {

def serialize(sink: SinkProgress): StoreTypes.SinkProgress = {
val builder = StoreTypes.SinkProgress.newBuilder()
builder.setDescription(sink.description)
builder.setNumOutputRows(sink.numOutputRows)
sink.metrics.forEach {
case (k, v) => builder.putMetrics(k, v)
}
builder.build()
}

def deserialize(sink: StoreTypes.SinkProgress): SinkProgress = {
new SinkProgress(
description = sink.getDescription,
numOutputRows = sink.getNumOutputRows,
metrics = sink.getMetricsMap
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.protobuf.sql

import java.util.{List => JList}

import org.apache.spark.sql.streaming.SourceProgress
import org.apache.spark.status.protobuf.StoreTypes

private[protobuf] object SourceProgressSerializer {

def serialize(source: SourceProgress): StoreTypes.SourceProgress = {
val builder = StoreTypes.SourceProgress.newBuilder()
builder.setDescription(source.description)
builder.setStartOffset(source.startOffset)
builder.setEndOffset(source.endOffset)
builder.setLatestOffset(source.latestOffset)
builder.setNumInputRows(source.numInputRows)
builder.setInputRowsPerSecond(source.inputRowsPerSecond)
builder.setProcessedRowsPerSecond(source.processedRowsPerSecond)
source.metrics.forEach {
case (k, v) => builder.putMetrics(k, v)
}
builder.build()
}

def deserializeToArray(sourceList: JList[StoreTypes.SourceProgress]): Array[SourceProgress] = {
val size = sourceList.size()
val result = new Array[SourceProgress](size)
var i = 0
while (i < size) {
result(i) = deserialize(sourceList.get(i))
i += 1
}
result
}

private def deserialize(source: StoreTypes.SourceProgress): SourceProgress = {
new SourceProgress(
description = source.getDescription,
startOffset = source.getStartOffset,
endOffset = source.getEndOffset,
latestOffset = source.getLatestOffset,
numInputRows = source.getNumInputRows,
inputRowsPerSecond = source.getInputRowsPerSecond,
processedRowsPerSecond = source.getProcessedRowsPerSecond,
metrics = source.getMetricsMap
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.protobuf.sql

import java.util.{List => JList}

import org.apache.spark.sql.streaming.StateOperatorProgress
import org.apache.spark.status.protobuf.StoreTypes

object StateOperatorProgressSerializer {

def serialize(stateOperator: StateOperatorProgress): StoreTypes.StateOperatorProgress = {
val builder = StoreTypes.StateOperatorProgress.newBuilder()
builder.setOperatorName(stateOperator.operatorName)
builder.setNumRowsTotal(stateOperator.numRowsTotal)
builder.setNumRowsUpdated(stateOperator.numRowsUpdated)
builder.setAllUpdatesTimeMs(stateOperator.allUpdatesTimeMs)
builder.setNumRowsRemoved(stateOperator.numRowsRemoved)
builder.setAllRemovalsTimeMs(stateOperator.allRemovalsTimeMs)
builder.setCommitTimeMs(stateOperator.commitTimeMs)
builder.setMemoryUsedBytes(stateOperator.memoryUsedBytes)
builder.setNumRowsDroppedByWatermark(stateOperator.numRowsDroppedByWatermark)
builder.setNumShufflePartitions(stateOperator.numShufflePartitions)
builder.setNumStateStoreInstances(stateOperator.numStateStoreInstances)
stateOperator.customMetrics.forEach {
case (k, v) => builder.putCustomMetrics(k, v)
}
builder.build()
}

def deserializeToArray(
stateOperatorList: JList[StoreTypes.StateOperatorProgress]): Array[StateOperatorProgress] = {
val size = stateOperatorList.size()
val result = new Array[StateOperatorProgress](size)
var i = 0
while (i < size) {
result(i) = deserialize(stateOperatorList.get(i))
i += 1
}
result
}

private def deserialize(
stateOperator: StoreTypes.StateOperatorProgress): StateOperatorProgress = {
new StateOperatorProgress(
operatorName = stateOperator.getOperatorName,
numRowsTotal = stateOperator.getNumRowsTotal,
numRowsUpdated = stateOperator.getNumRowsUpdated,
allUpdatesTimeMs = stateOperator.getAllUpdatesTimeMs,
numRowsRemoved = stateOperator.getNumRowsRemoved,
allRemovalsTimeMs = stateOperator.getAllRemovalsTimeMs,
commitTimeMs = stateOperator.getCommitTimeMs,
memoryUsedBytes = stateOperator.getMemoryUsedBytes,
numRowsDroppedByWatermark = stateOperator.getNumRowsDroppedByWatermark,
numShufflePartitions = stateOperator.getNumShufflePartitions,
numStateStoreInstances = stateOperator.getNumStateStoreInstances,
customMetrics = stateOperator.getCustomMetricsMap
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.protobuf.sql

import java.util.{HashMap => JHashMap, Map => JMap, UUID}

import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.streaming.StreamingQueryProgress
import org.apache.spark.status.protobuf.StoreTypes

private[protobuf] object StreamingQueryProgressSerializer {

private val mapper: JsonMapper = JsonMapper.builder()
.addModule(DefaultScalaModule)
.build()

def serialize(process: StreamingQueryProgress): StoreTypes.StreamingQueryProgress = {
val builder = StoreTypes.StreamingQueryProgress.newBuilder()
builder.setId(process.id.toString)
builder.setRunId(process.runId.toString)
builder.setName(process.name)
builder.setTimestamp(process.timestamp)
builder.setBatchId(process.batchId)
builder.setBatchDuration(process.batchDuration)
process.durationMs.forEach {
case (k, v) => builder.putDurationMs(k, v)
}
process.eventTime.forEach {
case (k, v) => builder.putEventTime(k, v)
}
process.stateOperators.foreach(
s => builder.addStateOperators(StateOperatorProgressSerializer.serialize(s)))
process.sources.foreach(
s => builder.addSources(SourceProgressSerializer.serialize(s))
)
builder.setSink(SinkProgressSerializer.serialize(process.sink))
process.observedMetrics.forEach {
case (k, v) => builder.putObservedMetrics(k, mapper.writeValueAsString(v))
}
builder.build()
}

def deserialize(process: StoreTypes.StreamingQueryProgress): StreamingQueryProgress = {
new StreamingQueryProgress(
id = UUID.fromString(process.getId),
runId = UUID.fromString(process.getRunId),
name = process.getName,
timestamp = process.getTimestamp,
batchId = process.getBatchId,
batchDuration = process.getBatchDuration,
durationMs = process.getDurationMsMap,
eventTime = process.getEventTimeMap,
stateOperators =
StateOperatorProgressSerializer.deserializeToArray(process.getStateOperatorsList),
sources = SourceProgressSerializer.deserializeToArray(process.getSourcesList),
sink = SinkProgressSerializer.deserialize(process.getSink),
observedMetrics = convertToObservedMetrics(process.getObservedMetricsMap)
)
}

private def convertToObservedMetrics(input: JMap[String, String]): JHashMap[String, Row] = {
val observedMetrics = new JHashMap[String, Row](input.size())
val classType = classOf[GenericRowWithSchema]
input.forEach {
case (k, v) =>
observedMetrics.put(k, mapper.readValue(v, classType))
}
observedMetrics
}
}

0 comments on commit 915e9c6

Please sign in to comment.