diff --git a/src/paimon/format/parquet/column_index_filter.cpp b/src/paimon/format/parquet/column_index_filter.cpp index cf638cf6..c2f9ab61 100644 --- a/src/paimon/format/parquet/column_index_filter.cpp +++ b/src/paimon/format/parquet/column_index_filter.cpp @@ -96,6 +96,10 @@ Result ColumnIndexFilter::VisitLeafPredicate( const auto& literals = leaf_predicate->Literals(); FieldType field_type = leaf_predicate->GetFieldType(); + if (function_type != Function::Type::IS_NULL && function_type != Function::Type::IS_NOT_NULL && + literals.empty()) { + return RowRanges::CreateSingle(row_group_row_count); + } std::vector matching_pages; switch (function_type) { @@ -106,37 +110,22 @@ Result ColumnIndexFilter::VisitLeafPredicate( matching_pages = FilterPagesByIsNotNull(column_index_ptr); break; case Function::Type::EQUAL: - if (!literals.empty()) { - matching_pages = FilterPagesByEqual(column_index_ptr, literals[0], field_type); - } + matching_pages = FilterPagesByEqual(column_index_ptr, literals[0], field_type); break; case Function::Type::NOT_EQUAL: - if (!literals.empty()) { - matching_pages = FilterPagesByNotEqual(column_index_ptr, literals[0], field_type); - } + matching_pages = FilterPagesByNotEqual(column_index_ptr, literals[0], field_type); break; case Function::Type::LESS_THAN: - if (!literals.empty()) { - matching_pages = FilterPagesByLessThan(column_index_ptr, literals[0], field_type); - } + matching_pages = FilterPagesByLessThan(column_index_ptr, literals[0], field_type); break; case Function::Type::LESS_OR_EQUAL: - if (!literals.empty()) { - matching_pages = - FilterPagesByLessOrEqual(column_index_ptr, literals[0], field_type); - } + matching_pages = FilterPagesByLessOrEqual(column_index_ptr, literals[0], field_type); break; case Function::Type::GREATER_THAN: - if (!literals.empty()) { - matching_pages = - FilterPagesByGreaterThan(column_index_ptr, literals[0], field_type); - } + matching_pages = FilterPagesByGreaterThan(column_index_ptr, literals[0], field_type); break; case Function::Type::GREATER_OR_EQUAL: - if (!literals.empty()) { - matching_pages = - FilterPagesByGreaterOrEqual(column_index_ptr, literals[0], field_type); - } + matching_pages = FilterPagesByGreaterOrEqual(column_index_ptr, literals[0], field_type); break; case Function::Type::IN: matching_pages = FilterPagesByIn(column_index_ptr, literals, field_type); diff --git a/src/paimon/format/parquet/column_index_filter_test.cpp b/src/paimon/format/parquet/column_index_filter_test.cpp index 8249f635..321d6b4e 100644 --- a/src/paimon/format/parquet/column_index_filter_test.cpp +++ b/src/paimon/format/parquet/column_index_filter_test.cpp @@ -26,6 +26,9 @@ #include "arrow/c/abi.h" #include "arrow/c/bridge.h" #include "gtest/gtest.h" +#include "paimon/common/predicate/equal.h" +#include "paimon/common/predicate/in.h" +#include "paimon/common/predicate/leaf_predicate_impl.h" #include "paimon/common/utils/arrow/arrow_input_stream_adapter.h" #include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/defs.h" @@ -480,4 +483,29 @@ TEST_F(ColumnIndexFilterTest, NullPredicateReturnsAllRows) { EXPECT_EQ(row_group_row_count_, ranges.RowCount()); } +/// When literals is empty for comparison predicates (EQUAL, NOT_EQUAL, LESS_THAN, +/// LESS_OR_EQUAL, GREATER_THAN, GREATER_OR_EQUAL), the filter should return all +/// rows (conservative fallback) rather than returning empty ranges. +TEST_F(ColumnIndexFilterTest, EmptyLiteralsReturnsAllRows) { + // Construct a LeafPredicate with EQUAL function but empty literals vector. + // This simulates the edge case where literals are unexpectedly empty. + auto pred = std::make_shared(paimon::Equal::Instance(), 0, "val", + FieldType::INT, std::vector()); + ASSERT_OK_AND_ASSIGN(auto ranges, Filter(pred)); + // With empty literals, the filter cannot evaluate the comparison, + // so it should conservatively return all rows. + EXPECT_EQ(row_group_row_count_, ranges.RowCount()); +} + +/// Empty literals for IN predicate — the early guard in VisitLeafPredicate treats +/// all non-IS_NULL/IS_NOT_NULL predicates with empty literals conservatively, +/// returning all rows rather than risking incorrect filtering. +TEST_F(ColumnIndexFilterTest, EmptyLiteralsInReturnsAllRows) { + auto pred = std::make_shared(paimon::In::Instance(), 0, "val", + FieldType::INT, std::vector()); + ASSERT_OK_AND_ASSIGN(auto ranges, Filter(pred)); + // Empty literals → conservative fallback → all rows. + EXPECT_EQ(row_group_row_count_, ranges.RowCount()); +} + } // namespace paimon::parquet::test diff --git a/src/paimon/format/parquet/file_reader_wrapper.cpp b/src/paimon/format/parquet/file_reader_wrapper.cpp index bfabb9f8..c89b7ff7 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.cpp +++ b/src/paimon/format/parquet/file_reader_wrapper.cpp @@ -26,23 +26,13 @@ #include "fmt/format.h" #include "paimon/format/parquet/column_index_filter.h" #include "paimon/format/parquet/page_filtered_row_group_reader.h" +#include "paimon/format/parquet/parquet_format_defs.h" #include "paimon/macros.h" #include "parquet/arrow/reader.h" #include "parquet/file_reader.h" #include "parquet/metadata.h" #include "parquet/page_index.h" -// Convert any std::exception thrown by underlying Parquet/Arrow APIs into a -// Status. Used as the trailing catch clauses of a try block in every public -// method that calls into the parquet C++ API, so the read layer never throws. -#define PAIMON_PARQUET_CATCH_AND_RETURN_STATUS(context) \ - catch (const std::exception& e) { \ - return Status::Invalid(fmt::format("{}: {}", (context), e.what())); \ - } \ - catch (...) { \ - return Status::UnknownError((context), ": unknown error"); \ - } - namespace paimon::parquet { namespace { @@ -109,15 +99,16 @@ Result> FileReaderWrapper::Create( return Status::Invalid(fmt::format( "unexpected error. row group ranges not match with num rows {}", num_rows)); } - std::vector row_groups_indices = - arrow::internal::Iota(file_reader->num_row_groups()); std::vector columns_indices = arrow::internal::Iota(file_reader->parquet_reader()->metadata()->num_columns()); auto file_reader_wrapper = std::unique_ptr(new FileReaderWrapper( std::move(file_reader), all_row_group_ranges, num_rows, pool, batch_size)); - PAIMON_RETURN_NOT_OK(file_reader_wrapper->PrepareForReadingLazy( - std::set(row_groups_indices.begin(), row_groups_indices.end()), - columns_indices)); + std::vector all_target_row_groups; + for (int32_t i = 0; i < file_reader_wrapper->GetNumberOfRowGroups(); i++) { + all_target_row_groups.emplace_back(i, false, RowRanges()); + } + PAIMON_RETURN_NOT_OK( + file_reader_wrapper->PrepareForReadingLazy(all_target_row_groups, columns_indices)); return file_reader_wrapper; } PAIMON_PARQUET_CATCH_AND_RETURN_STATUS("FileReaderWrapper::Create") @@ -168,39 +159,47 @@ void FileReaderWrapper::WaitForPendingPreBuffer() { } } +void FileReaderWrapper::AdvanceToNextRowGroup() { + // current_row_group_idx_ >= target_row_groups_.size() -1, avoid unsigned overflow + if (current_row_group_idx_ + 1 >= target_row_groups_.size()) { + next_row_to_read_ = num_rows_; + current_row_group_idx_ = target_row_groups_.size(); + } else { + current_row_group_idx_++; + next_row_to_read_ = + all_row_group_ranges_[target_row_groups_[current_row_group_idx_].row_group_index].first; + } +} + Status FileReaderWrapper::SeekToRow(uint64_t row_number) { try { - // Reset any in-progress page-filtered streaming current_page_filtered_reader_.reset(); filtered_global_offset_ = 0; for (uint64_t i = 0; i < target_row_groups_.size(); i++) { - if (row_number > target_row_groups_[i].first && - row_number < target_row_groups_[i].second) { + uint32_t rg_id = target_row_groups_[i].row_group_index; + uint64_t rg_start = all_row_group_ranges_[rg_id].first; + uint64_t rg_end = all_row_group_ranges_[rg_id].second; + if (row_number > rg_start && row_number < rg_end) { return Status::Invalid( fmt::format("seek to row failed. row number {} should not be in the middle of " "readable range", row_number)); } - if (target_row_groups_[i].first >= row_number) { + if (rg_start >= row_number) { current_row_group_idx_ = i; - next_row_to_read_ = target_row_groups_[i].first; + next_row_to_read_ = rg_start; - // Rebuild batch_reader_ only for non-page-filtered row groups at/after seek - // position. Page-filtered RGs need no seek-side bookkeeping: their per-RG - // reader is constructed on demand in Next() from row_group_row_ranges_ each - // time, so backward seek "just works". - std::vector target_row_group_indices; + // Rebuild batch_reader_ for non-page-filtered RGs at/after seek position. + std::vector fully_matched_indices; for (uint64_t j = i; j < target_row_groups_.size(); j++) { - if (page_filtered_indices_.count(j) == 0) { - PAIMON_ASSIGN_OR_RAISE(int32_t row_group_id, - GetRowGroupId(target_row_groups_[j])); - target_row_group_indices.push_back(row_group_id); + if (!target_row_groups_[j].is_page_filtered) { + fully_matched_indices.push_back(target_row_groups_[j].row_group_index); } } - if (!target_row_group_indices.empty()) { + if (!fully_matched_indices.empty()) { PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_->GetRecordBatchReader( - target_row_group_indices, target_column_indices_, &batch_reader_)); + fully_matched_indices, target_column_indices_, &batch_reader_)); } else { batch_reader_.reset(); } @@ -214,126 +213,96 @@ Status FileReaderWrapper::SeekToRow(uint64_t row_number) { PAIMON_PARQUET_CATCH_AND_RETURN_STATUS("FileReaderWrapper::SeekToRow") } +Result> FileReaderWrapper::NextPageFiltered() { + int32_t rg_id = target_row_groups_[current_row_group_idx_].row_group_index; + + // Construct the per-RG streaming reader on demand. + if (!current_page_filtered_reader_) { + const auto& target_rg = target_row_groups_[current_row_group_idx_]; + auto page_ranges = PageFilteredRowGroupReader::ComputePageRanges( + file_reader_->parquet_reader(), target_rg, target_column_indices_); + bool pre_buffered = !prebuffered_ranges_.empty(); + int64_t max_chunksize = batch_size_ > 0 ? batch_size_ : std::numeric_limits::max(); + PAIMON_ASSIGN_OR_RAISE( + current_page_filtered_reader_, + PageFilteredRowGroupReader::ReadFilteredRowGroup( + file_reader_->parquet_reader(), target_rg, target_column_indices_, + page_filtered_read_schema_, pool_, file_reader_->properties().cache_options(), + pre_buffered, page_ranges, max_chunksize)); + current_filtered_row_ranges_ = target_rg.row_ranges; + current_filtered_rg_start_ = all_row_group_ranges_[rg_id].first; + filtered_global_offset_ = 0; + } + + std::shared_ptr record_batch; + PAIMON_RETURN_NOT_OK_FROM_ARROW(current_page_filtered_reader_->ReadNext(&record_batch)); + + if (record_batch) { + auto original_row = + current_filtered_row_ranges_.MapFilteredIndexToOriginalRow(filtered_global_offset_); + previous_first_row_ = original_row.has_value() ? current_filtered_rg_start_ + + static_cast(*original_row) + : current_filtered_rg_start_; + filtered_global_offset_ += record_batch->num_rows(); + return record_batch; + } + + // RG exhausted — reset and advance. + current_page_filtered_reader_.reset(); + filtered_global_offset_ = 0; + AdvanceToNextRowGroup(); + return std::shared_ptr(); +} + +Result> FileReaderWrapper::NextFullyMatched() { + if (!batch_reader_) { + return std::shared_ptr(); + } + + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(auto record_batch, batch_reader_->Next()); + if (!record_batch) { + return std::shared_ptr(); + } + + int32_t rg_id = target_row_groups_[current_row_group_idx_].row_group_index; + uint64_t rg_end = all_row_group_ranges_[rg_id].second; + int64_t num_rows = record_batch->num_rows(); + + previous_first_row_ = next_row_to_read_; + if (next_row_to_read_ + num_rows < rg_end) { + next_row_to_read_ += num_rows; + } else if (next_row_to_read_ + num_rows == rg_end) { + AdvanceToNextRowGroup(); + } else { + return Status::Invalid( + fmt::format("Next failed. next_row_to_read {} + num_rows {} exceeds row group end {}", + next_row_to_read_, num_rows, rg_end)); + } + return record_batch; +} + Result> FileReaderWrapper::Next() { try { if (PAIMON_UNLIKELY(!reader_initialized_)) { - PAIMON_RETURN_NOT_OK( - PrepareForReading(target_row_group_indices_, target_column_indices_)); + PAIMON_RETURN_NOT_OK(PrepareForReading(target_row_groups_, target_column_indices_)); } - // Loop until we produce a batch or exhaust all row groups. A null from the active - // per-RG reader means that RG is done; we advance and try the next RG without - // surfacing a spurious null to the caller. while (current_row_group_idx_ < target_row_groups_.size()) { - std::shared_ptr record_batch; - bool is_page_filtered = page_filtered_indices_.count(current_row_group_idx_) > 0; - - if (is_page_filtered) { - // Construct the per-RG streaming reader on demand. Inputs are recomputed each - // time from existing wrapper fields (no per-RG meta cached on the wrapper), - // mirroring how the fully-matched path delegates to Arrow's stateless - // GetRecordBatchReader. This makes both forward and backward seeks work - // uniformly: SeekToRow only resets current_page_filtered_reader_, and the - // next Next() rebuilds from authoritative state. - if (!current_page_filtered_reader_) { - PAIMON_ASSIGN_OR_RAISE( - int32_t rg_index, - GetRowGroupId(target_row_groups_[current_row_group_idx_])); - auto range_it = row_group_row_ranges_.find(rg_index); - if (range_it == row_group_row_ranges_.end()) { - return Status::Invalid( - fmt::format("page-filtered row group {} missing row ranges in " - "row_group_row_ranges_", - rg_index)); - } - const RowRanges& row_ranges = range_it->second; - auto page_ranges = PageFilteredRowGroupReader::ComputePageRanges( - file_reader_->parquet_reader(), rg_index, row_ranges, - target_column_indices_); - bool pre_buffered = !prebuffered_ranges_.empty(); - // batch_size_ == 0 means "no per-batch row cap" in the wrapper's contract, - // but TableBatchReader::set_chunksize(0) would loop forever emitting empty - // batches. Translate to int64_max so the reader produces one batch per - // underlying chunk boundary instead. - int64_t max_chunksize = - batch_size_ > 0 ? batch_size_ : std::numeric_limits::max(); - PAIMON_ASSIGN_OR_RAISE(current_page_filtered_reader_, - PageFilteredRowGroupReader::ReadFilteredRowGroup( - file_reader_->parquet_reader(), rg_index, row_ranges, - target_column_indices_, page_filtered_read_schema_, - pool_, file_reader_->properties().cache_options(), - pre_buffered, page_ranges, max_chunksize)); - current_filtered_row_ranges_ = row_ranges; - current_filtered_rg_start_ = target_row_groups_[current_row_group_idx_].first; - filtered_global_offset_ = 0; - } - PAIMON_RETURN_NOT_OK_FROM_ARROW( - current_page_filtered_reader_->ReadNext(&record_batch)); - } else if (batch_reader_) { - PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(record_batch, batch_reader_->Next()); - } - - if (record_batch) { - int64_t num_rows = record_batch->num_rows(); - if (is_page_filtered) { - // Map the cumulative filtered-row offset back to the original row index - // within this row group. Must be evaluated BEFORE incrementing the offset. - auto original_row = current_filtered_row_ranges_.MapFilteredIndexToOriginalRow( - filtered_global_offset_); - previous_first_row_ = - original_row.has_value() - ? current_filtered_rg_start_ + static_cast(*original_row) - : current_filtered_rg_start_; - filtered_global_offset_ += num_rows; - // Stay on this RG; the next ReadNext will either return more data or null. - } else { - previous_first_row_ = next_row_to_read_; - if (next_row_to_read_ + num_rows < - target_row_groups_[current_row_group_idx_].second) { - next_row_to_read_ += num_rows; - } else if (next_row_to_read_ + num_rows == - target_row_groups_[current_row_group_idx_].second) { - if (current_row_group_idx_ == target_row_groups_.size() - 1) { - next_row_to_read_ = num_rows_; - } else { - current_row_group_idx_++; - next_row_to_read_ = target_row_groups_[current_row_group_idx_].first; - } - } else { - return Status::Invalid(fmt::format( - "Next failed. Unexpected error, next row to read {} + num rows just " - "read {} should always be within current row group range or exactly " - "equals to current row group end {}", - next_row_to_read_, num_rows, - target_row_groups_[current_row_group_idx_].second)); - } - } - return record_batch; - } - - // Null batch: current row group is exhausted (or fully-matched RGs hit a degenerate - // EOF). Advance to the next row group and continue the loop. - if (is_page_filtered) { - current_page_filtered_reader_.reset(); - filtered_global_offset_ = 0; - if (current_row_group_idx_ == target_row_groups_.size() - 1) { - next_row_to_read_ = num_rows_; - current_row_group_idx_ = target_row_groups_.size(); - } else { - current_row_group_idx_++; - next_row_to_read_ = target_row_groups_[current_row_group_idx_].first; - } - } else { - // Fully-matched path: batch_reader_ is exhausted with no more RBs to align on - // row counts. Stop here — remaining RGs (if any) should be page-filtered and - // will be handled by re-entering the loop, but if we got here without advancing - // first, treat as terminal to avoid an infinite loop. + bool is_page_filtered = target_row_groups_[current_row_group_idx_].is_page_filtered; + PAIMON_ASSIGN_OR_RAISE(auto batch, + is_page_filtered ? NextPageFiltered() : NextFullyMatched()); + if (batch) { + return batch; + } else if (!is_page_filtered) { + // Null from fully-matched path means batch_reader_ is globally exhausted. break; } + // current_row_group_idx_ has been advanced in NextPageFiltered() or NextFullyMatched(), + // loop to try next RG. } previous_first_row_ = next_row_to_read_; - return std::shared_ptr(); // EOF + return std::shared_ptr(); } PAIMON_PARQUET_CATCH_AND_RETURN_STATUS("FileReaderWrapper::Next") } @@ -351,149 +320,124 @@ Result>> FileReaderWrapper::GetRowGrou return row_group_ranges; } -Status FileReaderWrapper::PrepareForReadingLazy(const std::set& target_row_group_indices, - const std::vector& column_indices) { - target_row_group_indices_ = target_row_group_indices; +Status FileReaderWrapper::PrepareForReadingLazy( + const std::vector& target_row_groups, + const std::vector& column_indices) { + target_row_groups_ = target_row_groups; target_column_indices_ = column_indices; reader_initialized_ = false; return Status::OK(); } -Status FileReaderWrapper::PrepareForReading(const std::set& target_row_group_indices, - const std::vector& column_indices) { - try { - std::vector> target_row_groups; - PAIMON_ASSIGN_OR_RAISE(target_row_groups, GetRowGroupRanges(target_row_group_indices)); - - // Build position map: rg_index -> position in target_row_groups (O(1) lookup) - std::map rg_idx_to_position; - { - uint64_t pos = 0; - for (int32_t rg_idx : target_row_group_indices) { - rg_idx_to_position[rg_idx] = pos++; +Status FileReaderWrapper::BuildPageFilteredSchema(const std::vector& column_indices) { + if (page_filtered_read_schema_) { + return Status::OK(); + } + std::shared_ptr schema; + PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_->GetSchema(&schema)); + auto parquet_schema = file_reader_->parquet_reader()->metadata()->schema(); + std::vector> fields; + for (int32_t col_idx : column_indices) { + const std::string& col_name = parquet_schema->Column(col_idx)->name(); + auto field = schema->GetFieldByName(col_name); + if (!field) { + return Status::Invalid(fmt::format( + "PrepareForReading: Parquet column {} ('{}') has no matching Arrow field in " + "file schema", + col_idx, col_name)); + } + fields.push_back(field); + } + page_filtered_read_schema_ = arrow::schema(fields); + return Status::OK(); +} + +std::vector<::arrow::io::ReadRange> FileReaderWrapper::CollectPreBufferRanges( + const std::vector& fully_matched_row_groups, + const std::vector& column_indices) { + std::vector<::arrow::io::ReadRange> ranges; + + // Page-filtered RGs: only matching page byte ranges. + for (const auto& trg : target_row_groups_) { + if (trg.is_page_filtered) { + auto page_ranges = PageFilteredRowGroupReader::ComputePageRanges( + file_reader_->parquet_reader(), trg, column_indices); + ranges.insert(ranges.end(), std::make_move_iterator(page_ranges.begin()), + std::make_move_iterator(page_ranges.end())); + } + } + + // Fully-matched RGs: entire column chunk ranges. + auto file_metadata = file_reader_->parquet_reader()->metadata(); + for (int32_t rg_idx : fully_matched_row_groups) { + auto rg_metadata = file_metadata->RowGroup(rg_idx); + for (int32_t col_idx : column_indices) { + auto col_chunk = rg_metadata->ColumnChunk(col_idx); + int64_t offset = col_chunk->data_page_offset(); + if (col_chunk->has_dictionary_page() && col_chunk->dictionary_page_offset() > 0 && + offset > col_chunk->dictionary_page_offset()) { + offset = col_chunk->dictionary_page_offset(); } + ranges.push_back({offset, col_chunk->total_compressed_size()}); } + } + return ranges; +} - // Separate row groups into fully matched (Arrow's standard reader) and partially - // matched (page-filtered, per-RG reader constructed on demand in Next()). - // Per-RG metadata for the page-filtered path is NOT cached on the wrapper — it's - // recomputed on demand in Next() from row_group_row_ranges_ + target_column_indices_, - // mirroring how the fully-matched path lets Arrow's FileReader own all metadata. - std::vector fully_matched_row_groups; - page_filtered_indices_.clear(); - page_filtered_read_schema_.reset(); +void FileReaderWrapper::DispatchPreBuffer(std::vector<::arrow::io::ReadRange> ranges) { + const auto& cache_opts = file_reader_->properties().cache_options(); + ::arrow::io::IOContext io_ctx(pool_); + auto merged_ranges = MergeOverlappingRanges(std::move(ranges)); + try { + file_reader_->parquet_reader()->PreBufferRanges(merged_ranges, io_ctx, cache_opts); + prebuffered_ranges_ = std::move(merged_ranges); + } catch (const std::exception&) { + prebuffered_ranges_.clear(); + } +} - // Page-level byte ranges collected here only for the bulk PreBuffer call below; - // discarded once PreBuffer is dispatched. - std::vector<::arrow::io::ReadRange> page_filtered_byte_ranges; - - for (int32_t rg_idx : target_row_group_indices) { - auto range_it = row_group_row_ranges_.find(rg_idx); - if (range_it != row_group_row_ranges_.end()) { - uint64_t pos = rg_idx_to_position[rg_idx]; - page_filtered_indices_.insert(pos); - - // Build the page-filter read_schema once on first encounter — it's identical - // across all page-filtered RGs in this session. - if (!page_filtered_read_schema_) { - std::shared_ptr schema; - PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_->GetSchema(&schema)); - std::vector> fields; - auto parquet_schema = file_reader_->parquet_reader()->metadata()->schema(); - for (int32_t col_idx : column_indices) { - const std::string& col_name = parquet_schema->Column(col_idx)->name(); - auto field = schema->GetFieldByName(col_name); - if (!field) { - return Status::Invalid(fmt::format( - "PrepareForReading: Parquet column {} ('{}') has no matching Arrow " - "field in file schema", - col_idx, col_name)); - } - fields.push_back(field); - } - page_filtered_read_schema_ = arrow::schema(fields); - } +Status FileReaderWrapper::PrepareForReading(const std::vector& target_row_groups, + const std::vector& column_indices) { + try { + target_row_groups_ = target_row_groups; + target_column_indices_ = column_indices; + page_filtered_read_schema_.reset(); - auto page_ranges = PageFilteredRowGroupReader::ComputePageRanges( - file_reader_->parquet_reader(), rg_idx, range_it->second, column_indices); - page_filtered_byte_ranges.insert(page_filtered_byte_ranges.end(), - std::make_move_iterator(page_ranges.begin()), - std::make_move_iterator(page_ranges.end())); - } else { - fully_matched_row_groups.push_back(rg_idx); + // Partition into fully-matched and page-filtered row groups. + std::vector fully_matched_row_groups; + for (const auto& trg : target_row_groups_) { + if (!trg.is_page_filtered) { + fully_matched_row_groups.push_back(trg.row_group_index); } } - // Wait for any previously pre-buffered data before starting new pre-buffer. + bool has_page_filtered = fully_matched_row_groups.size() != target_row_groups_.size(); + if (has_page_filtered) { + PAIMON_RETURN_NOT_OK(BuildPageFilteredSchema(column_indices)); + } + WaitForPendingPreBuffer(); - // Create standard reader for fully matched row groups FIRST. - // GetRecordBatchReader internally calls PreBuffer, but we'll override it below - // with a single PreBuffer covering ALL row groups (page-filtered + fully-matched) - // so that async I/O for all files starts in parallel. - std::unique_ptr batch_reader; + // Create standard reader for fully-matched row groups. if (!fully_matched_row_groups.empty()) { PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_->GetRecordBatchReader( - fully_matched_row_groups, column_indices, &batch_reader)); + fully_matched_row_groups, column_indices, &batch_reader_)); + } else { + batch_reader_.reset(); } - // Collect all byte ranges for a single PreBufferRanges call. - // Page-filtered RGs: only matching page ranges (from ComputePageRanges). - // Fully-matched RGs: entire column chunk ranges. - // - // When there are no page-filtered RGs, skip the manual PreBufferRanges entirely: - // GetRecordBatchReader has already issued PreBuffer internally (driven by - // ArrowReaderProperties::pre_buffer=true), and a second PreBufferRanges call here - // would tear down and rebuild cached_source_, redundantly re-issuing the same IO - // on remote filesystems. The manual path is only needed to merge page-level ranges - // with column-chunk ranges into a single PreBuffer covering both kinds of RGs. - if (!page_filtered_indices_.empty()) { - std::vector<::arrow::io::ReadRange> all_ranges = std::move(page_filtered_byte_ranges); - - // Fully-matched row groups: add entire column chunk ranges - // The correct calculation follows Arrow's ColumnChunkMetaData::file_range(): - // - col_start = data_page_offset (or dictionary_page_offset if present and lower) - // - col_length = total_compressed_size (includes all pages: dictionary + data) - auto file_metadata = file_reader_->parquet_reader()->metadata(); - for (int32_t rg_idx : fully_matched_row_groups) { - auto rg_metadata = file_metadata->RowGroup(rg_idx); - for (int32_t col_idx : column_indices) { - auto col_chunk = rg_metadata->ColumnChunk(col_idx); - int64_t offset = col_chunk->data_page_offset(); - if (col_chunk->has_dictionary_page() && - col_chunk->dictionary_page_offset() > 0 && - offset > col_chunk->dictionary_page_offset()) { - offset = col_chunk->dictionary_page_offset(); - } - int64_t size = col_chunk->total_compressed_size(); - all_ranges.push_back({offset, size}); - } - } - - const auto& cache_opts = file_reader_->properties().cache_options(); - ::arrow::io::IOContext io_ctx(pool_); - // Merge overlapping ranges before calling PreBufferRanges, which rejects overlapping - // ranges. - auto merged_ranges = MergeOverlappingRanges(std::move(all_ranges)); - // PreBuffer is an optimization - if it fails (e.g., IO error during testing), - // continue without pre-buffering. Subsequent reads will fetch data on-demand. - try { - file_reader_->parquet_reader()->PreBufferRanges(merged_ranges, io_ctx, cache_opts); - // Track for cleanup on destruction - prebuffered_ranges_ = std::move(merged_ranges); - } catch (const std::exception& e) { - // Pre-buffering failed, clear ranges to indicate no pre-buffered data available. - // Reading will fall back to on-demand I/O. - prebuffered_ranges_.clear(); - } + // When page-filtered RGs exist, issue a single PreBuffer covering both kinds. + // Otherwise GetRecordBatchReader already issued PreBuffer internally. + if (has_page_filtered) { + auto all_ranges = CollectPreBufferRanges(fully_matched_row_groups, column_indices); + DispatchPreBuffer(std::move(all_ranges)); } - target_row_groups_ = target_row_groups; - target_column_indices_ = column_indices; - batch_reader_ = std::move(batch_reader); + + // Reset read state. if (target_row_groups_.empty()) { next_row_to_read_ = num_rows_; } else { - next_row_to_read_ = target_row_groups_[0].first; + next_row_to_read_ = all_row_group_ranges_[target_row_groups_[0].row_group_index].first; } previous_first_row_ = std::numeric_limits::max(); current_row_group_idx_ = 0; @@ -503,39 +447,32 @@ Status FileReaderWrapper::PrepareForReading(const std::set& target_row_ PAIMON_PARQUET_CATCH_AND_RETURN_STATUS("FileReaderWrapper::PrepareForReading") } -Result> FileReaderWrapper::FilterRowGroupsByReadRanges( - const std::vector>& read_ranges, - const std::vector& src_row_groups) const { - std::set target_row_groups; - PAIMON_ASSIGN_OR_RAISE(std::set row_groups_to_read, - ReadRangesToRowGroupIds(read_ranges)); - for (const auto& row_group_id : src_row_groups) { - if (row_groups_to_read.find(row_group_id) != row_groups_to_read.end()) { - target_row_groups.emplace(row_group_id); - } +Status FileReaderWrapper::ApplyReadRanges( + const std::vector>& read_ranges) { + if (read_ranges.empty()) { + target_row_groups_.clear(); + reader_initialized_ = false; + return Status::OK(); } - return target_row_groups; -} - -Result> FileReaderWrapper::ReadRangesToRowGroupIds( - const std::vector>& read_ranges) const { - std::set selected_row_group_ids; + // Build a set of row group indices whose range matches one of the read ranges. + std::set matching_rg_indices; for (const auto& read_range : read_ranges) { - PAIMON_ASSIGN_OR_RAISE(int32_t row_group_id, GetRowGroupId(read_range)); - selected_row_group_ids.emplace(row_group_id); + for (size_t i = 0; i < all_row_group_ranges_.size(); i++) { + if (all_row_group_ranges_[i] == read_range) { + matching_rg_indices.insert(static_cast(i)); + } + } } - return selected_row_group_ids; -} - -Result FileReaderWrapper::GetRowGroupId(std::pair target_range) const { - for (size_t i = 0; i < all_row_group_ranges_.size(); i++) { - if (all_row_group_ranges_[i] == target_range) { - return i; + // Keep only target row groups whose row_group_index is in matching set. + std::vector filtered; + for (const auto& trg : target_row_groups_) { + if (matching_rg_indices.count(trg.row_group_index) > 0) { + filtered.push_back(trg); } } - return Status::Invalid(fmt::format( - "not expected failure. target range bound '{},{}' not match with row group range bound", - target_range.first, target_range.second)); + target_row_groups_ = std::move(filtered); + reader_initialized_ = false; + return Status::OK(); } std::shared_ptr<::parquet::PageIndexReader> FileReaderWrapper::GetPageIndexReader() { diff --git a/src/paimon/format/parquet/file_reader_wrapper.h b/src/paimon/format/parquet/file_reader_wrapper.h index c023a4cf..503091a9 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.h +++ b/src/paimon/format/parquet/file_reader_wrapper.h @@ -109,24 +109,18 @@ class FileReaderWrapper { /// Prepare for lazy reading of the specified row groups and columns. /// Actual reader initialization is deferred until the first Next() call. - Status PrepareForReadingLazy(const std::set& row_group_indices, + Status PrepareForReadingLazy(const std::vector& target_row_groups, const std::vector& column_indices); /// Prepare for immediate reading of the specified row groups and columns. /// Initializes the reader and starts pre-buffering I/O. - Status PrepareForReading(const std::set& row_group_indices, + Status PrepareForReading(const std::vector& target_row_groups, const std::vector& column_indices); - /// Filter row groups by read ranges, returning only those that overlap. - Result> FilterRowGroupsByReadRanges( - const std::vector>& read_ranges, - const std::vector& src_row_groups) const; - - /// Set per-row-group RowRanges for page-level filtering. - /// Only partially matched row groups should have entries. - void SetRowGroupRowRanges(const std::map& ranges) { - row_group_row_ranges_ = ranges; - } + /// Apply read ranges to the current target_row_groups_, keeping only those + /// whose row-group range is equal to one of the given read ranges. + /// Resets reader state so that the next Next() call will re-initialize. + Status ApplyReadRanges(const std::vector>& read_ranges); /// Get the page index reader for the file. /// Returns nullptr if page index is not available. @@ -146,16 +140,10 @@ class FileReaderWrapper { const std::vector>& all_row_group_ranges, uint64_t num_rows, ::arrow::MemoryPool* pool, int64_t batch_size); - Result> ReadRangesToRowGroupIds( - const std::vector>& read_ranges) const; - Result GetRowGroupId(std::pair target_range) const; - std::unique_ptr<::parquet::arrow::FileReader> file_reader_; std::unique_ptr batch_reader_; std::vector> all_row_group_ranges_; - std::set target_row_group_indices_; - std::vector> target_row_groups_; std::vector target_column_indices_; ::arrow::MemoryPool* pool_; @@ -175,13 +163,8 @@ class FileReaderWrapper { RowRanges current_filtered_row_ranges_; // RowRanges for the active page-filtered RG uint64_t current_filtered_rg_start_ = 0; // Absolute row-group start row number - // Page-level filtering state. Externally injected via SetRowGroupRowRanges and - // looked up by row group index when entering a page-filtered RG. - std::map row_group_row_ranges_; - - // Set of target_row_groups_ positional indices that use page-filtered reading. - // Built in PrepareForReading from row_group_row_ranges_. - std::set page_filtered_indices_; + // Target row groups with row ranges for none page-level filtering and page-level filtering + std::vector target_row_groups_; // Arrow schema covering target_column_indices_, used when constructing the per-RG // page-filtered reader. Cached in PrepareForReading because it's identical across @@ -193,6 +176,26 @@ class FileReaderWrapper { /// Wait for all pending PreBuffer operations to complete. void WaitForPendingPreBuffer(); + + /// Advance current_row_group_idx_ to the next row group and update next_row_to_read_. + void AdvanceToNextRowGroup(); + + /// Read next batch from a page-filtered row group. Returns nullptr when the RG is exhausted. + Result> NextPageFiltered(); + + /// Read next batch from the fully-matched batch_reader_. Returns nullptr when exhausted. + Result> NextFullyMatched(); + + /// Build page_filtered_read_schema_ from the given column indices. No-op if already built. + Status BuildPageFilteredSchema(const std::vector& column_indices); + + /// Collect all byte ranges that need pre-buffering (page-filtered + fully-matched). + std::vector<::arrow::io::ReadRange> CollectPreBufferRanges( + const std::vector& fully_matched_row_groups, + const std::vector& column_indices); + + /// Dispatch a single PreBufferRanges call with merged ranges. + void DispatchPreBuffer(std::vector<::arrow::io::ReadRange> ranges); }; } // namespace paimon::parquet diff --git a/src/paimon/format/parquet/file_reader_wrapper_test.cpp b/src/paimon/format/parquet/file_reader_wrapper_test.cpp index b4c3d588..dc3c5a91 100644 --- a/src/paimon/format/parquet/file_reader_wrapper_test.cpp +++ b/src/paimon/format/parquet/file_reader_wrapper_test.cpp @@ -261,11 +261,9 @@ TEST_F(FileReaderWrapperTest, PageFilteredZeroBatchSizeDoesNotHang) { // contiguous ranges keep the test honest about RowRanges semantics; the actual // numbers don't matter as long as their total falls inside the row group. RowRanges rr({RowRanges::Range(0, 49), RowRanges::Range(100, 149)}); - reader_wrapper->SetRowGroupRowRanges({{0, rr}}); std::vector all_columns = {0, 1, 2}; - ASSERT_OK(reader_wrapper->PrepareForReading({0}, all_columns)); - + ASSERT_OK(reader_wrapper->PrepareForReading({TargetRowGroup(0, true, rr)}, all_columns)); int64_t total = 0; int64_t batch_count = 0; while (true) { @@ -295,10 +293,11 @@ TEST_F(FileReaderWrapperTest, SeekBackToConsumedPageFilteredRowGroup) { std::map row_ranges_map; row_ranges_map[0] = RowRanges(RowRanges::Range(10, 49)); row_ranges_map[1] = RowRanges(RowRanges::Range(100, 149)); - reader_wrapper->SetRowGroupRowRanges(row_ranges_map); std::vector all_columns = {0, 1, 2}; - ASSERT_OK(reader_wrapper->PrepareForReading({0, 1}, all_columns)); + ASSERT_OK(reader_wrapper->PrepareForReading( + {TargetRowGroup(0, true, row_ranges_map[0]), TargetRowGroup(1, true, row_ranges_map[1])}, + all_columns)); auto count_all_rows = [&](int64_t* out_total) { int64_t total = 0; @@ -348,8 +347,7 @@ TEST_F(FileReaderWrapperTest, PageFilteredRespectsBatchSize) { for (int64_t batch_size : {int64_t{1}, int64_t{2}, int64_t{3}, int64_t{5}, int64_t{10}}) { SCOPED_TRACE("batch_size=" + std::to_string(batch_size)); ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path, batch_size)); - reader_wrapper->SetRowGroupRowRanges({{0, rr}}); - ASSERT_OK(reader_wrapper->PrepareForReading({0}, {0, 1, 2})); + ASSERT_OK(reader_wrapper->PrepareForReading({TargetRowGroup(0, true, rr)}, {0, 1, 2})); int64_t total = 0; int64_t batch_count = 0; @@ -380,45 +378,47 @@ TEST_F(FileReaderWrapperTest, GetRowGroupRanges) { ASSERT_TRUE(ranges.empty()); } -TEST_F(FileReaderWrapperTest, ReadRangesToRowGroupIds) { +TEST_F(FileReaderWrapperTest, ApplyReadRanges) { std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet"); PrepareParquetFile(file_path, /*row_count=*/5500); ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path)); - std::set expected_row_group_ids = {0, 3, 5}; - std::vector> read_ranges = { - {0, 1000}, {3000, 4000}, {5000, 5500}}; - ASSERT_OK_AND_ASSIGN(auto row_group_ids, reader_wrapper->ReadRangesToRowGroupIds(read_ranges)); - ASSERT_EQ(expected_row_group_ids, row_group_ids); - std::vector> invalid_ranges = { - {0, 1000}, {3000, 4000}, {5000, 5600}}; - ASSERT_NOK_WITH_MSG(reader_wrapper->ReadRangesToRowGroupIds(invalid_ranges), - "not match with row group range bound"); - ASSERT_OK_AND_ASSIGN(row_group_ids, reader_wrapper->ReadRangesToRowGroupIds({})); - ASSERT_TRUE(row_group_ids.empty()); -} -TEST_F(FileReaderWrapperTest, FilterRowGroupsByReadRanges) { - std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet"); - PrepareParquetFile(file_path, /*row_count=*/5500); - ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path)); - std::set expected_row_group_ids = {0, 5}; + // Prepare with a subset of row groups: {0, 1, 2, 4, 5} + std::vector initial_targets = { + TargetRowGroup(0, false, RowRanges()), TargetRowGroup(1, false, RowRanges()), + TargetRowGroup(2, false, RowRanges()), TargetRowGroup(4, false, RowRanges()), + TargetRowGroup(5, false, RowRanges())}; + std::vector all_columns = {0, 1, 2}; + ASSERT_OK(reader_wrapper->PrepareForReadingLazy(initial_targets, all_columns)); + + // Apply read ranges that match RG 0, 3, 5. Only 0 and 5 are in initial targets. std::vector> read_ranges = { {0, 1000}, {3000, 4000}, {5000, 5500}}; - ASSERT_OK_AND_ASSIGN(auto row_group_ids, - reader_wrapper->FilterRowGroupsByReadRanges(read_ranges, {0, 1, 2, 4, 5})); - ASSERT_EQ(expected_row_group_ids, row_group_ids); + ASSERT_OK(reader_wrapper->ApplyReadRanges(read_ranges)); - ASSERT_OK_AND_ASSIGN(row_group_ids, - reader_wrapper->FilterRowGroupsByReadRanges(read_ranges, {})); - ASSERT_TRUE(row_group_ids.empty()); + // Verify: reading should only produce rows from RG 0 (1000 rows) and RG 5 (500 rows). + int64_t total_rows = 0; + while (true) { + ASSERT_OK_AND_ASSIGN(auto batch, reader_wrapper->Next()); + if (!batch) break; + total_rows += batch->num_rows(); + } + ASSERT_EQ(1500, total_rows); + + // Apply empty read ranges should result in no data. + ASSERT_OK(reader_wrapper->PrepareForReadingLazy(initial_targets, all_columns)); + ASSERT_OK(reader_wrapper->ApplyReadRanges({})); + ASSERT_OK_AND_ASSIGN(auto batch, reader_wrapper->Next()); + ASSERT_FALSE(batch); } TEST_F(FileReaderWrapperTest, PrepareForReading) { std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet"); PrepareParquetFile(file_path, /*row_count=*/5500); ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path)); - ASSERT_OK(reader_wrapper->PrepareForReading(/*row_group_indices=*/{1}, - /*column_indices=*/{0})); + ASSERT_OK(reader_wrapper->PrepareForReading( + /*target_row_groups=*/{TargetRowGroup(1, false, RowRanges())}, + /*column_indices=*/{0})); // seek before actual read range ASSERT_OK(reader_wrapper->SeekToRow(0)); ASSERT_EQ(1000, reader_wrapper->GetNextRowToRead()); @@ -438,8 +438,10 @@ TEST_F(FileReaderWrapperTest, PrepareForReading) { ASSERT_FALSE(record_batch); // empty column indices - ASSERT_OK(reader_wrapper->PrepareForReading(/*row_group_indices=*/{0, 1}, - /*column_indices=*/{})); + ASSERT_OK(reader_wrapper->PrepareForReading( + /*target_row_groups=*/{TargetRowGroup(0, false, RowRanges()), + TargetRowGroup(1, false, RowRanges())}, + /*column_indices=*/{})); ASSERT_EQ(0, reader_wrapper->GetNextRowToRead()); ASSERT_EQ(std::numeric_limits::max(), reader_wrapper->GetPreviousBatchFirstRowNumber().value()); @@ -448,8 +450,9 @@ TEST_F(FileReaderWrapperTest, PrepareForReading) { ASSERT_EQ(0, record_batch->num_columns()); // empty row group indices - ASSERT_OK(reader_wrapper->PrepareForReading(/*row_group_indices=*/{}, - /*column_indices=*/{0})); + ASSERT_OK(reader_wrapper->PrepareForReading( + /*target_row_groups=*/{}, + /*column_indices=*/{0})); ASSERT_EQ(5500, reader_wrapper->GetNextRowToRead()); ASSERT_EQ(std::numeric_limits::max(), reader_wrapper->GetPreviousBatchFirstRowNumber().value()); diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp index 4594717f..01090d63 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp @@ -60,34 +60,30 @@ class TableRecordBatchReader : public arrow::RecordBatchReader { } // namespace +std::pair PageFilteredRowGroupReader::GetPageRowRange( + const std::vector<::parquet::PageLocation>& page_locations, int32_t page_idx, + int64_t row_group_row_count) { + int64_t first_row = page_locations[page_idx].first_row_index; + int64_t last_row = (page_idx + 1 < static_cast(page_locations.size())) + ? page_locations[page_idx + 1].first_row_index - 1 + : row_group_row_count - 1; + return {first_row, last_row}; +} + std::function PageFilteredRowGroupReader::MakePageFilter( const RowRanges& row_ranges, const std::shared_ptr<::parquet::OffsetIndex>& offset_index, int64_t row_group_row_count) { - // Shared counter tracks the current page index as the callback is invoked - // in order for each data page. auto page_counter = std::make_shared(0); - const auto& page_locations = offset_index->page_locations(); auto num_pages = static_cast(page_locations.size()); return [row_ranges, page_locations, num_pages, row_group_row_count, page_counter](const ::parquet::DataPageStats& /*stats*/) -> bool { int32_t page_idx = (*page_counter)++; - if (page_idx >= num_pages) { - // Safety: if more pages than expected, don't skip return false; } - - int64_t first_row = page_locations[page_idx].first_row_index; - int64_t last_row; - if (page_idx + 1 < num_pages) { - last_row = page_locations[page_idx + 1].first_row_index - 1; - } else { - last_row = row_group_row_count - 1; - } - - // Return true to skip this page if it has no overlap with RowRanges + auto [first_row, last_row] = GetPageRowRange(page_locations, page_idx, row_group_row_count); return !row_ranges.IsOverlapping(first_row, last_row); }; } @@ -103,10 +99,7 @@ std::pair PageFilteredRowGroupReader::ComputeCompressedRowRa int64_t compressed_offset = 0; for (int32_t page_idx = 0; page_idx < num_pages; ++page_idx) { - int64_t page_from = page_locations[page_idx].first_row_index; - int64_t page_to = (page_idx + 1 < num_pages) - ? page_locations[page_idx + 1].first_row_index - 1 - : row_group_row_count - 1; + auto [page_from, page_to] = GetPageRowRange(page_locations, page_idx, row_group_row_count); int64_t page_size = page_to - page_from + 1; if (!original_ranges.IsOverlapping(page_from, page_to)) { @@ -114,19 +107,13 @@ std::pair PageFilteredRowGroupReader::ComputeCompressedRowRa continue; } - // Page is kept. Map overlapping original ranges to compressed row space. for (const auto& range : ranges) { - if (range.to < page_from) { - continue; - } - if (range.from > page_to) { - break; // Ranges are sorted - } + if (range.to < page_from) continue; + if (range.from > page_to) break; int64_t overlap_from = std::max(range.from, page_from); int64_t overlap_to = std::min(range.to, page_to); - int64_t c_from = compressed_offset + (overlap_from - page_from); - int64_t c_to = compressed_offset + (overlap_to - page_from); - compressed.Add(RowRanges::Range(c_from, c_to)); + compressed.Add(RowRanges::Range(compressed_offset + (overlap_from - page_from), + compressed_offset + (overlap_to - page_from))); } compressed_offset += page_size; @@ -135,6 +122,38 @@ std::pair PageFilteredRowGroupReader::ComputeCompressedRowRa return {compressed, compressed_offset}; } +Status PageFilteredRowGroupReader::ExecuteSkipReadPattern( + ::parquet::internal::RecordReader* record_reader, const RowRanges& ranges, + int64_t total_row_count, int32_t row_group_index, int32_t column_index) { + int64_t current_row = 0; + for (const auto& range : ranges.GetRanges()) { + if (range.from > current_row) { + int64_t to_skip = range.from - current_row; + int64_t skipped = record_reader->SkipRecords(to_skip); + if (skipped != to_skip) { + return Status::Invalid(fmt::format( + "PageFilteredRowGroupReader: expected to skip {} records but skipped {} " + "(row_group={}, column={})", + to_skip, skipped, row_group_index, column_index)); + } + current_row = range.from; + } + int64_t to_read = range.Count(); + int64_t read = record_reader->ReadRecords(to_read); + if (read != to_read) { + return Status::Invalid( + fmt::format("PageFilteredRowGroupReader: expected to read {} records but read {} " + "(row_group={}, column={}, range=[{},{}])", + to_read, read, row_group_index, column_index, range.from, range.to)); + } + current_row += to_read; + } + if (current_row < total_row_count) { + record_reader->SkipRecords(total_row_count - current_row); + } + return Status::OK(); +} + Result> PageFilteredRowGroupReader::ReadFilteredColumn( const std::shared_ptr<::parquet::RowGroupReader>& row_group_reader, ::parquet::ParquetFileReader* parquet_reader, @@ -173,42 +192,9 @@ Result> PageFilteredRowGroupReader::ReadFil auto record_reader = ::parquet::internal::RecordReader::Make(col_descriptor, leaf_info, pool); record_reader->SetPageReader(std::move(page_reader)); - // Execute skip/read pattern based on effective RowRanges - const auto& ranges = effective_ranges.GetRanges(); - int64_t current_row = 0; - - for (const auto& range : ranges) { - // Skip rows before this range - if (range.from > current_row) { - int64_t to_skip = range.from - current_row; - int64_t skipped = record_reader->SkipRecords(to_skip); - if (skipped != to_skip) { - return Status::Invalid(fmt::format( - "PageFilteredRowGroupReader: expected to skip {} records but skipped {} " - "(row_group={}, column={})", - to_skip, skipped, row_group_index, column_index)); - } - current_row = range.from; - } - - // Read rows in this range - int64_t to_read = range.Count(); - int64_t read = record_reader->ReadRecords(to_read); - if (read != to_read) { - return Status::Invalid( - fmt::format("PageFilteredRowGroupReader: expected to read {} records but read {} " - "(row_group={}, column={}, range=[{},{}])", - to_read, read, row_group_index, column_index, range.from, range.to)); - } - current_row += to_read; - } - - // Skip remaining rows after the last range to properly finalize the reader - if (current_row < effective_row_count) { - record_reader->SkipRecords(effective_row_count - current_row); - } + PAIMON_RETURN_NOT_OK(ExecuteSkipReadPattern( + record_reader.get(), effective_ranges, effective_row_count, row_group_index, column_index)); - // Transfer to Arrow ChunkedArray std::shared_ptr chunked_array; PAIMON_RETURN_NOT_OK_FROM_ARROW(::parquet::arrow::TransferColumnData( record_reader.get(), field, col_descriptor, pool, &chunked_array)); @@ -216,12 +202,38 @@ Result> PageFilteredRowGroupReader::ReadFil return chunked_array; } -Result> PageFilteredRowGroupReader::ReadFilteredRowGroup( +Status PageFilteredRowGroupReader::WaitForPreBuffer( ::parquet::ParquetFileReader* parquet_reader, int32_t row_group_index, - const RowRanges& row_ranges, const std::vector& column_indices, - const std::shared_ptr& arrow_schema, ::arrow::MemoryPool* pool, + const std::vector& column_indices, ::arrow::MemoryPool* pool, const ::arrow::io::CacheOptions& cache_options, bool pre_buffered, + const std::vector<::arrow::io::ReadRange>& page_ranges) { + std::vector rg_vec = {row_group_index}; + std::vector col_vec(column_indices.begin(), column_indices.end()); + if (!pre_buffered) { + ::arrow::io::IOContext io_ctx(pool); + parquet_reader->PreBuffer(rg_vec, col_vec, io_ctx, cache_options); + } + if (!page_ranges.empty()) { + auto status = parquet_reader->WhenBufferedRanges(page_ranges).status(); + if (!status.ok()) { + ::arrow::io::IOContext io_ctx(pool); + parquet_reader->PreBuffer(rg_vec, col_vec, io_ctx, cache_options); + PAIMON_RETURN_NOT_OK_FROM_ARROW(parquet_reader->WhenBuffered(rg_vec, col_vec).status()); + } + } else { + PAIMON_RETURN_NOT_OK_FROM_ARROW(parquet_reader->WhenBuffered(rg_vec, col_vec).status()); + } + return Status::OK(); +} + +Result> PageFilteredRowGroupReader::ReadFilteredRowGroup( + ::parquet::ParquetFileReader* parquet_reader, const TargetRowGroup& target_row_group, + const std::vector& column_indices, const std::shared_ptr& arrow_schema, + ::arrow::MemoryPool* pool, const ::arrow::io::CacheOptions& cache_options, bool pre_buffered, const std::vector<::arrow::io::ReadRange>& page_ranges, int64_t max_chunksize) { + const auto& row_ranges = target_row_group.row_ranges; + int32_t row_group_index = target_row_group.row_group_index; + if (row_ranges.IsEmpty()) { PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr empty_table, arrow::Table::MakeEmpty(arrow_schema, pool)); @@ -230,40 +242,17 @@ Result> PageFilteredRowGroupReader::Re int64_t expected_rows = row_ranges.RowCount(); - // Wait for pre-buffered data to be ready. - // When pre_buffered=true, PreBuffer was already called in PrepareForReading() covering - // all row groups in parallel. We only need to wait. Calling PreBuffer again would create - // a new cached_source_, discarding the parallel I/O already in progress. - { - std::vector rg_vec = {row_group_index}; - std::vector col_vec(column_indices.begin(), column_indices.end()); - if (!pre_buffered) { - ::arrow::io::IOContext io_ctx(pool); - parquet_reader->PreBuffer(rg_vec, col_vec, io_ctx, cache_options); - } - if (!page_ranges.empty()) { - // Page-level PreBuffer: wait on specific page byte ranges - // If pre-buffering failed (e.g., IO error during testing), fall back to on-demand read - auto status = parquet_reader->WhenBufferedRanges(page_ranges).status(); - if (!status.ok()) { - // Pre-buffering failed, fall back to row-group level PreBuffer - ::arrow::io::IOContext io_ctx(pool); - parquet_reader->PreBuffer(rg_vec, col_vec, io_ctx, cache_options); - } - } else { - PAIMON_RETURN_NOT_OK_FROM_ARROW(parquet_reader->WhenBuffered(rg_vec, col_vec).status()); - } - } + PAIMON_RETURN_NOT_OK(WaitForPreBuffer(parquet_reader, row_group_index, column_indices, pool, + cache_options, pre_buffered, page_ranges)); - // Open row group and page index once, share across all columns auto row_group_reader = parquet_reader->RowGroup(row_group_index); auto rg_metadata = parquet_reader->metadata()->RowGroup(row_group_index); int64_t row_group_row_count = rg_metadata->num_rows(); - auto page_index_reader = parquet_reader->GetPageIndexReader(); // reuse RowGroupPageIndexReader for multiple columns in the same row group to avoid redundant // metadata reads std::shared_ptr<::parquet::RowGroupPageIndexReader> rg_page_index_reader; + auto page_index_reader = parquet_reader->GetPageIndexReader(); if (page_index_reader) { rg_page_index_reader = page_index_reader->RowGroup(row_group_index); } @@ -274,7 +263,7 @@ Result> PageFilteredRowGroupReader::Re for (size_t i = 0; i < column_indices.size(); ++i) { PAIMON_ASSIGN_OR_RAISE( - std::shared_ptr chunked_array, + auto chunked_array, ReadFilteredColumn(row_group_reader, parquet_reader, rg_page_index_reader, row_group_index, column_indices[i], row_ranges, arrow_schema->field(static_cast(i)), row_group_row_count, @@ -290,18 +279,16 @@ Result> PageFilteredRowGroupReader::Re columns.push_back(std::move(chunked_array)); } - // Wrap columns in a Table and stream zero-copy-sliced batches via TableBatchReader. - // For multi-chunk variable-length columns this avoids the deep copy of CombineChunks: - // each emitted batch contains at most max_chunksize rows (capped further by the - // smallest remaining chunk across columns), and every column's Array is a zero-copy - // Slice of its underlying chunk. auto table = arrow::Table::Make(arrow_schema, std::move(columns), expected_rows); return std::make_unique(std::move(table), max_chunksize); } std::vector<::arrow::io::ReadRange> PageFilteredRowGroupReader::ComputePageRanges( - ::parquet::ParquetFileReader* parquet_reader, int32_t row_group_index, - const RowRanges& row_ranges, const std::vector& column_indices) { + ::parquet::ParquetFileReader* parquet_reader, const TargetRowGroup& target_row_group, + const std::vector& column_indices) { + int32_t row_group_index = target_row_group.row_group_index; + const auto& row_ranges = target_row_group.row_ranges; + std::vector<::arrow::io::ReadRange> ranges; auto file_metadata = parquet_reader->metadata(); auto rg_metadata = file_metadata->RowGroup(row_group_index); @@ -344,23 +331,17 @@ std::vector<::arrow::io::ReadRange> PageFilteredRowGroupReader::ComputePageRange auto num_pages = static_cast(page_locations.size()); for (int32_t page_idx = 0; page_idx < num_pages; ++page_idx) { - int64_t first_row = page_locations[page_idx].first_row_index; - int64_t last_row = (page_idx + 1 < num_pages) - ? page_locations[page_idx + 1].first_row_index - 1 - : row_group_row_count - 1; + auto [first_row, last_row] = + GetPageRowRange(page_locations, page_idx, row_group_row_count); if (!row_ranges.IsOverlapping(first_row, last_row)) { - continue; // Page doesn't overlap with target rows + continue; } - // Compute page byte range int64_t page_offset = page_locations[page_idx].offset; - int64_t page_size; - if (page_idx + 1 < num_pages) { - page_size = page_locations[page_idx + 1].offset - page_offset; - } else { - page_size = chunk_end - page_offset; - } + int64_t page_size = (page_idx + 1 < num_pages) + ? page_locations[page_idx + 1].offset - page_offset + : chunk_end - page_offset; ranges.push_back({page_offset, page_size}); } } diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader.h b/src/paimon/format/parquet/page_filtered_row_group_reader.h index 24c82e25..7294180c 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader.h +++ b/src/paimon/format/parquet/page_filtered_row_group_reader.h @@ -45,8 +45,7 @@ class PageFilteredRowGroupReader { /// Read a row group with page-level filtering. /// @param parquet_reader The underlying ParquetFileReader - /// @param row_group_index Row group to read - /// @param row_ranges Matching row ranges within this row group + /// @param target_row_group Target row group with index and row ranges /// @param column_indices Leaf column indices to read /// @param arrow_schema The target Arrow schema for output columns /// @param pool Memory pool @@ -54,16 +53,11 @@ class PageFilteredRowGroupReader { /// @param pre_buffered If true, assumes PreBuffer was already called externally /// and only waits via WhenBuffered (no redundant PreBuffer). /// @param page_ranges If non-empty, wait via WhenBufferedRanges instead of WhenBuffered - /// @param max_chunksize Per-batch row cap for the returned reader, mirroring Arrow's - /// TableBatchReader::set_chunksize. Each batch yields at most this many rows; - /// actual size may be smaller when an underlying ChunkedArray's chunk boundary - /// is reached first (zero-copy slice). - /// @return A RecordBatchReader streaming the filtered rows. Multi-chunk variable-length - /// columns are emitted as multiple zero-copy-sliced batches along chunk boundaries - /// instead of being concatenated, avoiding the deep copy of CombineChunks. + /// @param max_chunksize Per-batch row cap for the returned reader. + /// @return A RecordBatchReader streaming the filtered rows. static Result> ReadFilteredRowGroup( - ::parquet::ParquetFileReader* parquet_reader, int32_t row_group_index, - const RowRanges& row_ranges, const std::vector& column_indices, + ::parquet::ParquetFileReader* parquet_reader, const TargetRowGroup& target_row_group, + const std::vector& column_indices, const std::shared_ptr& arrow_schema, ::arrow::MemoryPool* pool, const ::arrow::io::CacheOptions& cache_options = ::arrow::io::CacheOptions::Defaults(), bool pre_buffered = false, const std::vector<::arrow::io::ReadRange>& page_ranges = {}, @@ -74,19 +68,35 @@ class PageFilteredRowGroupReader { /// Includes dictionary pages unconditionally. /// Falls back to entire column chunk range if OffsetIndex is unavailable. static std::vector<::arrow::io::ReadRange> ComputePageRanges( - ::parquet::ParquetFileReader* parquet_reader, int32_t row_group_index, - const RowRanges& row_ranges, const std::vector& column_indices); + ::parquet::ParquetFileReader* parquet_reader, const TargetRowGroup& target_row_group, + const std::vector& column_indices); private: + /// Get the [first_row, last_row] range of a page given page locations. + static std::pair GetPageRowRange( + const std::vector<::parquet::PageLocation>& page_locations, int32_t page_idx, + int64_t row_group_row_count); + + /// Wait for pre-buffered data to become available before reading. + static Status WaitForPreBuffer(::parquet::ParquetFileReader* parquet_reader, + int32_t row_group_index, + const std::vector& column_indices, + ::arrow::MemoryPool* pool, + const ::arrow::io::CacheOptions& cache_options, + bool pre_buffered, + const std::vector<::arrow::io::ReadRange>& page_ranges); + + /// Execute the skip/read pattern on a RecordReader based on RowRanges. + static Status ExecuteSkipReadPattern(::parquet::internal::RecordReader* record_reader, + const RowRanges& ranges, int64_t total_row_count, + int32_t row_group_index, int32_t column_index); + /// Create a data_page_filter callback for a column based on RowRanges + OffsetIndex. - /// Returns true (skip) if the page's row range has no overlap with RowRanges. static std::function MakePageFilter( const RowRanges& row_ranges, const std::shared_ptr<::parquet::OffsetIndex>& offset_index, int64_t row_group_row_count); /// Read a single column using skip/read pattern driven by RowRanges. - /// When OffsetIndex is available, uses data_page_filter for I/O-level page skipping - /// and compressed RowRanges for decode-level row skipping. static Result> ReadFilteredColumn( const std::shared_ptr<::parquet::RowGroupReader>& row_group_reader, ::parquet::ParquetFileReader* parquet_reader, @@ -96,8 +106,6 @@ class PageFilteredRowGroupReader { ::arrow::MemoryPool* pool); /// Compute compressed RowRanges after data_page_filter skips non-matching pages. - /// Maps original RowRanges to the compressed row space where skipped pages are removed. - /// @return pair of (compressed RowRanges, compressed total row count) static std::pair ComputeCompressedRowRanges( const RowRanges& original_ranges, const std::shared_ptr<::parquet::OffsetIndex>& offset_index, int64_t row_group_row_count); diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp index bd693730..604d2a4c 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp @@ -521,7 +521,7 @@ TEST_F(PageFilteredRowGroupReaderTest, ComputePageRangesPartialMatch) { row_ranges.Add(RowRanges::Range(50, 59)); auto ranges = PageFilteredRowGroupReader::ComputePageRanges( - parquet_reader.get(), /*row_group_index=*/0, row_ranges, /*column_indices=*/{0}); + parquet_reader.get(), TargetRowGroup(0, true, row_ranges), /*column_indices=*/{0}); // Should have exactly 1 range (page 5 of column 0, no dictionary since disabled) ASSERT_EQ(1, ranges.size()); @@ -544,8 +544,8 @@ TEST_F(PageFilteredRowGroupReaderTest, ComputePageRangesAllMatch) { RowRanges row_ranges; row_ranges.Add(RowRanges::Range(0, 99)); - auto ranges = - PageFilteredRowGroupReader::ComputePageRanges(parquet_reader.get(), 0, row_ranges, {0}); + auto ranges = PageFilteredRowGroupReader::ComputePageRanges( + parquet_reader.get(), TargetRowGroup(0, true, row_ranges), {0}); // 10 pages, all matching ASSERT_EQ(10, ranges.size()); @@ -568,8 +568,8 @@ TEST_F(PageFilteredRowGroupReaderTest, ComputePageRangesNoMatch) { RowRanges row_ranges; // empty - auto ranges = - PageFilteredRowGroupReader::ComputePageRanges(parquet_reader.get(), 0, row_ranges, {0}); + auto ranges = PageFilteredRowGroupReader::ComputePageRanges( + parquet_reader.get(), TargetRowGroup(0, true, row_ranges), {0}); ASSERT_EQ(0, ranges.size()); } @@ -589,8 +589,8 @@ TEST_F(PageFilteredRowGroupReaderTest, ComputePageRangesMultiColumn) { RowRanges row_ranges; row_ranges.Add(RowRanges::Range(50, 59)); - auto ranges = - PageFilteredRowGroupReader::ComputePageRanges(parquet_reader.get(), 0, row_ranges, {0, 1}); + auto ranges = PageFilteredRowGroupReader::ComputePageRanges( + parquet_reader.get(), TargetRowGroup(0, true, row_ranges), {0, 1}); // 1 matching page per column = 2 ranges total ASSERT_EQ(2, ranges.size()); @@ -615,8 +615,8 @@ TEST_F(PageFilteredRowGroupReaderTest, ComputePageRangesMultiplePages) { row_ranges.Add(RowRanges::Range(20, 29)); row_ranges.Add(RowRanges::Range(70, 79)); - auto ranges = - PageFilteredRowGroupReader::ComputePageRanges(parquet_reader.get(), 0, row_ranges, {0}); + auto ranges = PageFilteredRowGroupReader::ComputePageRanges( + parquet_reader.get(), TargetRowGroup(0, true, row_ranges), {0}); // 2 matching pages for 1 column ASSERT_EQ(2, ranges.size()); diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index c37570ac..95c0614a 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -48,17 +48,6 @@ #include "parquet/arrow/reader.h" #include "parquet/properties.h" -// Convert any std::exception thrown by underlying Parquet/Arrow APIs into a -// Status. Used as the trailing catch clauses of a try block in every public -// method that calls into the parquet C++ API, so the read layer never throws. -#define PAIMON_PARQUET_CATCH_AND_RETURN_STATUS(context) \ - catch (const std::exception& e) { \ - return Status::Invalid(fmt::format("{}: {}", (context), e.what())); \ - } \ - catch (...) { \ - return Status::UnknownError((context), ": unknown error"); \ - } - namespace arrow { class MemoryPool; } // namespace arrow @@ -76,7 +65,6 @@ ParquetFileBatchReader::ParquetFileBatchReader( arrow_pool_(arrow_pool), input_stream_(std::move(input_stream)), reader_(std::move(reader)), - read_ranges_(reader_->GetAllRowGroupRanges()), metrics_(std::make_shared()), logger_(Logger::GetLogger("ParquetFileBatchReader")) {} @@ -159,18 +147,6 @@ Status ParquetFileBatchReader::SetReadSchema( } } - // Build column name to index map for page-level filtering. - // For leaf columns, indices[0] is the correct leaf column index in Parquet. - // For nested types (struct/list/map), FlattenSchema produces multiple leaf indices, - // but predicate pushdown only targets leaf columns with simple types, so indices[0] - // is always the correct single leaf index for predicate evaluation. - std::map column_name_to_index; - for (const auto& [name, indices] : field_index_map) { - if (!indices.empty()) { - column_name_to_index[name] = indices[0]; - } - } - std::vector row_groups = arrow::internal::Iota(reader_->GetNumberOfRowGroups()); if (predicate) { PAIMON_ASSIGN_OR_RAISE(row_groups, @@ -182,44 +158,55 @@ Status ParquetFileBatchReader::SetReadSchema( } // Apply page-level filtering after bitmap pruning so we don't read page index // pages for row groups that the bitmap already excluded. + // If no predicate is provided, skip page-level filtering, row_group_row_ranges will be + // empty + std::map row_group_row_ranges; if (predicate && !row_groups.empty()) { PAIMON_ASSIGN_OR_RAISE( bool enable_page_index_filter, OptionsUtils::GetValueFromMap(options_, PARQUET_READ_ENABLE_PAGE_INDEX_FILTER, DEFAULT_PARQUET_READ_ENABLE_PAGE_INDEX_FILTER)); if (enable_page_index_filter) { + // Build column name to index map for page-level filtering. + // For leaf columns, indices[0] is the correct leaf column index in Parquet. + // For nested types (struct/list/map), FlattenSchema produces multiple leaf indices, + // but predicate pushdown only targets leaf columns with simple types, so indices[0] + // is always the correct single leaf index for predicate evaluation. + std::map column_name_to_index; + for (const auto& [name, indices] : field_index_map) { + if (!indices.empty()) { + column_name_to_index[name] = indices[0]; + } + } + PAIMON_ASSIGN_OR_RAISE( auto page_filter_result, FilterRowGroupsByPageIndex(predicate, column_name_to_index, row_groups)); row_groups = std::move(page_filter_result.first); - reader_->SetRowGroupRowRanges(page_filter_result.second); + row_group_row_ranges = std::move(page_filter_result.second); } } read_data_type_ = arrow::struct_(read_schema->fields()); - read_row_groups_ = row_groups; - read_column_indices_ = column_indices; metrics_->SetCounter(ParquetMetrics::READ_ROW_GROUPS_TOTAL, reader_->GetNumberOfRowGroups()); metrics_->SetCounter(ParquetMetrics::READ_ROW_GROUPS_AFTER_FILTER, row_groups.size()); - PAIMON_ASSIGN_OR_RAISE( - std::set ordered_row_groups, - reader_->FilterRowGroupsByReadRanges(read_ranges_, read_row_groups_)); - - // When predicate or selection is applied, prepare eagerly so PreBuffer I/O - // starts immediately. All file readers are created before consumption begins, - // so eager preparation allows I/O for multiple files to overlap. - Status ret; - if (predicate || selection_bitmap) { - ret = reader_->PrepareForReading(ordered_row_groups, read_column_indices_); - } else { - ret = reader_->PrepareForReadingLazy(ordered_row_groups, read_column_indices_); + // Build TargetRowGroup list with page-filter info in one shot. + std::vector target_row_groups; + for (int32_t rg_id : row_groups) { + auto it = row_group_row_ranges.find(rg_id); + if (it != row_group_row_ranges.end()) { + target_row_groups.emplace_back(rg_id, true, it->second); + } else { + target_row_groups.emplace_back(rg_id, false, RowRanges()); + } } - return ret; + PAIMON_RETURN_NOT_OK(reader_->PrepareForReadingLazy(target_row_groups, column_indices)); } PAIMON_PARQUET_CATCH_AND_RETURN_STATUS("ParquetFileBatchReader::SetReadSchema") + return Status::OK(); } Result> ParquetFileBatchReader::FilterRowGroupsByPredicate( diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.h b/src/paimon/format/parquet/parquet_file_batch_reader.h index 632d7762..8dc412c3 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.h +++ b/src/paimon/format/parquet/parquet_file_batch_reader.h @@ -102,11 +102,7 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { } Status SetReadRanges(const std::vector>& read_ranges) override { - read_ranges_ = read_ranges; - PAIMON_ASSIGN_OR_RAISE( - std::set ordered_row_groups, - reader_->FilterRowGroupsByReadRanges(read_ranges_, read_row_groups_)); - return reader_->PrepareForReadingLazy(ordered_row_groups, read_column_indices_); + return reader_->ApplyReadRanges(read_ranges); } std::shared_ptr GetReaderMetrics() const override { @@ -179,17 +175,12 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { std::unique_ptr reader_; std::shared_ptr read_data_type_; - std::vector> read_ranges_; std::shared_ptr metrics_; std::unique_ptr logger_; uint64_t read_rows_ = 0; uint64_t read_batch_count_ = 0; - - // last time set read schema - std::vector read_row_groups_; - std::vector read_column_indices_; }; } // namespace paimon::parquet diff --git a/src/paimon/format/parquet/parquet_format_defs.h b/src/paimon/format/parquet/parquet_format_defs.h index b979b578..b646ca6c 100644 --- a/src/paimon/format/parquet/parquet_format_defs.h +++ b/src/paimon/format/parquet/parquet_format_defs.h @@ -19,6 +19,17 @@ #include #include +// Convert any std::exception thrown by underlying Parquet/Arrow APIs into a +// Status. Used as the trailing catch clauses of a try block in every public +// method that calls into the parquet C++ API, so the read layer never throws. +#define PAIMON_PARQUET_CATCH_AND_RETURN_STATUS(context) \ + catch (const std::exception& e) { \ + return Status::Invalid(fmt::format("{}: {}", (context), e.what())); \ + } \ + catch (...) { \ + return Status::UnknownError((context), ": unknown error"); \ + } + namespace paimon::parquet { // write diff --git a/src/paimon/format/parquet/row_ranges.h b/src/paimon/format/parquet/row_ranges.h index 288fa48f..425f2405 100644 --- a/src/paimon/format/parquet/row_ranges.h +++ b/src/paimon/format/parquet/row_ranges.h @@ -43,6 +43,9 @@ class RowRanges { /// Creates a RowRanges from a list of ranges. explicit RowRanges(const std::vector& ranges) : ranges_(ranges) {} + /// Creates a RowRanges from a list of ranges, taking ownership of the vector. + explicit RowRanges(std::vector&& ranges) : ranges_(std::move(ranges)) {} + /// Creates a RowRanges with a single range [0, row_count - 1]. static RowRanges CreateSingle(int64_t row_count) { if (row_count <= 0) { @@ -102,4 +105,16 @@ class RowRanges { std::vector ranges_; }; +struct TargetRowGroup { + int32_t row_group_index{-1}; + bool is_page_filtered{false}; + // page-filtered row ranges, only valid if is_page_filtered is true. + RowRanges row_ranges; + + TargetRowGroup() = default; + TargetRowGroup(int32_t rg_index, bool page_filtered, RowRanges ranges) + : row_group_index(rg_index), + is_page_filtered(page_filtered), + row_ranges(std::move(ranges)) {} +}; } // namespace paimon::parquet