diff --git a/be/src/format/new_parquet/parquet_reader.cpp b/be/src/format/new_parquet/parquet_reader.cpp index 7f442808523ef8..ff9d939b4d064e 100644 --- a/be/src/format/new_parquet/parquet_reader.cpp +++ b/be/src/format/new_parquet/parquet_reader.cpp @@ -31,6 +31,7 @@ #include "common/exception.h" #include "core/block/block.h" #include "core/data_type/data_type_nullable.h" +#include "exprs/vexpr_context.h" #include "format/new_parquet/column_reader.h" #include "format/new_parquet/parquet_column_schema.h" #include "format/new_parquet/parquet_statistics.h" @@ -193,13 +194,8 @@ void ParquetReader::_fill_schema_field(const ParquetColumnSchema& column_schema, } } -bool ParquetReader::_has_structured_filter(const reader::FileLocalFilter& local_filter) { - for (const auto& predicate : local_filter.predicates) { - if (predicate != nullptr) { - return true; - } - } - return false; +bool ParquetReader::_has_expression_filter(const reader::FileLocalFilter& local_filter) { + return local_filter.conjunct != nullptr; } Status ParquetReader::_read_filter_columns(int64_t batch_rows, Block* file_block, @@ -220,24 +216,27 @@ Status ParquetReader::_read_filter_columns(int64_t batch_rows, Block* file_block return Status::Corruption("Parquet filter column {} returned {} rows, expected {} rows", column_reader->name(), column_rows, batch_rows); } + file_block->replace_by_position(block_position, std::move(column)); for (const auto& local_filter : _request->local_filters) { if (local_filter.file_column_id != file_field_id || - !_has_structured_filter(local_filter)) { + !_has_expression_filter(local_filter)) { continue; } if (*selected_rows == 0) { break; } - for (const auto& predicate : local_filter.predicates) { - *selected_rows = predicate->evaluate(*column, selection->data(), *selected_rows); - if (*selected_rows == 0) { - break; - } - } + IColumn::Filter filter(static_cast(batch_rows), 1); + bool can_filter_all = false; + RETURN_IF_ERROR(local_filter.conjunct->execute_filter( + file_block, filter.data(), static_cast(batch_rows), false, + &can_filter_all)); + *selected_rows = + can_filter_all + ? 0 + : _apply_filter_to_selection(filter, selection, *selected_rows); break; } - file_block->replace_by_position(block_position, std::move(column)); if (*selected_rows == 0) { break; } @@ -245,18 +244,6 @@ Status ParquetReader::_read_filter_columns(int64_t batch_rows, Block* file_block return Status::OK(); } -Status ParquetReader::_validate_supported_local_filters( - const std::vector& local_filters) { - for (const auto& local_filter : local_filters) { - if (local_filter.conjunct != nullptr) { - return Status::NotSupported( - "Parquet expression filter fallback is not implemented for field {}", - local_filter.file_column_id); - } - } - return Status::OK(); -} - IColumn::Filter ParquetReader::_selection_to_filter(const SelectionVector& selection, uint16_t selected_rows, int64_t batch_rows) { IColumn::Filter filter(static_cast(batch_rows), 0); @@ -266,6 +253,19 @@ IColumn::Filter ParquetReader::_selection_to_filter(const SelectionVector& selec return filter; } +uint16_t ParquetReader::_apply_filter_to_selection(const IColumn::Filter& filter, + SelectionVector* selection, + uint16_t selected_rows) { + uint16_t new_selected_rows = 0; + for (uint16_t selection_idx = 0; selection_idx < selected_rows; ++selection_idx) { + const auto row_idx = selection->get_index(selection_idx); + if (filter[row_idx] != 0) { + selection->set_index(new_selected_rows++, static_cast(row_idx)); + } + } + return new_selected_rows; +} + Status ParquetReader::_open_next_row_group(bool* has_row_group) { *has_row_group = false; while (_state->next_row_group_idx < _state->selected_row_groups.size()) { @@ -456,8 +456,6 @@ Status ParquetReader::open(std::unique_ptr& request) { local_filter.file_column_id); } } - RETURN_IF_ERROR(_validate_supported_local_filters(_request->local_filters)); - RETURN_IF_ERROR(select_row_groups_by_statistics(*_state->metadata, _state->file_schema, *_request, &_state->selected_row_groups)); RETURN_IF_ERROR(_reset_reader_position()); diff --git a/be/src/format/new_parquet/parquet_reader.h b/be/src/format/new_parquet/parquet_reader.h index 6920f8c4d78a61..40213ebb0d68da 100644 --- a/be/src/format/new_parquet/parquet_reader.h +++ b/be/src/format/new_parquet/parquet_reader.h @@ -121,13 +121,13 @@ class ParquetReader : public reader::FileReader { void _reset_current_row_group(); void _fill_schema_field(const ParquetColumnSchema& column_schema, reader::SchemaField* field) const; - bool _has_structured_filter(const reader::FileLocalFilter& local_filter); + bool _has_expression_filter(const reader::FileLocalFilter& local_filter); Status _read_filter_columns(int64_t batch_rows, Block* file_block, SelectionVector* selection, uint16_t* selected_rows); - Status _validate_supported_local_filters( - const std::vector& local_filters); IColumn::Filter _selection_to_filter(const SelectionVector& selection, uint16_t selected_rows, int64_t batch_rows); + uint16_t _apply_filter_to_selection(const IColumn::Filter& filter, SelectionVector* selection, + uint16_t selected_rows); Status _open_next_row_group(bool* has_row_group); Status _read_current_row_group_batch(int64_t batch_rows, Block* file_block, size_t* rows); diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp b/be/test/format/new_parquet/parquet_reader_test.cpp index e805243ca22952..7e28b7fce5b25a 100644 --- a/be/test/format/new_parquet/parquet_reader_test.cpp +++ b/be/test/format/new_parquet/parquet_reader_test.cpp @@ -33,6 +33,8 @@ #include "core/data_type/data_type_number.h" #include "core/data_type/primitive_type.h" #include "core/field.h" +#include "exprs/vexpr.h" +#include "exprs/vexpr_context.h" #include "format/new_parquet/parquet_reader.h" #include "format/reader/file_reader.h" #include "gen_cpp/Types_types.h" @@ -45,6 +47,43 @@ namespace { constexpr int64_t ROW_COUNT = 5; +class Int32GreaterThanExpr final : public VExpr { +public: + Int32GreaterThanExpr(int column_id, int32_t value) + : VExpr(std::make_shared(), false), + _column_id(column_id), + _value(value) {} + + Status execute_column_impl(VExprContext* context, const Block* block, const Selector* selector, + size_t count, ColumnPtr& result_column) const override { + const auto& input = + assert_cast(*block->get_by_position(_column_id).column); + auto result = ColumnUInt8::create(); + auto& result_data = result->get_data(); + result_data.resize(count); + for (size_t row = 0; row < count; ++row) { + const size_t input_row = selector == nullptr ? row : (*selector)[row]; + result_data[row] = input.get_element(input_row) > _value; + } + result_column = std::move(result); + return Status::OK(); + } + + const std::string& expr_name() const override { return _expr_name; } + +private: + const int _column_id; + const int32_t _value; + const std::string _expr_name = "Int32GreaterThanExpr"; +}; + +VExprContextSPtr create_int32_greater_than_conjunct(int column_id, int32_t value) { + auto ctx = VExprContext::create_shared(std::make_shared(column_id, value)); + ctx->_prepared = true; + ctx->_opened = true; + return ctx; +} + std::shared_ptr finish_array(arrow::ArrayBuilder* builder) { std::shared_ptr array; EXPECT_TRUE(builder->Finish(&array).ok()); @@ -265,6 +304,7 @@ TEST_F(NewParquetReaderTest, ReadPredicateAndNonPredicateColumnsWithSelection) { request->non_predicate_columns = {1}; reader::FileLocalFilter filter; filter.file_column_id = 0; + filter.conjunct = create_int32_greater_than_conjunct(0, 2); filter.predicates.push_back(create_comparison_predicate( 0, "id", schema[0].type, Field::create_field(2), false)); request->local_filters.push_back(std::move(filter)); @@ -306,9 +346,11 @@ TEST_F(NewParquetReaderTest, PredicateFiltersRowGroupsByStatistics) { std::vector schema; ASSERT_TRUE(reader->get_schema(&schema).ok()); auto request = std::make_unique(); - request->non_predicate_columns = {0, 1}; + request->predicate_columns = {0}; + request->non_predicate_columns = {1}; reader::FileLocalFilter filter; filter.file_column_id = 0; + filter.conjunct = create_int32_greater_than_conjunct(0, 2); filter.predicates.push_back(create_comparison_predicate( 0, "id", schema[0].type, Field::create_field(2), false)); request->local_filters.push_back(std::move(filter)); diff --git a/be/test/format/reader/table_reader_test.cpp b/be/test/format/reader/table_reader_test.cpp new file mode 100644 index 00000000000000..84c5700fc4c1ac --- /dev/null +++ b/be/test/format/reader/table_reader_test.cpp @@ -0,0 +1,303 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format/reader/table_reader.h" + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "core/assert_cast.h" +#include "core/block/block.h" +#include "core/column/column_string.h" +#include "core/column/column_vector.h" +#include "core/data_type/data_type_number.h" +#include "core/data_type/data_type_string.h" +#include "gen_cpp/PlanNodes_types.h" +#include "runtime/runtime_state.h" + +namespace doris::reader { +namespace { + +std::shared_ptr finish_array(arrow::ArrayBuilder* builder) { + std::shared_ptr array; + EXPECT_TRUE(builder->Finish(&array).ok()); + return array; +} + +std::shared_ptr build_int32_array(const std::vector& values) { + arrow::Int32Builder builder; + for (const auto value : values) { + EXPECT_TRUE(builder.Append(value).ok()); + } + return finish_array(&builder); +} + +std::shared_ptr build_string_array(const std::vector& values) { + arrow::StringBuilder builder; + for (const auto& value : values) { + EXPECT_TRUE(builder.Append(value).ok()); + } + return finish_array(&builder); +} + +void write_parquet_file(const std::string& file_path, int32_t id, const std::string& value) { + auto schema = arrow::schema({ + arrow::field("id", arrow::int32(), false), + arrow::field("value", arrow::utf8(), false), + }); + auto table = + arrow::Table::Make(schema, {build_int32_array({id}), build_string_array({value})}); + + auto file_result = arrow::io::FileOutputStream::Open(file_path); + ASSERT_TRUE(file_result.ok()) << file_result.status(); + std::shared_ptr out = *file_result; + + ::parquet::WriterProperties::Builder builder; + builder.version(::parquet::ParquetVersion::PARQUET_2_6); + builder.data_page_version(::parquet::ParquetDataPageVersion::V2); + builder.compression(::parquet::Compression::UNCOMPRESSED); + PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable( + *table, arrow::default_memory_pool(), out, 1, builder.build())); +} + +Block build_table_block(const std::vector& columns) { + Block block; + for (const auto& column : columns) { + block.insert({column.type->create_column(), column.type, column.name}); + } + return block; +} + +SplitReadOptions build_split_options(const std::string& file_path) { + SplitReadOptions options; + options.current_range.__set_path(file_path); + options.current_range.__set_file_size( + static_cast(std::filesystem::file_size(file_path))); + return options; +} + +TEST(TableReaderTest, ReopenSplitAfterClose) { + const auto test_dir = std::filesystem::temp_directory_path() / "doris_table_reader_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const std::vector file_paths = { + (test_dir / "split_1.parquet").string(), + (test_dir / "split_2.parquet").string(), + (test_dir / "split_3.parquet").string(), + }; + write_parquet_file(file_paths[0], 1, "one"); + write_parquet_file(file_paths[1], 2, "two"); + write_parquet_file(file_paths[2], 3, "three"); + + std::vector projected_columns; + projected_columns.push_back({.id = 0, .name = "id", .type = std::make_shared()}); + projected_columns.push_back( + {.id = 1, .name = "value", .type = std::make_shared()}); + + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + TableReader reader; + ASSERT_TRUE(reader + .init({ + .projected_columns = projected_columns, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = nullptr, + .io_ctx = nullptr, + .runtime_state = &state, + .scanner_profile = nullptr, + }) + .ok()); + + // Simulate the scanner lifecycle for three different splits: + // init() once, then repeat prepare_split() -> get_block() -> close(). + // This verifies TableReader::close() fully releases the previous low-level reader and task + // state, so a later prepare_split() can open and read a new split on the same TableReader. + std::vector ids; + std::vector values; + for (const auto& file_path : file_paths) { + auto split_options = build_split_options(file_path); + ASSERT_TRUE(reader.prepare_split(split_options).ok()); + + Block block = build_table_block(projected_columns); + bool eos = false; + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + + const auto& id_column = assert_cast(*block.get_by_position(0).column); + const auto& value_column = + assert_cast(*block.get_by_position(1).column); + ASSERT_EQ(id_column.size(), 1); + ASSERT_EQ(value_column.size(), 1); + ids.push_back(id_column.get_element(0)); + values.push_back(value_column.get_data_at(0).to_string()); + + ASSERT_TRUE(reader.close().ok()); + } + + EXPECT_EQ(ids, std::vector({1, 2, 3})); + EXPECT_EQ(values, std::vector({"one", "two", "three"})); + + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, ProjectedColumnsRejectParquetSchemaMismatch) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_table_reader_schema_mismatch_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + write_parquet_file(file_path, 1, "one"); + + std::vector projected_columns; + projected_columns.push_back( + {.id = 99, .name = "missing_value", .type = std::make_shared()}); + + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + TableReader reader; + ASSERT_TRUE(reader + .init({ + .projected_columns = projected_columns, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = nullptr, + .io_ctx = nullptr, + .runtime_state = &state, + .scanner_profile = nullptr, + }) + .ok()); + + ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); + + // The table projection asks for field id 99, but the ParquetReader exposes only file-local + // fields 0 and 1. get_block() opens the split lazily, so this is where TableReader must reject + // the mismatch between TableReadOptions::projected_columns and the Parquet file schema. + Block block = build_table_block(projected_columns); + bool eos = false; + const auto status = reader.get_block(&block, &eos); + ASSERT_FALSE(status.ok()); + EXPECT_NE(status.to_string().find("does not have a matching file column"), std::string::npos); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, ProjectedColumnsRejectSameNameDifferentIdParquetSchemaMismatch) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_table_reader_same_name_diff_id_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + write_parquet_file(file_path, 1, "one"); + + std::vector projected_columns; + projected_columns.push_back( + {.id = 99, .name = "id", .type = std::make_shared()}); + + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + TableReader reader; + ASSERT_TRUE(reader + .init({ + .projected_columns = projected_columns, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = nullptr, + .io_ctx = nullptr, + .runtime_state = &state, + .scanner_profile = nullptr, + }) + .ok()); + + ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); + + // The table column has the same name as the Parquet field, but a different field id. + // TableReader configures ColumnMapper in BY_FIELD_ID mode, so the name match must not hide + // the id mismatch. + Block block = build_table_block(projected_columns); + bool eos = false; + const auto status = reader.get_block(&block, &eos); + ASSERT_FALSE(status.ok()); + EXPECT_NE(status.to_string().find("does not have a matching file column"), std::string::npos); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, ProjectedColumnsUseMapperExpressionsForParquetSchemaMismatch) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_table_reader_mapper_expr_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + write_parquet_file(file_path, 7, "seven"); + + std::vector projected_columns; + projected_columns.push_back( + {.id = 0, .name = "table_id", .type = std::make_shared()}); + projected_columns.push_back( + {.id = 1, .name = "table_value", .type = std::make_shared()}); + + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + TableReader reader; + ASSERT_TRUE(reader + .init({ + .projected_columns = projected_columns, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = nullptr, + .io_ctx = nullptr, + .runtime_state = &state, + .scanner_profile = nullptr, + }) + .ok()); + + ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); + + // The table projection is intentionally different from the Parquet schema: + // field id 0 is requested as BIGINT instead of the file INT, so ColumnMapper should build a + // Cast expression; field id 1 has a different table name but the same type, so it should build + // a SlotRef projection. Both columns should still materialize in table schema order. + Block block = build_table_block(projected_columns); + bool eos = false; + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + + ASSERT_EQ(block.get_by_position(0).name, "table_id"); + ASSERT_EQ(block.get_by_position(1).name, "table_value"); + const auto& id_column = assert_cast(*block.get_by_position(0).column); + const auto& value_column = assert_cast(*block.get_by_position(1).column); + ASSERT_EQ(id_column.size(), 1); + ASSERT_EQ(value_column.size(), 1); + EXPECT_EQ(id_column.get_element(0), 7); + EXPECT_EQ(value_column.get_data_at(0).to_string(), "seven"); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +} // namespace +} // namespace doris::reader