Skip to content

[SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for StreamingQueryProgressWrapper#39642

Closed
LuciferYang wants to merge 19 commits intoapache:masterfrom
LuciferYang:SPARK-41677-2
Closed

[SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for StreamingQueryProgressWrapper#39642
LuciferYang wants to merge 19 commits intoapache:masterfrom
LuciferYang:SPARK-41677-2

Conversation

@LuciferYang
Copy link
Contributor

@LuciferYang LuciferYang commented Jan 18, 2023

What changes were proposed in this pull request?

Add Protobuf serializer for StreamingQueryProgressWrapper

Why are the changes needed?

Support fast and compact serialization/deserialization for StreamingQueryProgressWrapper over RocksDB.

Does this PR introduce any user-facing change?

No

How was this patch tested?

  • Add new UT
  • Manual test sql module with LIVE_UI_LOCAL_STORE_DIR, all test passed:
build/mvn clean install -DskipTests -pl sq/core -am
export LIVE_UI_LOCAL_STORE_DIR=/tmp/spark-ui
build/mvn clean install -pl sql/core

There 4 existing test suites classes use StreamingQueryProgressWrapper:

  • StateStoreCoordinatorSuite
  • StreamingQueryStatusListenerWithDiskStoreSuite
  • UISeleniumSuite
  • UISeleniumWithRocksDBBackendSuite

@LuciferYang LuciferYang marked this pull request as draft January 18, 2023 12:17
@LuciferYang LuciferYang changed the title [SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for StreamingQueryProgressWrapper [WIP][SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for StreamingQueryProgressWrapper Jan 18, 2023
val durationMs: ju.Map[String, JLong],
// If `eventTime` is no longer read-only, need change
// `StreamingQueryProgressSerializer#deserialize` to return mutable Java Map
val eventTime: ju.Map[String, String],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 6 ju.Map, durationMs and observedMetrics already use mutable Java Map, the remaining 4 Java Map should be read-only at present, but for safety, we can also directly return mutable Java Map in the deserialize method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For safety, 6 ju.Map are assigned as mutable Map after 8e89213

@LuciferYang LuciferYang marked this pull request as ready for review January 20, 2023 02:07
@LuciferYang LuciferYang changed the title [WIP][SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for StreamingQueryProgressWrapper [SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for StreamingQueryProgressWrapper Jan 20, 2023
@LuciferYang LuciferYang marked this pull request as draft January 20, 2023 05:02
@LuciferYang
Copy link
Contributor Author

Will refactor after #39666 merged

@LuciferYang
Copy link
Contributor Author

rebased

@LuciferYang LuciferYang marked this pull request as ready for review January 22, 2023 03:53
@LuciferYang
Copy link
Contributor Author

cc @gengliangwang

Does this one have a chance to Spark 3.4.0 ? Or wait for the next version?

int64 num_rows_dropped_by_watermark = 9;
int64 num_shuffle_partitions = 10;
int64 num_state_store_instances = 11;
map<string, int64> custom_metrics = 12;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned about the nullability of all these maps. Shall we check/test all of them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a904a27 check/test all map

699ebd1 add setJMapField function to Utils

}

test("StreamingQueryProgressWrapper") {
val normalInput = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two objects are manually created due to many fields can be null

assert(progress.batchId == resultProcess.batchId)
assert(progress.batchDuration == resultProcess.batchDuration)
if (hasNullValue) {
assert(resultProcess.durationMs.isEmpty)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null map input return empty map

@github-actions github-actions bot added the BUILD label Jan 23, 2023
@github-actions github-actions bot removed the BUILD label Jan 23, 2023
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use setJMapField?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use setJMapField need to define an additional function or use an anonymous function due this is not a simple putAll scenario.

Any other better way to write this ? @gengliangwang

)
}

private def putAllObservedMetrics(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to define a class level private function

builder: StoreTypes.StreamingQueryProgress.Builder,
observedMetrics: JMap[String, Row]): Unit = {
observedMetrics.forEach {
case (k, v) => builder.putObservedMetrics(k, mapper.writeValueAsString(v))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's add one line comment to mention why we choose to encode the row object with json.

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for working on this during holiday!

@LuciferYang
Copy link
Contributor Author

done. Thank you for reviewing the code during the holiday :) @gengliangwang

@LuciferYang
Copy link
Contributor Author

GA passed

gengliangwang pushed a commit that referenced this pull request Jan 25, 2023
…ngQueryProgressWrapper`

### What changes were proposed in this pull request?
Add Protobuf serializer for `StreamingQueryProgressWrapper `

### Why are the changes needed?
Support fast and compact serialization/deserialization for `StreamingQueryProgressWrapper` over RocksDB.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Add new UT
- Manual test sql module with `LIVE_UI_LOCAL_STORE_DIR`, all test passed:

```
build/mvn clean install -DskipTests -pl sq/core -am
export LIVE_UI_LOCAL_STORE_DIR=/tmp/spark-ui
build/mvn clean install -pl sql/core
```

There 4 existing test suites classes use `StreamingQueryProgressWrapper`:
 - StateStoreCoordinatorSuite
 - StreamingQueryStatusListenerWithDiskStoreSuite
 - UISeleniumSuite
 - UISeleniumWithRocksDBBackendSuite

Closes #39642 from LuciferYang/SPARK-41677-2.

Lead-authored-by: yangjie01 <yangjie01@baidu.com>
Co-authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 7b93415)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
@gengliangwang
Copy link
Member

Thanks, merged to master & branch-3.4

@LuciferYang
Copy link
Contributor Author

Thanks @gengliangwang

TarunParimi pushed a commit to TarunParimi/spark that referenced this pull request Jun 5, 2023
…ngQueryProgressWrapper`

Add Protobuf serializer for `StreamingQueryProgressWrapper `

Support fast and compact serialization/deserialization for `StreamingQueryProgressWrapper` over RocksDB.

No

- Add new UT
- Manual test sql module with `LIVE_UI_LOCAL_STORE_DIR`, all test passed:

```
build/mvn clean install -DskipTests -pl sq/core -am
export LIVE_UI_LOCAL_STORE_DIR=/tmp/spark-ui
build/mvn clean install -pl sql/core
```

There 4 existing test suites classes use `StreamingQueryProgressWrapper`:
 - StateStoreCoordinatorSuite
 - StreamingQueryStatusListenerWithDiskStoreSuite
 - UISeleniumSuite
 - UISeleniumWithRocksDBBackendSuite

Closes apache#39642 from LuciferYang/SPARK-41677-2.

Lead-authored-by: yangjie01 <yangjie01@baidu.com>
Co-authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 7b93415)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
TarunParimi pushed a commit to TarunParimi/spark that referenced this pull request Jun 5, 2023
…ngQueryProgressWrapper`

Add Protobuf serializer for `StreamingQueryProgressWrapper `

Support fast and compact serialization/deserialization for `StreamingQueryProgressWrapper` over RocksDB.

No

- Add new UT
- Manual test sql module with `LIVE_UI_LOCAL_STORE_DIR`, all test passed:

```
build/mvn clean install -DskipTests -pl sq/core -am
export LIVE_UI_LOCAL_STORE_DIR=/tmp/spark-ui
build/mvn clean install -pl sql/core
```

There 4 existing test suites classes use `StreamingQueryProgressWrapper`:
 - StateStoreCoordinatorSuite
 - StreamingQueryStatusListenerWithDiskStoreSuite
 - UISeleniumSuite
 - UISeleniumWithRocksDBBackendSuite

Closes apache#39642 from LuciferYang/SPARK-41677-2.

Lead-authored-by: yangjie01 <yangjie01@baidu.com>
Co-authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 7b93415)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments