[Improve](variant) Add streaming compaction writer for NestedGroup#61383
[Improve](variant) Add streaming compaction writer for NestedGroup#61383eldenmoon wants to merge 1 commit intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
Build a NestedGroup streaming write plan from input rowset metadata and use it to initialize root, regular subcolumn and nested-group writers during compaction, so Variant data can be written chunk by chunk instead of being fully materialized first. Refactor shared subcolumn writer helpers, allow the NestedGroup provider to consume prebuilt groups or a streaming plan, and skip sparse/doc path-stat collection for nested-group variant columns. Add tests for streaming plan construction and array subcolumn writes across multiple batches.
87b5358 to
b2f9151
Compare
|
run buildall |
There was a problem hiding this comment.
Pull request overview
Adds a streaming compaction write path for VARIANT columns when NestedGroup is enabled, building a streaming plan from input rowset metadata so compaction can write chunk-by-chunk (avoiding full VARIANT materialization). Also refactors shared subcolumn-writer helpers, extends the NestedGroup provider interface to accept prebuilt groups or a streaming plan, and disables sparse/doc path-stat handling for NestedGroup VARIANT columns.
Changes:
- Introduce
VariantStreamingCompactionWriterandNestedGroupStreamingWritePlanto drive chunked compaction writes for NestedGroup-enabled VARIANT. - Refactor common VARIANT subcolumn writer logic into
variant_writer_helpersand update legacy writers to use it. - Update NestedGroup routing/provider/path utilities and add new unit tests for streaming plan construction and multi-batch array subcolumn writes.
Reviewed changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| be/test/storage/segment/variant_column_writer_reader_test.cpp | Adds compaction streaming tests + helpers for creating rowsets/readers and validating streamed writes. |
| be/test/storage/segment/nested_group_provider_test.cpp | Updates default provider test to match the new provider API / error behavior. |
| be/test/storage/segment/nested_group_path_test.cpp | Adds prefix/overlap unit tests for NestedGroup path utilities. |
| be/test/storage/segment/nested_group_path_filter_test.cpp | Adds descendant match test coverage for path filtering. |
| be/src/storage/segment/variant/variant_writer_helpers.h | New shared helpers for converting/writing columns and preparing subcolumn writers. |
| be/src/storage/segment/variant/variant_streaming_compaction_writer.h | Declares the streaming compaction writer and its lifecycle. |
| be/src/storage/segment/variant/variant_streaming_compaction_writer.cpp | Implements streaming chunk append + root/subcolumn writes + provider integration. |
| be/src/storage/segment/variant/variant_column_writer_impl.h | Hooks streaming writer into the existing VARIANT writer implementation. |
| be/src/storage/segment/variant/variant_column_writer_impl.cpp | Refactors finalize path, adds prebuilt-groups option, and uses shared writer helpers. |
| be/src/storage/segment/variant/variant_column_reader.h | Adjusts includes / iterator surface for VARIANT reader. |
| be/src/storage/segment/variant/variant_column_reader.cpp | Skips merging sparse/doc stats when NestedGroup disables path stats; adjusts NG index-path handling. |
| be/src/storage/segment/variant/nested_group_streaming_write_plan.h | New data structures for streaming write planning. |
| be/src/storage/segment/variant/nested_group_streaming_write_plan.cpp | Builds streaming plan from input rowset segment metadata, including conflict detection. |
| be/src/storage/segment/variant/nested_group_routing_plan.h | Exposes shared conflict formatting/validation and candidate-based routing build. |
| be/src/storage/segment/variant/nested_group_routing_plan.cpp | Reuses shared prefix/overlap helpers and shared conflict validation. |
| be/src/storage/segment/variant/nested_group_provider.h | Extends provider API for prebuilt groups + streaming plan and adjusts path filter logic. |
| be/src/storage/segment/variant/nested_group_provider.cpp | Implements default provider arg validation, real nested-group path matching, and no-op builder stub. |
| be/src/storage/segment/variant/nested_group_path.h | Adds prefix/overlap helpers used across routing/planning/filtering. |
| be/src/storage/segment/variant/nested_group_builder.h | Adds NestedGroup builder interface/data structures for building NGs from JSONB. |
| be/src/storage/segment/column_writer.h | Adds test accessor to reach VariantColumnWriterImpl. |
| be/src/exec/common/variant_util.h | Adds helpers to disable path-stats/binary-column behavior when NestedGroup is enabled. |
| be/src/exec/common/variant_util.cpp | Updates compaction stats/schema logic to skip work for NestedGroup-enabled VARIANT columns. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Status read_by_rowids(const rowid_t* rowids, const size_t count, | ||
| MutableColumnPtr& dst) override; | ||
|
|
||
| ordinal_t get_current_ordinal() const override { return _inner_iter->get_current_ordinal(); } | ||
|
|
||
| Status read_null_map(size_t* n, NullMap& null_map) override { | ||
| return _inner_iter->read_null_map(n, null_map); | ||
| } | ||
|
|
||
| Status init_prefetcher(const SegmentPrefetchParams& params) override; | ||
| void collect_prefetchers( | ||
| std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers, |
| for (const auto& column : output->tablet_schema()->columns()) { | ||
| if (!column->is_variant_type()) { | ||
| continue; | ||
| } | ||
| } |
|
|
||
| VariantColumnWriterImpl* impl_for_test() const { return _impl.get(); } |
|
/review |
There was a problem hiding this comment.
Code Review Summary
This PR adds a streaming compaction writer for NestedGroup variant columns, enabling chunk-by-chunk writing during compaction instead of full materialization. The architecture is clean and well-structured overall. Below are the critical checkpoint conclusions and findings.
Critical Checkpoint Analysis (Part 1.3)
Goal & Correctness: The PR accomplishes its stated goal of enabling streaming writes for NestedGroup variant columns during compaction. The VariantStreamingCompactionWriter correctly orchestrates root column, regular subcolumn, and nested group writes per chunk. An end-to-end test (test_streaming_compaction_writer_streams_regular_array_paths_across_batches) verifies the round-trip. However, there is a dead code issue in variant_util.cpp (see inline comment).
Modification Focus: The change is well-scoped to the variant/nested-group write path with appropriate refactoring of shared helpers into variant_writer_helpers.h.
Concurrency: No new concurrency is introduced. The streaming writer operates single-threaded within the compaction write path. The existing concurrency test (test_concurrent_load_external_meta_and_get_subcolumn_meta) covers the read path. No issues found.
Lifecycle Management: VariantStreamingCompactionWriter has a clear phase-based lifecycle (UNINITIALIZED -> INITIALIZED -> APPENDING -> CLOSED) with proper state checks. The raw pointer to _nested_group_provider and _tablet_column is safe because the owner (VariantColumnWriterImpl) outlives the streaming writer. No circular references.
Configuration: No new configuration items added. The streaming path is activated when variant_enable_nested_group() is true and it's a compaction write with input readers present.
Incompatible Changes: No storage format changes; the output format is identical to the legacy path.
Parallel Code Paths: The streaming writer correctly mirrors all operations from the legacy VariantColumnWriterImpl path (finish, write_data, write_ordinal_index, write_zone_map, write_inverted_index, write_bloom_filter_index). All delegation points in VariantColumnWriterImpl check for _streaming_compaction_writer != nullptr consistently.
Test Coverage: Good coverage with the new end-to-end streaming test and unit tests for the write plan construction. Negative cases for error paths are tested via phase checks. The find_in_nested_groups implementation has unit tests covering empty readers, but not the full search logic (acceptable since real NestedGroupReaders require EE wiring).
Observability: No new metrics or logging added. The streaming path reuses existing variant statistics infrastructure. For a first iteration this is acceptable, though a VLOG for streaming plan selection would help debugging.
Performance: The streaming approach avoids materializing all data at once, which is the core performance improvement. insert_range_from + finalize() per chunk in _append_input creates a temporary copy; this is acceptable for correctness but could be optimized later if chunks are already finalized. No redundant operations found in hot paths.
Other Issues: See inline comments for a dead code loop in variant_util.cpp and a minor observation about early-return semantics in check_path_stats.
| if (!column->is_variant_type()) { | ||
| continue; | ||
| } | ||
| } |
There was a problem hiding this comment.
[Bug: Dead code / empty loop body]
This loop iterates over columns, skips non-variant types via continue, but then does nothing for variant-type columns — the loop body is entirely empty. This appears to be leftover code from an incomplete refactoring.
If the intent was to further filter variant columns (e.g., skip columns matching some condition before proceeding to the stats aggregation below), the filtering logic is missing. If the intent was just the early-return in the loop above (lines 986-990), then this loop should be removed entirely.
// This loop does nothing — remove it or add the intended logic
for (const auto& column : output->tablet_schema()->columns()) {
if (!column->is_variant_type()) {
continue;
}
// <-- empty body for variant columns
}| for (const auto& column : output->tablet_schema()->columns()) { | ||
| if (column->is_variant_type() && !should_check_variant_path_stats(*column)) { | ||
| return Status::OK(); | ||
| } |
There was a problem hiding this comment.
[Correctness concern: early-return skips all variant columns]
This loop returns OK as soon as it finds any variant column where should_check_variant_path_stats() returns false (i.e., any column with nested groups enabled). This means if the schema has two variant columns — one with nested groups and one without — the path stats check for the non-nested-group variant is skipped entirely.
Was the intent to skip only the nested-group variant columns from path stats checking, while still checking the others? If so, this should collect which variant UIDs to skip rather than returning early for all columns.
Similarly, the equivalent check at lines 943-951 in the same function has the same early-return-for-all semantics from input rowset columns.
TPC-H: Total hot run time: 26818 ms |
TPC-H: Total hot run time: 27146 ms |
TPC-DS: Total hot run time: 167998 ms |
TPC-DS: Total hot run time: 167825 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
Build a NestedGroup streaming write plan from input rowset metadata and use it to initialize root, regular subcolumn and nested-group writers during compaction, so Variant data can be written chunk by chunk instead of being fully materialized first.
Refactor shared subcolumn writer helpers, allow the NestedGroup provider to consume prebuilt groups or a streaming plan, and skip sparse/doc path-stat collection for nested-group variant columns.
Add tests for streaming plan construction and array subcolumn writes across multiple batches.