From 308e2c21e18c19e61a1d2a39d0381b51e3de9557 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 29 May 2026 09:03:24 +0800 Subject: [PATCH] [feature](be) Support aggregate pushdown in new table reader ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: Add aggregate pushdown support for the new table reader/file reader path so count and min/max can be served from parquet metadata without changing old vparquet reader, generic reader, or file scanner. ### Release note Support count and min/max aggregate pushdown in the new table reader parquet path. ### Check List (For Author) - Test: Unit Test - Added TableReaderTest coverage for count, min/max, casted min/max, and Iceberg delete fallback. - Not run locally: run-be-ut.sh requires JDK-17, but this machine has JDK-11; cmake/ninja are also unavailable. - Behavior changed: Yes (new table reader can push down count and min/max aggregates when eligible) - Does this need documentation: No --- be/src/format/new_parquet/parquet_reader.cpp | 108 ++- be/src/format/new_parquet/parquet_reader.h | 3 + be/src/format/reader/column_mapper.cpp | 66 +- be/src/format/reader/column_mapper.h | 6 +- be/src/format/reader/file_reader.h | 43 +- be/src/format/reader/table_reader.cpp | 22 +- be/src/format/reader/table_reader.h | 170 ++++- be/src/format/table/iceberg_reader_v2.cpp | 50 +- be/src/format/table/iceberg_reader_v2.h | 11 +- .../new_parquet/parquet_reader_test.cpp | 31 +- be/test/format/reader/expr/cast_test.cpp | 193 +++++- be/test/format/reader/table_reader_test.cpp | 654 +++++++++++++++++- 12 files changed, 1208 insertions(+), 149 deletions(-) diff --git a/be/src/format/new_parquet/parquet_reader.cpp b/be/src/format/new_parquet/parquet_reader.cpp index 5e4107d727d749..a38a0046fac0fd 100644 --- a/be/src/format/new_parquet/parquet_reader.cpp +++ b/be/src/format/new_parquet/parquet_reader.cpp @@ -323,35 +323,27 @@ Status ParquetReader::_read_filter_columns(int64_t batch_rows, Block* file_block Status ParquetReader::_execute_filter_conjuncts(int64_t batch_rows, Block* file_block, SelectionVector* selection, uint16_t* selected_rows) { - // Expression filters may reference several predicate columns. Execute them only after all + // Conjuncts may reference several predicate columns. Execute them only after all referenced // predicate columns in the file-local block have been materialized. - for (const auto& expression_filter : _request->expression_filters) { - if (expression_filter.conjunct == nullptr) { - if (expression_filter.delete_conjunct == nullptr) { - continue; - } - } else { - if (*selected_rows == 0) { - break; - } - IColumn::Filter filter(static_cast(batch_rows), 1); - bool can_filter_all = false; - RETURN_IF_ERROR(expression_filter.conjunct->execute_filter( - file_block, filter.data(), static_cast(batch_rows), false, - &can_filter_all)); - *selected_rows = - can_filter_all ? 0 - : _apply_filter_to_selection(filter, selection, *selected_rows); - } + for (const auto& conjunct : _request->conjuncts) { if (*selected_rows == 0) { break; } - if (expression_filter.delete_conjunct == nullptr) { - continue; + IColumn::Filter filter(static_cast(batch_rows), 1); + bool can_filter_all = false; + RETURN_IF_ERROR(conjunct->execute_filter(file_block, filter.data(), + static_cast(batch_rows), false, + &can_filter_all)); + *selected_rows = + can_filter_all ? 0 : _apply_filter_to_selection(filter, selection, *selected_rows); + } + for (const auto& delete_conjunct : _request->delete_conjuncts) { + if (*selected_rows == 0) { + break; } int result_column_id = -1; - RETURN_IF_ERROR(expression_filter.delete_conjunct->root()->execute( - expression_filter.delete_conjunct.get(), file_block, &result_column_id)); + RETURN_IF_ERROR(delete_conjunct->root()->execute(delete_conjunct.get(), file_block, + &result_column_id)); DORIS_CHECK(result_column_id >= 0 && result_column_id < static_cast(file_block->columns())); const auto& delete_filter = assert_cast( @@ -744,6 +736,76 @@ Status ParquetReader::get_block(Block* file_block, size_t* rows, bool* eof) { } } +Status ParquetReader::get_aggregate_result(const reader::FileAggregateRequest& request, + reader::FileAggregateResult* result) { + DORIS_CHECK(result != nullptr); + if (_state == nullptr || _state->metadata == nullptr || _state->schema == nullptr) { + return Status::Uninitialized("ParquetReader is not open"); + } + result->count = 0; + result->columns.clear(); + if (request.agg_type != TPushAggOp::type::COUNT && + request.agg_type != TPushAggOp::type::MINMAX) { + return Status::NotSupported("Unsupported parquet aggregate pushdown type {}", + request.agg_type); + } + + // Aggregate row count in all selected row groups. For MIN/MAX aggregate, this is used to determine whether there is no row group selected. + for (const auto row_group_idx : _state->selected_row_groups) { + auto row_group_metadata = _state->metadata->RowGroup(row_group_idx); + DORIS_CHECK(row_group_metadata != nullptr); + result->count += row_group_metadata->num_rows(); + } + if (request.agg_type == TPushAggOp::type::COUNT) { + return Status::OK(); + } + + result->columns.resize(request.columns.size()); + for (size_t request_column_idx = 0; request_column_idx < request.columns.size(); + ++request_column_idx) { + const auto file_column_id = request.columns[request_column_idx].file_column_id; + if (file_column_id < 0 || + file_column_id >= static_cast(_state->file_schema.size())) { + return Status::InvalidArgument("Invalid parquet aggregate column id {}", + file_column_id); + } + const auto& column_schema = _state->file_schema[file_column_id]; + DORIS_CHECK(column_schema != nullptr); + // TODO: Support min/max pushdown for complex column by traversing down to the leaf column readers. This requires supporting complex column statistics in parquet file reader, which is currently not implemented in parquet-cpp. + if (column_schema->leaf_column_id < 0) { + return Status::NotSupported( + "Parquet aggregate pushdown only supports primitive column {}", + column_schema->name); + } + + auto& aggregate_column = result->columns[request_column_idx]; + for (const auto row_group_idx : _state->selected_row_groups) { + auto row_group_metadata = _state->metadata->RowGroup(row_group_idx); + DORIS_CHECK(row_group_metadata != nullptr); + auto column_chunk = row_group_metadata->ColumnChunk(column_schema->leaf_column_id); + DORIS_CHECK(column_chunk != nullptr); + const auto statistics = ParquetStatisticsUtils::TransformColumnStatistics( + *column_schema, column_chunk->statistics()); + if (!statistics.has_min_max) { + return Status::NotSupported("Missing parquet min/max statistics for column {}", + column_schema->name); + } + if (!aggregate_column.has_min || statistics.min_value < aggregate_column.min_value) { + aggregate_column.min_value = statistics.min_value; + aggregate_column.has_min = true; + } + if (!aggregate_column.has_max || aggregate_column.max_value < statistics.max_value) { + aggregate_column.max_value = statistics.max_value; + aggregate_column.has_max = true; + } + } + if (!aggregate_column.has_min || !aggregate_column.has_max) { + return Status::NotSupported("No parquet row group selected for min/max pushdown"); + } + } + return Status::OK(); +} + Status ParquetReader::close() { if (_state != nullptr) { if (_state->file_reader != nullptr) { diff --git a/be/src/format/new_parquet/parquet_reader.h b/be/src/format/new_parquet/parquet_reader.h index 14a891c75e1dcf..85d766f88820ce 100644 --- a/be/src/format/new_parquet/parquet_reader.h +++ b/be/src/format/new_parquet/parquet_reader.h @@ -69,6 +69,9 @@ class ParquetReader : public reader::FileReader { // 返回列必须保持 file-local 语义,不能在这里补 default/generated/partition 列。 Status get_block(Block* file_block, size_t* rows, bool* eof) override; + Status get_aggregate_result(const reader::FileAggregateRequest& request, + reader::FileAggregateResult* result) override; + Status close() override; protected: diff --git a/be/src/format/reader/column_mapper.cpp b/be/src/format/reader/column_mapper.cpp index e8e7442a8d798e..c6114b20df31cb 100644 --- a/be/src/format/reader/column_mapper.cpp +++ b/be/src/format/reader/column_mapper.cpp @@ -43,6 +43,13 @@ struct FileSlotRewriteInfo { std::string file_column_name; }; +static VExprSPtr create_file_slot_ref(const VSlotRef& slot_ref, + const FileSlotRewriteInfo& rewrite_info) { + return TableSlotRef::create_shared(slot_ref.slot_id(), + cast_set(rewrite_info.block_position), -1, + rewrite_info.file_type, rewrite_info.file_column_name); +} + static VExprSPtr rewrite_table_expr_to_file_expr( const VExprSPtr& expr, const std::map& table_column_to_file_slot) { @@ -54,9 +61,7 @@ static VExprSPtr rewrite_table_expr_to_file_expr( const auto rewrite_it = table_column_to_file_slot.find(slot_ref->slot_id()); if (rewrite_it != table_column_to_file_slot.end()) { const auto& rewrite_info = rewrite_it->second; - auto file_slot = TableSlotRef::create_shared( - slot_ref->slot_id(), cast_set(rewrite_info.block_position), -1, - rewrite_info.file_type, rewrite_info.file_column_name); + auto file_slot = create_file_slot_ref(*slot_ref, rewrite_info); if (rewrite_info.file_type->equals(*rewrite_info.table_type)) { return file_slot; } @@ -66,6 +71,27 @@ static VExprSPtr rewrite_table_expr_to_file_expr( } return expr; } + // rewrite_table_expr_to_file_expr localizes the expression tree in-place because VExpr does + // not provide a generic deep-clone API. A previous split may already have inserted Cast(slot) + // for the same table-level conjunct. Keep that rewrite idempotent: rewrite the cast child + // from table slot to the current split's file slot, and drop the cast when the current split + // no longer needs it. + if (dynamic_cast(expr.get()) != nullptr && expr->get_num_children() == 1) { + const auto& child = expr->children()[0]; + if (child->is_slot_ref()) { + const auto* slot_ref = assert_cast(child.get()); + const auto rewrite_it = table_column_to_file_slot.find(slot_ref->slot_id()); + if (rewrite_it != table_column_to_file_slot.end() && + expr->data_type()->equals(*rewrite_it->second.table_type)) { + auto rewritten_child = create_file_slot_ref(*slot_ref, rewrite_it->second); + if (rewrite_it->second.file_type->equals(*rewrite_it->second.table_type)) { + return rewritten_child; + } + expr->set_children({std::move(rewritten_child)}); + return expr; + } + } + } // VExpr currently does not provide a generic deep-clone API for arbitrary expression types. // Keep all slot-localization mutation inside ColumnMapper and rebuild it for every split @@ -85,13 +111,28 @@ static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER = "_last_update static void add_scan_column(FileScanRequest* file_request, ColumnId file_column_id, std::vector* scan_columns) { + if (scan_columns == &file_request->non_predicate_columns && + std::find(file_request->predicate_columns.begin(), file_request->predicate_columns.end(), + file_column_id) != file_request->predicate_columns.end()) { + return; + } // column_positions is the global read-column index for this scan request, so it also // deduplicates predicate_columns and non_predicate_columns across all filter/projection paths. - if (file_request->column_positions.count(file_column_id) == 0) { + const bool newly_added = file_request->column_positions.count(file_column_id) == 0; + if (newly_added) { file_request->column_positions.emplace(file_column_id, file_request->column_positions.size()); + } + if (std::find(scan_columns->begin(), scan_columns->end(), file_column_id) == + scan_columns->end()) { scan_columns->push_back(file_column_id); } + if (scan_columns == &file_request->predicate_columns) { + file_request->non_predicate_columns.erase( + std::remove(file_request->non_predicate_columns.begin(), + file_request->non_predicate_columns.end(), file_column_id), + file_request->non_predicate_columns.end()); + } } static void rebuild_projection(ColumnMapping* mapping, size_t block_position) { @@ -293,7 +334,8 @@ Status TableColumnMapper::create_scan_request(const std::vector& ta file_request->non_predicate_columns.clear(); file_request->column_positions.clear(); file_request->complex_projections.clear(); - file_request->expression_filters.clear(); + file_request->conjuncts.clear(); + file_request->delete_conjuncts.clear(); file_request->column_predicate_filters.clear(); file_request->reader_expression_map.clear(); // 1. Build referenced non-predicate columns @@ -379,19 +421,9 @@ Status TableColumnMapper::localize_filters(const std::vector& table continue; } if (table_filter.conjunct != nullptr) { - FileExpressionFilter expression_filter; - expression_filter.conjunct = + file_request->conjuncts.push_back( VExprContext::create_shared(rewrite_table_expr_to_file_expr( - table_filter.conjunct->root(), table_column_to_file_slot)); - expression_filter.file_column_ids.reserve(table_filter.slot_ids.size()); - for (const auto table_column_id : table_filter.slot_ids) { - const auto* mapping = _find_mapping(table_column_id); - if (mapping == nullptr || !mapping->file_column_id.has_value()) { - continue; - } - expression_filter.file_column_ids.push_back(*mapping->file_column_id); - } - file_request->expression_filters.push_back(std::move(expression_filter)); + table_filter.conjunct->root(), table_column_to_file_slot))); } } for (const auto& [table_column_id, predicates] : table_column_predicates) { diff --git a/be/src/format/reader/column_mapper.h b/be/src/format/reader/column_mapper.h index 75b53f68d2d09e..e1839652a4799a 100644 --- a/be/src/format/reader/column_mapper.h +++ b/be/src/format/reader/column_mapper.h @@ -106,7 +106,7 @@ class TableColumnMapper { // 把 table-level scan 请求转换成 file-local scan 请求。 // table_request 使用 table/global schema;file_request 只包含 FileReader 能理解的 - // projected_file_columns、expression_filters、column_predicate_filters 和 + // projected_file_columns、conjuncts、delete_conjuncts、column_predicate_filters 和 // reader_expression_map。 virtual Status create_scan_request(const std::vector& table_filters, const TableColumnPredicates& table_column_predicates, @@ -149,7 +149,9 @@ class TableColumnMapper { } bool _is_same_type(const DataTypePtr& table_type, const DataTypePtr& file_type) const { - return table_type == file_type; + DORIS_CHECK(table_type != nullptr); + DORIS_CHECK(file_type != nullptr); + return table_type->equals(*file_type); } TableColumnMapperOptions _options; diff --git a/be/src/format/reader/file_reader.h b/be/src/format/reader/file_reader.h index 28de8f068b0f6c..7e6d18acedc2d4 100644 --- a/be/src/format/reader/file_reader.h +++ b/be/src/format/reader/file_reader.h @@ -27,7 +27,9 @@ #include "common/status.h" #include "core/data_type/data_type.h" +#include "core/field.h" #include "exprs/vexpr_fwd.h" +#include "gen_cpp/PlanNodes_types.h" #include "io/file_factory.h" #include "io/fs/file_reader_writer_fwd.h" @@ -75,15 +77,6 @@ struct FieldProjection { std::vector children; }; -// File-local expression filter. It may reference multiple predicate_columns, so FileReader should -// evaluate it after all referenced predicate columns have been materialized in the file-local block. -struct FileExpressionFilter { - VExprContextSPtr conjunct; - // DeletePredicate - VExprContextSPtr delete_conjunct; - std::vector file_column_ids; -}; - // File-local single-column predicates for file-layer pruning, such as min/max, page index, // dictionary and bloom filter. Predicates must all belong to file_column_id. struct FileColumnPredicateFilter { @@ -108,12 +101,37 @@ struct FileScanRequest { std::vector non_predicate_columns; std::map column_positions; // file_column_id -> file-local block position std::map complex_projections; - std::vector expression_filters; + // Complex conjuncts converted to file-local predicates from table-level predicates. + VExprContextSPtrs conjuncts; + // Delete predicates converted to file-local predicates. + VExprContextSPtrs delete_conjuncts; + // Only simple predicates that can be directly evaluated on column, such as `a` > 1. Now we use it for zone-map filtering. std::vector column_predicate_filters; // fallback path if filters cannot be localized to file-local predicates. The expression can reference projected_file_columns and partition columns. std::vector> reader_expression_map; }; +struct FileAggregateRequest { + struct Column { + ColumnId file_column_id = -1; + }; + + TPushAggOp::type agg_type = TPushAggOp::type::NONE; + std::vector columns; +}; + +struct FileAggregateResult { + struct Column { + bool has_min = false; + bool has_max = false; + Field min_value; + Field max_value; + }; + + int64_t count = 0; + std::vector columns; +}; + // 文件物理读取层通用接口。 // 该接口只描述 file-local schema、file-local scan request 和 file-local block。 // TableReader/IcebergTableReader 可以通过它组合不同文件格式 reader。 @@ -188,6 +206,11 @@ class FileReader { return Status::OK(); } + virtual Status get_aggregate_result(const FileAggregateRequest& request, + FileAggregateResult* result) { + return Status::NotSupported("FileReader does not support aggregate pushdown"); + } + // 关闭当前物理文件 reader 并释放文件层状态。 // 该方法不处理 table-level delete/finalize 状态,后者由 TableReader 子类管理。 virtual Status close() { diff --git a/be/src/format/reader/table_reader.cpp b/be/src/format/reader/table_reader.cpp index 8289d637d78b14..2c92b9ca1a1d0c 100644 --- a/be/src/format/reader/table_reader.cpp +++ b/be/src/format/reader/table_reader.cpp @@ -153,6 +153,7 @@ Status TableReader::init(TableReadOptions options) { _io_ctx = options.io_ctx; _runtime_state = options.runtime_state; _scanner_profile = options.scanner_profile; + _push_down_agg_type = options.push_down_agg_type; _projected_columns = std::move(options.projected_columns); _system_properties = create_system_properties(_scan_params); _profile = std::move(options.profile); @@ -173,19 +174,13 @@ Status TableReader::_build_table_filters_from_conjuncts() { Status TableReader::_open_local_filter_exprs(const FileScanRequest& file_request) { RowDescriptor row_desc; - for (const auto& expression_filter : file_request.expression_filters) { - if (expression_filter.conjunct == nullptr) { - if (expression_filter.delete_conjunct == nullptr) { - continue; - } - } else { - RETURN_IF_ERROR(expression_filter.conjunct->prepare(_runtime_state, row_desc)); - RETURN_IF_ERROR(expression_filter.conjunct->open(_runtime_state)); - } - if (expression_filter.delete_conjunct != nullptr) { - RETURN_IF_ERROR(expression_filter.delete_conjunct->prepare(_runtime_state, row_desc)); - RETURN_IF_ERROR(expression_filter.delete_conjunct->open(_runtime_state)); - } + for (const auto& conjunct : file_request.conjuncts) { + RETURN_IF_ERROR(conjunct->prepare(_runtime_state, row_desc)); + RETURN_IF_ERROR(conjunct->open(_runtime_state)); + } + for (const auto& delete_conjunct : file_request.delete_conjuncts) { + RETURN_IF_ERROR(delete_conjunct->prepare(_runtime_state, row_desc)); + RETURN_IF_ERROR(delete_conjunct->open(_runtime_state)); } return Status::OK(); } @@ -235,6 +230,7 @@ Status TableReader::prepare_split(const SplitReadOptions& options) { _current_task = std::make_unique(); _current_task->data_file = create_file_description(options.current_range); _delete_rows = nullptr; + _aggregate_pushdown_tried = false; return _parse_delete_predicates(options); } diff --git a/be/src/format/reader/table_reader.h b/be/src/format/reader/table_reader.h index de7626dfb2418d..83e0ec44fc80fe 100644 --- a/be/src/format/reader/table_reader.h +++ b/be/src/format/reader/table_reader.h @@ -36,6 +36,7 @@ #include "core/data_type/data_type_nullable.h" #include "core/data_type/data_type_number.h" #include "core/data_type/data_type_struct.h" +#include "core/field.h" #include "exprs/vexpr_context.h" #include "exprs/vexpr_fwd.h" #include "format/new_parquet/column_reader.h" @@ -43,6 +44,7 @@ #include "format/reader/expr/delete_predicate.h" #include "format/reader/expr/slot_ref.h" #include "format/reader/file_reader.h" +#include "gen_cpp/PlanNodes_types.h" #include "runtime/descriptors.h" namespace doris { @@ -66,8 +68,8 @@ struct TableColumn { bool is_partition_key = false; }; -// table-level filter。 -// TableColumnMapper 负责把它转换成 FileExpressionFilter 或 reader_expression_map。 +// All complex predicates on table/global schema, which cannot be directly localized to file +// schema. They will be evaluated at table level and may depend on multiple columns. struct TableFilter { // 表达式过滤,适合表达 cast、复杂表达式、复杂列提取等语义。 VExprContextSPtr conjunct; @@ -108,21 +110,29 @@ struct ReadProfile { }; struct TableReadOptions { + // Columns need to be read from file and output by table reader. They are all in table/global + // schema semantics. const std::vector projected_columns; + // Simple predicates for a single column, which is parsed on scan operator. const TableColumnPredicates column_predicates; - // All conjuncts from scan operator + // All complex conjuncts from scan operator const VExprContext conjuncts; + // File format of the underlying data files, needed for reader initialization and reader-level + // filter pushdown. const FileFormat format; TFileScanRangeParams* scan_params; std::shared_ptr io_ctx; RuntimeState* runtime_state; RuntimeProfile* scanner_profile; const bool allow_missing_columns = true; + // Push-down aggregate type. + const TPushAggOp::type push_down_agg_type = TPushAggOp::type::NONE; std::unique_ptr profile; }; struct SplitReadOptions { + // Split-level information for reader initialization, which may include file path, partition values, delete file info, etc. The content is table format specific and opaque to table reader base class; it's the responsibility of the concrete table reader implementation to parse necessary information for reader initialization and filter pushdown. std::map partition_values; ShardedKVCache* cache; TFileRangeDesc current_range; @@ -175,6 +185,18 @@ class TableReader { } } + // Materialize a reduced row set for upper aggregate operators when aggregate + // pushdown can be applied. This is not the final aggregate result: COUNT emits + // `count` default rows for the upper COUNT(*), and MIN/MAX emits two rows containing + // file-level min/max values for the upper MIN/MAX. + if (!_aggregate_pushdown_tried) { + bool pushed_down = false; + RETURN_IF_ERROR(_try_materialize_aggregate_pushdown_rows(block, &pushed_down)); + if (pushed_down) { + return Status::OK(); + } + } + bool current_eof = false; _data_reader.block_template.clear_column_data(); size_t current_rows = 0; @@ -329,10 +351,8 @@ class TableReader { std::make_shared(), parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_NAME)); - FileExpressionFilter delete_filter; - delete_filter.delete_conjunct = VExprContext::create_shared(std::move(delete_predicate)); - delete_filter.file_column_ids.push_back(row_position_column_id); - request->expression_filters.push_back(std::move(delete_filter)); + request->delete_conjuncts.push_back( + VExprContext::create_shared(std::move(delete_predicate))); return Status::OK(); } @@ -343,7 +363,6 @@ class TableReader { _data_reader.reader.reset(); _data_reader.column_mapper.clear(); _table_filters.clear(); - _table_column_predicates.clear(); _data_reader.file_schema.clear(); _data_reader.block_schema.clear(); _data_reader.block_template.clear(); @@ -368,6 +387,62 @@ class TableReader { // Materialize virtual columns in table block, such as _row_id and _last_updated_sequence_number in Iceberg. This is called after finalize_chunk, so the virtual column can be referenced in finalize_expr. virtual Status materialize_virtual_columns(Block* table_block) { return Status::OK(); } + Status _try_materialize_aggregate_pushdown_rows(Block* block, bool* pushed_down) { + DORIS_CHECK(block != nullptr); + DORIS_CHECK(pushed_down != nullptr); + *pushed_down = false; + block->clear_column_data(_projected_columns.size()); + _aggregate_pushdown_tried = true; + if (!_supports_aggregate_pushdown(_push_down_agg_type)) { + return Status::OK(); + } + + FileAggregateRequest file_request; + _build_file_aggregate_request(_push_down_agg_type, &file_request); + FileAggregateResult file_result; + const auto status = _data_reader.reader->get_aggregate_result(file_request, &file_result); + if (status.is()) { + return Status::OK(); + } + RETURN_IF_ERROR(status); + RETURN_IF_ERROR( + _materialize_aggregate_pushdown_rows(_push_down_agg_type, file_result, block)); + *pushed_down = true; + RETURN_IF_ERROR(close_current_reader()); + return Status::OK(); + } + + virtual bool _supports_aggregate_pushdown(TPushAggOp::type agg_type) const { + // Only COUNT and MIN/MAX can be push down. + if (agg_type != TPushAggOp::type::COUNT && agg_type != TPushAggOp::type::MINMAX) { + return false; + } + // Only support aggregate pushdown when there is no delete, filter and column predicate, so + // the reduced rows consumed by the upper aggregate remain semantically equivalent to a + // normal scan. + if (_delete_rows != nullptr && !_delete_rows->empty()) { + return false; + } + if (!_table_filters.empty() || !_table_column_predicates.empty()) { + return false; + } + if (agg_type == TPushAggOp::type::COUNT) { + return true; + } + // For MIN/MAX, only support direct file-to-table column mappings. The two emitted rows + // must be enough for the upper MIN/MAX aggregate without evaluating projections, default + // expressions or virtual columns. + for (const auto& mapping : _data_reader.column_mapper.mappings()) { + if (!mapping.file_column_id.has_value() || mapping.has_complex_projection || + mapping.virtual_column_type != TableVirtualColumnType::INVALID || + mapping.default_expr != nullptr || mapping.file_type == nullptr || + mapping.table_type == nullptr) { + return false; + } + } + return true; + } + Status _materialize_mapping_column(const ColumnMapping& mapping, Block* current_block, const size_t rows, ColumnPtr* column) { if (mapping.projection != nullptr) { @@ -411,6 +486,82 @@ class TableReader { return Status::OK(); } + void _build_file_aggregate_request(TPushAggOp::type agg_type, + FileAggregateRequest* request) const { + DORIS_CHECK(request != nullptr); + DORIS_CHECK(_supports_aggregate_pushdown(agg_type)); + request->agg_type = agg_type; + request->columns.clear(); + if (agg_type == TPushAggOp::type::COUNT) { + return; + } + request->columns.reserve(_data_reader.column_mapper.mappings().size()); + for (const auto& mapping : _data_reader.column_mapper.mappings()) { + DORIS_CHECK(mapping.file_column_id.has_value()); + request->columns.push_back({*mapping.file_column_id}); + } + } + + Status _materialize_aggregate_pushdown_rows(TPushAggOp::type agg_type, + const FileAggregateResult& file_result, + Block* block) { + if (agg_type == TPushAggOp::type::COUNT) { + // COUNT pushdown is not a final count value. It emits `count` default rows so the + // upper COUNT(*) aggregate can count them and produce the final result, including + // zero rows when count is 0. + for (size_t column_idx = 0; column_idx < block->columns(); ++column_idx) { + block->replace_by_position(column_idx, + block->get_by_position(column_idx) + .type->create_column_const_with_default_value( + cast_set(file_result.count))); + } + return Status::OK(); + } + // MIN/MAX pushdown emits two rows, min first and max second, for each projected column. + // The upper MIN/MAX aggregate consumes those two rows to produce the final aggregate value. + DORIS_CHECK(file_result.columns.size() == _data_reader.column_mapper.mappings().size()); + DORIS_CHECK(block->columns() == _data_reader.column_mapper.mappings().size()); + Block file_block; + file_block.reserve(_data_reader.block_schema.size()); + for (const auto& field : _data_reader.block_schema) { + file_block.insert({field.type->create_column(), field.type, field.name}); + } + for (size_t column_idx = 0; column_idx < file_result.columns.size(); ++column_idx) { + const auto& result_column = file_result.columns[column_idx]; + if (!result_column.has_min || !result_column.has_max) { + return Status::NotSupported("Missing min/max aggregate result for column {}", + _projected_columns[column_idx].name); + } + const auto& mapping = _data_reader.column_mapper.mappings()[column_idx]; + DORIS_CHECK(mapping.file_column_id.has_value()); + bool found_file_column = false; + for (size_t block_position = 0; block_position < _data_reader.block_schema.size(); + ++block_position) { + if (_data_reader.block_schema[block_position].id == *mapping.file_column_id) { + found_file_column = true; + auto column = + file_block.get_by_position(block_position).column->assume_mutable(); + if (column->empty()) { + column->insert(result_column.min_value); + column->insert(result_column.max_value); + file_block.replace_by_position(block_position, std::move(column)); + } + break; + } + } + DORIS_CHECK(found_file_column); + } + for (size_t column_idx = 0; column_idx < _data_reader.column_mapper.mappings().size(); + ++column_idx) { + ColumnPtr table_column; + RETURN_IF_ERROR( + _materialize_mapping_column(_data_reader.column_mapper.mappings()[column_idx], + &file_block, 2, &table_column)); + block->replace_by_position(column_idx, std::move(table_column)); + } + return Status::OK(); + } + struct DataReader { std::unique_ptr reader; TableColumnMapper column_mapper; @@ -426,6 +577,7 @@ class TableReader { std::shared_ptr _system_properties; // partition key -> value std::map _partition_values; + // Predicates built from scan conjuncts before file-level localization. std::vector _table_filters; TableColumnPredicates _table_column_predicates; VExprContext _conjuncts {nullptr}; @@ -437,6 +589,8 @@ class TableReader { RuntimeState* _runtime_state; RuntimeProfile* _scanner_profile; FileFormat _format; + TPushAggOp::type _push_down_agg_type = TPushAggOp::type::NONE; + bool _aggregate_pushdown_tried = false; private: static const SchemaField* _find_schema_field(const std::vector& schema, diff --git a/be/src/format/table/iceberg_reader_v2.cpp b/be/src/format/table/iceberg_reader_v2.cpp index ed6649fce2c9ed..f9587361dcf89d 100644 --- a/be/src/format/table/iceberg_reader_v2.cpp +++ b/be/src/format/table/iceberg_reader_v2.cpp @@ -115,6 +115,13 @@ Status IcebergTableReader::customize_file_scan_request(reader::FileScanRequest* return Status::OK(); } +bool IcebergTableReader::_supports_aggregate_pushdown(TPushAggOp::type agg_type) const { + if (!TableReader::_supports_aggregate_pushdown(agg_type)) { + return false; + } + return _equality_delete_filters.empty(); +} + Status IcebergTableReader::_parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, DeleteFileDesc* desc, bool* has_delete_file) { @@ -184,7 +191,6 @@ Status IcebergTableReader::_init_delete_predicates(const TTableFormatFileDesc& t equality_delete_files.push_back(delete_file); } } - // `_delete_rows != nullptr` means DeleteVector is parsed if (_delete_rows != nullptr) { _position_delete_rows_storage = *_delete_rows; @@ -250,14 +256,13 @@ std::unique_ptr IcebergTableReader::_delete_file_descriptio return file_description; } -const reader::SchemaField* IcebergTableReader::_find_delete_field( - const std::vector& schema, const std::string& name) { - for (const auto& field : schema) { - if (field.name == name) { - return &field; - } +std::string IcebergTableReader::_data_file_path() const { + if (_iceberg_params != nullptr && _iceberg_params->__isset.original_file_path) { + return _iceberg_params->original_file_path; } - return nullptr; + DORIS_CHECK(_current_task != nullptr); + DORIS_CHECK(_current_task->data_file != nullptr); + return _current_task->data_file->path; } Status IcebergTableReader::_append_row_position_output_column(reader::FileScanRequest* request) { @@ -273,8 +278,6 @@ Status IcebergTableReader::_append_equality_delete_predicates(reader::FileScanRe for (const auto& filter : _equality_delete_filters) { auto delete_predicate = std::make_shared(filter.delete_block, filter.field_ids); - reader::FileExpressionFilter expression_filter; - expression_filter.delete_conjunct = VExprContext::create_shared(delete_predicate); DCHECK_EQ(filter.field_ids.size(), filter.key_types.size()); for (size_t idx = 0; idx < filter.field_ids.size(); ++idx) { const int field_id = filter.field_ids[idx]; @@ -301,22 +304,13 @@ Status IcebergTableReader::_append_equality_delete_predicates(reader::FileScanRe cast_expr->add_child(std::move(slot)); delete_predicate->add_child(std::move(cast_expr)); } - expression_filter.file_column_ids.push_back(field_it->id); } - request->expression_filters.push_back(std::move(expression_filter)); + request->delete_conjuncts.push_back( + VExprContext::create_shared(std::move(delete_predicate))); } return Status::OK(); } -std::string IcebergTableReader::_data_file_path() const { - if (_iceberg_params != nullptr && _iceberg_params->__isset.original_file_path) { - return _iceberg_params->original_file_path; - } - DORIS_CHECK(_current_task != nullptr); - DORIS_CHECK(_current_task->data_file != nullptr); - return _current_task->data_file->path; -} - Status IcebergTableReader::_read_parquet_position_delete_file( const TIcebergDeleteFileDesc& delete_file, const TFileScanRangeParams& scan_params, IcebergDeleteFileIOContext* delete_io_ctx, PositionDeleteRowsCollector* collector) { @@ -344,8 +338,15 @@ Status IcebergTableReader::_read_parquet_position_delete_file( std::vector schema; RETURN_IF_ERROR(reader.get_schema(&schema)); - const auto* file_path_field = _find_delete_field(schema, ICEBERG_FILE_PATH); - const auto* pos_field = _find_delete_field(schema, ICEBERG_ROW_POS); + reader::SchemaField* file_path_field = nullptr; + reader::SchemaField* pos_field = nullptr; + for (auto& field : schema) { + if (field.name == ICEBERG_FILE_PATH) { + file_path_field = &field; + } else if (field.name == ICEBERG_ROW_POS) { + pos_field = &field; + } + } if (file_path_field == nullptr || pos_field == nullptr) { return Status::InternalError("Position delete parquet file is missing required columns"); } @@ -381,9 +382,8 @@ Status IcebergTableReader::_init_position_delete_rows( TFileScanRangeParams delete_scan_params = _scan_params == nullptr ? TFileScanRangeParams() : *_scan_params; reader::DeleteRows position_delete_rows; - const auto data_file_path = _data_file_path(); IcebergDeleteFileIOContext delete_io_ctx(_runtime_state); - PositionDeleteRowsCollector collector(data_file_path, &position_delete_rows); + PositionDeleteRowsCollector collector(_data_file_path(), &position_delete_rows); for (const auto& delete_file : delete_files) { RETURN_IF_ERROR(_read_parquet_position_delete_file(delete_file, delete_scan_params, &delete_io_ctx, &collector)); diff --git a/be/src/format/table/iceberg_reader_v2.h b/be/src/format/table/iceberg_reader_v2.h index 497a989289a14d..a543ae0797dec4 100644 --- a/be/src/format/table/iceberg_reader_v2.h +++ b/be/src/format/table/iceberg_reader_v2.h @@ -53,6 +53,8 @@ class IcebergTableReader : public reader::TableReader { Status customize_file_scan_request(reader::FileScanRequest* file_request) override; + bool _supports_aggregate_pushdown(TPushAggOp::type agg_type) const override; + Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, DeleteFileDesc* desc, bool* has_delete_file) override; @@ -93,18 +95,17 @@ class IcebergTableReader : public reader::TableReader { static std::unique_ptr _delete_file_description( const TFileRangeDesc& range); - static const reader::SchemaField* _find_delete_field( - const std::vector& schema, const std::string& name); + std::string _data_file_path() const; + // Append row position column to file scan request for position delete handling. Status _append_row_position_output_column(reader::FileScanRequest* request); - + // Append equality delete predicates to file scan request based on the delete files in iceberg + // params. DeleteVector and position delete files use the common DeleteRows path in TableReader. Status _append_equality_delete_predicates(reader::FileScanRequest* request); Status _init_equality_delete_predicates( const std::vector& delete_files); - std::string _data_file_path() const; - // Read equality/position delete files. Status _read_parquet_equality_delete_file(const TIcebergDeleteFileDesc& delete_file, const TFileScanRangeParams& scan_params, diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp b/be/test/format/new_parquet/parquet_reader_test.cpp index 0be12c271293cc..59f0bc38577499 100644 --- a/be/test/format/new_parquet/parquet_reader_test.cpp +++ b/be/test/format/new_parquet/parquet_reader_test.cpp @@ -461,9 +461,7 @@ TEST_F(NewParquetReaderTest, ReadPredicateAndNonPredicateColumnsWithSelection) { auto request = std::make_unique(); request->predicate_columns = {0}; request->non_predicate_columns = {1}; - reader::FileExpressionFilter expression_filter; - expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2); - request->expression_filters.push_back(std::move(expression_filter)); + request->conjuncts.push_back(create_int32_greater_than_conjunct(0, 2)); reader::FileColumnPredicateFilter column_filter; column_filter.file_column_id = 0; column_filter.predicates.push_back(create_comparison_predicate( @@ -508,9 +506,7 @@ TEST_F(NewParquetReaderTest, ReadMultiPredicateColumnsBeforeExpressionFilter) { auto request = std::make_unique(); request->predicate_columns = {0, 1}; request->non_predicate_columns = {}; - reader::FileExpressionFilter expression_filter; - expression_filter.conjunct = create_int32_sum_greater_than_conjunct(0, 1, 7); - request->expression_filters.push_back(std::move(expression_filter)); + request->conjuncts.push_back(create_int32_sum_greater_than_conjunct(0, 1, 7)); ASSERT_TRUE(reader->open(request).ok()); size_t rows = 0; @@ -543,9 +539,7 @@ TEST_F(NewParquetReaderTest, PredicateFiltersRowGroupsByStatistics) { auto request = std::make_unique(); request->predicate_columns = {0}; request->non_predicate_columns = {1}; - reader::FileExpressionFilter expression_filter; - expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2); - request->expression_filters.push_back(std::move(expression_filter)); + request->conjuncts.push_back(create_int32_greater_than_conjunct(0, 2)); reader::FileColumnPredicateFilter column_filter; column_filter.file_column_id = 0; column_filter.predicates.push_back(create_comparison_predicate( @@ -635,9 +629,7 @@ TEST_F(NewParquetReaderTest, RowPositionReaderKeepsPositionsAfterSelection) { {0, 0}, {parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 2}, }; - reader::FileExpressionFilter expression_filter; - expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2); - request->expression_filters.push_back(std::move(expression_filter)); + request->conjuncts.push_back(create_int32_greater_than_conjunct(0, 2)); ASSERT_TRUE(reader->open(request).ok()); size_t rows = 0; @@ -679,11 +671,7 @@ TEST_F(NewParquetReaderTest, DeletePredicateFiltersRowPositions) { {0, 0}, {parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 2}, }; - reader::FileExpressionFilter delete_filter; - delete_filter.delete_conjunct = VExprContext::create_shared(std::move(delete_predicate)); - delete_filter.file_column_ids.push_back( - parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID); - request->expression_filters.push_back(std::move(delete_filter)); + request->delete_conjuncts.push_back(VExprContext::create_shared(std::move(delete_predicate))); ASSERT_TRUE(reader->open(request).ok()); size_t rows = 0; @@ -725,13 +713,8 @@ TEST_F(NewParquetReaderTest, QueryPredicateAndDeletePredicateFilterRowPositions) {0, 0}, {parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 2}, }; - reader::FileExpressionFilter expression_filter; - expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2); - expression_filter.delete_conjunct = VExprContext::create_shared(std::move(delete_predicate)); - expression_filter.file_column_ids.push_back(0); - expression_filter.file_column_ids.push_back( - parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID); - request->expression_filters.push_back(std::move(expression_filter)); + request->conjuncts.push_back(create_int32_greater_than_conjunct(0, 2)); + request->delete_conjuncts.push_back(VExprContext::create_shared(std::move(delete_predicate))); ASSERT_TRUE(reader->open(request).ok()); size_t rows = 0; diff --git a/be/test/format/reader/expr/cast_test.cpp b/be/test/format/reader/expr/cast_test.cpp index a236d327a1f2c4..93858dbf53ef85 100644 --- a/be/test/format/reader/expr/cast_test.cpp +++ b/be/test/format/reader/expr/cast_test.cpp @@ -238,6 +238,26 @@ TEST_F(CastTest, ColumnMapperBuildsCastProjectionForTypeMismatch) { mapping.projection->close(); } +TEST_F(CastTest, ColumnMapperTreatsEquivalentTypesAsTrivial) { + reader::TableColumnMapper mapper; + reader::TableColumn table_column; + table_column.id = 7; + table_column.name = "value"; + table_column.type = std::make_shared(); + std::vector projected_columns {table_column}; + + reader::SchemaField file_field; + file_field.id = 0; + file_field.name = "value"; + file_field.type = std::make_shared(); + std::vector file_schema {file_field}; + + auto status = mapper.create_mapping(projected_columns, {}, file_schema); + ASSERT_TRUE(status.ok()) << status; + ASSERT_EQ(mapper.mappings().size(), 1); + EXPECT_TRUE(mapper.mappings()[0].is_trivial); +} + TEST_F(CastTest, ColumnMapperBuildsCastFilterForTypeMismatch) { reader::TableColumnMapper mapper; reader::TableColumn table_column; @@ -264,9 +284,9 @@ TEST_F(CastTest, ColumnMapperBuildsCastFilterForTypeMismatch) { reader::FileScanRequest file_request; ASSERT_TRUE( mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request).ok()); - ASSERT_EQ(file_request.expression_filters.size(), 1); + ASSERT_EQ(file_request.conjuncts.size(), 1); ASSERT_EQ(file_request.predicate_columns, std::vector({0})); - const auto& localized_expr = file_request.expression_filters[0].conjunct->root(); + const auto& localized_expr = file_request.conjuncts[0]->root(); ASSERT_EQ(localized_expr->get_num_children(), 1); const auto& localized_child = localized_expr->children()[0]; ASSERT_NE(dynamic_cast(localized_child.get()), nullptr); @@ -279,7 +299,7 @@ TEST_F(CastTest, ColumnMapperBuildsCastFilterForTypeMismatch) { Block block; block.insert(ColumnHelper::create_column_with_name({11, 22})); - auto* conjunct = file_request.expression_filters[0].conjunct.get(); + auto* conjunct = file_request.conjuncts[0].get(); status = conjunct->prepare(&state, RowDescriptor()); ASSERT_TRUE(status.ok()) << status; status = conjunct->open(&state); @@ -293,7 +313,172 @@ TEST_F(CastTest, ColumnMapperBuildsCastFilterForTypeMismatch) { EXPECT_EQ(filter[0], 0); EXPECT_EQ(filter[1], 1); - file_request.expression_filters[0].conjunct->close(); + file_request.conjuncts[0]->close(); +} + +TEST_F(CastTest, ColumnMapperDoesNotNestCastFilterAcrossScanRequests) { + reader::TableColumnMapper mapper; + reader::TableColumn table_column; + table_column.id = 7; + table_column.name = "value"; + table_column.type = std::make_shared(); + std::vector projected_columns {table_column}; + + reader::SchemaField file_field; + file_field.id = 0; + file_field.name = "value"; + file_field.type = std::make_shared(); + std::vector file_schema {file_field}; + + auto status = mapper.create_mapping(projected_columns, {}, file_schema); + ASSERT_TRUE(status.ok()) << status; + + auto predicate = std::make_shared(15); + predicate->add_child(TableSlotRef::create_shared(7, 7, -1, table_column.type, "value")); + reader::TableFilter table_filter; + table_filter.conjunct = VExprContext::create_shared(predicate); + table_filter.slot_ids = {7}; + + reader::FileScanRequest first_request; + ASSERT_TRUE( + mapper.create_scan_request({table_filter}, {}, projected_columns, &first_request).ok()); + reader::FileScanRequest second_request; + ASSERT_TRUE(mapper.create_scan_request({table_filter}, {}, projected_columns, &second_request) + .ok()); + + ASSERT_EQ(second_request.conjuncts.size(), 1); + const auto& localized_expr = second_request.conjuncts[0]->root(); + ASSERT_EQ(localized_expr->get_num_children(), 1); + const auto& localized_child = localized_expr->children()[0]; + ASSERT_NE(dynamic_cast(localized_child.get()), nullptr); + ASSERT_EQ(localized_child->get_num_children(), 1); + const auto* localized_slot = + assert_cast(localized_child->children()[0].get()); + EXPECT_EQ(localized_slot->column_id(), 0); +} + +TEST_F(CastTest, ColumnMapperRewritesPreviousCastFilterToMatchingSplitType) { + reader::TableColumn table_column; + table_column.id = 7; + table_column.name = "value"; + table_column.type = std::make_shared(); + std::vector projected_columns {table_column}; + + auto predicate = std::make_shared(15); + predicate->add_child(TableSlotRef::create_shared(7, 7, -1, table_column.type, "value")); + reader::TableFilter table_filter; + table_filter.conjunct = VExprContext::create_shared(predicate); + table_filter.slot_ids = {7}; + + reader::SchemaField int_file_field; + int_file_field.id = 0; + int_file_field.name = "value"; + int_file_field.type = std::make_shared(); + + reader::TableColumnMapper int_mapper; + ASSERT_TRUE(int_mapper.create_mapping(projected_columns, {}, {int_file_field}).ok()); + reader::FileScanRequest int_request; + ASSERT_TRUE(int_mapper.create_scan_request({table_filter}, {}, projected_columns, &int_request) + .ok()); + + const auto& int_localized_expr = int_request.conjuncts[0]->root(); + ASSERT_EQ(int_localized_expr->get_num_children(), 1); + ASSERT_NE(dynamic_cast(int_localized_expr->children()[0].get()), nullptr); + + reader::SchemaField bigint_file_field; + bigint_file_field.id = 0; + bigint_file_field.name = "value"; + bigint_file_field.type = std::make_shared(); + + reader::TableColumnMapper bigint_mapper; + ASSERT_TRUE(bigint_mapper.create_mapping(projected_columns, {}, {bigint_file_field}).ok()); + reader::FileScanRequest bigint_request; + ASSERT_TRUE(bigint_mapper + .create_scan_request({table_filter}, {}, projected_columns, &bigint_request) + .ok()); + + const auto& bigint_localized_expr = bigint_request.conjuncts[0]->root(); + ASSERT_EQ(bigint_localized_expr->get_num_children(), 1); + const auto& bigint_localized_child = bigint_localized_expr->children()[0]; + const auto* localized_slot = assert_cast(bigint_localized_child.get()); + EXPECT_EQ(localized_slot->column_id(), 0); + EXPECT_TRUE(localized_slot->data_type()->equals(*bigint_file_field.type)); + + Block block; + block.insert(ColumnHelper::create_column_with_name({11, 22})); + auto* conjunct = bigint_request.conjuncts[0].get(); + auto status = conjunct->prepare(&state, RowDescriptor()); + ASSERT_TRUE(status.ok()) << status; + status = conjunct->open(&state); + ASSERT_TRUE(status.ok()) << status; + IColumn::Filter filter(block.rows(), 1); + bool can_filter_all = false; + status = conjunct->execute_filter(&block, filter.data(), block.rows(), false, &can_filter_all); + ASSERT_TRUE(status.ok()) << status; + EXPECT_FALSE(can_filter_all); + ASSERT_EQ(filter.size(), 2); + EXPECT_EQ(filter[0], 0); + EXPECT_EQ(filter[1], 1); + conjunct->close(); +} + +TEST_F(CastTest, ColumnMapperKeepsTableSlotIdWhenFileBlockPositionChanges) { + reader::TableColumn table_column; + table_column.id = 7; + table_column.name = "value"; + table_column.type = std::make_shared(); + std::vector projected_columns {table_column}; + + reader::SchemaField file_field; + file_field.id = 10; + file_field.name = "value"; + file_field.type = std::make_shared(); + + reader::TableColumnMapper mapper; + ASSERT_TRUE(mapper.create_mapping(projected_columns, {}, {file_field}).ok()); + + auto predicate = std::make_shared(15); + predicate->add_child(TableSlotRef::create_shared(7, 7, -1, table_column.type, "value")); + reader::TableFilter table_filter; + table_filter.conjunct = VExprContext::create_shared(predicate); + table_filter.slot_ids = {7}; + + reader::FileScanRequest first_request; + ASSERT_TRUE(mapper.localize_filters({table_filter}, {}, &first_request).ok()); + ASSERT_EQ(first_request.conjuncts.size(), 1); + const auto* first_slot = assert_cast( + first_request.conjuncts[0]->root()->children()[0].get()); + EXPECT_EQ(first_slot->slot_id(), 7); + EXPECT_EQ(first_slot->column_id(), 0); + + reader::FileScanRequest second_request; + second_request.column_positions.emplace(9, 0); + second_request.column_positions.emplace(10, 1); + second_request.non_predicate_columns.push_back(9); + ASSERT_TRUE(mapper.localize_filters({table_filter}, {}, &second_request).ok()); + ASSERT_EQ(second_request.conjuncts.size(), 1); + const auto* second_slot = assert_cast( + second_request.conjuncts[0]->root()->children()[0].get()); + EXPECT_EQ(second_slot->slot_id(), 7); + EXPECT_EQ(second_slot->column_id(), 1); + + Block block; + block.insert(ColumnHelper::create_column_with_name({100, 100})); + block.insert(ColumnHelper::create_column_with_name({11, 22})); + auto* conjunct = second_request.conjuncts[0].get(); + auto status = conjunct->prepare(&state, RowDescriptor()); + ASSERT_TRUE(status.ok()) << status; + status = conjunct->open(&state); + ASSERT_TRUE(status.ok()) << status; + IColumn::Filter filter(block.rows(), 1); + bool can_filter_all = false; + status = conjunct->execute_filter(&block, filter.data(), block.rows(), false, &can_filter_all); + ASSERT_TRUE(status.ok()) << status; + EXPECT_FALSE(can_filter_all); + ASSERT_EQ(filter.size(), 2); + EXPECT_EQ(filter[0], 0); + EXPECT_EQ(filter[1], 1); + conjunct->close(); } } // namespace doris diff --git a/be/test/format/reader/table_reader_test.cpp b/be/test/format/reader/table_reader_test.cpp index 8a72937002dba4..1bb6eaf26be6fb 100644 --- a/be/test/format/reader/table_reader_test.cpp +++ b/be/test/format/reader/table_reader_test.cpp @@ -268,14 +268,38 @@ void write_parquet_file(const std::string& file_path, int32_t id, const std::str builder.build())); } +void write_iceberg_equality_delete_parquet_file(const std::string& file_path, int32_t field_id, + int32_t value) { + const auto metadata = + arrow::key_value_metadata({"PARQUET:field_id"}, {std::to_string(field_id)}); + auto schema = arrow::schema({ + arrow::field("id", arrow::int32(), false)->WithMetadata(metadata), + }); + auto table = arrow::Table::Make(schema, {build_int32_array({value})}); + + auto file_result = arrow::io::FileOutputStream::Open(file_path); + ASSERT_TRUE(file_result.ok()) << file_result.status(); + std::shared_ptr out = *file_result; + + ::parquet::WriterProperties::Builder builder; + builder.version(::parquet::ParquetVersion::PARQUET_2_6); + builder.data_page_version(::parquet::ParquetDataPageVersion::V2); + builder.compression(::parquet::Compression::UNCOMPRESSED); + PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, 1, + builder.build())); +} + void write_int_pair_parquet_file(const std::string& file_path, const std::vector& ids, const std::vector& scores, const std::vector& values, int64_t row_group_size = -1) { + const auto id_metadata = arrow::key_value_metadata({"PARQUET:field_id"}, {"0"}); + const auto score_metadata = arrow::key_value_metadata({"PARQUET:field_id"}, {"1"}); + const auto value_metadata = arrow::key_value_metadata({"PARQUET:field_id"}, {"2"}); auto schema = arrow::schema({ - arrow::field("id", arrow::int32(), false), - arrow::field("score", arrow::int32(), false), - arrow::field("value", arrow::utf8(), false), + arrow::field("id", arrow::int32(), false)->WithMetadata(id_metadata), + arrow::field("score", arrow::int32(), false)->WithMetadata(score_metadata), + arrow::field("value", arrow::utf8(), false)->WithMetadata(value_metadata), }); auto table = arrow::Table::Make(schema, {build_int32_array(ids), build_int32_array(scores), build_string_array(values)}); @@ -398,6 +422,16 @@ TIcebergDeleteFileDesc make_iceberg_position_delete_file(const std::string& path return delete_file; } +TIcebergDeleteFileDesc make_iceberg_equality_delete_file(const std::string& path, + const std::vector& field_ids) { + TIcebergDeleteFileDesc delete_file; + delete_file.__set_content(2); + delete_file.__set_path(path); + delete_file.__set_field_ids(field_ids); + delete_file.__set_file_format(TFileFormatType::FORMAT_PARQUET); + return delete_file; +} + TFileScanRangeParams make_local_parquet_scan_params() { TFileScanRangeParams scan_params; scan_params.__set_file_type(TFileType::FILE_LOCAL); @@ -557,6 +591,268 @@ TEST(TableReaderTest, ReopenSplitAfterClose) { std::filesystem::remove_all(test_dir); } +TEST(TableReaderTest, PushDownCountFromNewParquetReader) { + const auto test_dir = std::filesystem::temp_directory_path() / "doris_table_reader_count_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + write_int_pair_parquet_file(file_path, {1, 2, 3, 4, 5}, {10, 20, 30, 40, 50}, + {"one", "two", "three", "four", "five"}, 2); + + std::vector projected_columns; + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + TableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = {}, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = nullptr, + .io_ctx = nullptr, + .runtime_state = &state, + .scanner_profile = nullptr, + .allow_missing_columns = true, + .push_down_agg_type = TPushAggOp::type::COUNT, + .profile = nullptr, + }) + .ok()); + ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); + + Block block = build_table_block(projected_columns); + bool eos = false; + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + ASSERT_EQ(block.rows(), 5); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, PushDownMinMaxFromNewParquetReader) { + const auto test_dir = std::filesystem::temp_directory_path() / "doris_table_reader_minmax_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + write_int_pair_parquet_file(file_path, {3, 1, 5, 2}, {30, 10, 50, 20}, + {"three", "one", "five", "two"}, 2); + + std::vector projected_columns; + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + projected_columns.push_back(make_table_column(1, "score", std::make_shared())); + + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + TableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = {}, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = nullptr, + .io_ctx = nullptr, + .runtime_state = &state, + .scanner_profile = nullptr, + .allow_missing_columns = true, + .push_down_agg_type = TPushAggOp::type::MINMAX, + .profile = nullptr, + }) + .ok()); + ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); + + Block block = build_table_block(projected_columns); + bool eos = false; + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + ASSERT_EQ(block.rows(), 2); + const auto& id_column = assert_cast(*block.get_by_position(0).column); + const auto& score_column = assert_cast(*block.get_by_position(1).column); + EXPECT_EQ(id_column.get_element(0), 1); + EXPECT_EQ(id_column.get_element(1), 5); + EXPECT_EQ(score_column.get_element(0), 10); + EXPECT_EQ(score_column.get_element(1), 50); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, PushDownMinMaxCastsFileValueToTableType) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_table_reader_minmax_cast_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + write_int_pair_parquet_file(file_path, {3, 1, 5, 2}, {30, 10, 50, 20}, + {"three", "one", "five", "two"}, 2); + + std::vector projected_columns; + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + TableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = {}, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = nullptr, + .io_ctx = nullptr, + .runtime_state = &state, + .scanner_profile = nullptr, + .allow_missing_columns = true, + .push_down_agg_type = TPushAggOp::type::MINMAX, + .profile = nullptr, + }) + .ok()); + ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); + + Block block = build_table_block(projected_columns); + bool eos = false; + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + ASSERT_EQ(block.rows(), 2); + const auto& id_column = assert_cast(*block.get_by_position(0).column); + EXPECT_EQ(id_column.get_element(0), 1); + EXPECT_EQ(id_column.get_element(1), 5); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, PushDownCountFallsBackWithTableConjunct) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_table_reader_count_conjunct_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}); + + std::vector projected_columns; + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + TableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = {}, + .conjuncts = VExprContext( + std::make_shared(0, 0, 2)), + .format = FileFormat::PARQUET, + .scan_params = nullptr, + .io_ctx = nullptr, + .runtime_state = &state, + .scanner_profile = nullptr, + .allow_missing_columns = true, + .push_down_agg_type = TPushAggOp::type::COUNT, + .profile = nullptr, + }) + .ok()); + ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); + + Block block = build_table_block(projected_columns); + bool eos = false; + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + ASSERT_EQ(block.rows(), 1); + const auto& id_column = assert_cast(*block.get_by_position(0).column); + EXPECT_EQ(id_column.get_element(0), 3); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, PushDownCountFallsBackWithColumnPredicate) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_table_reader_count_predicate_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}, 1); + + std::vector projected_columns; + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + + TableColumnPredicates column_predicates; + column_predicates[0].push_back(create_comparison_predicate( + 0, "id", std::make_shared(), Field::create_field(2), false)); + + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + TableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = std::move(column_predicates), + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = nullptr, + .io_ctx = nullptr, + .runtime_state = &state, + .scanner_profile = nullptr, + .allow_missing_columns = true, + .push_down_agg_type = TPushAggOp::type::COUNT, + .profile = nullptr, + }) + .ok()); + ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); + + Block block = build_table_block(projected_columns); + bool eos = false; + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + ASSERT_EQ(block.rows(), 1); + const auto& id_column = assert_cast(*block.get_by_position(0).column); + EXPECT_EQ(id_column.get_element(0), 3); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, PushDownMinMaxFallsBackWithoutDirectFileMapping) { + const auto test_dir = std::filesystem::temp_directory_path() / + "doris_table_reader_minmax_missing_mapping_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + write_parquet_file(file_path, 1, "one"); + + std::vector projected_columns; + projected_columns.push_back( + make_table_column(99, "missing_id", std::make_shared())); + + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + TableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = {}, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = nullptr, + .io_ctx = nullptr, + .runtime_state = &state, + .scanner_profile = nullptr, + .allow_missing_columns = true, + .push_down_agg_type = TPushAggOp::type::MINMAX, + .profile = nullptr, + }) + .ok()); + ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); + + Block block = build_table_block(projected_columns); + bool eos = false; + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + ASSERT_EQ(block.rows(), 1); + EXPECT_EQ(block.get_by_position(0).column->get_int(0), 0); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + TEST(TableReaderTest, OpenReaderBuildsTableFiltersFromConjuncts) { const auto test_dir = std::filesystem::temp_directory_path() / "doris_table_reader_conjunct_filter_test"; @@ -644,7 +940,61 @@ TEST(TableReaderTest, OpenReaderBuildsColumnPredicateFilters) { write_int_pair_parquet_file(file_path, {1, 2, 3}, {1, 5, 8}, {"one", "two", "three"}, 1); std::vector projected_columns; - projected_columns.push_back(make_table_column(2, "value", std::make_shared())); + projected_columns.push_back(make_table_column(2, "value", std::make_shared())); + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + + TableColumnPredicates column_predicates; + column_predicates[0].push_back(create_comparison_predicate( + 0, "id", std::make_shared(), Field::create_field(2), false)); + + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + TableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = std::move(column_predicates), + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = nullptr, + .io_ctx = nullptr, + .runtime_state = &state, + .scanner_profile = nullptr, + .allow_missing_columns = true, + .profile = nullptr, + }) + .ok()); + + ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); + + Block block = build_table_block(projected_columns); + bool eos = false; + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + + const auto& value_column = assert_cast(*block.get_by_position(0).column); + const auto& id_column = assert_cast(*block.get_by_position(1).column); + ASSERT_EQ(id_column.size(), 1); + ASSERT_EQ(value_column.size(), 1); + EXPECT_EQ(id_column.get_element(0), 3); + EXPECT_EQ(value_column.get_data_at(0).to_string(), "three"); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, ColumnPredicateSurvivesReopenSplit) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_table_reader_predicate_reopen_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const std::vector file_paths = { + (test_dir / "split_1.parquet").string(), + (test_dir / "split_2.parquet").string(), + }; + write_int_pair_parquet_file(file_paths[0], {1, 3}, {10, 30}, {"one", "three"}, 1); + write_int_pair_parquet_file(file_paths[1], {2, 4}, {20, 40}, {"two", "four"}, 1); + + std::vector projected_columns; projected_columns.push_back(make_table_column(0, "id", std::make_shared())); TableColumnPredicates column_predicates; @@ -667,21 +1017,22 @@ TEST(TableReaderTest, OpenReaderBuildsColumnPredicateFilters) { }) .ok()); - ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); + std::vector ids; + for (const auto& file_path : file_paths) { + ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); - Block block = build_table_block(projected_columns); - bool eos = false; - ASSERT_TRUE(reader.get_block(&block, &eos).ok()); - ASSERT_FALSE(eos); + Block block = build_table_block(projected_columns); + bool eos = false; + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + const auto& id_column = assert_cast(*block.get_by_position(0).column); + ASSERT_EQ(id_column.size(), 1); + ids.push_back(id_column.get_element(0)); - const auto& value_column = assert_cast(*block.get_by_position(0).column); - const auto& id_column = assert_cast(*block.get_by_position(1).column); - ASSERT_EQ(id_column.size(), 1); - ASSERT_EQ(value_column.size(), 1); - EXPECT_EQ(id_column.get_element(0), 3); - EXPECT_EQ(value_column.get_data_at(0).to_string(), "three"); + ASSERT_TRUE(reader.close().ok()); + } - ASSERT_TRUE(reader.close().ok()); + EXPECT_EQ(ids, std::vector({3, 4})); std::filesystem::remove_all(test_dir); } @@ -763,6 +1114,51 @@ TEST(TableReaderTest, CreateScanRequestDeduplicatesSharedPredicateColumns) { } } +TEST(TableReaderTest, CreateScanRequestPromotesProjectedColumnToPredicateColumn) { + const auto int_type = std::make_shared(); + const std::vector projected_columns = { + make_table_column(0, "id", int_type), + make_table_column(1, "score", int_type), + }; + const std::vector file_schema = { + {.id = 0, + .name = "id", + .type = int_type, + .children = {}, + .file_path = {0}, + .field_id_path = {0}, + .name_path = {"id"}, + .column_type = DATA_COLUMN}, + {.id = 1, + .name = "score", + .type = int_type, + .children = {}, + .file_path = {1}, + .field_id_path = {1}, + .name_path = {"score"}, + .column_type = DATA_COLUMN}, + }; + + TableColumnMapper mapper; + ASSERT_TRUE(mapper.create_mapping(projected_columns, {}, file_schema).ok()); + + TableFilter table_filter { + .conjunct = VExprContext::create_shared( + std::make_shared(0, 0, 1)), + .slot_ids = {0}, + }; + + FileScanRequest file_request; + ASSERT_TRUE( + mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request).ok()); + + EXPECT_EQ(file_request.predicate_columns, std::vector({0})); + EXPECT_EQ(file_request.non_predicate_columns, std::vector({1})); + ASSERT_EQ(file_request.column_positions.size(), 2); + EXPECT_EQ(file_request.column_positions.at(0), 1); + EXPECT_EQ(file_request.column_positions.at(1), 0); +} + TEST(TableReaderTest, OpenReaderPushesMultiColumnConjunctToParquetReader) { const auto test_dir = std::filesystem::temp_directory_path() / "doris_table_reader_multi_conjunct_test"; @@ -1194,6 +1590,7 @@ TEST(TableReaderTest, IcebergTableReaderAppliesDeletionVectorFile) { .runtime_state = &state, .scanner_profile = &profile, .allow_missing_columns = true, + .push_down_agg_type = TPushAggOp::type::COUNT, .profile = make_table_read_profile(&profile), }) .ok()); @@ -1210,6 +1607,226 @@ TEST(TableReaderTest, IcebergTableReaderAppliesDeletionVectorFile) { std::filesystem::remove_all(test_dir); } +TEST(TableReaderTest, IcebergTableReaderDoesNotPushDownAggregateWithDeletes) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_iceberg_aggregate_delete_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + const auto dv_path = (test_dir / "delete-vector.bin").string(); + write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}); + const auto dv_size = write_iceberg_deletion_vector_file(dv_path, {0}); + + std::vector projected_columns; + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + + RuntimeProfile profile("test_profile"); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + auto scan_params = make_local_parquet_scan_params(); + io::FileReaderStats file_reader_stats; + io::FileCacheStatistics file_cache_stats; + auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); + ShardedKVCache cache(1); + doris::iceberg::IcebergTableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = {}, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = &scan_params, + .io_ctx = io_ctx, + .runtime_state = &state, + .scanner_profile = &profile, + .allow_missing_columns = true, + .push_down_agg_type = TPushAggOp::type::COUNT, + .profile = make_table_read_profile(&profile), + }) + .ok()); + + auto split_options = build_split_options(file_path); + split_options.cache = &cache; + split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc( + file_path, {make_iceberg_deletion_vector(dv_path, 0, dv_size)})); + ASSERT_TRUE(reader.prepare_split(split_options).ok()); + + Block block = build_table_block(projected_columns); + bool eos = false; + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + ASSERT_EQ(block.rows(), 2); + const auto& id_column = assert_cast(*block.get_by_position(0).column); + EXPECT_EQ(id_column.get_element(0), 2); + EXPECT_EQ(id_column.get_element(1), 3); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, IcebergTableReaderDoesNotPushDownAggregateWithPositionDelete) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_iceberg_aggregate_position_delete_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + const auto delete_file_path = (test_dir / "position-delete.parquet").string(); + write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}); + write_position_delete_parquet_file(delete_file_path, {file_path}, {1}); + + std::vector projected_columns; + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + + RuntimeProfile profile("test_profile"); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + auto scan_params = make_local_parquet_scan_params(); + io::FileReaderStats file_reader_stats; + io::FileCacheStatistics file_cache_stats; + auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); + ShardedKVCache cache(1); + doris::iceberg::IcebergTableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = {}, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = &scan_params, + .io_ctx = io_ctx, + .runtime_state = &state, + .scanner_profile = &profile, + .allow_missing_columns = true, + .push_down_agg_type = TPushAggOp::type::COUNT, + .profile = make_table_read_profile(&profile), + }) + .ok()); + + auto split_options = build_split_options(file_path); + split_options.cache = &cache; + split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc( + file_path, {make_iceberg_position_delete_file(delete_file_path)})); + ASSERT_TRUE(reader.prepare_split(split_options).ok()); + + Block block = build_table_block(projected_columns); + bool eos = false; + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + ASSERT_EQ(block.rows(), 2); + const auto& id_column = assert_cast(*block.get_by_position(0).column); + EXPECT_EQ(id_column.get_element(0), 1); + EXPECT_EQ(id_column.get_element(1), 3); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, IcebergPositionDeleteFallsBackToSplitPath) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_iceberg_position_delete_path_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + const auto delete_file_path = (test_dir / "position-delete.parquet").string(); + write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}); + write_position_delete_parquet_file(delete_file_path, {file_path}, {1}); + + std::vector projected_columns; + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + + RuntimeProfile profile("test_profile"); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + auto scan_params = make_local_parquet_scan_params(); + io::FileReaderStats file_reader_stats; + io::FileCacheStatistics file_cache_stats; + auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); + ShardedKVCache cache(1); + doris::iceberg::IcebergTableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = {}, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = &scan_params, + .io_ctx = io_ctx, + .runtime_state = &state, + .scanner_profile = &profile, + .allow_missing_columns = true, + .profile = make_table_read_profile(&profile), + }) + .ok()); + + auto split_options = build_split_options(file_path); + split_options.cache = &cache; + TTableFormatFileDesc table_format_params; + TIcebergFileDesc iceberg_params; + iceberg_params.__set_format_version(2); + iceberg_params.__set_delete_files({make_iceberg_position_delete_file(delete_file_path)}); + table_format_params.__set_iceberg_params(iceberg_params); + split_options.current_range.__set_table_format_params(table_format_params); + ASSERT_TRUE(reader.prepare_split(split_options).ok()); + + EXPECT_EQ(read_iceberg_ids(&reader, projected_columns), std::vector({1, 3})); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, IcebergTableReaderDoesNotPushDownAggregateWithEqualityDelete) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_iceberg_aggregate_equality_delete_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + const auto delete_file_path = (test_dir / "equality-delete.parquet").string(); + write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}); + write_iceberg_equality_delete_parquet_file(delete_file_path, 0, 2); + + std::vector projected_columns; + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + + RuntimeProfile profile("test_profile"); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + auto scan_params = make_local_parquet_scan_params(); + io::FileReaderStats file_reader_stats; + io::FileCacheStatistics file_cache_stats; + auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); + ShardedKVCache cache(1); + doris::iceberg::IcebergTableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = {}, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = &scan_params, + .io_ctx = io_ctx, + .runtime_state = &state, + .scanner_profile = &profile, + .allow_missing_columns = true, + .push_down_agg_type = TPushAggOp::type::COUNT, + .profile = make_table_read_profile(&profile), + }) + .ok()); + + auto split_options = build_split_options(file_path); + split_options.cache = &cache; + split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc( + file_path, {make_iceberg_equality_delete_file(delete_file_path, {0})})); + ASSERT_TRUE(reader.prepare_split(split_options).ok()); + + Block block = build_table_block(projected_columns); + bool eos = false; + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + ASSERT_EQ(block.rows(), 2); + const auto& id_column = assert_cast(*block.get_by_position(0).column); + EXPECT_EQ(id_column.get_element(0), 1); + EXPECT_EQ(id_column.get_element(1), 3); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + TEST(TableReaderTest, IcebergTableReaderAppliesPositionDeleteFile) { const auto test_dir = std::filesystem::temp_directory_path() / "doris_iceberg_position_delete_file_test"; @@ -1332,8 +1949,9 @@ TEST(TableReaderTest, RowPositionDeletePredicateColumnIsNotRepeatedAsOutputColum EXPECT_EQ(request.non_predicate_columns, std::vector({0})); ASSERT_TRUE(request.column_positions.contains(row_position_column_id)); EXPECT_EQ(request.column_positions.at(row_position_column_id), 1); - ASSERT_EQ(request.expression_filters.size(), 1); - EXPECT_NE(request.expression_filters[0].delete_conjunct, nullptr); + ASSERT_TRUE(request.conjuncts.empty()); + ASSERT_EQ(request.delete_conjuncts.size(), 1); + EXPECT_NE(request.delete_conjuncts[0], nullptr); } TEST(TableReaderTest, ParquetReaderReadsOnlyRowGroupsInFileRange) {