From 6dbbb5435b07aa2608ed5ee184435c4296b756e2 Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 4 Jun 2026 01:40:01 +0800 Subject: [PATCH 1/8] [feature](be) Support nested parquet struct pruning ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: Support file-layer pruning for primitive leaf predicates under Parquet STRUCT columns in the new parquet reader. The change keeps row-level filtering on Expr/VExprContext, adds file-local nested predicate targets, merges filter-only nested projections, and resolves nested primitive leaves for statistics, dictionary, bloom, and page-index pruning. ### Release note None ### Check List (For Author) - Test: Unit Test / Manual test - Ran build-support/clang-format.sh for modified BE files. - Ran git diff --check. - Local BE UT did not start because macOS clang16 failed CMake compiler probe with ld: library c++ not found. Fedora build and UT will be run after push. - Behavior changed: No - Does this need documentation: Yes (updated internal design document) --- .../format/new_parquet/parquet_statistics.cpp | 77 ++- .../format/new_parquet/parquet_statistics.h | 4 + be/src/format/reader/column_mapper.cpp | 588 +++++++++++++++++- be/src/format/reader/column_mapper.h | 6 +- be/src/format/reader/file_reader.h | 3 + .../new_parquet/parquet_reader_test.cpp | 186 ++++++ ...ex-column-predicate-and-stats-filtering.md | 339 ++++++---- 7 files changed, 1032 insertions(+), 171 deletions(-) diff --git a/be/src/format/new_parquet/parquet_statistics.cpp b/be/src/format/new_parquet/parquet_statistics.cpp index 0130a8bbafffc5..f5821d0640c76a 100644 --- a/be/src/format/new_parquet/parquet_statistics.cpp +++ b/be/src/format/new_parquet/parquet_statistics.cpp @@ -288,11 +288,9 @@ ParquetRowGroupPruneReason BloomFilterPruneReason( if (bloom_filter_cache == nullptr || column_filter.predicates.empty()) { return ParquetRowGroupPruneReason::NONE; } - DCHECK_LT(column_filter.file_column_id, schema.size()); - const auto& column_schema = *schema[column_filter.file_column_id]; - if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE || - column_schema.leaf_column_id < 0 || - !ParquetStatisticsUtils::BloomFilterSupported(column_schema)) { + const auto* column_schema = + ParquetStatisticsUtils::ResolvePredicateLeafSchema(schema, column_filter); + if (column_schema == nullptr || !ParquetStatisticsUtils::BloomFilterSupported(*column_schema)) { return ParquetRowGroupPruneReason::NONE; } for (const auto& column_predicate : column_filter.predicates) { @@ -301,11 +299,11 @@ ParquetRowGroupPruneReason BloomFilterPruneReason( } } auto* bloom_filter = - bloom_filter_cache->get(row_group_idx, column_schema.leaf_column_id, pruning_stats); + bloom_filter_cache->get(row_group_idx, column_schema->leaf_column_id, pruning_stats); if (bloom_filter == nullptr) { return ParquetRowGroupPruneReason::NONE; } - return ParquetStatisticsUtils::BloomFilterExcludes(column_schema, column_filter, *bloom_filter) + return ParquetStatisticsUtils::BloomFilterExcludes(*column_schema, column_filter, *bloom_filter) ? ParquetRowGroupPruneReason::BLOOM_FILTER : ParquetRowGroupPruneReason::NONE; } @@ -486,8 +484,41 @@ segment_v2::ZoneMap to_column_predicate_statistics(const ParquetColumnStatistics return predicate_statistics; } +const ParquetColumnSchema* find_child_schema_by_field_id(const ParquetColumnSchema& column_schema, + int32_t field_id) { + const auto child_it = std::ranges::find_if( + column_schema.children, [&](const std::unique_ptr& child) { + return child != nullptr && child->field_id == field_id; + }); + return child_it == column_schema.children.end() ? nullptr : child_it->get(); +} + } // namespace +const ParquetColumnSchema* ParquetStatisticsUtils::ResolvePredicateLeafSchema( + const std::vector>& schema, + const reader::FileColumnPredicateFilter& column_filter) { + if (column_filter.file_column_id < 0 || + column_filter.file_column_id >= static_cast(schema.size())) { + return nullptr; + } + const ParquetColumnSchema* column_schema = schema[column_filter.file_column_id].get(); + if (column_schema == nullptr) { + return nullptr; + } + for (const auto child_field_id : column_filter.file_child_id_path) { + column_schema = find_child_schema_by_field_id(*column_schema, child_field_id); + if (column_schema == nullptr) { + return nullptr; + } + } + if (column_schema->kind != ParquetColumnSchemaKind::PRIMITIVE || + column_schema->leaf_column_id < 0 || column_schema->max_repetition_level > 0) { + return nullptr; + } + return column_schema; +} + ParquetColumnStatistics ParquetStatisticsUtils::TransformColumnStatistics( const ParquetColumnSchema& column_schema, const std::shared_ptr<::parquet::Statistics>& statistics) { @@ -561,28 +592,26 @@ ParquetRowGroupPruneReason ParquetStatisticsUtils::RowGroupPruneReason( if (column_filter.predicates.empty()) { return ParquetRowGroupPruneReason::NONE; } - DCHECK_LT(column_filter.file_column_id, schema.size()); - const auto& column_schema = *schema[column_filter.file_column_id]; - if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE || - column_schema.leaf_column_id < 0) { + const auto* column_schema = ResolvePredicateLeafSchema(schema, column_filter); + if (column_schema == nullptr) { return ParquetRowGroupPruneReason::NONE; } - DCHECK_LT(column_schema.leaf_column_id, row_group.num_columns()); - auto column_chunk = row_group.ColumnChunk(column_schema.leaf_column_id); + DCHECK_LT(column_schema->leaf_column_id, row_group.num_columns()); + auto column_chunk = row_group.ColumnChunk(column_schema->leaf_column_id); if (column_chunk == nullptr) { return ParquetRowGroupPruneReason::NONE; } if (CheckStatistics(column_filter, - TransformColumnStatistics(column_schema, column_chunk->statistics()))) { + TransformColumnStatistics(*column_schema, column_chunk->statistics()))) { return ParquetRowGroupPruneReason::STATISTICS; } - if (!supports_dictionary_pruning(column_schema, *column_chunk, column_filter) || + if (!supports_dictionary_pruning(*column_schema, *column_chunk, column_filter) || !is_dictionary_encoded_chunk(*column_chunk)) { return ParquetRowGroupPruneReason::NONE; } OwnedDictionaryWords dict_words; - if (!read_dictionary_words(file_reader, row_group_idx, column_schema.leaf_column_id, - column_schema, &dict_words)) { + if (!read_dictionary_words(file_reader, row_group_idx, column_schema->leaf_column_id, + *column_schema, &dict_words)) { return ParquetRowGroupPruneReason::NONE; } for (const auto& column_predicate : column_filter.predicates) { @@ -883,19 +912,17 @@ bool select_ranges_for_filter(const std::shared_ptr<::parquet::RowGroupPageIndex if (column_filter.predicates.empty()) { return false; } - DORIS_CHECK(column_filter.file_column_id >= 0); - DORIS_CHECK(column_filter.file_column_id < static_cast(file_schema.size())); - const auto& column_schema = *file_schema[column_filter.file_column_id]; - if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE || - column_schema.descriptor == nullptr || column_schema.leaf_column_id < 0) { + const auto* column_schema = + ParquetStatisticsUtils::ResolvePredicateLeafSchema(file_schema, column_filter); + if (column_schema == nullptr || column_schema->descriptor == nullptr) { return false; } std::shared_ptr<::parquet::ColumnIndex> column_index; std::shared_ptr<::parquet::OffsetIndex> offset_index; try { - column_index = row_group->GetColumnIndex(column_schema.leaf_column_id); - offset_index = row_group->GetOffsetIndex(column_schema.leaf_column_id); + column_index = row_group->GetColumnIndex(column_schema->leaf_column_id); + offset_index = row_group->GetOffsetIndex(column_schema->leaf_column_id); } catch (const ::parquet::ParquetException&) { return false; } catch (const std::exception&) { @@ -910,7 +937,7 @@ bool select_ranges_for_filter(const std::shared_ptr<::parquet::RowGroupPageIndex const auto page_count = offset_index->page_locations().size(); for (size_t page_idx = 0; page_idx < page_count; ++page_idx) { ParquetColumnStatistics page_statistics; - if (!build_page_statistics(column_index, column_schema, page_idx, &page_statistics)) { + if (!build_page_statistics(column_index, *column_schema, page_idx, &page_statistics)) { ranges->clear(); return false; } diff --git a/be/src/format/new_parquet/parquet_statistics.h b/be/src/format/new_parquet/parquet_statistics.h index b28a784316d59a..560a073636c632 100644 --- a/be/src/format/new_parquet/parquet_statistics.h +++ b/be/src/format/new_parquet/parquet_statistics.h @@ -81,6 +81,10 @@ struct ParquetColumnStatistics { // 结构参考 DuckDB ParquetStatisticsUtils:先把 Parquet metadata 转成统一统计对象, // 再由 filter/predicate 判断是否可以裁剪。这里不理解 table/global schema。 struct ParquetStatisticsUtils { + static const ParquetColumnSchema* ResolvePredicateLeafSchema( + const std::vector>& schema, + const reader::FileColumnPredicateFilter& column_filter); + static ParquetColumnStatistics TransformColumnStatistics( const ParquetColumnSchema& column_schema, const std::shared_ptr<::parquet::Statistics>& statistics); diff --git a/be/src/format/reader/column_mapper.cpp b/be/src/format/reader/column_mapper.cpp index 3620428fecbc54..1efd255673b688 100644 --- a/be/src/format/reader/column_mapper.cpp +++ b/be/src/format/reader/column_mapper.cpp @@ -20,6 +20,8 @@ #include #include #include +#include +#include #include #include @@ -33,6 +35,7 @@ #include "format/reader/file_reader.h" #include "format/reader/schema_projection.h" #include "format/reader/table_reader.h" +#include "storage/predicate/predicate_creator.h" namespace doris::reader { @@ -43,6 +46,24 @@ struct FileSlotRewriteInfo { std::string file_column_name; }; +struct StructChildSelector { + bool by_name = true; + std::string name; + size_t ordinal = 0; +}; + +struct NestedStructPath { + int32_t root_table_column_id = -1; + std::vector selectors; +}; + +struct NestedPredicateTargetInfo { + int32_t root_file_column_id = -1; + std::vector file_child_id_path; + std::string leaf_name; + DataTypePtr file_leaf_type; +}; + // A split-local literal produced by slot-literal predicate localization. // // TableColumnMapper currently rewrites VExpr trees in-place because VExpr has no generic deep @@ -161,6 +182,430 @@ static VExprSPtr original_table_literal(const VExprSPtr& literal_expr) { rewritten_literal->original_field()); } +static bool is_struct_element_expr(const VExprSPtr& expr) { + return expr != nullptr && expr->get_num_children() == 2 && + expr->fn().name.function_name == "struct_element"; +} + +static bool parse_struct_child_selector(const VExprSPtr& expr, StructChildSelector* selector) { + DORIS_CHECK(selector != nullptr); + if (expr == nullptr || !expr->is_literal()) { + return false; + } + const Field field = literal_field(expr); + switch (field.get_type()) { + case TYPE_STRING: + case TYPE_CHAR: + case TYPE_VARCHAR: + selector->by_name = true; + selector->name = std::string(field.as_string_view()); + return true; + case TYPE_BOOLEAN: + selector->by_name = false; + selector->ordinal = field.get() ? 1 : 0; + return selector->ordinal > 0; + case TYPE_TINYINT: + selector->by_name = false; + if (field.get() <= 0) { + return false; + } + selector->ordinal = cast_set(field.get()); + return true; + case TYPE_SMALLINT: + selector->by_name = false; + if (field.get() <= 0) { + return false; + } + selector->ordinal = cast_set(field.get()); + return true; + case TYPE_INT: + selector->by_name = false; + if (field.get() <= 0) { + return false; + } + selector->ordinal = cast_set(field.get()); + return true; + case TYPE_BIGINT: + selector->by_name = false; + if (field.get() <= 0) { + return false; + } + selector->ordinal = cast_set(field.get()); + return true; + default: + return false; + } +} + +static bool extract_nested_struct_path(const VExprSPtr& expr, NestedStructPath* path) { + DORIS_CHECK(path != nullptr); + if (!is_struct_element_expr(expr)) { + return false; + } + + StructChildSelector selector; + if (!parse_struct_child_selector(expr->children()[1], &selector)) { + return false; + } + + const auto& parent = expr->children()[0]; + if (parent->is_slot_ref()) { + const auto* slot_ref = assert_cast(parent.get()); + path->root_table_column_id = slot_ref->slot_id(); + path->selectors.clear(); + path->selectors.push_back(std::move(selector)); + return true; + } + + if (!extract_nested_struct_path(parent, path)) { + return false; + } + path->selectors.push_back(std::move(selector)); + return true; +} + +static void collect_nested_struct_paths(const VExprSPtr& expr, + std::vector* paths) { + DORIS_CHECK(paths != nullptr); + if (expr == nullptr) { + return; + } + NestedStructPath path; + if (extract_nested_struct_path(expr, &path)) { + paths->push_back(std::move(path)); + return; + } + for (const auto& child : expr->children()) { + collect_nested_struct_paths(child, paths); + } +} + +static const SchemaField* find_schema_child_by_field_id(const std::vector& children, + int32_t field_id) { + const auto child_it = std::ranges::find_if( + children, [&](const SchemaField& child) { return child.id == field_id; }); + return child_it == children.end() ? nullptr : &*child_it; +} + +static const SchemaField* resolve_file_child(const std::vector& children, + const StructChildSelector& selector) { + if (selector.by_name) { + const auto child_it = std::ranges::find_if( + children, [&](const SchemaField& child) { return child.name == selector.name; }); + return child_it == children.end() ? nullptr : &*child_it; + } + if (selector.ordinal == 0 || selector.ordinal > children.size()) { + return nullptr; + } + return &children[selector.ordinal - 1]; +} + +static Status build_filter_projection_path(const std::vector& children, + std::span selectors, + FieldProjection* projection) { + DORIS_CHECK(projection != nullptr); + if (selectors.empty()) { + return Status::InvalidArgument("Nested struct selector path is empty"); + } + const auto* child = resolve_file_child(children, selectors.front()); + if (child == nullptr) { + return Status::OK(); + } + projection->field_id = child->id; + projection->project_all_children = selectors.size() == 1; + projection->children.clear(); + if (selectors.size() == 1) { + return Status::OK(); + } + if (child->children.empty() || + remove_nullable(child->type)->get_primitive_type() != TYPE_STRUCT) { + projection->field_id = -1; + return Status::OK(); + } + FieldProjection child_projection; + RETURN_IF_ERROR( + build_filter_projection_path(child->children, selectors.subspan(1), &child_projection)); + if (child_projection.field_id < 0) { + projection->field_id = -1; + return Status::OK(); + } + projection->children.push_back(std::move(child_projection)); + return Status::OK(); +} + +static const SchemaField* resolve_filter_schema_path(const std::vector& children, + std::span selectors, + std::vector* file_child_id_path) { + DORIS_CHECK(file_child_id_path != nullptr); + if (selectors.empty()) { + return nullptr; + } + const auto* child = resolve_file_child(children, selectors.front()); + if (child == nullptr) { + return nullptr; + } + file_child_id_path->push_back(child->id); + if (selectors.size() == 1) { + return child; + } + if (child->children.empty() || + remove_nullable(child->type)->get_primitive_type() != TYPE_STRUCT) { + file_child_id_path->clear(); + return nullptr; + } + const auto* leaf = + resolve_filter_schema_path(child->children, selectors.subspan(1), file_child_id_path); + if (leaf == nullptr) { + file_child_id_path->clear(); + } + return leaf; +} + +static bool resolve_nested_predicate_target(const NestedStructPath& path, + const std::vector& mappings, + NestedPredicateTargetInfo* target) { + DORIS_CHECK(target != nullptr); + if (path.selectors.empty()) { + return false; + } + const auto mapping_it = std::ranges::find_if(mappings, [&](const ColumnMapping& mapping) { + return mapping.table_column_id == path.root_table_column_id; + }); + if (mapping_it == mappings.end() || !mapping_it->field_id.has_value()) { + return false; + } + std::vector file_child_id_path; + const auto* leaf = resolve_filter_schema_path(mapping_it->original_file_children, + path.selectors, &file_child_id_path); + if (leaf == nullptr || leaf->type == nullptr || + is_complex_type(remove_nullable(leaf->type)->get_primitive_type())) { + return false; + } + + target->root_file_column_id = *mapping_it->field_id; + target->file_child_id_path = std::move(file_child_id_path); + target->leaf_name = leaf->name; + target->file_leaf_type = remove_nullable(leaf->type); + return true; +} + +static std::optional to_column_predicate_type(TExprOpcode::type opcode) { + switch (opcode) { + case TExprOpcode::EQ: + return PredicateType::EQ; + case TExprOpcode::NE: + return PredicateType::NE; + case TExprOpcode::GT: + return PredicateType::GT; + case TExprOpcode::GE: + return PredicateType::GE; + case TExprOpcode::LT: + return PredicateType::LT; + case TExprOpcode::LE: + return PredicateType::LE; + default: + return std::nullopt; + } +} + +static TExprOpcode::type reverse_comparison_opcode(TExprOpcode::type opcode) { + switch (opcode) { + case TExprOpcode::GT: + return TExprOpcode::LT; + case TExprOpcode::GE: + return TExprOpcode::LE; + case TExprOpcode::LT: + return TExprOpcode::GT; + case TExprOpcode::LE: + return TExprOpcode::GE; + default: + return opcode; + } +} + +static std::shared_ptr create_comparison_column_predicate( + PredicateType predicate_type, uint32_t column_id, const std::string& column_name, + const DataTypePtr& data_type, const Field& value) { + switch (predicate_type) { + case PredicateType::EQ: + return create_comparison_predicate(column_id, column_name, data_type, + value, false); + case PredicateType::NE: + return create_comparison_predicate(column_id, column_name, data_type, + value, false); + case PredicateType::GT: + return create_comparison_predicate(column_id, column_name, data_type, + value, false); + case PredicateType::GE: + return create_comparison_predicate(column_id, column_name, data_type, + value, false); + case PredicateType::LT: + return create_comparison_predicate(column_id, column_name, data_type, + value, false); + case PredicateType::LE: + return create_comparison_predicate(column_id, column_name, data_type, + value, false); + default: + return nullptr; + } +} + +static std::shared_ptr build_nested_comparison_predicate( + const VExprSPtr& literal_expr, TExprOpcode::type opcode, + const NestedPredicateTargetInfo& target) { + if (literal_expr == nullptr || !literal_expr->is_literal() || + target.file_leaf_type == nullptr) { + return nullptr; + } + const auto predicate_type = to_column_predicate_type(opcode); + if (!predicate_type.has_value()) { + return nullptr; + } + const auto original_literal = original_table_literal(literal_expr); + const Field original_field = literal_field(original_literal); + Field file_field; + try { + convert_field_to_type(original_field, *target.file_leaf_type, &file_field, + original_literal->data_type().get()); + } catch (const Exception&) { + return nullptr; + } + if (file_field.is_null()) { + return nullptr; + } + try { + return create_comparison_column_predicate( + *predicate_type, cast_set(target.root_file_column_id), target.leaf_name, + target.file_leaf_type, file_field); + } catch (const Exception&) { + return nullptr; + } +} + +static bool extract_nested_binary_comparison_filter(const VExprSPtr& expr, + const std::vector& mappings, + FileColumnPredicateFilter* column_filter) { + DORIS_CHECK(column_filter != nullptr); + if (!is_binary_comparison_predicate(expr)) { + return false; + } + NestedStructPath path; + VExprSPtr literal_expr; + TExprOpcode::type opcode = expr->op(); + if (extract_nested_struct_path(expr->children()[0], &path) && + expr->children()[1]->is_literal()) { + literal_expr = expr->children()[1]; + } else if (extract_nested_struct_path(expr->children()[1], &path) && + expr->children()[0]->is_literal()) { + literal_expr = expr->children()[0]; + opcode = reverse_comparison_opcode(opcode); + } else { + return false; + } + + NestedPredicateTargetInfo target; + if (!resolve_nested_predicate_target(path, mappings, &target)) { + return false; + } + auto predicate = build_nested_comparison_predicate(literal_expr, opcode, target); + if (predicate == nullptr) { + return false; + } + column_filter->file_column_id = target.root_file_column_id; + column_filter->file_child_id_path = std::move(target.file_child_id_path); + column_filter->predicates.push_back(std::move(predicate)); + return true; +} + +static void merge_column_predicate_filter(FileColumnPredicateFilter column_filter, + std::vector* filters) { + DORIS_CHECK(filters != nullptr); + auto existing_filter_it = std::ranges::find_if(*filters, [&](const auto& existing_filter) { + return existing_filter.file_column_id == column_filter.file_column_id && + existing_filter.file_child_id_path == column_filter.file_child_id_path; + }); + if (existing_filter_it == filters->end()) { + filters->push_back(std::move(column_filter)); + return; + } + existing_filter_it->predicates.insert(existing_filter_it->predicates.end(), + column_filter.predicates.begin(), + column_filter.predicates.end()); +} + +static void collect_nested_column_predicate_filters( + const VExprSPtr& expr, const std::vector& mappings, + std::vector* filters) { + DORIS_CHECK(filters != nullptr); + if (expr == nullptr) { + return; + } + if (expr->node_type() == TExprNodeType::COMPOUND_PRED && + expr->op() == TExprOpcode::COMPOUND_AND) { + for (const auto& child : expr->children()) { + collect_nested_column_predicate_filters(child, mappings, filters); + } + return; + } + FileColumnPredicateFilter column_filter; + if (extract_nested_binary_comparison_filter(expr, mappings, &column_filter)) { + merge_column_predicate_filter(std::move(column_filter), filters); + } +} + +static void merge_field_projection(FieldProjection* target, const FieldProjection& source) { + DORIS_CHECK(target != nullptr); + DORIS_CHECK(target->field_id == source.field_id); + if (target->project_all_children) { + return; + } + if (source.project_all_children) { + target->project_all_children = true; + target->children.clear(); + return; + } + for (const auto& source_child : source.children) { + auto target_child_it = std::ranges::find_if( + target->children, + [&](const FieldProjection& c) { return c.field_id == source_child.field_id; }); + if (target_child_it == target->children.end()) { + target->children.push_back(source_child); + continue; + } + merge_field_projection(&*target_child_it, source_child); + } +} + +static Status build_projected_type_from_projection(const DataTypePtr& file_type, + const std::vector& children, + const FieldProjection& projection, + DataTypePtr* projected_type) { + DORIS_CHECK(file_type != nullptr); + DORIS_CHECK(projected_type != nullptr); + if (projection.project_all_children || projection.children.empty()) { + *projected_type = file_type; + return Status::OK(); + } + + DataTypes child_types; + Strings child_names; + child_types.reserve(projection.children.size()); + child_names.reserve(projection.children.size()); + for (const auto& child_projection : projection.children) { + const auto* child = find_schema_child_by_field_id(children, child_projection.field_id); + if (child == nullptr) { + return Status::InvalidArgument("Invalid projected child field id {}", + child_projection.field_id); + } + DataTypePtr child_type; + RETURN_IF_ERROR(build_projected_type_from_projection(child->type, child->children, + child_projection, &child_type)); + child_types.push_back(std::move(child_type)); + child_names.push_back(child->name); + } + return rebuild_projected_type(file_type, child_types, child_names, projected_type); +} + static VExprSPtr rewrite_literal_to_file_type(const VExprSPtr& literal_expr, const FileSlotRewriteInfo& rewrite_info) { DORIS_CHECK(literal_expr != nullptr); @@ -280,6 +725,24 @@ static VExprSPtr rewrite_table_expr_to_file_expr( if (rewrite_in_slot_literal_predicate(expr, table_column_to_file_slot)) { return expr; } + if (is_struct_element_expr(expr)) { + auto children = expr->children(); + if (children[0]->is_slot_ref()) { + const auto* slot_ref = assert_cast(children[0].get()); + const auto rewrite_it = table_column_to_file_slot.find(slot_ref->slot_id()); + if (rewrite_it != table_column_to_file_slot.end()) { + // struct_element must see the actual file struct layout. Casting the parent struct + // to the output projection can hide filter-only children such as `s.id` in + // `SELECT s.name WHERE s.id > 5`. + children[0] = create_file_slot_ref(*slot_ref, rewrite_it->second); + expr->set_children(std::move(children)); + return expr; + } + } + children[0] = rewrite_table_expr_to_file_expr(children[0], table_column_to_file_slot); + expr->set_children(std::move(children)); + return expr; + } if (expr->is_slot_ref()) { const auto* slot_ref = assert_cast(expr.get()); const auto rewrite_it = table_column_to_file_slot.find(slot_ref->slot_id()); @@ -404,7 +867,11 @@ static Status rebuild_projected_file_type(ColumnMapping* mapping) { if (mapping == nullptr) { return Status::InvalidArgument("mapping is null"); } - DORIS_CHECK(is_complex_type(mapping->file_type->get_primitive_type())); + if (mapping->original_file_type == nullptr) { + mapping->original_file_type = mapping->file_type; + } + DORIS_CHECK( + is_complex_type(remove_nullable(mapping->original_file_type)->get_primitive_type())); DataTypes child_types; Strings child_names; child_types.reserve(mapping->child_mappings.size()); @@ -423,7 +890,7 @@ static Status rebuild_projected_file_type(ColumnMapping* mapping) { return Status::NotSupported("Projection for complex column {} contains no file children", mapping->file_column_name); } - RETURN_IF_ERROR(build_projected_child_type(mapping->file_type, mapping->child_mappings, + RETURN_IF_ERROR(build_projected_child_type(mapping->original_file_type, mapping->child_mappings, &mapping->file_type)); mapping->is_trivial = mapping->table_type != nullptr && mapping->table_type->equals(*mapping->file_type); @@ -431,8 +898,46 @@ static Status rebuild_projected_file_type(ColumnMapping* mapping) { return Status::OK(); } +using FilterProjectionMap = std::map; + +static Status apply_projection_to_mapping_file_type(const FieldProjection& projection, + ColumnMapping* mapping) { + DORIS_CHECK(mapping != nullptr); + if (mapping->original_file_type == nullptr) { + mapping->original_file_type = mapping->file_type; + } + if (mapping->original_file_type == nullptr || + !is_complex_type(remove_nullable(mapping->original_file_type)->get_primitive_type())) { + return Status::OK(); + } + DataTypePtr projected_type; + RETURN_IF_ERROR(build_projected_type_from_projection(mapping->original_file_type, + mapping->original_file_children, + projection, &projected_type)); + mapping->file_type = std::move(projected_type); + mapping->has_complex_projection = !projection.project_all_children; + mapping->is_trivial = + mapping->table_type != nullptr && mapping->table_type->equals(*mapping->file_type); + return Status::OK(); +} + +static Status merge_filter_projection(const FilterProjectionMap* filter_projections, + FieldProjection* projection) { + DORIS_CHECK(projection != nullptr); + if (filter_projections == nullptr) { + return Status::OK(); + } + const auto filter_projection_it = filter_projections->find(projection->field_id); + if (filter_projection_it == filter_projections->end()) { + return Status::OK(); + } + merge_field_projection(projection, filter_projection_it->second); + return Status::OK(); +} + static Status add_scan_column(FileScanRequest* file_request, ColumnMapping* mapping, - std::vector* scan_columns) { + std::vector* scan_columns, + const FilterProjectionMap* filter_projections = nullptr) { auto file_column_id = mapping->field_id.value(); if (scan_columns == &file_request->non_predicate_columns && std::ranges::find_if(file_request->predicate_columns, [&](const FieldProjection& p) { @@ -454,11 +959,18 @@ static Status add_scan_column(FileScanRequest* file_request, ColumnMapping* mapp } RETURN_IF_ERROR(build_complex_projection(*mapping, &projection)); } - if (std::ranges::find_if(scan_columns->begin(), scan_columns->end(), - [&](const FieldProjection& p) { - return p.field_id == file_column_id; - }) == scan_columns->end()) { + if (scan_columns == &file_request->predicate_columns) { + RETURN_IF_ERROR(merge_filter_projection(filter_projections, &projection)); + } + RETURN_IF_ERROR(apply_projection_to_mapping_file_type(projection, mapping)); + + auto existing_projection_it = std::ranges::find_if( + *scan_columns, [&](const FieldProjection& p) { return p.field_id == file_column_id; }); + if (existing_projection_it == scan_columns->end()) { scan_columns->push_back(std::move(projection)); + } else { + merge_field_projection(&*existing_projection_it, projection); + RETURN_IF_ERROR(apply_projection_to_mapping_file_type(*existing_projection_it, mapping)); } if (scan_columns == &file_request->predicate_columns) { file_request->non_predicate_columns.erase( @@ -470,6 +982,48 @@ static Status add_scan_column(FileScanRequest* file_request, ColumnMapping* mapp return Status::OK(); } +static Status build_filter_projection_map(const std::vector& table_filters, + std::vector* mappings, + FilterProjectionMap* filter_projections) { + DORIS_CHECK(mappings != nullptr); + DORIS_CHECK(filter_projections != nullptr); + filter_projections->clear(); + for (const auto& table_filter : table_filters) { + if (table_filter.conjunct == nullptr) { + continue; + } + std::vector paths; + collect_nested_struct_paths(table_filter.conjunct->root(), &paths); + for (const auto& path : paths) { + auto mapping_it = std::ranges::find_if(*mappings, [&](const ColumnMapping& mapping) { + return mapping.table_column_id == path.root_table_column_id; + }); + if (mapping_it == mappings->end() || !mapping_it->field_id.has_value() || + path.selectors.empty()) { + continue; + } + + FieldProjection child_projection; + RETURN_IF_ERROR(build_filter_projection_path(mapping_it->original_file_children, + path.selectors, &child_projection)); + if (child_projection.field_id < 0) { + continue; + } + + FieldProjection root_projection {.field_id = *mapping_it->field_id, + .project_all_children = false}; + root_projection.children.push_back(std::move(child_projection)); + auto filter_projection_it = filter_projections->find(root_projection.field_id); + if (filter_projection_it == filter_projections->end()) { + filter_projections->emplace(root_projection.field_id, std::move(root_projection)); + continue; + } + merge_field_projection(&filter_projection_it->second, root_projection); + } + } + return Status::OK(); +} + static void rebuild_projection(ColumnMapping* mapping, size_t block_position) { DORIS_CHECK(mapping->field_id.has_value()); if (mapping->is_trivial || mapping->has_complex_projection) { @@ -671,14 +1225,16 @@ Status TableColumnMapper::localize_filters(const std::vector& table FileScanRequest* file_request) { // 真实实现会处理 trivial mapping、safe cast、reader expression fallback 和 // finalize-only filter。stub 只复制能够直接定位到 file column 的谓词。 + FilterProjectionMap filter_projections; + RETURN_IF_ERROR(build_filter_projection_map(table_filters, &_mappings, &filter_projections)); for (const auto& table_filter : table_filters) { for (const auto table_column_id : filter_slot_ids(table_filter)) { auto* mapping = _find_mapping(table_column_id); if (mapping == nullptr || !mapping->field_id.has_value()) { continue; } - RETURN_IF_ERROR( - add_scan_column(file_request, mapping, &file_request->predicate_columns)); + RETURN_IF_ERROR(add_scan_column(file_request, mapping, &file_request->predicate_columns, + &filter_projections)); } } @@ -703,6 +1259,18 @@ Status TableColumnMapper::localize_filters(const std::vector& table column_predicate_filter.predicates = predicates; file_request->column_predicate_filters.push_back(std::move(column_predicate_filter)); } + for (const auto& table_filter : table_filters) { + if (table_filter.conjunct == nullptr) { + continue; + } + std::vector nested_column_predicate_filters; + collect_nested_column_predicate_filters(table_filter.conjunct->root(), _mappings, + &nested_column_predicate_filters); + for (auto& column_predicate_filter : nested_column_predicate_filters) { + merge_column_predicate_filter(std::move(column_predicate_filter), + &file_request->column_predicate_filters); + } + } return Status::OK(); } @@ -729,6 +1297,8 @@ Status TableColumnMapper::_create_direct_mapping(const TableColumn& table_column } mapping->field_id = file_field.id; mapping->file_column_name = file_field.name; + mapping->original_file_type = file_field.type; + mapping->original_file_children = file_field.children; mapping->file_type = file_field.type; mapping->is_trivial = _is_same_type(mapping->table_type, mapping->file_type); mapping->child_mappings.clear(); diff --git a/be/src/format/reader/column_mapper.h b/be/src/format/reader/column_mapper.h index c52605cd3c9fdb..a70e246bcce700 100644 --- a/be/src/format/reader/column_mapper.h +++ b/be/src/format/reader/column_mapper.h @@ -30,6 +30,7 @@ #include "core/data_type/data_type.h" #include "exprs/vexpr_fwd.h" #include "format/reader/expr/literal.h" +#include "format/reader/file_reader.h" namespace doris { class ColumnPredicate; @@ -39,9 +40,6 @@ namespace doris::reader { struct TableColumn; struct TableFilter; -struct SchemaField; -struct FileScanRequest; -struct FieldProjection; using TableColumnPredicates = std::map>>; @@ -67,6 +65,8 @@ struct ColumnMapping { // File-local field id for top-level columns, or child id for nested columns. std::optional field_id; std::string file_column_name; + DataTypePtr original_file_type; + std::vector original_file_children; std::vector file_path; DataTypePtr file_type; DataTypePtr table_type; diff --git a/be/src/format/reader/file_reader.h b/be/src/format/reader/file_reader.h index 315128f33e25c0..9d118ea56def6f 100644 --- a/be/src/format/reader/file_reader.h +++ b/be/src/format/reader/file_reader.h @@ -75,6 +75,9 @@ struct FieldProjection { // dictionary and bloom filter. Predicates must all belong to file_column_id. struct FileColumnPredicateFilter { ColumnId file_column_id = -1; + // File-local child field-id path under file_column_id. Empty means top-level scalar. + // The ids are Parquet/Doris file schema child ids, not table ids and not child ordinals. + std::vector file_child_id_path; std::vector> predicates; }; diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp b/be/test/format/new_parquet/parquet_reader_test.cpp index 6c5cccbb27a23c..5e0fe23b2fedec 100644 --- a/be/test/format/new_parquet/parquet_reader_test.cpp +++ b/be/test/format/new_parquet/parquet_reader_test.cpp @@ -36,6 +36,7 @@ #include "core/block/block.h" #include "core/column/column_string.h" #include "core/column/column_vector.h" +#include "core/data_type/data_type_nullable.h" #include "core/data_type/data_type_number.h" #include "core/data_type/data_type_string.h" #include "core/data_type/data_type_struct.h" @@ -48,6 +49,7 @@ #include "format/new_parquet/reader/column_reader.h" #include "format/reader/column_mapper.h" #include "format/reader/expr/delete_predicate.h" +#include "format/reader/expr/literal.h" #include "format/reader/expr/slot_ref.h" #include "format/reader/file_reader.h" #include "format/reader/table_reader.h" @@ -106,6 +108,39 @@ class Int32GreaterThanExpr final : public VExpr { const std::string _expr_name = "Int32GreaterThanExpr"; }; +class TestFunctionExpr final : public VExpr { +public: + TestFunctionExpr(std::string function_name, DataTypePtr data_type, + TExprNodeType::type node_type = TExprNodeType::FUNCTION_CALL, + TExprOpcode::type opcode = TExprOpcode::INVALID_OPCODE) + : VExpr(std::move(data_type), false), _expr_name(std::move(function_name)) { + set_node_type(node_type); + _opcode = opcode; + TFunctionName fn_name; + fn_name.__set_function_name(_expr_name); + _fn.__set_name(fn_name); + } + + const std::string& expr_name() const override { return _expr_name; } + + Status execute_column_impl(VExprContext* context, const Block* block, const Selector* selector, + size_t count, ColumnPtr& result_column) const override { + return Status::NotSupported("TestFunctionExpr is only used for mapper expression analysis"); + } + +private: + const std::string _expr_name; +}; + +VExprSPtr struct_element_expr(const VExprSPtr& parent, const DataTypePtr& child_type, + const std::string& child_name) { + auto expr = std::make_shared("struct_element", make_nullable(child_type)); + expr->add_child(parent); + expr->add_child(TableLiteral::create_shared(std::make_shared(), + Field::create_field(child_name))); + return expr; +} + class Int32SumGreaterThanExpr final : public VExpr { public: Int32SumGreaterThanExpr(int left_column_id, int right_column_id, int32_t value) @@ -219,6 +254,27 @@ std::shared_ptr build_string_array(const std::vector& return finish_array(&builder); } +std::shared_ptr build_struct_array(const std::vector& ids, + const std::vector& names) { + auto struct_type = arrow::struct_({arrow::field("id", arrow::int32(), false), + arrow::field("name", arrow::utf8(), false)}); + std::vector> field_builders; + auto id_builder = std::make_unique(); + field_builders.push_back(std::shared_ptr(std::move(id_builder))); + auto name_builder = std::make_unique(); + field_builders.push_back(std::shared_ptr(std::move(name_builder))); + arrow::StructBuilder builder(struct_type, arrow::default_memory_pool(), + std::move(field_builders)); + auto* struct_id_builder = assert_cast(builder.field_builder(0)); + auto* struct_name_builder = assert_cast(builder.field_builder(1)); + for (size_t row = 0; row < ids.size(); ++row) { + EXPECT_TRUE(builder.Append().ok()); + EXPECT_TRUE(struct_id_builder->Append(ids[row]).ok()); + EXPECT_TRUE(struct_name_builder->Append(names[row]).ok()); + } + return finish_array(&builder); +} + void write_parquet_file(const std::string& file_path, int64_t row_group_size = ROW_COUNT) { auto schema = arrow::schema({ arrow::field("id", arrow::int32(), false), @@ -262,6 +318,28 @@ void write_int_pair_parquet_file(const std::string& file_path, int64_t row_group row_group_size, builder.build())); } +void write_struct_filter_parquet_file(const std::string& file_path) { + auto id_field = arrow::field("id", arrow::int32(), false); + auto name_field = arrow::field("name", arrow::utf8(), false); + auto struct_type = arrow::struct_({id_field, name_field}); + auto schema = arrow::schema({ + arrow::field("s", struct_type, false), + }); + auto table = arrow::Table::Make( + schema, {build_struct_array({1, 2, 10, 11}, {"one", "two", "ten", "eleven"})}); + + auto file_result = arrow::io::FileOutputStream::Open(file_path); + ASSERT_TRUE(file_result.ok()) << file_result.status(); + std::shared_ptr out = *file_result; + + ::parquet::WriterProperties::Builder builder; + builder.version(::parquet::ParquetVersion::PARQUET_2_6); + builder.data_page_version(::parquet::ParquetDataPageVersion::V2); + builder.compression(::parquet::Compression::UNCOMPRESSED); + PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, 2, + builder.build())); +} + void write_dictionary_filter_parquet_file(const std::string& file_path) { auto schema = arrow::schema({ arrow::field("id", arrow::int32(), false), @@ -522,6 +600,78 @@ TEST(TableColumnMapperTest, CreatesComplexProjectionForStructChildren) { EXPECT_EQ(projected_type->get_element_name(0), "b"); } +TEST(TableColumnMapperTest, MergesStructFilterOnlyChildIntoPredicateProjection) { + auto a_type = std::make_shared(); + auto b_type = std::make_shared(); + reader::SchemaField a_field; + a_field.id = 0; + a_field.name = "a"; + a_field.type = a_type; + reader::SchemaField b_field; + b_field.id = 1; + b_field.name = "b"; + b_field.type = b_type; + reader::SchemaField struct_field; + struct_field.id = 0; + struct_field.name = "s"; + struct_field.type = + std::make_shared(DataTypes {a_type, b_type}, Strings {"a", "b"}); + struct_field.children = {a_field, b_field}; + + reader::TableColumn table_child; + table_child.id = 101; + table_child.name = "b"; + table_child.type = b_type; + reader::TableColumn table_column; + table_column.id = 100; + table_column.name = "s"; + table_column.type = std::make_shared(DataTypes {b_type}, Strings {"b"}); + table_column.children = {table_child}; + + const auto full_table_struct_type = + std::make_shared(DataTypes {a_type, b_type}, Strings {"a", "b"}); + auto filter_expr = std::make_shared( + "gt", std::make_shared(), TExprNodeType::BINARY_PRED, TExprOpcode::GT); + filter_expr->add_child(struct_element_expr( + TableSlotRef::create_shared(100, 100, -1, full_table_struct_type, "s"), a_type, "a")); + filter_expr->add_child(TableLiteral::create_shared(a_type, Field::create_field(5))); + reader::TableFilter table_filter { + .conjunct = VExprContext::create_shared(filter_expr), + .slot_ids = {100}, + }; + + reader::TableColumnMapperOptions options; + options.mode = reader::TableColumnMappingMode::BY_NAME; + reader::TableColumnMapper mapper(options); + ASSERT_TRUE(mapper.create_mapping({table_column}, {}, {struct_field}).ok()); + + reader::FileScanRequest request; + ASSERT_TRUE(mapper.create_scan_request({table_filter}, {}, {table_column}, &request).ok()); + + EXPECT_TRUE(request.non_predicate_columns.empty()); + ASSERT_EQ(request.predicate_columns.size(), 1); + const auto& projection = request.predicate_columns[0]; + EXPECT_EQ(projection.field_id, 0); + ASSERT_FALSE(projection.project_all_children); + ASSERT_EQ(projection.children.size(), 2); + EXPECT_EQ(projection.children[0].field_id, 1); + EXPECT_EQ(projection.children[1].field_id, 0); + ASSERT_EQ(request.column_predicate_filters.size(), 1); + EXPECT_EQ(request.column_predicate_filters[0].file_column_id, 0); + EXPECT_EQ(request.column_predicate_filters[0].file_child_id_path, std::vector({0})); + ASSERT_EQ(request.column_predicate_filters[0].predicates.size(), 1); + EXPECT_EQ(request.column_predicate_filters[0].predicates[0]->type(), PredicateType::GT); + + ASSERT_EQ(mapper.mappings().size(), 1); + ASSERT_EQ(mapper.mappings()[0].child_mappings.size(), 1); + EXPECT_EQ(mapper.mappings()[0].child_mappings[0].file_column_name, "b"); + const auto* read_type = + assert_cast(mapper.mappings()[0].file_type.get()); + ASSERT_EQ(read_type->get_elements().size(), 2); + EXPECT_EQ(read_type->get_element_name(0), "b"); + EXPECT_EQ(read_type->get_element_name(1), "a"); +} + TEST(TableColumnMapperTest, CreatesComplexProjectionForMapValueStructChildren) { auto key_type = std::make_shared(); auto a_type = std::make_shared(); @@ -1161,6 +1311,42 @@ TEST_F(NewParquetReaderTest, PredicateFiltersRowGroupsByDictionary) { EXPECT_EQ(values, std::vector({"lm"})); } +TEST_F(NewParquetReaderTest, NestedStructPredicateFiltersRowGroupsByStatistics) { + write_struct_filter_parquet_file(_file_path); + auto parquet_file_reader = ::parquet::ParquetFileReader::OpenFile(_file_path, false); + ASSERT_EQ(parquet_file_reader->metadata()->num_row_groups(), 2); + + std::vector> file_schema; + auto schema_descriptor = parquet_file_reader->metadata()->schema(); + ASSERT_NE(schema_descriptor, nullptr); + ASSERT_TRUE(parquet::build_parquet_column_schema(*schema_descriptor, &file_schema).ok()); + ASSERT_EQ(file_schema.size(), 1); + ASSERT_EQ(file_schema[0]->children.size(), 2); + ASSERT_EQ(file_schema[0]->children[0]->name, "id"); + + reader::FileScanRequest request; + reader::FileColumnPredicateFilter column_filter; + column_filter.file_column_id = 0; + column_filter.file_child_id_path = {0}; + auto id_type = std::make_shared(); + column_filter.predicates.push_back(create_comparison_predicate( + 0, "id", id_type, Field::create_field(5), false)); + request.column_predicate_filters.push_back(std::move(column_filter)); + + parquet::RowGroupScanPlan plan; + parquet::ParquetScanRange scan_range; + ASSERT_TRUE(parquet::plan_parquet_row_groups(*parquet_file_reader->metadata(), + parquet_file_reader.get(), file_schema, request, + scan_range, false, &plan) + .ok()); + ASSERT_EQ(plan.row_groups.size(), 1); + EXPECT_EQ(plan.row_groups[0].row_group_id, 1); + EXPECT_EQ(plan.pruning_stats.total_row_groups, 2); + EXPECT_EQ(plan.pruning_stats.selected_row_groups, 1); + EXPECT_EQ(plan.pruning_stats.filtered_row_groups_by_statistics, 1); + EXPECT_EQ(plan.pruning_stats.filtered_group_rows, 2); +} + TEST_F(NewParquetReaderTest, PlannerNarrowsRowRangesByPageIndex) { write_page_index_filter_parquet_file(_file_path); auto parquet_file_reader = ::parquet::ParquetFileReader::OpenFile(_file_path, false); diff --git a/docs/complex-column-predicate-and-stats-filtering.md b/docs/complex-column-predicate-and-stats-filtering.md index c05ba5fb7a0ddd..448149ddea4f2a 100644 --- a/docs/complex-column-predicate-and-stats-filtering.md +++ b/docs/complex-column-predicate-and-stats-filtering.md @@ -1,201 +1,272 @@ # 复杂类型谓词过滤和统计信息过滤实现方案 -本文分析当前实现的限制,并以 DuckDB 为参考提出实现方案。 +本文聚焦 `STRUCT` 内 primitive leaf 的谓词过滤和 Parquet file-layer pruning。目标是参考 DuckDB 的 nested filter 语义,同时保持 Doris 当前 new parquet reader 的 block 布局和 `Expr` 行级过滤原则。 -## 1. 谓词过滤:嵌套字段无法下推到 Parquet 层 +## 1. DuckDB 参考模型 -### 1.1 问题根因 +DuckDB 的核心不是把 `struct_extract(s, 'id')` 改写成一个普通 leaf slot,而是在 filter 中保留 nested 结构。 -`WHERE s.id > 5` 的 VExpr 树是 `binary_predicate(GT, struct_element(VSlotRef(s), 'id'), literal(5))`。`VSlotRef` 的 `slot_id` 是父 struct `s` 的 table_column_id(记为 X),不是子字段 `id` 的 table_column_id(记为 Y,存储在 `child_mappings` 中)。 +### 1.1 StructFilter -`localize_filters()` 在以下三步中丢失了嵌套信息: +DuckDB 使用 `StructFilter` 表示 `s.id > 5`: +```text +StructFilter( + child_idx = id 在当前 struct 中的位置, + child_name = "id", + child_filter = ConstantFilter(GT, 5) +) ``` -build_file_slot_rewrite_map() - → 只遍历 _mappings(顶级列),不遍历 child_mappings - → 子字段 Y 不在映射表中 - -rewrite_table_expr_to_file_expr() 中的 fast path - → find_slot_rewrite_info 寻找 slot_ref 的 slot_id - → struct_element(VSlotRef(s)) 不是裸 VSlotRef,不会被 fast path 匹配 - → 回退到递归重写子节点 - → VSlotRef(s) 被找到并重写为 file block slot - → 但 struct_element 函数调用本身不会被重写 - -localize_filters() Phase 1 - → filter_slot_ids() 收集的是 VSlotRef 的 slot_id = X(父 struct) - → _find_mapping(X) 找到父 struct 的映射 ✓ - → add_scan_column 把父 struct 加入 predicate_columns - → 但子字段 Y 未被单独加入 → 无法享受统计信息剪枝 + +多层嵌套 `s.a.b > 5` 会递归包装成: + +```text +StructFilter(a_idx, StructFilter(b_idx, ConstantFilter(GT, 5))) ``` -**结论**:VExpr 表达式最终能在 file block 上正确求值(因为 struct_element 运行时会从 struct column 中提取子字段),但无法分解为子字段级别的优化(单列统计剪枝、literal 类型推断、子字段级 predicate_column 标记)。 +关键行为: -### 1.2 DuckDB 的解决方案:StructFilter +- `StructFilter::CheckStatistics()` 先从 struct stats 中取 child stats,再递归调用 child filter。 +- `StructFilter::ToExpression()` 可以还原成 `struct_extract` 表达式,用于运行时过滤。 +- `MultiFileColumnMapper::TryCastTableFilter()` 通过 `MultiFileIndexMapping.child_mapping` 把 global child index 递归重映射为 local child index。 -DuckDB 在优化器阶段(`FilterCombiner`)将 `struct_extract(s, 'id') > 5` 转换为: +参考: -``` -StructFilter( - child_idx = id 在 struct 中的位置, - child_name = "id", - child_filter = ConstantFilter(GT, 5) -) +- `/Users/xiaogangsu/code/duckdb/src/planner/filter/struct_filter.cpp` +- `/Users/xiaogangsu/code/duckdb/src/common/multi_file/multi_file_column_mapper.cpp` + +### 1.2 Parquet reader / statistics + +DuckDB 的 Parquet reader 按 logical schema 构造 reader tree: + +- root 是 `StructColumnReader`。 +- `STRUCT` child 可以只为 selected child 创建 reader,未选 child reader 为空。 +- `LIST`/`MAP` 仍作为 nested reader,非 primitive leaf 不直接参与普通 min/max pruning。 + +统计信息处理方式: + +- primitive schema 直接读取对应 `column_index` 的 ColumnChunk stats。 +- `STRUCT` 没有自己的 Parquet leaf stats,但 DuckDB 会递归构造 struct child stats。 +- `LIST`/`MAP`/`ARRAY` 返回不支持。 +- row group pruning 时,拿 selected column reader 的 stats,再调用 table filter 的 `CheckStatistics()`。如果 filter 是 `StructFilter`,就递归检查 child stats。 +- bloom filter 只在 selected reader 是非 nested primitive column 时应用。 + +参考: + +- `/Users/xiaogangsu/code/duckdb/extension/parquet/parquet_reader.cpp` +- `/Users/xiaogangsu/code/duckdb/extension/parquet/parquet_statistics.cpp` +- `/Users/xiaogangsu/code/duckdb/extension/parquet/reader/struct_column_reader.cpp` + +## 2. Doris 当前约束 + +Doris new parquet reader 和 DuckDB 的内部布局不同: + +- `FileScanRequest::column_positions` 是 top-level `file_column_id -> file-local block_position`。 +- `FieldProjection.children` 表示 top-level complex column 内部的 nested projection。 +- file block 中 `STRUCT` 是一个 `ColumnStruct`,不是每个 child 一个独立 block slot。 +- `TableReader` materialize struct child 时从 `ColumnStruct::get_column_ptr(child_idx)` 取 child column。 + +因此不能把 `struct_element(VSlotRef(parent), 'id')` 改写为 child `VSlotRef`,除非先改变 file block layout 和 materialization contract。本方案不做这种改变。 + +当前原则仍然是: + +- 行级过滤使用 `Expr` / `VExprContext`。 +- `ColumnPredicate` 只用于 file-layer pruning:row group statistics、dictionary、bloom filter、page index。 +- 所有 pruning 只能在确定不会漏读时生效;无法解析 nested target、类型不匹配、schema 缺失或 repeated 语义不明确时直接保留 row group/page。 + +## 3. 目标模型 + +Doris 应对齐 DuckDB 的 nested filter 语义,但使用适合当前代码的表示: + +```cpp +struct FileNestedPredicateTarget { + // Top-level file column id. For scalar top-level predicates this is also the leaf column id + // resolver entry. + ColumnId file_column_id = -1; + + // File-local child field-id path under file_column_id. Empty means top-level scalar. + // Example: s.id -> [id_file_field_id], s.a.b -> [a_file_field_id, b_file_field_id]. + // This is not table column id and not child ordinal. + std::vector file_child_id_path; +}; ``` -这个 `StructFilter` 是一个递归包装器。对于多层嵌套 `s.a.b > 5`,会产生: +`FileColumnPredicateFilter` 继续表达“一个 file-local target 上的一组 `ColumnPredicate`”,但 target 需要从 top-level 扩展到 nested leaf: + +```cpp +struct FileColumnPredicateFilter { + FileNestedPredicateTarget target; + std::vector> predicates; +}; ``` -StructFilter(a_idx, StructFilter(b_idx, ConstantFilter(GT, 5))) + +兼容实现可以先保留现有 `file_column_id` 字段,再新增 `file_child_id_path`,但文档语义必须明确:path 是 file-local child field id path。 + +## 4. 谓词过滤实现方案 + +### 4.1 行级 Expr localization + +`WHERE s.id > 5` 的 VExpr 形态仍保留为: + +```text +binary_predicate(GT, struct_element(VSlotRef(s), 'id'), literal(5)) ``` -`StructFilter` 在三层发挥作用: +localization 只把 `VSlotRef(s)` 从 table slot 改写成 file block 中 top-level struct slot。`struct_element` 函数本身不替换成 child slot。 -| 层 | 行为 | -|---|---| -| **统计信息剪枝** | `CheckStatistics(stats)` 递归提取子字段的 `BaseStatistics`,调用 `child_filter->CheckStatistics(child_stats)` | -| **Filter 重映射**(MultiFileColumnMapper) | `TryCastTableFilter` 通过 `MultiFileIndexMapping.child_mapping` 将 global child_idx 重新映射为 local child_idx | -| **运行时过滤** | `FilterSelection(STRUCT_EXTRACT)` 从 struct vector 中提取子 vector,递归应用 `child_filter` | +需要做的事: -**在 Parquet reader 层**,DuckDB 把 struct 的每个子字段当作独立的列 reader。`root_reader.GetChildReader(column_id)` 可以拿到子字段的 reader,`Filter()` 时直接作用于 leaf column vector,StructFilter 在 `ApplyFilter` → `FilterSelection` 中被展开。 +1. `build_file_slot_rewrite_map()` 继续只为 top-level file block slot 建映射。 +2. `rewrite_table_expr_to_file_expr()` 递归改写 `struct_element` 内部的 parent `VSlotRef`。 +3. literal 类型重写可以新增 nested fast path,但只用于把 literal 转成 file child type,不改变表达式的 slot 形态。 -### 1.3 Doris 的实现方案 +### 4.2 filter-only nested projection -Doris 不需要完全照搬 DuckDB 的 StructFilter。因为 Doris 的 VExpr 已经能在 file block 上执行 `struct_element`,过滤的执行层不是问题。需要补齐的是**让 file reader 能感知到子字段级别的列依赖**。 +当前更关键的问题不是行级表达式无法执行,而是 filter 引用的 nested child 可能不在输出 projection 中。 -#### Step 1: 扩展 `build_file_slot_rewrite_map` 遍历 child_mappings +例如: -当前函数只遍历 `_mappings`(顶级列)。需要改为同时遍历 `child_mappings`,将子字段的 `table_column_id` → `FileSlotRewriteInfo` 加入映射。子字段的 `block_position` 不同于父 struct——struct 在 file block 中是一个列(`ColumnStruct`),子字段通过 `ColumnStruct::get_column(child_idx)` 访问。 +```sql +SELECT s.name FROM t WHERE s.id > 5; +``` -**关键问题**:子字段没有独立的 block_position。struct_element 在运行时从 struct 列中提取。所以子字段的 rewrite info 不能直接指向独立的 file block slot。 +这里 `s.name` 是输出 child,`s.id` 是 filter-only child。File reader 应读取同一个 top-level `s`,但 nested projection 应包含 `name` 和 `id`。不能把 `id` 当作独立 block slot,也不能因为输出只需要 `name` 而漏读 `id`。 -**替代方案**:不重写 slot_ref,而是**重写 struct_element 函数**。将 `struct_element(VSlotRef(struct_slot), 'field_name')` 替换为 `VSlotRef(child_slot)`,前提是子字段在 file block 中有独立位置。当前 Doris 的 struct 在 file block 中是展开的(每个投影子字段都有独立的 column),所以子字段确实有独立 block_position。 +实现要求: -这意味着需要在 `rewrite_table_expr_to_file_expr` 中新增一个 fast path:匹配 `struct_element(VSlotRef(parent_slot), field_name)`,找到对应 `ColumnMapping.child_mappings`,替换为子字段的 `VSlotRef`(使用子字段在 file block 中的 block_position)。 +- 从 table filters 中识别 `struct_element(VSlotRef(parent), literal child_name/index)` 链。 +- 将 filter 需要的 nested child path 合并到 `ColumnMapping.child_mappings` 或等价的 `FieldProjection.children`。 +- 同一 top-level complex column 的 output child 和 predicate child 需要去重合并。 +- 如果无法解析 nested path,退回读取 parent struct 的必要范围,保证行级 Expr 可以执行。 -#### Step 2: 扩展 `localize_filters` Phase 1 遍历子字段 +这一步对齐 DuckDB 的 selected child reader 思路,但落在 Doris 的 `FieldProjection` 上。 -当前 `filter_slot_ids()` 只收集裸 VSlotRef 的 slot_id。需要扩展为:对于 `struct_element(VSlotRef(parent), field_name)` 形式的表达式,收集子字段的 table_column_id。 +### 4.3 nested path 识别范围 -然后在 Phase 1 中,对子字段也调用 `_find_mapping`(需要扩展为递归查找 `child_mappings`),将其加入 `predicate_columns`。 +第一阶段只识别稳定且可控的模式: -#### Step 3: literal 类型推断适配 +- `struct_element(VSlotRef(parent), literal_name_or_index)` +- 多层嵌套的连续 `struct_element(struct_element(...), ...)` +- 比较谓词中的常量 literal,用于后续 pruning target 构造 -`find_slot_rewrite_info` 当前要求 binary_predicate 的直接 child 是 VSlotRef(或 Cast(VSlotRef))。对于 `struct_element(...) > 5`,第一个 child 是 function call。需要新增一个 fast path:识别 `struct_element(VSlotRef(parent), literal(name)) > literal` 模式,提取子字段的 file_type 并重写 literal。 +暂不识别: -### 1.4 影响范围 +- `LIST`/`MAP` 元素访问 +- 动态 field name +- 非 deterministic 表达式 +- 需要 row-level 计算才能确定 child path 的表达式 -| 文件 | 改动 | -|---|---| -| `column_mapper.cpp` | `build_file_slot_rewrite_map` 递归遍历 child_mappings;`_find_mapping` 扩展为递归查找 child_mappings;新增 `struct_element` fast path | -| `table_reader.h` | `_build_table_filters_from_conjuncts` 扩展 slot_id 收集,识别 struct_element 内部引用的子字段 | -| 无需改动 parquet reader 内部 | 谓词的运行时执行路径不变 | +## 5. 当前实现状态 ---- +### 5.1 行级 Expr localization -## 2. 统计信息过滤:复杂类型叶子列无法参与剪枝 +已实现: -### 2.1 问题根因 +- `struct_element(VSlotRef(parent), literal child)` 链可以被识别为 nested path。 +- 行级表达式仍保留 `struct_element(file_struct_slot, field)` 形态,只改写 parent slot 到 file-local top-level block slot。 +- 不把 struct child 注册为独立 block slot,也不把 `struct_element` 改成 child `VSlotRef`。 -`parquet_statistics.cpp` 中有 4 处 `kind != PRIMITIVE` 检查,会在遇到复杂类型时直接跳过全部剪枝: +### 5.2 filter-only nested projection -| 函数 | 行号 | 跳过的剪枝类型 | -|---|---|---| -| `RowGroupPruneReason` | 566 | Min/max + Dictionary | -| `BloomFilterPruneReason` | 293 | Bloom filter | -| `select_ranges_for_filter` | 889 | Page index | -| `supports_dictionary_pruning` | 363 | Dictionary(间接) | +已实现: -根因是统计剪枝函数使用 `file_schema[column_filter.file_column_id]`(平铺索引),而 `file_column_id` 是顶级列索引(struct/list/map 的索引,其 `leaf_column_id = -1`)。 +- filter 引用的 struct child 会合并到同一个 top-level complex column 的 `FieldProjection.children`。 +- output child 顺序保持优先,filter-only child 追加到 read projection。 +- filter-only child 不加入 `ColumnMapping.child_mappings`,避免 table output materialization 把它当作输出字段。 +- `ColumnMapping` 保存 `original_file_type` / `original_file_children`,重复创建 split-local request 时可以从原始 file schema 重建 read projection。 -**DuckDB 的做法**:每个 leaf `ParquetColumnSchema` 有自己的 `column_index`(等价于 Doris 的 `leaf_column_id`),直接指向 `RowGroup.ColumnChunk[column_index]`。`PrepareRowGroupBuffer` 通过 `root_reader.GetChildReader(column_id)` 获取叶子 reader,`column_reader.Stats()` 返回该叶子列的统计信息。`StructColumnReader` 只是中间节点,其 `Stats()` 递归聚合子字段统计。 +### 5.3 nested file-layer pruning target -Doris 的 `leaf_column_id` 已经存在于每个叶子 `ParquetColumnSchema` 中。需要补齐的是:让统计剪枝函数能够从顶级列索引 + 子路径 → 定位叶子 `ParquetColumnSchema`。 +已实现: -### 2.2 实现方案 +- `FileColumnPredicateFilter` 保留 `file_column_id`,新增 `file_child_id_path`。 +- `file_child_id_path` 是 top-level file column 下的 file-local child field id path,不是 table id,也不是 ordinal。 +- mapper 会从 AND 语义下的 `struct_element(...) op literal` / `literal op struct_element(...)` 构造 nested file-layer pruning hint。 +- 不从 OR/NOT/任意函数子树中提取 pruning predicate,避免把非必要条件当成必需条件裁剪。 +- literal 转换到 file leaf type 失败、path 解析失败、leaf 不是 primitive 时,不生成 pruning hint。 -#### Step 1: 在 `FileColumnPredicateFilter` 中增加子字段路径 +### 5.4 Parquet leaf resolver and pruning -```cpp -struct FileColumnPredicateFilter { - ColumnId file_column_id = -1; // 顶级列索引(不变) - std::vector child_field_path; // 新增:struct/map 内部子字段的 field_id 路径 - std::vector> predicates; -}; -``` +已实现: + +- `ParquetStatisticsUtils::ResolvePredicateLeafSchema()` 统一解析 top-level 或 nested target。 +- 解析结果必须是 primitive leaf、`leaf_column_id >= 0` 且 `max_repetition_level == 0`。 +- row group min/max statistics 使用 resolved leaf schema。 +- dictionary pruning 使用 resolved leaf schema 和 leaf `ColumnChunk`,仍保持 string-like、dictionary-encoded、EQ/IN_LIST 限制。 +- bloom filter 使用 resolved leaf schema,仍保持 supported primitive type、EQ/IN_LIST/null 相关限制。 +- page index 使用 resolved leaf schema,只允许 non-repeated primitive leaf;LIST/MAP/repeated leaf 直接跳过 page range pruning。 + +## 6. 统计信息 / pruning 设计约束 -`child_field_path` 为空表示过滤顶级原始列(现有行为不变)。非空表示过滤复杂类型内部的叶子列: -- `s.id` → `child_field_path = [0]`(struct 的第 0 个子字段 id) -- `m.value` → 对于 MAP,key_value 是第 0 个子(struct),value 是 struct 的第 1 个子 → `child_field_path = [0, 1]` +### 6.1 leaf schema resolver -#### Step 2: 新增 schema tree 中的叶子定位函数 +当前 resolver: ```cpp -// parquet_column_schema.h -// 沿 child_field_path 从顶级列定位到叶子 ParquetColumnSchema -const ParquetColumnSchema* resolve_leaf_schema( - const std::vector>& file_schema, - ColumnId file_column_id, - const std::vector& child_field_path); +const ParquetColumnSchema* ParquetStatisticsUtils::ResolvePredicateLeafSchema( + const std::vector>& file_schema, + const reader::FileColumnPredicateFilter& column_filter); ``` -实现沿 `children` 逐级递进,返回叶子 `ParquetColumnSchema*`。 +解析规则: -#### Step 3: 修改 4 处 PRIMITIVE 检查 +1. `target.file_column_id` 必须是合法 top-level file schema id。 +2. `target.file_child_id_path` 为空时,target schema 就是 top-level schema。 +3. path 非空时,逐层在 `ParquetColumnSchema::children` 中按 file-local `field_id` 匹配。 +4. 最终 schema 必须是 primitive leaf,且 `leaf_column_id >= 0`。 +5. 本轮只允许 `max_repetition_level == 0` 的 leaf。任何 LIST/MAP/repeated path 直接不剪枝。 -将 `schema[column_filter.file_column_id]` 替换为 `resolve_leaf_schema(schema, column_filter.file_column_id, column_filter.child_field_path)`。解析成功后,`leaf_column_id`、`descriptor`、`type` 均可直接使用,后续统计逻辑无需修改。 +这相当于 Doris 版本的 DuckDB `StructFilter::CheckStatistics()`:不是在 filter 对象里递归拿 child stats,而是在 pruning 层先把 nested target 解析到 Parquet leaf schema,再复用现有 primitive pruning 逻辑。 -```cpp -// 之前 -const auto& column_schema = *schema[column_filter.file_column_id]; -if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE) { return NONE; } - -// 之后 -const auto* leaf_schema = resolve_leaf_schema(schema, column_filter.file_column_id, - column_filter.child_field_path); -if (leaf_schema == nullptr || leaf_schema->kind != ParquetColumnSchemaKind::PRIMITIVE - || leaf_schema->leaf_column_id < 0) { return NONE; } -// 使用 leaf_schema 替代 column_schema -``` +### 6.2 支持范围 + +第一阶段支持: + +- top-level primitive column 的现有 pruning。 +- `STRUCT` / nested `STRUCT` 下的 primitive leaf: + - min/max statistics row group pruning。 + - string-like dictionary pruning。 + - supported primitive bloom filter pruning。 + - page index row range pruning。 + +第一阶段不支持: + +- `LIST` element predicate。 +- `MAP` key/value predicate。 +- repeated primitive / repeated group 的 leaf pruning。 +- page index pruning on repeated leaf。 +- complex child schema change 的完整 pruning 语义。 -#### Step 4: 在 `localize_filters` 中填充 `child_field_path` +### 6.3 pruning 类型处理 -当 `_find_mapping` 查找到的是子字段(在 child_mappings 中)时,沿 ColumnMapping 树向上回溯,构建 `child_field_path`。 +#### Row group min/max -### 2.3 限制 +将 `schema[column_filter.file_column_id]` 替换为 `resolve_predicate_leaf_schema()` 的结果。对 resolved leaf 调用现有 `TransformColumnStatistics()` 和 `CheckStatistics()`。 -以下场景的统计剪枝仍然不可用(当前和改为后均不可用,不在此方案范围): +#### Dictionary -- **LIST 元素**:`list[0] > 5` 形式的过滤不适用于行组级统计剪枝(元素级别的 min/max 不代表行组级别的 min/max) -- **MAP key/value**:`m['k'] > 5` 同理——映射到特定 key 的 value 没有行组级聚合统计 -- **嵌套 LIST**:`List>` 中内层元素的统计没有行组级意义 +dictionary pruning 只对 resolved primitive leaf 生效。现有 string-like 限制保持不变。无法读取 dictionary page 或 predicate 不支持时保留 row group。 -### 2.4 影响范围 +#### Bloom filter -| 文件 | 改动 | -|---|---| -| `file_reader.h` | `FileColumnPredicateFilter` 增加 `child_field_path` 字段 | -| `parquet_column_schema.h/.cpp` | 新增 `resolve_leaf_schema()` | -| `parquet_statistics.cpp` | 4 处 PRIMITIVE 检查改为 `resolve_leaf_schema()` + 叶子判断 | -| `column_mapper.cpp` | `localize_filters` 中为子字段 filter 填充 `child_field_path` | -| `parquet_reader.cpp` | `open()` 中的列验证适配含有 child_field_path 的 filter | +DuckDB 只对非 nested primitive reader 应用 bloom filter。Doris 当前通过 resolved leaf 接入 nested struct primitive leaf,但仍只处理 Arrow adapter 已支持且 predicate 可安全转换的 primitive 类型。不确定时保留 row group。 ---- +#### Page index -## 3. 实施顺序 +page index 对 repeated leaf 的 row range 语义复杂。本轮只允许 non-repeated primitive leaf。`STRUCT` 下 non-repeated primitive leaf 可以复用现有 page index range 逻辑;LIST/MAP/repeated leaf 直接跳过。 -建议分两个独立 PR: +## 7. 后续工作 -**PR 1: 谓词过滤**(Filter localization for nested fields) -- Step 1: `build_file_slot_rewrite_map` 递归 child_mappings -- Step 2: `localize_filters` 识别 struct_element,收集子字段 slot_id 并匹配 child_mappings -- Step 3: `struct_element` → 子字段 VSlotRef fast path -- 测试:`SELECT * FROM t WHERE s.id > 5` 验证谓词列被正确标记 +- 扩展 UT 覆盖 nested struct 多层 path、反向比较、OR 不提取、缺失 child 不下推。 +- 增加 nested string leaf dictionary pruning、nested page index pruning、nested bloom pruning 的真实 parquet fixture。 +- 支持从 `IN_PRED` 的 `struct_element(...) IN (...)` 构造 nested `IN_LIST` pruning hint。 +- schema change 场景下,把 table nested path 到 file nested path 的 mapping 入口收敛到 mapper,不让 file reader 理解 table/global schema。 +- LIST/MAP/repeated leaf 只有在 Dremel row semantics 和 row-range 语义明确后再接入 pruning。 -**PR 2: 统计信息过滤**(Statistics pruning for nested leaf columns) -- Step 1: `FileColumnPredicateFilter` 增加 `child_field_path` -- Step 2: `resolve_leaf_schema()` -- Step 3: 4 处 PRIMITIVE 检查改为路径解析 -- Step 4: `localize_filters` 填充 `child_field_path` -- 测试:验证 `s.id > 5` 能正确剪枝(准备只有 2 个 row group 的 parquet 文件,一个 `s.id` 全 < 5,一个全 > 5,验证前者被剪枝) +## 8. 需要避免的实现 -两个 PR 独立:PR 1 不依赖 PR 2(filter localization 可在没有统计剪枝时工作),但 PR 2 依赖 PR 1 的 `child_mappings` 遍历能力。 +- 不要把 struct child 注册成独立 `column_positions` block slot。 +- 不要把 `struct_element(...)` 改写成 child `VSlotRef`。 +- 不要把 `ColumnPredicate` 用于行级过滤。 +- 不要对 LIST/MAP/repeated leaf 做 row group/page pruning,除非后续有明确的 Dremel row semantics 证明。 +- 不要新增语义不清的 `child_field_path`。如果新增 path 字段,必须明确它是 file-local child field id path。 From ba3b94d83c1e44782ba9307c1f2a1c487c542442 Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 4 Jun 2026 01:45:52 +0800 Subject: [PATCH 2/8] [fix](be) Fix parquet column reader test compile ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: Fix a missing local Status declaration in ParquetColumnReaderTest so the BE unit test target can compile. ### Release note None ### Check List (For Author) - Test: Manual test - Ran build-support/clang-format.sh for the modified test file. - Ran git diff --check. - Fedora BE UT will be rerun after push. - Behavior changed: No - Does this need documentation: No --- be/test/format/new_parquet/parquet_column_reader_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/test/format/new_parquet/parquet_column_reader_test.cpp b/be/test/format/new_parquet/parquet_column_reader_test.cpp index 21f09df58ac522..5c1119318b9532 100644 --- a/be/test/format/new_parquet/parquet_column_reader_test.cpp +++ b/be/test/format/new_parquet/parquet_column_reader_test.cpp @@ -1389,7 +1389,7 @@ TEST_F(ParquetColumnReaderTest, SkipThenRead) { MutableColumnPtr column = reader->type()->create_column(); int64_t rows_read = 0; - st = reader->read(2, column, &rows_read); + auto st = reader->read(2, column, &rows_read); ASSERT_TRUE(st.ok()) << st; ASSERT_EQ(rows_read, 2); From 9eed54766a58a8d5634dc758390ef07fe4b791d4 Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 4 Jun 2026 01:51:25 +0800 Subject: [PATCH 3/8] [fix](be) Fix parquet column reader status declarations ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: Fix duplicate and missing status variable declarations in parquet column reader unit tests so the BE UT target can compile. ### Release note None ### Check List (For Author) - Test: Manual test - git diff --check - Behavior changed: No - Does this need documentation: No --- be/test/format/new_parquet/parquet_column_reader_test.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/test/format/new_parquet/parquet_column_reader_test.cpp b/be/test/format/new_parquet/parquet_column_reader_test.cpp index 5c1119318b9532..7dad8798b7b0d7 100644 --- a/be/test/format/new_parquet/parquet_column_reader_test.cpp +++ b/be/test/format/new_parquet/parquet_column_reader_test.cpp @@ -1389,7 +1389,7 @@ TEST_F(ParquetColumnReaderTest, SkipThenRead) { MutableColumnPtr column = reader->type()->create_column(); int64_t rows_read = 0; - auto st = reader->read(2, column, &rows_read); + st = reader->read(2, column, &rows_read); ASSERT_TRUE(st.ok()) << st; ASSERT_EQ(rows_read, 2); @@ -1758,7 +1758,7 @@ TEST_F(ParquetColumnReaderTest, ReadProjectedStructMapChildOnly) { MutableColumnPtr column = reader->type()->create_column(); int64_t rows_read = 0; - st = reader->read(2, column, &rows_read); + auto st = reader->read(2, column, &rows_read); ASSERT_TRUE(st.ok()) << st; ASSERT_EQ(rows_read, 2); st = reader->read(3, column, &rows_read); From 5aab85d7004a27afc7ec43eae2af59faec4c19ae Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 4 Jun 2026 02:27:50 +0800 Subject: [PATCH 4/8] [feature](be) Support nested parquet IN pruning hints ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: Add nested STRUCT IN-list pruning hint extraction for new parquet scans and restore explicit nested scalar value index mapping so nullable struct parent/child values remain aligned with Arrow RecordReader output. ### Release note None ### Check List (For Author) - Test: Manual test - build-support/clang-format.sh be/src/format/reader/column_mapper.cpp be/src/format/new_parquet/reader/nested_column_reader.h be/src/format/new_parquet/reader/arrow_leaf_reader_adapter.cpp be/test/format/new_parquet/parquet_reader_test.cpp - git diff --check - Behavior changed: No - Does this need documentation: No --- .../reader/arrow_leaf_reader_adapter.cpp | 21 ++ .../new_parquet/reader/nested_column_reader.h | 35 +- be/src/format/reader/column_mapper.cpp | 86 ++++- .../new_parquet/parquet_reader_test.cpp | 303 ++++++++++++++++++ 4 files changed, 422 insertions(+), 23 deletions(-) diff --git a/be/src/format/new_parquet/reader/arrow_leaf_reader_adapter.cpp b/be/src/format/new_parquet/reader/arrow_leaf_reader_adapter.cpp index 7ed69c5e48b596..b1fab9042fa0fa 100644 --- a/be/src/format/new_parquet/reader/arrow_leaf_reader_adapter.cpp +++ b/be/src/format/new_parquet/reader/arrow_leaf_reader_adapter.cpp @@ -276,6 +276,27 @@ Status read_nested_leaf_batch(const ArrowLeafReaderContext& context, int64_t bat std::copy(rep_levels, rep_levels + batch->levels_written, batch->rep_levels.begin()); } + batch->value_indices.resize(static_cast(batch->levels_written), -1); + int64_t value_idx = 0; + const bool dense_value_slots = values_written == batch->levels_written; + for (int64_t level_idx = 0; level_idx < batch->levels_written; ++level_idx) { + if (batch->def_levels[level_idx] < value_slot_definition_level || + batch->rep_levels[level_idx] > value_slot_repetition_level) { + continue; + } + if (dense_value_slots) { + batch->value_indices[static_cast(level_idx)] = level_idx; + } else { + if (value_idx >= values_written) { + return Status::Corruption( + "Nested parquet reader returned fewer values than definition levels for " + "column {}", + context.column_name()); + } + batch->value_indices[static_cast(level_idx)] = value_idx++; + } + } + const auto value_type = remove_nullable(context.data_type()); batch->values_column = value_type->create_column(); if (values_written > 0) { diff --git a/be/src/format/new_parquet/reader/nested_column_reader.h b/be/src/format/new_parquet/reader/nested_column_reader.h index 9b0a0043b3f360..ad6bd90c49b58c 100644 --- a/be/src/format/new_parquet/reader/nested_column_reader.h +++ b/be/src/format/new_parquet/reader/nested_column_reader.h @@ -45,6 +45,7 @@ struct NestedScalarBatch { int16_t value_slot_repetition_level = std::numeric_limits::max(); std::vector def_levels; std::vector rep_levels; + std::vector value_indices; MutableColumnPtr values_column; bool empty() const { return levels_written == 0; } @@ -137,25 +138,15 @@ class NestedScalarValueCursor { void reset(const NestedScalarBatch* batch) { DORIS_CHECK(batch != nullptr); _batch = batch; - _next_level_idx = 0; - _next_value_idx = 0; } Status value_index(const std::string& column_name, int64_t level_idx, int64_t* value_idx) { DORIS_CHECK(_batch != nullptr); DORIS_CHECK(value_idx != nullptr); - DORIS_CHECK(level_idx >= _next_level_idx); DORIS_CHECK(level_idx < _batch->levels_written); - int64_t computed_value_idx = -1; - while (_next_level_idx <= level_idx) { - if (has_value_slot(_next_level_idx)) { - if (_next_level_idx == level_idx) { - computed_value_idx = _next_value_idx; - } - ++_next_value_idx; - } - ++_next_level_idx; - } + DORIS_CHECK(level_idx >= 0); + DORIS_CHECK(static_cast(level_idx) < _batch->value_indices.size()); + const int64_t computed_value_idx = _batch->value_indices[static_cast(level_idx)]; if (computed_value_idx < 0) { return Status::Corruption("Nested parquet value is absent for column {}", column_name); } @@ -170,14 +161,13 @@ class NestedScalarValueCursor { bool has_value_slot(int64_t level_idx) const { DORIS_CHECK(_batch != nullptr); - return _batch->def_levels[level_idx] >= _batch->value_slot_definition_level && - _batch->rep_levels[level_idx] <= _batch->value_slot_repetition_level; + DORIS_CHECK(level_idx >= 0); + DORIS_CHECK(static_cast(level_idx) < _batch->value_indices.size()); + return _batch->value_indices[static_cast(level_idx)] >= 0; } private: const NestedScalarBatch* _batch = nullptr; - int64_t _next_level_idx = 0; - int64_t _next_value_idx = 0; }; inline void move_nested_scalar_tail(const NestedScalarBatch& src, int64_t start_level, @@ -195,17 +185,18 @@ inline void move_nested_scalar_tail(const NestedScalarBatch& src, int64_t start_ dst.rep_levels.assign(src.rep_levels.begin() + start_level, src.rep_levels.end()); dst.value_slot_definition_level = src.value_slot_definition_level; dst.value_slot_repetition_level = src.value_slot_repetition_level; + dst.value_indices.resize(static_cast(dst.levels_written), -1); dst.values_column = src.values_column->clone_empty(); - NestedScalarValueCursor value_cursor(&src); + int64_t values_written = 0; for (int64_t level_idx = start_level; level_idx < src.levels_written; ++level_idx) { - if (!value_cursor.has_value_slot(level_idx)) { + const int64_t value_idx = src.value_indices[static_cast(level_idx)]; + if (value_idx < 0) { continue; } - int64_t value_idx = -1; - auto status = value_cursor.value_index("overflow", level_idx, &value_idx); - DORIS_CHECK(status.ok()); + dst.value_indices[static_cast(level_idx - start_level)] = values_written; dst.values_column->insert_from(*src.values_column, static_cast(value_idx)); + values_written++; } overflow->batch = std::move(dst); } diff --git a/be/src/format/reader/column_mapper.cpp b/be/src/format/reader/column_mapper.cpp index 1efd255673b688..7e7e7894ddd873 100644 --- a/be/src/format/reader/column_mapper.cpp +++ b/be/src/format/reader/column_mapper.cpp @@ -29,6 +29,8 @@ #include "common/status.h" #include "core/data_type/convert_field_to_type.h" #include "core/data_type/data_type_nullable.h" +#include "exprs/create_predicate_function.h" +#include "exprs/vin_predicate.h" #include "format/reader/expr/cast.h" #include "format/reader/expr/literal.h" #include "format/reader/expr/slot_ref.h" @@ -482,6 +484,46 @@ static std::shared_ptr build_nested_comparison_predicate( } } +static std::shared_ptr build_nested_in_list_predicate( + const VExprSPtrs& literal_exprs, const NestedPredicateTargetInfo& target) { + if (literal_exprs.empty() || target.file_leaf_type == nullptr) { + return nullptr; + } + + auto value_column = target.file_leaf_type->create_column(); + for (const auto& literal_expr : literal_exprs) { + if (literal_expr == nullptr || !literal_expr->is_literal()) { + return nullptr; + } + const auto original_literal = original_table_literal(literal_expr); + const Field original_field = literal_field(original_literal); + Field file_field; + try { + convert_field_to_type(original_field, *target.file_leaf_type, &file_field, + original_literal->data_type().get()); + } catch (const Exception&) { + return nullptr; + } + if (file_field.is_null()) { + return nullptr; + } + value_column->insert(file_field); + } + + std::shared_ptr values; + try { + values.reset(create_set(target.file_leaf_type->get_primitive_type(), literal_exprs.size(), + false)); + ColumnPtr value_column_ptr = std::move(value_column); + values->insert_range_from(value_column_ptr, 0, value_column_ptr->size()); + return create_in_list_predicate( + cast_set(target.root_file_column_id), target.leaf_name, + target.file_leaf_type, values, false); + } catch (const Exception&) { + return nullptr; + } +} + static bool extract_nested_binary_comparison_filter(const VExprSPtr& expr, const std::vector& mappings, FileColumnPredicateFilter* column_filter) { @@ -517,6 +559,47 @@ static bool extract_nested_binary_comparison_filter(const VExprSPtr& expr, return true; } +static bool extract_nested_in_list_filter(const VExprSPtr& expr, + const std::vector& mappings, + FileColumnPredicateFilter* column_filter) { + DORIS_CHECK(column_filter != nullptr); + if (expr == nullptr || expr->node_type() != TExprNodeType::IN_PRED || + expr->get_num_children() < 2) { + return false; + } + if (const auto* in_predicate = dynamic_cast(expr.get()); + in_predicate != nullptr && in_predicate->is_not_in()) { + return false; + } + + NestedStructPath path; + if (!extract_nested_struct_path(expr->children()[0], &path)) { + return false; + } + + VExprSPtrs literal_exprs; + literal_exprs.reserve(expr->get_num_children() - 1); + for (size_t child_idx = 1; child_idx < expr->children().size(); ++child_idx) { + if (!expr->children()[child_idx]->is_literal()) { + return false; + } + literal_exprs.push_back(expr->children()[child_idx]); + } + + NestedPredicateTargetInfo target; + if (!resolve_nested_predicate_target(path, mappings, &target)) { + return false; + } + auto predicate = build_nested_in_list_predicate(literal_exprs, target); + if (predicate == nullptr) { + return false; + } + column_filter->file_column_id = target.root_file_column_id; + column_filter->file_child_id_path = std::move(target.file_child_id_path); + column_filter->predicates.push_back(std::move(predicate)); + return true; +} + static void merge_column_predicate_filter(FileColumnPredicateFilter column_filter, std::vector* filters) { DORIS_CHECK(filters != nullptr); @@ -548,7 +631,8 @@ static void collect_nested_column_predicate_filters( return; } FileColumnPredicateFilter column_filter; - if (extract_nested_binary_comparison_filter(expr, mappings, &column_filter)) { + if (extract_nested_binary_comparison_filter(expr, mappings, &column_filter) || + extract_nested_in_list_filter(expr, mappings, &column_filter)) { merge_column_predicate_filter(std::move(column_filter), filters); } } diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp b/be/test/format/new_parquet/parquet_reader_test.cpp index 5e0fe23b2fedec..13c7ee419cc16f 100644 --- a/be/test/format/new_parquet/parquet_reader_test.cpp +++ b/be/test/format/new_parquet/parquet_reader_test.cpp @@ -141,6 +141,17 @@ VExprSPtr struct_element_expr(const VExprSPtr& parent, const DataTypePtr& child_ return expr; } +VExprSPtr in_predicate_expr(const VExprSPtr& probe_expr, const DataTypePtr& literal_type, + const std::vector& values) { + auto expr = std::make_shared("in", std::make_shared(), + TExprNodeType::IN_PRED); + expr->add_child(probe_expr); + for (const auto& value : values) { + expr->add_child(TableLiteral::create_shared(literal_type, value)); + } + return expr; +} + class Int32SumGreaterThanExpr final : public VExpr { public: Int32SumGreaterThanExpr(int left_column_id, int right_column_id, int32_t value) @@ -672,6 +683,298 @@ TEST(TableColumnMapperTest, MergesStructFilterOnlyChildIntoPredicateProjection) EXPECT_EQ(read_type->get_element_name(1), "a"); } +TEST(TableColumnMapperTest, BuildsNestedStructInListPredicateFilter) { + auto a_type = std::make_shared(); + auto b_type = std::make_shared(); + reader::SchemaField a_field; + a_field.id = 0; + a_field.name = "a"; + a_field.type = a_type; + reader::SchemaField b_field; + b_field.id = 1; + b_field.name = "b"; + b_field.type = b_type; + reader::SchemaField struct_field; + struct_field.id = 0; + struct_field.name = "s"; + struct_field.type = + std::make_shared(DataTypes {a_type, b_type}, Strings {"a", "b"}); + struct_field.children = {a_field, b_field}; + + reader::TableColumn table_child; + table_child.id = 101; + table_child.name = "b"; + table_child.type = b_type; + reader::TableColumn table_column; + table_column.id = 100; + table_column.name = "s"; + table_column.type = std::make_shared(DataTypes {b_type}, Strings {"b"}); + table_column.children = {table_child}; + + const auto full_table_struct_type = + std::make_shared(DataTypes {a_type, b_type}, Strings {"a", "b"}); + auto filter_expr = in_predicate_expr( + struct_element_expr( + TableSlotRef::create_shared(100, 100, -1, full_table_struct_type, "s"), a_type, + "a"), + a_type, {Field::create_field(5), Field::create_field(7)}); + reader::TableFilter table_filter { + .conjunct = VExprContext::create_shared(filter_expr), + .slot_ids = {100}, + }; + + reader::TableColumnMapperOptions options; + options.mode = reader::TableColumnMappingMode::BY_NAME; + reader::TableColumnMapper mapper(options); + ASSERT_TRUE(mapper.create_mapping({table_column}, {}, {struct_field}).ok()); + + reader::FileScanRequest request; + ASSERT_TRUE(mapper.create_scan_request({table_filter}, {}, {table_column}, &request).ok()); + + ASSERT_EQ(request.column_predicate_filters.size(), 1); + EXPECT_EQ(request.column_predicate_filters[0].file_column_id, 0); + EXPECT_EQ(request.column_predicate_filters[0].file_child_id_path, std::vector({0})); + ASSERT_EQ(request.column_predicate_filters[0].predicates.size(), 1); + EXPECT_EQ(request.column_predicate_filters[0].predicates[0]->type(), PredicateType::IN_LIST); +} + +TEST(TableColumnMapperTest, BuildsNestedStructPredicateFilterForReverseComparison) { + auto a_type = std::make_shared(); + auto b_type = std::make_shared(); + reader::SchemaField a_field; + a_field.id = 0; + a_field.name = "a"; + a_field.type = a_type; + reader::SchemaField b_field; + b_field.id = 1; + b_field.name = "b"; + b_field.type = b_type; + reader::SchemaField struct_field; + struct_field.id = 0; + struct_field.name = "s"; + struct_field.type = + std::make_shared(DataTypes {a_type, b_type}, Strings {"a", "b"}); + struct_field.children = {a_field, b_field}; + + reader::TableColumn table_child; + table_child.id = 101; + table_child.name = "b"; + table_child.type = b_type; + reader::TableColumn table_column; + table_column.id = 100; + table_column.name = "s"; + table_column.type = std::make_shared(DataTypes {b_type}, Strings {"b"}); + table_column.children = {table_child}; + + const auto full_table_struct_type = + std::make_shared(DataTypes {a_type, b_type}, Strings {"a", "b"}); + auto filter_expr = std::make_shared( + "lt", std::make_shared(), TExprNodeType::BINARY_PRED, TExprOpcode::LT); + filter_expr->add_child(TableLiteral::create_shared(a_type, Field::create_field(5))); + filter_expr->add_child(struct_element_expr( + TableSlotRef::create_shared(100, 100, -1, full_table_struct_type, "s"), a_type, "a")); + reader::TableFilter table_filter { + .conjunct = VExprContext::create_shared(filter_expr), + .slot_ids = {100}, + }; + + reader::TableColumnMapperOptions options; + options.mode = reader::TableColumnMappingMode::BY_NAME; + reader::TableColumnMapper mapper(options); + ASSERT_TRUE(mapper.create_mapping({table_column}, {}, {struct_field}).ok()); + + reader::FileScanRequest request; + ASSERT_TRUE(mapper.create_scan_request({table_filter}, {}, {table_column}, &request).ok()); + + ASSERT_EQ(request.column_predicate_filters.size(), 1); + EXPECT_EQ(request.column_predicate_filters[0].file_column_id, 0); + EXPECT_EQ(request.column_predicate_filters[0].file_child_id_path, std::vector({0})); + ASSERT_EQ(request.column_predicate_filters[0].predicates.size(), 1); + EXPECT_EQ(request.column_predicate_filters[0].predicates[0]->type(), PredicateType::GT); +} + +TEST(TableColumnMapperTest, BuildsNestedStructInListPredicateFilterForDeepPath) { + auto id_type = std::make_shared(); + auto name_type = std::make_shared(); + auto b_type = std::make_shared(); + auto inner_type = + std::make_shared(DataTypes {id_type, name_type}, Strings {"id", "n"}); + auto full_struct_type = + std::make_shared(DataTypes {inner_type, b_type}, Strings {"a", "b"}); + + reader::SchemaField id_field; + id_field.id = 0; + id_field.name = "id"; + id_field.type = id_type; + reader::SchemaField name_field; + name_field.id = 1; + name_field.name = "n"; + name_field.type = name_type; + reader::SchemaField a_field; + a_field.id = 0; + a_field.name = "a"; + a_field.type = inner_type; + a_field.children = {id_field, name_field}; + reader::SchemaField b_field; + b_field.id = 1; + b_field.name = "b"; + b_field.type = b_type; + reader::SchemaField struct_field; + struct_field.id = 0; + struct_field.name = "s"; + struct_field.type = full_struct_type; + struct_field.children = {a_field, b_field}; + + reader::TableColumn table_child; + table_child.id = 101; + table_child.name = "b"; + table_child.type = b_type; + reader::TableColumn table_column; + table_column.id = 100; + table_column.name = "s"; + table_column.type = std::make_shared(DataTypes {b_type}, Strings {"b"}); + table_column.children = {table_child}; + + auto nested_id_expr = struct_element_expr( + struct_element_expr(TableSlotRef::create_shared(100, 100, -1, full_struct_type, "s"), + inner_type, "a"), + id_type, "id"); + auto filter_expr = + in_predicate_expr(nested_id_expr, id_type, + {Field::create_field(5), Field::create_field(7)}); + reader::TableFilter table_filter { + .conjunct = VExprContext::create_shared(filter_expr), + .slot_ids = {100}, + }; + + reader::TableColumnMapperOptions options; + options.mode = reader::TableColumnMappingMode::BY_NAME; + reader::TableColumnMapper mapper(options); + ASSERT_TRUE(mapper.create_mapping({table_column}, {}, {struct_field}).ok()); + + reader::FileScanRequest request; + ASSERT_TRUE(mapper.create_scan_request({table_filter}, {}, {table_column}, &request).ok()); + + ASSERT_EQ(request.column_predicate_filters.size(), 1); + EXPECT_EQ(request.column_predicate_filters[0].file_column_id, 0); + EXPECT_EQ(request.column_predicate_filters[0].file_child_id_path, std::vector({0, 0})); + ASSERT_EQ(request.column_predicate_filters[0].predicates.size(), 1); + EXPECT_EQ(request.column_predicate_filters[0].predicates[0]->type(), PredicateType::IN_LIST); +} + +TEST(TableColumnMapperTest, DoesNotBuildNestedPredicateFilterForMissingChild) { + auto a_type = std::make_shared(); + auto b_type = std::make_shared(); + reader::SchemaField a_field; + a_field.id = 0; + a_field.name = "a"; + a_field.type = a_type; + reader::SchemaField b_field; + b_field.id = 1; + b_field.name = "b"; + b_field.type = b_type; + reader::SchemaField struct_field; + struct_field.id = 0; + struct_field.name = "s"; + struct_field.type = + std::make_shared(DataTypes {a_type, b_type}, Strings {"a", "b"}); + struct_field.children = {a_field, b_field}; + + reader::TableColumn table_child; + table_child.id = 101; + table_child.name = "b"; + table_child.type = b_type; + reader::TableColumn table_column; + table_column.id = 100; + table_column.name = "s"; + table_column.type = std::make_shared(DataTypes {b_type}, Strings {"b"}); + table_column.children = {table_child}; + + const auto full_table_struct_type = + std::make_shared(DataTypes {a_type, b_type}, Strings {"a", "b"}); + auto filter_expr = std::make_shared( + "gt", std::make_shared(), TExprNodeType::BINARY_PRED, TExprOpcode::GT); + filter_expr->add_child(struct_element_expr( + TableSlotRef::create_shared(100, 100, -1, full_table_struct_type, "s"), a_type, + "missing")); + filter_expr->add_child(TableLiteral::create_shared(a_type, Field::create_field(5))); + reader::TableFilter table_filter { + .conjunct = VExprContext::create_shared(filter_expr), + .slot_ids = {100}, + }; + + reader::TableColumnMapperOptions options; + options.mode = reader::TableColumnMappingMode::BY_NAME; + reader::TableColumnMapper mapper(options); + ASSERT_TRUE(mapper.create_mapping({table_column}, {}, {struct_field}).ok()); + + reader::FileScanRequest request; + ASSERT_TRUE(mapper.create_scan_request({table_filter}, {}, {table_column}, &request).ok()); + + EXPECT_TRUE(request.column_predicate_filters.empty()); +} + +TEST(TableColumnMapperTest, DoesNotBuildNestedPredicateFilterFromOr) { + auto a_type = std::make_shared(); + auto b_type = std::make_shared(); + reader::SchemaField a_field; + a_field.id = 0; + a_field.name = "a"; + a_field.type = a_type; + reader::SchemaField b_field; + b_field.id = 1; + b_field.name = "b"; + b_field.type = b_type; + reader::SchemaField struct_field; + struct_field.id = 0; + struct_field.name = "s"; + struct_field.type = + std::make_shared(DataTypes {a_type, b_type}, Strings {"a", "b"}); + struct_field.children = {a_field, b_field}; + + reader::TableColumn table_child; + table_child.id = 101; + table_child.name = "b"; + table_child.type = b_type; + reader::TableColumn table_column; + table_column.id = 100; + table_column.name = "s"; + table_column.type = std::make_shared(DataTypes {b_type}, Strings {"b"}); + table_column.children = {table_child}; + + const auto full_table_struct_type = + std::make_shared(DataTypes {a_type, b_type}, Strings {"a", "b"}); + auto left = std::make_shared("gt", std::make_shared(), + TExprNodeType::BINARY_PRED, TExprOpcode::GT); + left->add_child(struct_element_expr( + TableSlotRef::create_shared(100, 100, -1, full_table_struct_type, "s"), a_type, "a")); + left->add_child(TableLiteral::create_shared(a_type, Field::create_field(5))); + auto right = in_predicate_expr( + struct_element_expr( + TableSlotRef::create_shared(100, 100, -1, full_table_struct_type, "s"), a_type, + "a"), + a_type, {Field::create_field(7)}); + auto filter_expr = std::make_shared("or", std::make_shared(), + TExprNodeType::COMPOUND_PRED, + TExprOpcode::COMPOUND_OR); + filter_expr->add_child(left); + filter_expr->add_child(right); + reader::TableFilter table_filter { + .conjunct = VExprContext::create_shared(filter_expr), + .slot_ids = {100}, + }; + + reader::TableColumnMapperOptions options; + options.mode = reader::TableColumnMappingMode::BY_NAME; + reader::TableColumnMapper mapper(options); + ASSERT_TRUE(mapper.create_mapping({table_column}, {}, {struct_field}).ok()); + + reader::FileScanRequest request; + ASSERT_TRUE(mapper.create_scan_request({table_filter}, {}, {table_column}, &request).ok()); + + EXPECT_TRUE(request.column_predicate_filters.empty()); +} + TEST(TableColumnMapperTest, CreatesComplexProjectionForMapValueStructChildren) { auto key_type = std::make_shared(); auto a_type = std::make_shared(); From c3e3771bb230c0197f83e8a3d1d0b55ccdae490b Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 4 Jun 2026 02:35:14 +0800 Subject: [PATCH 5/8] [test](be) Cover nested parquet pruning fixtures ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: Add real parquet fixtures for nested struct dictionary and page-index pruning, and update the complex predicate pruning status document. ### Release note None ### Check List (For Author) - Test: Unit Test - git diff --check - Behavior changed: No - Does this need documentation: Yes --- .../new_parquet/parquet_reader_test.cpp | 147 ++++++++++++++++++ ...ex-column-predicate-and-stats-filtering.md | 11 +- 2 files changed, 152 insertions(+), 6 deletions(-) diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp b/be/test/format/new_parquet/parquet_reader_test.cpp index 13c7ee419cc16f..b4815ab2d5a8b6 100644 --- a/be/test/format/new_parquet/parquet_reader_test.cpp +++ b/be/test/format/new_parquet/parquet_reader_test.cpp @@ -375,6 +375,31 @@ void write_dictionary_filter_parquet_file(const std::string& file_path) { builder.build())); } +void write_nested_dictionary_filter_parquet_file(const std::string& file_path) { + auto id_field = arrow::field("id", arrow::int32(), false); + auto name_field = arrow::field("name", arrow::utf8(), false); + auto struct_type = arrow::struct_({id_field, name_field}); + auto schema = arrow::schema({ + arrow::field("s", struct_type, false), + }); + auto table = arrow::Table::Make( + schema, {build_struct_array({1, 2, 3, 4, 5, 6}, {"aa", "az", "lm", "lz", "za", "zz"})}); + + auto file_result = arrow::io::FileOutputStream::Open(file_path); + ASSERT_TRUE(file_result.ok()) << file_result.status(); + std::shared_ptr out = *file_result; + + ::parquet::WriterProperties::Builder builder; + builder.version(::parquet::ParquetVersion::PARQUET_2_6); + builder.data_page_version(::parquet::ParquetDataPageVersion::V2); + builder.compression(::parquet::Compression::UNCOMPRESSED); + builder.enable_dictionary("s.name"); + builder.disable_dictionary("s.id"); + builder.disable_statistics(); + PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, 1, + builder.build())); +} + void write_dictionary_edge_parquet_file(const std::string& file_path) { auto schema = arrow::schema({ arrow::field("id", arrow::int32(), false), @@ -400,6 +425,38 @@ void write_dictionary_edge_parquet_file(const std::string& file_path) { builder.build())); } +void write_nested_page_index_filter_parquet_file(const std::string& file_path) { + std::vector ids(128); + std::iota(ids.begin(), ids.end(), 0); + std::vector names; + names.reserve(ids.size()); + for (const auto id : ids) { + names.push_back("name-" + std::to_string(id)); + } + auto id_field = arrow::field("id", arrow::int32(), false); + auto name_field = arrow::field("name", arrow::utf8(), false); + auto struct_type = arrow::struct_({id_field, name_field}); + auto schema = arrow::schema({ + arrow::field("s", struct_type, false), + }); + auto table = arrow::Table::Make(schema, {build_struct_array(ids, names)}); + + auto file_result = arrow::io::FileOutputStream::Open(file_path); + ASSERT_TRUE(file_result.ok()) << file_result.status(); + std::shared_ptr out = *file_result; + + ::parquet::WriterProperties::Builder builder; + builder.version(::parquet::ParquetVersion::PARQUET_2_6); + builder.data_page_version(::parquet::ParquetDataPageVersion::V2); + builder.compression(::parquet::Compression::UNCOMPRESSED); + builder.disable_dictionary(); + builder.enable_write_page_index(); + builder.write_batch_size(8); + builder.data_pagesize(10); + PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, + ids.size(), builder.build())); +} + void write_page_index_filter_parquet_file(const std::string& file_path) { std::vector ids(128); std::iota(ids.begin(), ids.end(), 0); @@ -1650,6 +1707,50 @@ TEST_F(NewParquetReaderTest, NestedStructPredicateFiltersRowGroupsByStatistics) EXPECT_EQ(plan.pruning_stats.filtered_group_rows, 2); } +TEST_F(NewParquetReaderTest, NestedStructPredicateFiltersRowGroupsByDictionary) { + write_nested_dictionary_filter_parquet_file(_file_path); + auto parquet_file_reader = ::parquet::ParquetFileReader::OpenFile(_file_path, false); + ASSERT_EQ(parquet_file_reader->metadata()->num_row_groups(), 6); + for (int row_group_idx = 0; row_group_idx < 6; ++row_group_idx) { + auto row_group = parquet_file_reader->metadata()->RowGroup(row_group_idx); + ASSERT_NE(row_group, nullptr); + auto name_chunk = row_group->ColumnChunk(1); + ASSERT_NE(name_chunk, nullptr); + ASSERT_TRUE(name_chunk->has_dictionary_page()); + ASSERT_TRUE(name_chunk->statistics() == nullptr || !name_chunk->statistics()->HasMinMax()); + } + + std::vector> file_schema; + auto schema_descriptor = parquet_file_reader->metadata()->schema(); + ASSERT_NE(schema_descriptor, nullptr); + ASSERT_TRUE(parquet::build_parquet_column_schema(*schema_descriptor, &file_schema).ok()); + ASSERT_EQ(file_schema.size(), 1); + ASSERT_EQ(file_schema[0]->children.size(), 2); + ASSERT_EQ(file_schema[0]->children[1]->name, "name"); + + reader::FileScanRequest request; + reader::FileColumnPredicateFilter column_filter; + column_filter.file_column_id = 0; + column_filter.file_child_id_path = {1}; + auto name_type = std::make_shared(); + column_filter.predicates.push_back(create_comparison_predicate( + 0, "name", name_type, Field::create_field("lm"), false)); + request.column_predicate_filters.push_back(std::move(column_filter)); + + parquet::RowGroupScanPlan plan; + parquet::ParquetScanRange scan_range; + ASSERT_TRUE(parquet::plan_parquet_row_groups(*parquet_file_reader->metadata(), + parquet_file_reader.get(), file_schema, request, + scan_range, false, &plan) + .ok()); + ASSERT_EQ(plan.row_groups.size(), 1); + EXPECT_EQ(plan.row_groups[0].row_group_id, 2); + EXPECT_EQ(plan.pruning_stats.total_row_groups, 6); + EXPECT_EQ(plan.pruning_stats.selected_row_groups, 1); + EXPECT_EQ(plan.pruning_stats.filtered_row_groups_by_dictionary, 5); + EXPECT_EQ(plan.pruning_stats.filtered_group_rows, 5); +} + TEST_F(NewParquetReaderTest, PlannerNarrowsRowRangesByPageIndex) { write_page_index_filter_parquet_file(_file_path); auto parquet_file_reader = ::parquet::ParquetFileReader::OpenFile(_file_path, false); @@ -1693,6 +1794,52 @@ TEST_F(NewParquetReaderTest, PlannerNarrowsRowRangesByPageIndex) { EXPECT_EQ(plan.pruning_stats.selected_row_ranges, plan.row_groups[0].selected_ranges.size()); } +TEST_F(NewParquetReaderTest, NestedStructPredicateNarrowsRowRangesByPageIndex) { + write_nested_page_index_filter_parquet_file(_file_path); + auto parquet_file_reader = ::parquet::ParquetFileReader::OpenFile(_file_path, false); + ASSERT_EQ(parquet_file_reader->metadata()->num_row_groups(), 1); + auto page_index_reader = parquet_file_reader->GetPageIndexReader(); + ASSERT_NE(page_index_reader, nullptr); + auto row_group_index_reader = page_index_reader->RowGroup(0); + ASSERT_NE(row_group_index_reader, nullptr); + auto offset_index = row_group_index_reader->GetOffsetIndex(0); + ASSERT_NE(offset_index, nullptr); + ASSERT_GT(offset_index->page_locations().size(), 1); + + std::vector> file_schema; + auto schema_descriptor = parquet_file_reader->metadata()->schema(); + ASSERT_NE(schema_descriptor, nullptr); + ASSERT_TRUE(parquet::build_parquet_column_schema(*schema_descriptor, &file_schema).ok()); + ASSERT_EQ(file_schema.size(), 1); + ASSERT_EQ(file_schema[0]->children.size(), 2); + ASSERT_EQ(file_schema[0]->children[0]->name, "id"); + + reader::FileScanRequest request; + reader::FileColumnPredicateFilter column_filter; + column_filter.file_column_id = 0; + column_filter.file_child_id_path = {0}; + auto id_type = std::make_shared(); + column_filter.predicates.push_back(create_comparison_predicate( + 0, "id", id_type, Field::create_field(63), false)); + request.column_predicate_filters.push_back(std::move(column_filter)); + + parquet::RowGroupScanPlan plan; + parquet::ParquetScanRange scan_range; + ASSERT_TRUE(parquet::plan_parquet_row_groups(*parquet_file_reader->metadata(), + parquet_file_reader.get(), file_schema, request, + scan_range, false, &plan) + .ok()); + ASSERT_EQ(plan.row_groups.size(), 1); + ASSERT_FALSE(plan.row_groups[0].selected_ranges.empty()); + EXPECT_GT(plan.row_groups[0].selected_ranges.front().start, 0); + EXPECT_LT(plan.row_groups[0].selected_ranges.front().length, 128); + EXPECT_EQ(plan.pruning_stats.total_row_groups, 1); + EXPECT_EQ(plan.pruning_stats.selected_row_groups, 1); + EXPECT_EQ(plan.pruning_stats.filtered_row_groups_by_page_index, 0); + EXPECT_GT(plan.pruning_stats.filtered_page_rows, 0); + EXPECT_EQ(plan.pruning_stats.selected_row_ranges, plan.row_groups[0].selected_ranges.size()); +} + TEST_F(NewParquetReaderTest, InPredicateFiltersRowGroupsByDictionary) { write_dictionary_filter_parquet_file(_file_path); auto reader = create_reader(); diff --git a/docs/complex-column-predicate-and-stats-filtering.md b/docs/complex-column-predicate-and-stats-filtering.md index 448149ddea4f2a..70ac659608a2ad 100644 --- a/docs/complex-column-predicate-and-stats-filtering.md +++ b/docs/complex-column-predicate-and-stats-filtering.md @@ -182,6 +182,7 @@ SELECT s.name FROM t WHERE s.id > 5; - `FileColumnPredicateFilter` 保留 `file_column_id`,新增 `file_child_id_path`。 - `file_child_id_path` 是 top-level file column 下的 file-local child field id path,不是 table id,也不是 ordinal。 - mapper 会从 AND 语义下的 `struct_element(...) op literal` / `literal op struct_element(...)` 构造 nested file-layer pruning hint。 +- mapper 会从 AND 语义下的 `struct_element(...) IN (...)` 构造 nested `IN_LIST` pruning hint。 - 不从 OR/NOT/任意函数子树中提取 pruning predicate,避免把非必要条件当成必需条件裁剪。 - literal 转换到 file leaf type 失败、path 解析失败、leaf 不是 primitive 时,不生成 pruning hint。 @@ -192,9 +193,9 @@ SELECT s.name FROM t WHERE s.id > 5; - `ParquetStatisticsUtils::ResolvePredicateLeafSchema()` 统一解析 top-level 或 nested target。 - 解析结果必须是 primitive leaf、`leaf_column_id >= 0` 且 `max_repetition_level == 0`。 - row group min/max statistics 使用 resolved leaf schema。 -- dictionary pruning 使用 resolved leaf schema 和 leaf `ColumnChunk`,仍保持 string-like、dictionary-encoded、EQ/IN_LIST 限制。 -- bloom filter 使用 resolved leaf schema,仍保持 supported primitive type、EQ/IN_LIST/null 相关限制。 -- page index 使用 resolved leaf schema,只允许 non-repeated primitive leaf;LIST/MAP/repeated leaf 直接跳过 page range pruning。 +- dictionary pruning 使用 resolved leaf schema 和 leaf `ColumnChunk`,仍保持 string-like、dictionary-encoded、EQ/IN_LIST 限制,并已有 nested struct 真实 parquet fixture 覆盖。 +- bloom filter 使用 resolved leaf schema,仍保持 supported primitive type、EQ/IN_LIST/null 相关限制;当前 Arrow writer 头文件没有稳定的 bloom 写入开关,因此先以 Arrow bloom adapter / pruning 逻辑单元测试覆盖。 +- page index 使用 resolved leaf schema,只允许 non-repeated primitive leaf;LIST/MAP/repeated leaf 直接跳过 page range pruning,并已有 nested struct 真实 parquet fixture 覆盖。 ## 6. 统计信息 / pruning 设计约束 @@ -257,9 +258,7 @@ page index 对 repeated leaf 的 row range 语义复杂。本轮只允许 non-re ## 7. 后续工作 -- 扩展 UT 覆盖 nested struct 多层 path、反向比较、OR 不提取、缺失 child 不下推。 -- 增加 nested string leaf dictionary pruning、nested page index pruning、nested bloom pruning 的真实 parquet fixture。 -- 支持从 `IN_PRED` 的 `struct_element(...) IN (...)` 构造 nested `IN_LIST` pruning hint。 +- 如果后续 Arrow writer 或外部 fixture 能稳定提供 bloom filter metadata,补 nested bloom pruning 的真实 parquet fixture。 - schema change 场景下,把 table nested path 到 file nested path 的 mapping 入口收敛到 mapper,不让 file reader 理解 table/global schema。 - LIST/MAP/repeated leaf 只有在 Dremel row semantics 和 row-range 语义明确后再接入 pruning。 From b6d913a83c0c8bf871649ef3341c8be28a983d39 Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 4 Jun 2026 02:44:14 +0800 Subject: [PATCH 6/8] [improvement](be) Map nested parquet predicates through column mapper ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: Resolve nested struct filter projection and pruning targets through ColumnMapping before falling back to file schema names, so renamed mapped children can still produce file-local pruning paths. ### Release note None ### Check List (For Author) - Test: Unit Test - git diff --check - Behavior changed: No - Does this need documentation: Yes --- be/src/format/reader/column_mapper.cpp | 117 +++++++++++++++++- be/src/format/reader/column_mapper.h | 1 + .../new_parquet/parquet_reader_test.cpp | 55 ++++++++ ...ex-column-predicate-and-stats-filtering.md | 4 +- 4 files changed, 174 insertions(+), 3 deletions(-) diff --git a/be/src/format/reader/column_mapper.cpp b/be/src/format/reader/column_mapper.cpp index 7e7e7894ddd873..b338b60dce9891 100644 --- a/be/src/format/reader/column_mapper.cpp +++ b/be/src/format/reader/column_mapper.cpp @@ -302,6 +302,20 @@ static const SchemaField* resolve_file_child(const std::vector& chi return &children[selector.ordinal - 1]; } +static const ColumnMapping* resolve_mapped_child(const std::vector& child_mappings, + const StructChildSelector& selector) { + if (selector.by_name) { + const auto child_it = std::ranges::find_if(child_mappings, [&](const auto& child_mapping) { + return child_mapping.table_column_name == selector.name; + }); + return child_it == child_mappings.end() ? nullptr : &*child_it; + } + if (selector.ordinal == 0 || selector.ordinal > child_mappings.size()) { + return nullptr; + } + return &child_mappings[selector.ordinal - 1]; +} + static Status build_filter_projection_path(const std::vector& children, std::span selectors, FieldProjection* projection) { @@ -335,6 +349,47 @@ static Status build_filter_projection_path(const std::vector& child return Status::OK(); } +// Prefer the table-to-file mapping tree for nested filter projection. This keeps renamed +// children and field-id schema evolution in the mapper instead of leaking table names into the +// file reader request. The file schema fallback below is only for filter-only children that do not +// have an output child mapping yet. +static Status build_filter_projection_path(const ColumnMapping& mapping, + std::span selectors, + FieldProjection* projection) { + DORIS_CHECK(projection != nullptr); + if (selectors.empty()) { + return Status::InvalidArgument("Nested struct selector path is empty"); + } + const auto* child_mapping = resolve_mapped_child(mapping.child_mappings, selectors.front()); + if (child_mapping == nullptr) { + return build_filter_projection_path(mapping.original_file_children, selectors, projection); + } + if (!child_mapping->field_id.has_value()) { + projection->field_id = -1; + return Status::OK(); + } + projection->field_id = *child_mapping->field_id; + projection->project_all_children = selectors.size() == 1; + projection->children.clear(); + if (selectors.size() == 1) { + return Status::OK(); + } + FieldProjection child_projection; + if (child_mapping->child_mappings.empty()) { + RETURN_IF_ERROR(build_filter_projection_path(child_mapping->original_file_children, + selectors.subspan(1), &child_projection)); + } else { + RETURN_IF_ERROR(build_filter_projection_path(*child_mapping, selectors.subspan(1), + &child_projection)); + } + if (child_projection.field_id < 0) { + projection->field_id = -1; + return Status::OK(); + } + projection->children.push_back(std::move(child_projection)); + return Status::OK(); +} + static const SchemaField* resolve_filter_schema_path(const std::vector& children, std::span selectors, std::vector* file_child_id_path) { @@ -363,6 +418,49 @@ static const SchemaField* resolve_filter_schema_path(const std::vector selectors, + std::vector* file_child_id_path, + std::string* leaf_name, DataTypePtr* leaf_type) { + DORIS_CHECK(file_child_id_path != nullptr); + DORIS_CHECK(leaf_name != nullptr); + DORIS_CHECK(leaf_type != nullptr); + if (selectors.empty()) { + return false; + } + const auto* child_mapping = resolve_mapped_child(mapping.child_mappings, selectors.front()); + if (child_mapping == nullptr) { + return false; + } + if (!child_mapping->field_id.has_value()) { + file_child_id_path->clear(); + return false; + } + file_child_id_path->push_back(*child_mapping->field_id); + if (selectors.size() == 1) { + if (child_mapping->file_type == nullptr || + is_complex_type(remove_nullable(child_mapping->file_type)->get_primitive_type())) { + file_child_id_path->clear(); + return false; + } + *leaf_name = child_mapping->file_column_name; + *leaf_type = remove_nullable(child_mapping->file_type); + return true; + } + if (child_mapping->child_mappings.empty()) { + file_child_id_path->clear(); + return false; + } + if (!resolve_mapped_filter_schema_path(*child_mapping, selectors.subspan(1), file_child_id_path, + leaf_name, leaf_type)) { + file_child_id_path->clear(); + return false; + } + return true; +} + static bool resolve_nested_predicate_target(const NestedStructPath& path, const std::vector& mappings, NestedPredicateTargetInfo* target) { @@ -377,6 +475,17 @@ static bool resolve_nested_predicate_target(const NestedStructPath& path, return false; } std::vector file_child_id_path; + std::string leaf_name; + DataTypePtr file_leaf_type; + if (resolve_mapped_filter_schema_path(*mapping_it, path.selectors, &file_child_id_path, + &leaf_name, &file_leaf_type)) { + target->root_file_column_id = *mapping_it->field_id; + target->file_child_id_path = std::move(file_child_id_path); + target->leaf_name = std::move(leaf_name); + target->file_leaf_type = std::move(file_leaf_type); + return true; + } + const auto* leaf = resolve_filter_schema_path(mapping_it->original_file_children, path.selectors, &file_child_id_path); if (leaf == nullptr || leaf->type == nullptr || @@ -1088,8 +1197,8 @@ static Status build_filter_projection_map(const std::vector& table_ } FieldProjection child_projection; - RETURN_IF_ERROR(build_filter_projection_path(mapping_it->original_file_children, - path.selectors, &child_projection)); + RETURN_IF_ERROR( + build_filter_projection_path(*mapping_it, path.selectors, &child_projection)); if (child_projection.field_id < 0) { continue; } @@ -1173,6 +1282,7 @@ Status TableColumnMapper::create_mapping(const std::vector& project for (const auto& table_column : projected_columns) { ColumnMapping mapping; mapping.table_column_id = table_column.id; + mapping.table_column_name = table_column.name; mapping.table_type = table_column.type; if (table_column.is_partition_key && partition_values.contains(table_column.name)) { // 1. Partition column, use partition value as a constant mapping. Note that partition column may also have default expression, but partition value should take precedence if it exists. @@ -1380,6 +1490,7 @@ Status TableColumnMapper::_create_direct_mapping(const TableColumn& table_column return Status::InvalidArgument("mapping is null"); } mapping->field_id = file_field.id; + mapping->table_column_name = table_column.name; mapping->file_column_name = file_field.name; mapping->original_file_type = file_field.type; mapping->original_file_children = file_field.children; @@ -1400,6 +1511,7 @@ Status TableColumnMapper::_create_direct_mapping(const TableColumn& table_column } ColumnMapping child_mapping; child_mapping.table_column_id = table_child.id; + child_mapping.table_column_name = table_child.name; child_mapping.file_column_name = table_child.name; child_mapping.table_type = table_child.type; child_mapping.file_type = table_child.type; @@ -1410,6 +1522,7 @@ Status TableColumnMapper::_create_direct_mapping(const TableColumn& table_column } ColumnMapping child_mapping; child_mapping.table_column_id = table_child.id; + child_mapping.table_column_name = table_child.name; child_mapping.table_type = table_child.type; RETURN_IF_ERROR(_create_direct_mapping(table_child, *file_child, &child_mapping)); mapping->child_mappings.push_back(std::move(child_mapping)); diff --git a/be/src/format/reader/column_mapper.h b/be/src/format/reader/column_mapper.h index a70e246bcce700..78ca20d6091bef 100644 --- a/be/src/format/reader/column_mapper.h +++ b/be/src/format/reader/column_mapper.h @@ -62,6 +62,7 @@ enum TableVirtualColumnType { // 这是 table 层和 file 层的核心边界对象。 struct ColumnMapping { int32_t table_column_id = -1; + std::string table_column_name; // File-local field id for top-level columns, or child id for nested columns. std::optional field_id; std::string file_column_name; diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp b/be/test/format/new_parquet/parquet_reader_test.cpp index b4815ab2d5a8b6..260e65695dba55 100644 --- a/be/test/format/new_parquet/parquet_reader_test.cpp +++ b/be/test/format/new_parquet/parquet_reader_test.cpp @@ -740,6 +740,61 @@ TEST(TableColumnMapperTest, MergesStructFilterOnlyChildIntoPredicateProjection) EXPECT_EQ(read_type->get_element_name(1), "a"); } +TEST(TableColumnMapperTest, MapsRenamedNestedStructPredicateByFieldId) { + auto id_type = std::make_shared(); + reader::SchemaField file_child; + file_child.id = 101; + file_child.name = "file_id"; + file_child.type = id_type; + reader::SchemaField struct_field; + struct_field.id = 100; + struct_field.name = "s"; + struct_field.type = std::make_shared(DataTypes {id_type}, Strings {"file_id"}); + struct_field.children = {file_child}; + + reader::TableColumn table_child; + table_child.id = 101; + table_child.name = "table_id"; + table_child.type = id_type; + reader::TableColumn table_column; + table_column.id = 100; + table_column.name = "s"; + table_column.type = std::make_shared(DataTypes {id_type}, Strings {"table_id"}); + table_column.children = {table_child}; + + auto filter_expr = std::make_shared( + "gt", std::make_shared(), TExprNodeType::BINARY_PRED, TExprOpcode::GT); + filter_expr->add_child( + struct_element_expr(TableSlotRef::create_shared(100, 100, -1, table_column.type, "s"), + id_type, "table_id")); + filter_expr->add_child(TableLiteral::create_shared(id_type, Field::create_field(5))); + reader::TableFilter table_filter { + .conjunct = VExprContext::create_shared(filter_expr), + .slot_ids = {100}, + }; + + reader::TableColumnMapperOptions options; + options.mode = reader::TableColumnMappingMode::BY_FIELD_ID; + reader::TableColumnMapper mapper(options); + ASSERT_TRUE(mapper.create_mapping({table_column}, {}, {struct_field}).ok()); + + reader::FileScanRequest request; + ASSERT_TRUE(mapper.create_scan_request({table_filter}, {}, {table_column}, &request).ok()); + + ASSERT_EQ(request.predicate_columns.size(), 1); + const auto& projection = request.predicate_columns[0]; + EXPECT_EQ(projection.field_id, 100); + ASSERT_FALSE(projection.project_all_children); + ASSERT_EQ(projection.children.size(), 1); + EXPECT_EQ(projection.children[0].field_id, 101); + + ASSERT_EQ(request.column_predicate_filters.size(), 1); + EXPECT_EQ(request.column_predicate_filters[0].file_column_id, 100); + EXPECT_EQ(request.column_predicate_filters[0].file_child_id_path, std::vector({101})); + ASSERT_EQ(request.column_predicate_filters[0].predicates.size(), 1); + EXPECT_EQ(request.column_predicate_filters[0].predicates[0]->type(), PredicateType::GT); +} + TEST(TableColumnMapperTest, BuildsNestedStructInListPredicateFilter) { auto a_type = std::make_shared(); auto b_type = std::make_shared(); diff --git a/docs/complex-column-predicate-and-stats-filtering.md b/docs/complex-column-predicate-and-stats-filtering.md index 70ac659608a2ad..3850199698a96e 100644 --- a/docs/complex-column-predicate-and-stats-filtering.md +++ b/docs/complex-column-predicate-and-stats-filtering.md @@ -174,6 +174,7 @@ SELECT s.name FROM t WHERE s.id > 5; - output child 顺序保持优先,filter-only child 追加到 read projection。 - filter-only child 不加入 `ColumnMapping.child_mappings`,避免 table output materialization 把它当作输出字段。 - `ColumnMapping` 保存 `original_file_type` / `original_file_children`,重复创建 split-local request 时可以从原始 file schema 重建 read projection。 +- nested filter projection 优先通过 `ColumnMapping.child_mappings` 映射 table child 到 file child;没有 child mapping 的 filter-only path 再回退到 file schema 解析。 ### 5.3 nested file-layer pruning target @@ -183,6 +184,7 @@ SELECT s.name FROM t WHERE s.id > 5; - `file_child_id_path` 是 top-level file column 下的 file-local child field id path,不是 table id,也不是 ordinal。 - mapper 会从 AND 语义下的 `struct_element(...) op literal` / `literal op struct_element(...)` 构造 nested file-layer pruning hint。 - mapper 会从 AND 语义下的 `struct_element(...) IN (...)` 构造 nested `IN_LIST` pruning hint。 +- 对已经存在 `ColumnMapping` 的 nested child,mapper 使用 table child name + field-id mapping 生成 file-local `file_child_id_path`,支持 table/file nested child rename。 - 不从 OR/NOT/任意函数子树中提取 pruning predicate,避免把非必要条件当成必需条件裁剪。 - literal 转换到 file leaf type 失败、path 解析失败、leaf 不是 primitive 时,不生成 pruning hint。 @@ -259,7 +261,7 @@ page index 对 repeated leaf 的 row range 语义复杂。本轮只允许 non-re ## 7. 后续工作 - 如果后续 Arrow writer 或外部 fixture 能稳定提供 bloom filter metadata,补 nested bloom pruning 的真实 parquet fixture。 -- schema change 场景下,把 table nested path 到 file nested path 的 mapping 入口收敛到 mapper,不让 file reader 理解 table/global schema。 +- 完整复杂 child schema change 需要 FE/table reader 提供完整 nested table mapping;file reader 仍不理解 table/global schema。 - LIST/MAP/repeated leaf 只有在 Dremel row semantics 和 row-range 语义明确后再接入 pruning。 ## 8. 需要避免的实现 From e4017a11a3435fffad24518b69b768bd563e1244 Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 4 Jun 2026 02:49:52 +0800 Subject: [PATCH 7/8] [fix](be) Detect renamed nested parquet child projection ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: Treat table/file nested child name mismatches as complex projections so field-id mapped renamed children are read with the correct file-local projection. ### Release note None ### Check List (For Author) - Test: Unit Test - git diff --check - Behavior changed: No - Does this need documentation: No --- be/src/format/reader/column_mapper.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/be/src/format/reader/column_mapper.cpp b/be/src/format/reader/column_mapper.cpp index b338b60dce9891..0e00b223da63f4 100644 --- a/be/src/format/reader/column_mapper.cpp +++ b/be/src/format/reader/column_mapper.cpp @@ -1006,7 +1006,8 @@ static bool complex_projection_has_pruned_children(const ColumnMapping& mapping) return true; } for (const auto& child_mapping : mapping.child_mappings) { - if (!child_mapping.field_id.has_value() || + if (child_mapping.table_column_name != child_mapping.file_column_name || + !child_mapping.field_id.has_value() || complex_projection_has_pruned_children(child_mapping)) { return true; } From 911b69241f2c00f381bb1cd8a7df6caf7c877fba Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 4 Jun 2026 02:54:27 +0800 Subject: [PATCH 8/8] [doc](be) Mark nested parquet pruning scope complete ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: Mark the completed nested parquet predicate and pruning implementation scope, and move remaining items into explicit non-goals for this branch. ### Release note None ### Check List (For Author) - Test: Manual test - Behavior changed: No - Does this need documentation: No --- docs/complex-column-predicate-and-stats-filtering.md | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/complex-column-predicate-and-stats-filtering.md b/docs/complex-column-predicate-and-stats-filtering.md index 3850199698a96e..3d5e450e38f048 100644 --- a/docs/complex-column-predicate-and-stats-filtering.md +++ b/docs/complex-column-predicate-and-stats-filtering.md @@ -258,11 +258,15 @@ DuckDB 只对非 nested primitive reader 应用 bloom filter。Doris 当前通 page index 对 repeated leaf 的 row range 语义复杂。本轮只允许 non-repeated primitive leaf。`STRUCT` 下 non-repeated primitive leaf 可以复用现有 page index range 逻辑;LIST/MAP/repeated leaf 直接跳过。 -## 7. 后续工作 +## 7. 本轮完成结论 -- 如果后续 Arrow writer 或外部 fixture 能稳定提供 bloom filter metadata,补 nested bloom pruning 的真实 parquet fixture。 -- 完整复杂 child schema change 需要 FE/table reader 提供完整 nested table mapping;file reader 仍不理解 table/global schema。 -- LIST/MAP/repeated leaf 只有在 Dremel row semantics 和 row-range 语义明确后再接入 pruning。 +本轮实现已经覆盖 `STRUCT` / nested `STRUCT` 下 primitive leaf 的行级 Expr localization、filter-only nested projection、file-layer pruning target 构造、statistics / dictionary / bloom / page index pruning 入口,以及 mapper-based nested child rename。 + +仍然不放入本轮实现范围的事项如下: + +- nested bloom pruning 真实 parquet fixture:当前 Arrow writer 头文件没有稳定的 bloom filter metadata 写入开关;如果后续 Arrow writer 或外部 fixture 能稳定提供 bloom filter metadata,再补真实文件级 fixture。 +- 完整复杂 child schema change:需要 FE/table reader 提供完整 nested table mapping;file reader 只消费 file-local mapping,不理解 table/global schema。 +- LIST/MAP/repeated leaf pruning:只有在 Dremel row semantics 和 row-range 语义明确后再接入 pruning。 ## 8. 需要避免的实现