diff --git a/src/paimon/format/parquet/column_index_filter.cpp b/src/paimon/format/parquet/column_index_filter.cpp index cf638cf6..c2f9ab61 100644 --- a/src/paimon/format/parquet/column_index_filter.cpp +++ b/src/paimon/format/parquet/column_index_filter.cpp @@ -96,6 +96,10 @@ Result ColumnIndexFilter::VisitLeafPredicate( const auto& literals = leaf_predicate->Literals(); FieldType field_type = leaf_predicate->GetFieldType(); + if (function_type != Function::Type::IS_NULL && function_type != Function::Type::IS_NOT_NULL && + literals.empty()) { + return RowRanges::CreateSingle(row_group_row_count); + } std::vector matching_pages; switch (function_type) { @@ -106,37 +110,22 @@ Result ColumnIndexFilter::VisitLeafPredicate( matching_pages = FilterPagesByIsNotNull(column_index_ptr); break; case Function::Type::EQUAL: - if (!literals.empty()) { - matching_pages = FilterPagesByEqual(column_index_ptr, literals[0], field_type); - } + matching_pages = FilterPagesByEqual(column_index_ptr, literals[0], field_type); break; case Function::Type::NOT_EQUAL: - if (!literals.empty()) { - matching_pages = FilterPagesByNotEqual(column_index_ptr, literals[0], field_type); - } + matching_pages = FilterPagesByNotEqual(column_index_ptr, literals[0], field_type); break; case Function::Type::LESS_THAN: - if (!literals.empty()) { - matching_pages = FilterPagesByLessThan(column_index_ptr, literals[0], field_type); - } + matching_pages = FilterPagesByLessThan(column_index_ptr, literals[0], field_type); break; case Function::Type::LESS_OR_EQUAL: - if (!literals.empty()) { - matching_pages = - FilterPagesByLessOrEqual(column_index_ptr, literals[0], field_type); - } + matching_pages = FilterPagesByLessOrEqual(column_index_ptr, literals[0], field_type); break; case Function::Type::GREATER_THAN: - if (!literals.empty()) { - matching_pages = - FilterPagesByGreaterThan(column_index_ptr, literals[0], field_type); - } + matching_pages = FilterPagesByGreaterThan(column_index_ptr, literals[0], field_type); break; case Function::Type::GREATER_OR_EQUAL: - if (!literals.empty()) { - matching_pages = - FilterPagesByGreaterOrEqual(column_index_ptr, literals[0], field_type); - } + matching_pages = FilterPagesByGreaterOrEqual(column_index_ptr, literals[0], field_type); break; case Function::Type::IN: matching_pages = FilterPagesByIn(column_index_ptr, literals, field_type); diff --git a/src/paimon/format/parquet/column_index_filter_test.cpp b/src/paimon/format/parquet/column_index_filter_test.cpp index 8249f635..321d6b4e 100644 --- a/src/paimon/format/parquet/column_index_filter_test.cpp +++ b/src/paimon/format/parquet/column_index_filter_test.cpp @@ -26,6 +26,9 @@ #include "arrow/c/abi.h" #include "arrow/c/bridge.h" #include "gtest/gtest.h" +#include "paimon/common/predicate/equal.h" +#include "paimon/common/predicate/in.h" +#include "paimon/common/predicate/leaf_predicate_impl.h" #include "paimon/common/utils/arrow/arrow_input_stream_adapter.h" #include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/defs.h" @@ -480,4 +483,29 @@ TEST_F(ColumnIndexFilterTest, NullPredicateReturnsAllRows) { EXPECT_EQ(row_group_row_count_, ranges.RowCount()); } +/// When literals is empty for comparison predicates (EQUAL, NOT_EQUAL, LESS_THAN, +/// LESS_OR_EQUAL, GREATER_THAN, GREATER_OR_EQUAL), the filter should return all +/// rows (conservative fallback) rather than returning empty ranges. +TEST_F(ColumnIndexFilterTest, EmptyLiteralsReturnsAllRows) { + // Construct a LeafPredicate with EQUAL function but empty literals vector. + // This simulates the edge case where literals are unexpectedly empty. + auto pred = std::make_shared(paimon::Equal::Instance(), 0, "val", + FieldType::INT, std::vector()); + ASSERT_OK_AND_ASSIGN(auto ranges, Filter(pred)); + // With empty literals, the filter cannot evaluate the comparison, + // so it should conservatively return all rows. + EXPECT_EQ(row_group_row_count_, ranges.RowCount()); +} + +/// Empty literals for IN predicate — the early guard in VisitLeafPredicate treats +/// all non-IS_NULL/IS_NOT_NULL predicates with empty literals conservatively, +/// returning all rows rather than risking incorrect filtering. +TEST_F(ColumnIndexFilterTest, EmptyLiteralsInReturnsAllRows) { + auto pred = std::make_shared(paimon::In::Instance(), 0, "val", + FieldType::INT, std::vector()); + ASSERT_OK_AND_ASSIGN(auto ranges, Filter(pred)); + // Empty literals → conservative fallback → all rows. + EXPECT_EQ(row_group_row_count_, ranges.RowCount()); +} + } // namespace paimon::parquet::test diff --git a/src/paimon/format/parquet/file_reader_wrapper.cpp b/src/paimon/format/parquet/file_reader_wrapper.cpp index bfabb9f8..98c5a9f3 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.cpp +++ b/src/paimon/format/parquet/file_reader_wrapper.cpp @@ -26,23 +26,13 @@ #include "fmt/format.h" #include "paimon/format/parquet/column_index_filter.h" #include "paimon/format/parquet/page_filtered_row_group_reader.h" +#include "paimon/format/parquet/parquet_format_defs.h" #include "paimon/macros.h" #include "parquet/arrow/reader.h" #include "parquet/file_reader.h" #include "parquet/metadata.h" #include "parquet/page_index.h" -// Convert any std::exception thrown by underlying Parquet/Arrow APIs into a -// Status. Used as the trailing catch clauses of a try block in every public -// method that calls into the parquet C++ API, so the read layer never throws. -#define PAIMON_PARQUET_CATCH_AND_RETURN_STATUS(context) \ - catch (const std::exception& e) { \ - return Status::Invalid(fmt::format("{}: {}", (context), e.what())); \ - } \ - catch (...) { \ - return Status::UnknownError((context), ": unknown error"); \ - } - namespace paimon::parquet { namespace { diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp index 4594717f..fa311eb3 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp @@ -249,6 +249,8 @@ Result> PageFilteredRowGroupReader::Re // Pre-buffering failed, fall back to row-group level PreBuffer ::arrow::io::IOContext io_ctx(pool); parquet_reader->PreBuffer(rg_vec, col_vec, io_ctx, cache_options); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + parquet_reader->WhenBuffered(rg_vec, col_vec).status()); } } else { PAIMON_RETURN_NOT_OK_FROM_ARROW(parquet_reader->WhenBuffered(rg_vec, col_vec).status()); diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index c37570ac..8da49efd 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -48,17 +48,6 @@ #include "parquet/arrow/reader.h" #include "parquet/properties.h" -// Convert any std::exception thrown by underlying Parquet/Arrow APIs into a -// Status. Used as the trailing catch clauses of a try block in every public -// method that calls into the parquet C++ API, so the read layer never throws. -#define PAIMON_PARQUET_CATCH_AND_RETURN_STATUS(context) \ - catch (const std::exception& e) { \ - return Status::Invalid(fmt::format("{}: {}", (context), e.what())); \ - } \ - catch (...) { \ - return Status::UnknownError((context), ": unknown error"); \ - } - namespace arrow { class MemoryPool; } // namespace arrow @@ -159,18 +148,6 @@ Status ParquetFileBatchReader::SetReadSchema( } } - // Build column name to index map for page-level filtering. - // For leaf columns, indices[0] is the correct leaf column index in Parquet. - // For nested types (struct/list/map), FlattenSchema produces multiple leaf indices, - // but predicate pushdown only targets leaf columns with simple types, so indices[0] - // is always the correct single leaf index for predicate evaluation. - std::map column_name_to_index; - for (const auto& [name, indices] : field_index_map) { - if (!indices.empty()) { - column_name_to_index[name] = indices[0]; - } - } - std::vector row_groups = arrow::internal::Iota(reader_->GetNumberOfRowGroups()); if (predicate) { PAIMON_ASSIGN_OR_RAISE(row_groups, @@ -188,6 +165,18 @@ Status ParquetFileBatchReader::SetReadSchema( OptionsUtils::GetValueFromMap(options_, PARQUET_READ_ENABLE_PAGE_INDEX_FILTER, DEFAULT_PARQUET_READ_ENABLE_PAGE_INDEX_FILTER)); if (enable_page_index_filter) { + // Build column name to index map for page-level filtering. + // For leaf columns, indices[0] is the correct leaf column index in Parquet. + // For nested types (struct/list/map), FlattenSchema produces multiple leaf indices, + // but predicate pushdown only targets leaf columns with simple types, so indices[0] + // is always the correct single leaf index for predicate evaluation. + std::map column_name_to_index; + for (const auto& [name, indices] : field_index_map) { + if (!indices.empty()) { + column_name_to_index[name] = indices[0]; + } + } + PAIMON_ASSIGN_OR_RAISE( auto page_filter_result, FilterRowGroupsByPageIndex(predicate, column_name_to_index, row_groups)); diff --git a/src/paimon/format/parquet/parquet_format_defs.h b/src/paimon/format/parquet/parquet_format_defs.h index b979b578..b646ca6c 100644 --- a/src/paimon/format/parquet/parquet_format_defs.h +++ b/src/paimon/format/parquet/parquet_format_defs.h @@ -19,6 +19,17 @@ #include #include +// Convert any std::exception thrown by underlying Parquet/Arrow APIs into a +// Status. Used as the trailing catch clauses of a try block in every public +// method that calls into the parquet C++ API, so the read layer never throws. +#define PAIMON_PARQUET_CATCH_AND_RETURN_STATUS(context) \ + catch (const std::exception& e) { \ + return Status::Invalid(fmt::format("{}: {}", (context), e.what())); \ + } \ + catch (...) { \ + return Status::UnknownError((context), ": unknown error"); \ + } + namespace paimon::parquet { // write diff --git a/src/paimon/format/parquet/row_ranges.h b/src/paimon/format/parquet/row_ranges.h index 288fa48f..401e4957 100644 --- a/src/paimon/format/parquet/row_ranges.h +++ b/src/paimon/format/parquet/row_ranges.h @@ -43,6 +43,9 @@ class RowRanges { /// Creates a RowRanges from a list of ranges. explicit RowRanges(const std::vector& ranges) : ranges_(ranges) {} + /// Creates a RowRanges from a list of ranges, taking ownership of the vector. + explicit RowRanges(std::vector&& ranges) : ranges_(std::move(ranges)) {} + /// Creates a RowRanges with a single range [0, row_count - 1]. static RowRanges CreateSingle(int64_t row_count) { if (row_count <= 0) {