-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[Improve](variant) Add streaming compaction writer for NestedGroup #61383
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,7 @@ | |
| #include <mutex> | ||
| #include <optional> | ||
| #include <ostream> | ||
| #include <ranges> | ||
| #include <set> | ||
| #include <stack> | ||
| #include <string> | ||
|
|
@@ -379,6 +380,11 @@ void get_column_by_type(const DataTypePtr& data_type, const std::string& name, T | |
| column.add_sub_column(child); | ||
| return; | ||
| } | ||
| if (data_type->get_primitive_type() == PrimitiveType::TYPE_VARIANT) { | ||
| column.set_variant_max_subcolumns_count(assert_cast<const DataTypeVariant*>(data_type.get()) | ||
| ->variant_max_subcolumns_count()); | ||
| return; | ||
| } | ||
| // size is not fixed when type is string or json | ||
| if (is_string_type(data_type->get_primitive_type()) || | ||
| data_type->get_primitive_type() == TYPE_JSONB) { | ||
|
|
@@ -806,7 +812,9 @@ Status VariantCompactionUtil::aggregate_path_to_stats( | |
| if (!column->is_variant_type() || column->unique_id() < 0) { | ||
| continue; | ||
| } | ||
|
|
||
| if (!should_check_variant_path_stats(*column)) { | ||
| continue; | ||
| } | ||
| for (const auto& segment : segment_cache.get_segments()) { | ||
| std::shared_ptr<ColumnReader> column_reader; | ||
| OlapReaderStatistics stats; | ||
|
|
@@ -848,6 +856,10 @@ Status VariantCompactionUtil::aggregate_variant_extended_info( | |
| if (!column->is_variant_type()) { | ||
| continue; | ||
| } | ||
| if (column->variant_enable_nested_group()) { | ||
| (*uid_to_variant_extended_info)[column->unique_id()].has_nested_group = true; | ||
| continue; | ||
| } | ||
| for (const auto& segment : segment_cache.get_segments()) { | ||
| std::shared_ptr<ColumnReader> column_reader; | ||
| OlapReaderStatistics stats; | ||
|
|
@@ -889,11 +901,6 @@ Status VariantCompactionUtil::aggregate_variant_extended_info( | |
| // 4. extract nested paths | ||
| auto& nested_paths = (*uid_to_variant_extended_info)[column->unique_id()].nested_paths; | ||
| variant_column_reader->get_nested_paths(&nested_paths); | ||
|
|
||
| // 5. check if has nested group from stats | ||
| if (source_stats->has_nested_group) { | ||
| (*uid_to_variant_extended_info)[column->unique_id()].has_nested_group = true; | ||
| } | ||
| } | ||
| } | ||
| return Status::OK(); | ||
|
|
@@ -936,6 +943,13 @@ Status VariantCompactionUtil::check_path_stats(const std::vector<RowsetSharedPtr | |
| if (output->tablet_schema()->num_variant_columns() == 0) { | ||
| return Status::OK(); | ||
| } | ||
| for (const auto& rowset : intputs) { | ||
| for (const auto& column : rowset->tablet_schema()->columns()) { | ||
| if (column->is_variant_type() && !should_check_variant_path_stats(*column)) { | ||
| return Status::OK(); | ||
| } | ||
| } | ||
| } | ||
| // check no extended schema in input rowsets | ||
| for (const auto& rowset : intputs) { | ||
| for (const auto& column : rowset->tablet_schema()->columns()) { | ||
|
|
@@ -969,6 +983,16 @@ Status VariantCompactionUtil::check_path_stats(const std::vector<RowsetSharedPtr | |
| return Status::OK(); | ||
| } | ||
| } | ||
| for (const auto& column : output->tablet_schema()->columns()) { | ||
| if (column->is_variant_type() && !should_check_variant_path_stats(*column)) { | ||
| return Status::OK(); | ||
| } | ||
| } | ||
| for (const auto& column : output->tablet_schema()->columns()) { | ||
| if (!column->is_variant_type()) { | ||
| continue; | ||
| } | ||
| } | ||
|
Comment on lines
+991
to
+995
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Bug: Dead code / empty loop body] This loop iterates over columns, skips non-variant types via If the intent was to further filter variant columns (e.g., skip columns matching some condition before proceeding to the stats aggregation below), the filtering logic is missing. If the intent was just the early-return in the loop above (lines 986-990), then this loop should be removed entirely. // This loop does nothing — remove it or add the intended logic
for (const auto& column : output->tablet_schema()->columns()) {
if (!column->is_variant_type()) {
continue;
}
// <-- empty body for variant columns
} |
||
| std::unordered_map<int32_t, PathToNoneNullValues> original_uid_to_path_stats; | ||
| for (const auto& rs : intputs) { | ||
| RETURN_IF_ERROR(aggregate_path_to_stats(rs, &original_uid_to_path_stats)); | ||
|
|
@@ -1188,18 +1212,14 @@ void VariantCompactionUtil::get_compaction_subcolumns_from_data_types( | |
| Status VariantCompactionUtil::get_extended_compaction_schema( | ||
| const std::vector<RowsetSharedPtr>& rowsets, TabletSchemaSPtr& target) { | ||
| std::unordered_map<int32_t, VariantExtendedInfo> uid_to_variant_extended_info; | ||
| // collect path stats from all rowsets and segments | ||
| for (const auto& rs : rowsets) { | ||
| RETURN_IF_ERROR(aggregate_variant_extended_info(rs, &uid_to_variant_extended_info)); | ||
| } | ||
|
|
||
| // If any variant column has nested group, skip extended schema and use normal compaction. | ||
| // Nested groups require special handling that is not yet supported in extended schema compaction. | ||
| for (const auto& [uid, info] : uid_to_variant_extended_info) { | ||
| if (info.has_nested_group) { | ||
| LOG(INFO) << "Variant column uid=" << uid | ||
| << " has nested group, skip extended schema compaction"; | ||
| return Status::OK(); | ||
| const bool has_extendable_variant = | ||
| std::ranges::any_of(target->columns(), [](const TabletColumnPtr& column) { | ||
| return column->is_variant_type() && should_check_variant_path_stats(*column); | ||
| }); | ||
| if (has_extendable_variant) { | ||
| // collect path stats from all rowsets and segments | ||
| for (const auto& rs : rowsets) { | ||
| RETURN_IF_ERROR(aggregate_variant_extended_info(rs, &uid_to_variant_extended_info)); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1216,6 +1236,22 @@ Status VariantCompactionUtil::get_extended_compaction_schema( | |
| } | ||
| VLOG_DEBUG << "column " << column->name() << " unique id " << column->unique_id(); | ||
|
|
||
| const auto info_it = uid_to_variant_extended_info.find(column->unique_id()); | ||
| const VariantExtendedInfo empty_extended_info; | ||
| const VariantExtendedInfo& extended_info = info_it == uid_to_variant_extended_info.end() | ||
| ? empty_extended_info | ||
| : info_it->second; | ||
| if (!should_check_variant_path_stats(*column)) { | ||
| VLOG_DEBUG << "skip extended schema compaction for variant uid=" << column->unique_id() | ||
| << " because the column disables variant path stats"; | ||
| continue; | ||
| } | ||
| if (extended_info.has_nested_group) { | ||
| LOG(INFO) << "Variant column uid=" << column->unique_id() | ||
| << " has nested group, keep original column in compaction schema"; | ||
| continue; | ||
| } | ||
|
|
||
| if (column->variant_enable_doc_mode()) { | ||
| const int bucket_num = std::max(1, column->variant_doc_hash_shard_count()); | ||
| for (int b = 0; b < bucket_num; ++b) { | ||
|
|
@@ -1228,34 +1264,30 @@ Status VariantCompactionUtil::get_extended_compaction_schema( | |
| } | ||
|
|
||
| // 1. append typed columns | ||
| RETURN_IF_ERROR(get_compaction_typed_columns( | ||
| target, uid_to_variant_extended_info[column->unique_id()].typed_paths, column, | ||
| output_schema, uid_to_paths_set_info[column->unique_id()])); | ||
| RETURN_IF_ERROR(get_compaction_typed_columns(target, extended_info.typed_paths, column, | ||
| output_schema, | ||
| uid_to_paths_set_info[column->unique_id()])); | ||
| // 2. append nested columns | ||
| RETURN_IF_ERROR(get_compaction_nested_columns( | ||
| uid_to_variant_extended_info[column->unique_id()].nested_paths, | ||
| uid_to_variant_extended_info[column->unique_id()].path_to_data_types, column, | ||
| output_schema, uid_to_paths_set_info[column->unique_id()])); | ||
| extended_info.nested_paths, extended_info.path_to_data_types, column, output_schema, | ||
| uid_to_paths_set_info[column->unique_id()])); | ||
|
|
||
| // 3. get the subpaths | ||
| get_subpaths(column->variant_max_subcolumns_count(), | ||
| uid_to_variant_extended_info[column->unique_id()].path_to_none_null_values, | ||
| get_subpaths(column->variant_max_subcolumns_count(), extended_info.path_to_none_null_values, | ||
| uid_to_paths_set_info[column->unique_id()]); | ||
|
|
||
| // 4. append subcolumns | ||
| if (column->variant_max_subcolumns_count() > 0 || !column->get_sub_columns().empty()) { | ||
| get_compaction_subcolumns_from_subpaths( | ||
| uid_to_paths_set_info[column->unique_id()], column, target, | ||
| uid_to_variant_extended_info[column->unique_id()].path_to_data_types, | ||
| uid_to_variant_extended_info[column->unique_id()].sparse_paths, output_schema); | ||
| extended_info.path_to_data_types, extended_info.sparse_paths, output_schema); | ||
| } | ||
| // variant_max_subcolumns_count == 0 and no typed paths materialized | ||
| // it means that all subcolumns are materialized, may be from old data | ||
| else { | ||
| get_compaction_subcolumns_from_data_types( | ||
| uid_to_paths_set_info[column->unique_id()], column, target, | ||
| uid_to_variant_extended_info[column->unique_id()].path_to_data_types, | ||
| output_schema); | ||
| extended_info.path_to_data_types, output_schema); | ||
| } | ||
|
|
||
| // append sparse column(s) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,165 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| #pragma once | ||
|
|
||
| #include <parallel_hashmap/phmap.h> | ||
|
|
||
| #include <cstddef> | ||
| #include <cstdint> | ||
| #include <memory> | ||
| #include <string> | ||
| #include <string_view> | ||
| #include <unordered_map> | ||
| #include <vector> | ||
|
|
||
| #include "common/status.h" | ||
| #include "core/column/column.h" | ||
| #include "core/column/column_variant.h" | ||
| #include "storage/segment/variant/nested_group_path.h" | ||
| #include "util/json/path_in_data.h" | ||
|
|
||
| namespace doris { | ||
| struct JsonbValue; | ||
| } // namespace doris | ||
|
|
||
| namespace doris::segment_v2 { | ||
|
|
||
| /** | ||
| * English comment: NestedGroup is a storage-layer structure used to persist array<object> | ||
| * with shared offsets to preserve per-element field associations. | ||
| * | ||
| * This is intentionally independent from ColumnVariant's in-memory nested structures. | ||
| */ | ||
| struct NestedGroup { | ||
| // Full array path for top-level group (e.g. "voltage.list"), | ||
| // and relative path for nested groups within another NestedGroup (e.g. "cells"). | ||
| PathInData path; | ||
|
|
||
| // Offsets per parent row (or per parent element for nested groups). | ||
| MutableColumnPtr offsets; | ||
|
|
||
| // Scalar (or flattened object) children under this array path. | ||
| phmap::flat_hash_map<PathInData, ColumnVariant::Subcolumn, PathInData::Hash> children; | ||
| // Sparse row positions for each child subcolumn value in flattened element space. | ||
| // When present for a child path, children[path] stores only non-missing values and | ||
| // child_rowids[path][i] is the logical row index for children[path][i]. | ||
| phmap::flat_hash_map<PathInData, std::vector<uint32_t>, PathInData::Hash> child_rowids; | ||
|
|
||
| // Nested array<object> groups under this array path. | ||
| phmap::flat_hash_map<PathInData, std::shared_ptr<NestedGroup>, PathInData::Hash> nested_groups; | ||
|
|
||
| size_t current_flat_size = 0; | ||
| bool is_disabled = false; | ||
|
|
||
| enum struct StructureType { UNKNOWN, SCALAR, ARRAY, OBJECT }; | ||
| StructureType expected_type = StructureType::UNKNOWN; | ||
|
|
||
| void ensure_offsets(); | ||
| }; | ||
|
|
||
| using NestedGroupsMap = | ||
| phmap::flat_hash_map<PathInData, std::shared_ptr<NestedGroup>, PathInData::Hash>; | ||
|
|
||
| // NestedGroup marker/path constants are defined in nested_group_path.h | ||
|
|
||
| /** | ||
| * English comment: Build NestedGroup(s) from JSONB columns at storage finalize stage. | ||
| * The builder scans JSONB values and only expands array<object>. | ||
| */ | ||
| class NestedGroupBuilder { | ||
| public: | ||
| NestedGroupBuilder() = default; | ||
|
|
||
| // Build NestedGroups from a JSONB column. base_path is the path of this JSONB column | ||
| // in ColumnVariant (empty for root JSONB). | ||
| Status build_from_jsonb(const ColumnPtr& jsonb_column, const PathInData& base_path, | ||
| NestedGroupsMap& nested_groups, size_t num_rows); | ||
|
|
||
| // Convenience overload for root JSONB. | ||
| Status build_from_jsonb(const ColumnPtr& jsonb_column, NestedGroupsMap& nested_groups, | ||
| size_t num_rows) { | ||
| return build_from_jsonb(jsonb_column, PathInData {}, nested_groups, num_rows); | ||
| } | ||
|
|
||
| // Collect paths that have ARRAY<OBJECT> vs non-array structural conflicts. | ||
| // Returned paths are de-duplicated and sorted. | ||
| void collect_conflict_paths(std::vector<std::string>* out_paths) const; | ||
|
|
||
| void set_max_depth(size_t max_depth) { _max_depth = max_depth; } | ||
|
|
||
| private: | ||
| using AppendedPathCache = | ||
| phmap::flat_hash_map<std::string, PathInData, phmap::priv::StringHashEqT<char>::Hash, | ||
| phmap::priv::StringHashEqT<char>::Eq>; | ||
|
|
||
| enum class PathShape { ARRAY_OBJECT, NON_ARRAY }; | ||
|
|
||
| struct PathShapeState { | ||
| bool has_array_object = false; | ||
| bool has_non_array = false; | ||
| }; | ||
|
|
||
| const PathInData& _normalize_group_path(const PathInData& path) const; | ||
| void _record_path_shape(const PathInData& path, PathShape shape); | ||
| PathInData _append_path_cached(const PathInData& base, std::string_view suffix); | ||
|
|
||
| Status _process_jsonb_value(const doris::JsonbValue* value, const PathInData& current_path, | ||
| NestedGroupsMap& nested_groups, size_t row_idx, size_t depth); | ||
|
|
||
| Status _process_object_as_paths(const doris::JsonbValue* obj_value, | ||
| const PathInData& current_prefix, NestedGroup& group, | ||
| size_t element_flat_idx, size_t depth, | ||
| const PathInData& group_absolute_path); | ||
|
|
||
| Status _process_array_of_objects(const doris::JsonbValue* arr_value, NestedGroup& group, | ||
| size_t parent_row_idx, size_t depth, | ||
| const PathInData& group_absolute_path); | ||
| Status _finalize_group(NestedGroup& group); | ||
|
|
||
| // Process nested object field by recursively flattening into dotted paths. | ||
| Status _process_object_field(const doris::JsonbValue* obj_value, const PathInData& next_prefix, | ||
| NestedGroup& group, size_t element_flat_idx, size_t depth, | ||
| const PathInData& group_absolute_path); | ||
|
|
||
| // Process nested array<object> field within a NestedGroup. | ||
| Status _process_nested_array_field(const doris::JsonbValue* arr_value, | ||
| const PathInData& next_prefix, NestedGroup& group, | ||
| size_t element_flat_idx, size_t depth, | ||
| const PathInData& group_absolute_path); | ||
|
|
||
| // Process scalar field and insert into subcolumn. | ||
| Status _process_scalar_field(const doris::JsonbValue* value, const PathInData& next_prefix, | ||
| NestedGroup& group, size_t element_flat_idx); | ||
|
|
||
| // Return true if this array can be treated as array<object> (nulls allowed). | ||
| bool _is_array_of_objects(const doris::JsonbValue* arr_value) const; | ||
|
|
||
| // Convert a JsonbValue to a scalar Field (or NULL Field). Container types are not supported. | ||
| Status _jsonb_to_field(const doris::JsonbValue* value, Field& out) const; | ||
|
|
||
| // Conflict policy placeholder. Returns true if the current value should be discarded. | ||
| bool _handle_conflict(NestedGroup& group, bool is_array_object) const; | ||
|
|
||
| private: | ||
| size_t _max_depth = 0; // 0 = unlimited | ||
| phmap::flat_hash_map<PathInData, PathShapeState, PathInData::Hash> _path_shape_states; | ||
| phmap::flat_hash_set<std::string> _conflict_paths; | ||
| phmap::flat_hash_map<PathInData, AppendedPathCache, PathInData::Hash> _appended_path_cache; | ||
| }; | ||
|
|
||
| } // namespace doris::segment_v2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Correctness concern: early-return skips all variant columns]
This loop returns
OKas soon as it finds any variant column whereshould_check_variant_path_stats()returns false (i.e., any column with nested groups enabled). This means if the schema has two variant columns — one with nested groups and one without — the path stats check for the non-nested-group variant is skipped entirely.Was the intent to skip only the nested-group variant columns from path stats checking, while still checking the others? If so, this should collect which variant UIDs to skip rather than returning early for all columns.
Similarly, the equivalent check at lines 943-951 in the same function has the same early-return-for-all semantics from input rowset columns.