From e3f46f052ca52b64ca6d8b06bb23b0c51555bfc6 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 27 May 2026 11:13:25 +0800 Subject: [PATCH 1/2] [feature](be) Build table filters from conjuncts ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: TableReader stored TableReadOptions::conjuncts but did not convert them into _table_filters before ColumnMapper created FileScanRequest. This adds conservative single-slot conjunct extraction during open_reader(), prepares/opens the generated filter contexts, and covers the localized Parquet row filtering path in TableReader tests. ### Release note None ### Check List (For Author) - Test: Manual test - Ran git diff --check. ./run-be-ut.sh --run '--filter=TableReaderTest.*' could not run because JAVA_HOME points to JDK 11 and JDK_17 is not set. build-support/check-format.sh could not run because clang-format is not installed. - Behavior changed: Yes. TableReader now localizes single-slot table conjuncts into FileScanRequest local filters during reader open. - Does this need documentation: No --- be/src/format/reader/table_reader.cpp | 54 +++++++++- be/src/format/reader/table_reader.h | 5 + be/test/format/reader/table_reader_test.cpp | 106 ++++++++++++++++++++ 3 files changed, 163 insertions(+), 2 deletions(-) diff --git a/be/src/format/reader/table_reader.cpp b/be/src/format/reader/table_reader.cpp index 13f093228e6e70..f650a75b1ffcee 100644 --- a/be/src/format/reader/table_reader.cpp +++ b/be/src/format/reader/table_reader.cpp @@ -20,15 +20,54 @@ #include #include +#include #include #include "common/status.h" +#include "core/assert_cast.h" +#include "exprs/vslot_ref.h" #include "format/new_parquet/parquet_reader.h" #include "format/reader/column_mapper.h" #include "format/table/deletion_vector_reader.h" #include "io/io_common.h" namespace doris::reader { +namespace { + +void collect_table_slot_ids(const VExprSPtr& expr, std::set* slot_ids) { + if (expr == nullptr) { + return; + } + if (expr->is_slot_ref()) { + const auto* slot_ref = assert_cast(expr.get()); + slot_ids->insert(slot_ref->slot_id()); + } + for (const auto& child : expr->children()) { + collect_table_slot_ids(child, slot_ids); + } +} + +void build_table_filters_from_conjunct(const VExprSPtr& conjunct, + std::map* table_filters) { + if (conjunct == nullptr) { + return; + } + std::set slot_ids; + collect_table_slot_ids(conjunct, &slot_ids); + if (slot_ids.size() == 1) { + (*table_filters)[*slot_ids.begin()].conjunct = VExprContext::create_shared(conjunct); + return; + } + if (conjunct->node_type() == TExprNodeType::COMPOUND_PRED && + conjunct->op() == TExprOpcode::COMPOUND_AND) { + for (const auto& child : conjunct->children()) { + build_table_filters_from_conjunct(child, table_filters); + } + return; + } +} + +} // namespace std::shared_ptr create_system_properties( const TFileScanRangeParams* scan_params) { @@ -59,8 +98,19 @@ Status TableReader::init(TableReadOptions options) { TableColumnMapperOptions mapper_options; mapper_options.mode = TableColumnMappingMode::BY_FIELD_ID; _data_reader.column_mapper = TableColumnMapper(mapper_options); - // TODO: - // _table_filters = build_table_filters_from_conjuncts(options.conjuncts); + _conjuncts = std::move(options.conjuncts); + return Status::OK(); +} + +Status TableReader::_build_table_filters_from_conjuncts() { + _table_filters.clear(); + build_table_filters_from_conjunct(_conjuncts.root(), &_table_filters); + RowDescriptor row_desc; + for (auto& [_, table_filter] : _table_filters) { + DORIS_CHECK(table_filter.conjunct != nullptr); + RETURN_IF_ERROR(table_filter.conjunct->prepare(_runtime_state, row_desc)); + RETURN_IF_ERROR(table_filter.conjunct->open(_runtime_state)); + } return Status::OK(); } diff --git a/be/src/format/reader/table_reader.h b/be/src/format/reader/table_reader.h index 53791747faf67f..e2a342310b209f 100644 --- a/be/src/format/reader/table_reader.h +++ b/be/src/format/reader/table_reader.h @@ -219,6 +219,7 @@ class TableReader { RETURN_IF_ERROR(_data_reader.column_mapper.create_mapping(_projected_columns, _partition_values, file_schema)); DORIS_CHECK(_data_reader.column_mapper.mappings().size() == _projected_columns.size()); + RETURN_IF_ERROR(_build_table_filters_from_conjuncts()); auto file_request = std::make_unique(); RETURN_IF_ERROR(_data_reader.column_mapper.create_scan_request( @@ -242,12 +243,15 @@ class TableReader { return Status::OK(); } + Status _build_table_filters_from_conjuncts(); + // 关闭当前具体 reader。 // 该 hook 会被 create_next_reader 和 close 调用;实现应保持幂等。 virtual Status close_current_reader() { RETURN_IF_ERROR(_data_reader.reader->close()); _data_reader.reader.reset(); _data_reader.column_mapper.clear(); + _table_filters.clear(); _data_reader.block_schema.clear(); _data_reader.scan_schema.clear(); _data_reader.block_template.clear(); @@ -314,6 +318,7 @@ class TableReader { // partition key -> value std::map _partition_values; std::map _table_filters; + VExprContext _conjuncts {nullptr}; std::unique_ptr _profile; // Parsed from DELETION_VECTOR in Iceberg and Paimon DeleteRows* _delete_rows; diff --git a/be/test/format/reader/table_reader_test.cpp b/be/test/format/reader/table_reader_test.cpp index 84c5700fc4c1ac..e5898070b09ec3 100644 --- a/be/test/format/reader/table_reader_test.cpp +++ b/be/test/format/reader/table_reader_test.cpp @@ -33,12 +33,49 @@ #include "core/column/column_vector.h" #include "core/data_type/data_type_number.h" #include "core/data_type/data_type_string.h" +#include "exprs/vexpr.h" +#include "format/reader/expr/slot_ref.h" #include "gen_cpp/PlanNodes_types.h" #include "runtime/runtime_state.h" namespace doris::reader { namespace { +class TableInt32GreaterThanExpr final : public VExpr { +public: + TableInt32GreaterThanExpr(int slot_id, int column_id, int32_t value) + : VExpr(std::make_shared(), false), + _column_id(column_id), + _value(value) { + add_child(TableSlotRef::create_shared(slot_id, column_id, -1, + std::make_shared(), "id")); + set_node_type(TExprNodeType::BINARY_PRED); + _opcode = TExprOpcode::GT; + } + + Status execute_column_impl(VExprContext* context, const Block* block, const Selector* selector, + size_t count, ColumnPtr& result_column) const override { + const auto& input = + assert_cast(*block->get_by_position(_column_id).column); + auto result = ColumnUInt8::create(); + auto& result_data = result->get_data(); + result_data.resize(count); + for (size_t row = 0; row < count; ++row) { + const size_t input_row = selector == nullptr ? row : (*selector)[row]; + result_data[row] = input.get_element(input_row) > _value; + } + result_column = std::move(result); + return Status::OK(); + } + + const std::string& expr_name() const override { return _expr_name; } + +private: + const int _column_id; + const int32_t _value; + const std::string _expr_name = "TableInt32GreaterThanExpr"; +}; + std::shared_ptr finish_array(arrow::ArrayBuilder* builder) { std::shared_ptr array; EXPECT_TRUE(builder->Finish(&array).ok()); @@ -162,6 +199,75 @@ TEST(TableReaderTest, ReopenSplitAfterClose) { std::filesystem::remove_all(test_dir); } +TEST(TableReaderTest, OpenReaderBuildsTableFiltersFromConjuncts) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_table_reader_conjunct_filter_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + write_parquet_file(file_path, 3, "three"); + + std::vector projected_columns; + projected_columns.push_back({.id = 0, .name = "id", .type = std::make_shared()}); + projected_columns.push_back( + {.id = 1, .name = "value", .type = std::make_shared()}); + + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + TableReader reader; + ASSERT_TRUE(reader + .init({ + .projected_columns = projected_columns, + .conjuncts = VExprContext(std::make_shared( + 0, 0, 2)), + .format = FileFormat::PARQUET, + .scan_params = nullptr, + .io_ctx = nullptr, + .runtime_state = &state, + .scanner_profile = nullptr, + }) + .ok()); + + ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); + + // open_reader() should convert the table-level conjunct on projected column id 0 into + // _table_filters before ColumnMapper creates the FileScanRequest. ParquetReader then receives + // that localized conjunct and filters rows through its predicate-column path. + Block block = build_table_block(projected_columns); + bool eos = false; + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + const auto& id_column = assert_cast(*block.get_by_position(0).column); + ASSERT_EQ(id_column.size(), 1); + EXPECT_EQ(id_column.get_element(0), 3); + + ASSERT_TRUE(reader.close().ok()); + + TableReader filtered_reader; + ASSERT_TRUE(filtered_reader + .init({ + .projected_columns = projected_columns, + .conjuncts = VExprContext(std::make_shared( + 0, 0, 4)), + .format = FileFormat::PARQUET, + .scan_params = nullptr, + .io_ctx = nullptr, + .runtime_state = &state, + .scanner_profile = nullptr, + }) + .ok()); + ASSERT_TRUE(filtered_reader.prepare_split(build_split_options(file_path)).ok()); + + block = build_table_block(projected_columns); + eos = false; + ASSERT_TRUE(filtered_reader.get_block(&block, &eos).ok()); + EXPECT_TRUE(eos); + EXPECT_EQ(block.get_by_position(0).column->size(), 0); + + ASSERT_TRUE(filtered_reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + TEST(TableReaderTest, ProjectedColumnsRejectParquetSchemaMismatch) { const auto test_dir = std::filesystem::temp_directory_path() / "doris_table_reader_schema_mismatch_test"; From e9560b93821c8525316d84679caca887cdd3a9eb Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 27 May 2026 15:19:48 +0800 Subject: [PATCH 2/2] [fix](be) Rewrite table filter conjunct slots ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: TableReader builds table filters from table-level conjuncts, but the conjunct slot refs use table block positions. Passing them directly to ParquetReader is incorrect because ParquetReader evaluates filters on a file-local block with different column positions. This rewrites localized conjunct slot refs to file-local positions and opens those localized filter expressions after FileScanRequest creation. ### Release note None ### Check List (For Author) - Test: Unit Test - git diff --check - build-support/check-format.sh --files be/src/format/reader/column_mapper.cpp be/src/format/reader/table_reader.cpp be/src/format/reader/table_reader.h be/test/format/reader/table_reader_test.cpp (blocked: clang-format not found) - ./run-be-ut.sh --run '--filter=TableReaderTest.*' (blocked: JAVA_HOME points to JDK 11 and JDK_17 is not set) - Behavior changed: No - Does this need documentation: No --- be/src/exprs/vslot_ref.h | 4 +- be/src/format/reader/column_mapper.cpp | 74 +++++++++-- be/src/format/reader/table_reader.cpp | 15 ++- be/src/format/reader/table_reader.h | 3 + be/test/format/reader/table_reader_test.cpp | 132 +++++++++++++++----- 5 files changed, 184 insertions(+), 44 deletions(-) diff --git a/be/src/exprs/vslot_ref.h b/be/src/exprs/vslot_ref.h index 6e7197f4cf6876..8cb26f9bcfd296 100644 --- a/be/src/exprs/vslot_ref.h +++ b/be/src/exprs/vslot_ref.h @@ -75,7 +75,9 @@ class VSlotRef : public VExpr { protected: VSlotRef(int slot_id, int column_id, int column_uniq_id) - : _slot_id(slot_id), _column_id(column_id), _column_uniq_id(column_uniq_id) {} + : _slot_id(slot_id), _column_id(column_id), _column_uniq_id(column_uniq_id) { + set_node_type(TExprNodeType::SLOT_REF); + } private: int _slot_id; diff --git a/be/src/format/reader/column_mapper.cpp b/be/src/format/reader/column_mapper.cpp index b2453dbbfaf61c..5790517f7bb71f 100644 --- a/be/src/format/reader/column_mapper.cpp +++ b/be/src/format/reader/column_mapper.cpp @@ -21,6 +21,7 @@ #include #include "common/status.h" +#include "core/assert_cast.h" #include "format/reader/expr/cast.h" #include "format/reader/expr/slot_ref.h" #include "format/reader/file_reader.h" @@ -28,6 +29,35 @@ namespace doris::reader { +static VExprSPtr rewrite_table_expr_to_file_expr( + const VExprSPtr& expr, const std::map& table_column_to_file_position) { + if (expr == nullptr) { + return nullptr; + } + if (expr->is_slot_ref()) { + const auto* slot_ref = assert_cast(expr.get()); + const auto position_it = table_column_to_file_position.find(slot_ref->slot_id()); + if (position_it != table_column_to_file_position.end()) { + return TableSlotRef::create_shared(slot_ref->slot_id(), + cast_set(position_it->second), -1, + slot_ref->data_type(), slot_ref->expr_name()); + } + return expr; + } + + // VExpr currently does not provide a generic deep-clone API for arbitrary expression types. + // Keep all slot-localization mutation inside ColumnMapper and rebuild it for every split + // before the localized expression is prepared/opened by TableReader. + VExprSPtrs rewritten_children; + rewritten_children.reserve(expr->children().size()); + for (const auto& child : expr->children()) { + rewritten_children.push_back( + rewrite_table_expr_to_file_expr(child, table_column_to_file_position)); + } + expr->set_children(std::move(rewritten_children)); + return expr; +} + static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id"; static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER = "_last_updated_sequence_number"; @@ -56,6 +86,21 @@ static void rebuild_projection(ColumnMapping* mapping, size_t block_position) { mapping->projection = VExprContext::create_shared(expr); } +static std::map build_file_position_map( + const std::vector& mappings, const FileScanRequest& file_request) { + std::map table_column_to_file_position; + for (const auto& mapping : mappings) { + if (!mapping.file_column_id.has_value()) { + continue; + } + const auto position_it = file_request.column_positions.find(*mapping.file_column_id); + if (position_it != file_request.column_positions.end()) { + table_column_to_file_position.emplace(mapping.table_column_id, position_it->second); + } + } + return table_column_to_file_position; +} + Status TableColumnMapper::create_mapping(const std::vector& projected_columns, const std::map& partition_values, const std::vector& file_schema) { @@ -102,7 +147,8 @@ Status TableColumnMapper::create_mapping(const std::vector& project Status TableColumnMapper::create_scan_request(const std::map& table_filters, const std::vector& projected_columns, FileScanRequest* file_request) { - // 真实实现会把 table projection/filter 转换成 file-local projection/filter。 + // FileReader evaluates expressions against a file-local block. This mapper owns the + // table-column to file-column conversion, so it also owns the file-local block positions. file_request->predicate_columns.clear(); file_request->non_predicate_columns.clear(); file_request->column_positions.clear(); @@ -141,15 +187,29 @@ Status TableColumnMapper::localize_filters(const std::map& if (!it.second.can_be_localized()) { // TODO: Rewrite table filter to reader_expression_map // file_request->reader_expression_map.emplace_back(mapping->table_column_id, it.second.conjunct); - } else { - FileLocalFilter local_filter; - local_filter.file_column_id = *mapping->file_column_id; - local_filter.conjunct = it.second.conjunct; - local_filter.predicates = it.second.predicates; - file_request->local_filters.push_back(std::move(local_filter)); + continue; } add_scan_column(file_request, *mapping->file_column_id, &file_request->predicate_columns); } + + // Build the complete table-slot to file-block position map after all predicate columns have + // been assigned. This keeps expression localization independent from filter iteration order. + const auto table_column_to_file_position = build_file_position_map(_mappings, *file_request); + for (const auto& it : table_filters) { + const auto* mapping = _find_mapping(it.first); + if (mapping == nullptr || !mapping->file_column_id.has_value() || + !it.second.can_be_localized()) { + continue; + } + FileLocalFilter local_filter; + local_filter.file_column_id = *mapping->file_column_id; + if (it.second.conjunct != nullptr) { + local_filter.conjunct = VExprContext::create_shared(rewrite_table_expr_to_file_expr( + it.second.conjunct->root(), table_column_to_file_position)); + } + local_filter.predicates = it.second.predicates; + file_request->local_filters.push_back(std::move(local_filter)); + } return Status::OK(); } diff --git a/be/src/format/reader/table_reader.cpp b/be/src/format/reader/table_reader.cpp index f650a75b1ffcee..f6cfa21600ea61 100644 --- a/be/src/format/reader/table_reader.cpp +++ b/be/src/format/reader/table_reader.cpp @@ -97,6 +97,7 @@ Status TableReader::init(TableReadOptions options) { _profile = std::move(options.profile); TableColumnMapperOptions mapper_options; mapper_options.mode = TableColumnMappingMode::BY_FIELD_ID; + mapper_options.allow_missing_columns = options.allow_missing_columns; _data_reader.column_mapper = TableColumnMapper(mapper_options); _conjuncts = std::move(options.conjuncts); return Status::OK(); @@ -105,11 +106,17 @@ Status TableReader::init(TableReadOptions options) { Status TableReader::_build_table_filters_from_conjuncts() { _table_filters.clear(); build_table_filters_from_conjunct(_conjuncts.root(), &_table_filters); + return Status::OK(); +} + +Status TableReader::_open_local_filter_exprs(const FileScanRequest& file_request) { RowDescriptor row_desc; - for (auto& [_, table_filter] : _table_filters) { - DORIS_CHECK(table_filter.conjunct != nullptr); - RETURN_IF_ERROR(table_filter.conjunct->prepare(_runtime_state, row_desc)); - RETURN_IF_ERROR(table_filter.conjunct->open(_runtime_state)); + for (const auto& local_filter : file_request.local_filters) { + if (local_filter.conjunct == nullptr) { + continue; + } + RETURN_IF_ERROR(local_filter.conjunct->prepare(_runtime_state, row_desc)); + RETURN_IF_ERROR(local_filter.conjunct->open(_runtime_state)); } return Status::OK(); } diff --git a/be/src/format/reader/table_reader.h b/be/src/format/reader/table_reader.h index e2a342310b209f..4f28c4e1aaa9f8 100644 --- a/be/src/format/reader/table_reader.h +++ b/be/src/format/reader/table_reader.h @@ -107,6 +107,7 @@ struct TableReadOptions { std::shared_ptr io_ctx; RuntimeState* runtime_state; RuntimeProfile* scanner_profile; + const bool allow_missing_columns = true; std::unique_ptr profile; }; @@ -224,6 +225,7 @@ class TableReader { auto file_request = std::make_unique(); RETURN_IF_ERROR(_data_reader.column_mapper.create_scan_request( _table_filters, _projected_columns, file_request.get())); + RETURN_IF_ERROR(_open_local_filter_exprs(*file_request)); _data_reader.scan_schema.clear(); _data_reader.block_template.clear(); _data_reader.scan_schema.resize(file_request->column_positions.size()); @@ -244,6 +246,7 @@ class TableReader { } Status _build_table_filters_from_conjuncts(); + Status _open_local_filter_exprs(const FileScanRequest& file_request); // 关闭当前具体 reader。 // 该 hook 会被 create_next_reader 和 close 调用;实现应保持幂等。 diff --git a/be/test/format/reader/table_reader_test.cpp b/be/test/format/reader/table_reader_test.cpp index e5898070b09ec3..dc2e26f35ea222 100644 --- a/be/test/format/reader/table_reader_test.cpp +++ b/be/test/format/reader/table_reader_test.cpp @@ -44,9 +44,7 @@ namespace { class TableInt32GreaterThanExpr final : public VExpr { public: TableInt32GreaterThanExpr(int slot_id, int column_id, int32_t value) - : VExpr(std::make_shared(), false), - _column_id(column_id), - _value(value) { + : VExpr(std::make_shared(), false), _value(value) { add_child(TableSlotRef::create_shared(slot_id, column_id, -1, std::make_shared(), "id")); set_node_type(TExprNodeType::BINARY_PRED); @@ -55,8 +53,10 @@ class TableInt32GreaterThanExpr final : public VExpr { Status execute_column_impl(VExprContext* context, const Block* block, const Selector* selector, size_t count, ColumnPtr& result_column) const override { + const auto* slot_ref = assert_cast(get_child(0).get()); const auto& input = - assert_cast(*block->get_by_position(_column_id).column); + assert_cast( + *block->get_by_position(slot_ref->column_id()).column); auto result = ColumnUInt8::create(); auto& result_data = result->get_data(); result_data.resize(count); @@ -71,7 +71,6 @@ class TableInt32GreaterThanExpr final : public VExpr { const std::string& expr_name() const override { return _expr_name; } private: - const int _column_id; const int32_t _value; const std::string _expr_name = "TableInt32GreaterThanExpr"; }; @@ -134,6 +133,14 @@ SplitReadOptions build_split_options(const std::string& file_path) { return options; } +TableColumn make_table_column(ColumnId id, const std::string& name, const DataTypePtr& type) { + TableColumn column; + column.id = id; + column.name = name; + column.type = type; + return column; +} + TEST(TableReaderTest, ReopenSplitAfterClose) { const auto test_dir = std::filesystem::temp_directory_path() / "doris_table_reader_test"; std::filesystem::remove_all(test_dir); @@ -149,21 +156,23 @@ TEST(TableReaderTest, ReopenSplitAfterClose) { write_parquet_file(file_paths[2], 3, "three"); std::vector projected_columns; - projected_columns.push_back({.id = 0, .name = "id", .type = std::make_shared()}); - projected_columns.push_back( - {.id = 1, .name = "value", .type = std::make_shared()}); + projected_columns.push_back(make_table_column(1, "value", std::make_shared())); + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); RuntimeState state {TQueryOptions(), TQueryGlobals()}; TableReader reader; ASSERT_TRUE(reader .init({ .projected_columns = projected_columns, - .conjuncts = VExprContext(nullptr), + .conjuncts = VExprContext( + std::make_shared(0, 0, 0)), .format = FileFormat::PARQUET, .scan_params = nullptr, .io_ctx = nullptr, .runtime_state = &state, .scanner_profile = nullptr, + .allow_missing_columns = true, + .profile = nullptr, }) .ok()); @@ -171,6 +180,9 @@ TEST(TableReaderTest, ReopenSplitAfterClose) { // init() once, then repeat prepare_split() -> get_block() -> close(). // This verifies TableReader::close() fully releases the previous low-level reader and task // state, so a later prepare_split() can open and read a new split on the same TableReader. + // The table-level conjunct is also rebuilt for each split. The projection order puts value + // before id, so the pushed conjunct has to be rewritten to the ParquetReader file-local block + // position every time a new split is opened. std::vector ids; std::vector values; for (const auto& file_path : file_paths) { @@ -182,9 +194,9 @@ TEST(TableReaderTest, ReopenSplitAfterClose) { ASSERT_TRUE(reader.get_block(&block, &eos).ok()); ASSERT_FALSE(eos); - const auto& id_column = assert_cast(*block.get_by_position(0).column); const auto& value_column = - assert_cast(*block.get_by_position(1).column); + assert_cast(*block.get_by_position(0).column); + const auto& id_column = assert_cast(*block.get_by_position(1).column); ASSERT_EQ(id_column.size(), 1); ASSERT_EQ(value_column.size(), 1); ids.push_back(id_column.get_element(0)); @@ -209,9 +221,8 @@ TEST(TableReaderTest, OpenReaderBuildsTableFiltersFromConjuncts) { write_parquet_file(file_path, 3, "three"); std::vector projected_columns; - projected_columns.push_back({.id = 0, .name = "id", .type = std::make_shared()}); - projected_columns.push_back( - {.id = 1, .name = "value", .type = std::make_shared()}); + projected_columns.push_back(make_table_column(1, "value", std::make_shared())); + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); RuntimeState state {TQueryOptions(), TQueryGlobals()}; TableReader reader; @@ -225,19 +236,23 @@ TEST(TableReaderTest, OpenReaderBuildsTableFiltersFromConjuncts) { .io_ctx = nullptr, .runtime_state = &state, .scanner_profile = nullptr, + .allow_missing_columns = true, + .profile = nullptr, }) .ok()); ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); // open_reader() should convert the table-level conjunct on projected column id 0 into - // _table_filters before ColumnMapper creates the FileScanRequest. ParquetReader then receives - // that localized conjunct and filters rows through its predicate-column path. + // _table_filters before ColumnMapper creates the FileScanRequest. ColumnMapper then rewrites + // the conjunct's slot ref from table column id 0 to the file-local block position used by + // ParquetReader. The projection order intentionally puts value before id, so the id filter + // column is not at position 0 in the file block. Block block = build_table_block(projected_columns); bool eos = false; ASSERT_TRUE(reader.get_block(&block, &eos).ok()); ASSERT_FALSE(eos); - const auto& id_column = assert_cast(*block.get_by_position(0).column); + const auto& id_column = assert_cast(*block.get_by_position(1).column); ASSERT_EQ(id_column.size(), 1); EXPECT_EQ(id_column.get_element(0), 3); @@ -254,6 +269,8 @@ TEST(TableReaderTest, OpenReaderBuildsTableFiltersFromConjuncts) { .io_ctx = nullptr, .runtime_state = &state, .scanner_profile = nullptr, + .allow_missing_columns = true, + .profile = nullptr, }) .ok()); ASSERT_TRUE(filtered_reader.prepare_split(build_split_options(file_path)).ok()); @@ -262,13 +279,13 @@ TEST(TableReaderTest, OpenReaderBuildsTableFiltersFromConjuncts) { eos = false; ASSERT_TRUE(filtered_reader.get_block(&block, &eos).ok()); EXPECT_TRUE(eos); - EXPECT_EQ(block.get_by_position(0).column->size(), 0); + EXPECT_EQ(block.get_by_position(1).column->size(), 0); ASSERT_TRUE(filtered_reader.close().ok()); std::filesystem::remove_all(test_dir); } -TEST(TableReaderTest, ProjectedColumnsRejectParquetSchemaMismatch) { +TEST(TableReaderTest, ProjectedColumnsFillDefaultForParquetSchemaMismatch) { const auto test_dir = std::filesystem::temp_directory_path() / "doris_table_reader_schema_mismatch_test"; std::filesystem::remove_all(test_dir); @@ -279,7 +296,7 @@ TEST(TableReaderTest, ProjectedColumnsRejectParquetSchemaMismatch) { std::vector projected_columns; projected_columns.push_back( - {.id = 99, .name = "missing_value", .type = std::make_shared()}); + make_table_column(99, "missing_value", std::make_shared())); RuntimeState state {TQueryOptions(), TQueryGlobals()}; TableReader reader; @@ -292,14 +309,59 @@ TEST(TableReaderTest, ProjectedColumnsRejectParquetSchemaMismatch) { .io_ctx = nullptr, .runtime_state = &state, .scanner_profile = nullptr, + .allow_missing_columns = true, + .profile = nullptr, }) .ok()); ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); // The table projection asks for field id 99, but the ParquetReader exposes only file-local - // fields 0 and 1. get_block() opens the split lazily, so this is where TableReader must reject - // the mismatch between TableReadOptions::projected_columns and the Parquet file schema. + // fields 0 and 1. Missing columns are allowed by the current mapper options, so TableReader + // should still use the Parquet row count and fill a default column in table schema. + Block block = build_table_block(projected_columns); + bool eos = false; + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + EXPECT_EQ(block.get_by_position(0).column->size(), 1); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, ProjectedColumnsRejectParquetSchemaMismatchWhenMissingColumnsDisallowed) { + const auto test_dir = std::filesystem::temp_directory_path() / + "doris_table_reader_schema_mismatch_reject_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + write_parquet_file(file_path, 1, "one"); + + std::vector projected_columns; + projected_columns.push_back( + make_table_column(99, "missing_value", std::make_shared())); + + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + TableReader reader; + ASSERT_TRUE(reader + .init({ + .projected_columns = projected_columns, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = nullptr, + .io_ctx = nullptr, + .runtime_state = &state, + .scanner_profile = nullptr, + .allow_missing_columns = false, + .profile = nullptr, + }) + .ok()); + + ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); + + // With allow_missing_columns disabled, the same missing projected column should fail while + // opening the split instead of being materialized as a default column. Block block = build_table_block(projected_columns); bool eos = false; const auto status = reader.get_block(&block, &eos); @@ -310,7 +372,7 @@ TEST(TableReaderTest, ProjectedColumnsRejectParquetSchemaMismatch) { std::filesystem::remove_all(test_dir); } -TEST(TableReaderTest, ProjectedColumnsRejectSameNameDifferentIdParquetSchemaMismatch) { +TEST(TableReaderTest, ProjectedColumnsUseMapperExpressionForSameNameDifferentIdParquetSchema) { const auto test_dir = std::filesystem::temp_directory_path() / "doris_table_reader_same_name_diff_id_test"; std::filesystem::remove_all(test_dir); @@ -320,8 +382,7 @@ TEST(TableReaderTest, ProjectedColumnsRejectSameNameDifferentIdParquetSchemaMism write_parquet_file(file_path, 1, "one"); std::vector projected_columns; - projected_columns.push_back( - {.id = 99, .name = "id", .type = std::make_shared()}); + projected_columns.push_back(make_table_column(99, "id", std::make_shared())); RuntimeState state {TQueryOptions(), TQueryGlobals()}; TableReader reader; @@ -334,19 +395,24 @@ TEST(TableReaderTest, ProjectedColumnsRejectSameNameDifferentIdParquetSchemaMism .io_ctx = nullptr, .runtime_state = &state, .scanner_profile = nullptr, + .allow_missing_columns = true, + .profile = nullptr, }) .ok()); ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); // The table column has the same name as the Parquet field, but a different field id. - // TableReader configures ColumnMapper in BY_FIELD_ID mode, so the name match must not hide - // the id mismatch. + // ColumnMapper should still resolve it by name and build a SlotRef projection from the file + // column into the requested table column. Block block = build_table_block(projected_columns); bool eos = false; - const auto status = reader.get_block(&block, &eos); - ASSERT_FALSE(status.ok()); - EXPECT_NE(status.to_string().find("does not have a matching file column"), std::string::npos); + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + + const auto& id_column = assert_cast(*block.get_by_position(0).column); + ASSERT_EQ(id_column.size(), 1); + EXPECT_EQ(id_column.get_element(0), 1); ASSERT_TRUE(reader.close().ok()); std::filesystem::remove_all(test_dir); @@ -363,9 +429,9 @@ TEST(TableReaderTest, ProjectedColumnsUseMapperExpressionsForParquetSchemaMismat std::vector projected_columns; projected_columns.push_back( - {.id = 0, .name = "table_id", .type = std::make_shared()}); + make_table_column(0, "table_id", std::make_shared())); projected_columns.push_back( - {.id = 1, .name = "table_value", .type = std::make_shared()}); + make_table_column(1, "table_value", std::make_shared())); RuntimeState state {TQueryOptions(), TQueryGlobals()}; TableReader reader; @@ -378,6 +444,8 @@ TEST(TableReaderTest, ProjectedColumnsUseMapperExpressionsForParquetSchemaMismat .io_ctx = nullptr, .runtime_state = &state, .scanner_profile = nullptr, + .allow_missing_columns = true, + .profile = nullptr, }) .ok());