From 3098c818d4129f9dd53b82e3834e65ffe100581d Mon Sep 17 00:00:00 2001 From: Minh Vu <38443830+fallintoplace@users.noreply.github.com> Date: Wed, 20 May 2026 19:23:42 +0200 Subject: [PATCH] fix: evaluate ManifestGroup file filters --- src/iceberg/manifest/manifest_group.cc | 81 +++++++++++-- src/iceberg/row/manifest_wrapper.cc | 146 ++++++++++++++++++++++++ src/iceberg/row/manifest_wrapper.h | 21 ++++ src/iceberg/test/manifest_group_test.cc | 132 ++++++++++++++++++++- 4 files changed, 370 insertions(+), 10 deletions(-) diff --git a/src/iceberg/manifest/manifest_group.cc b/src/iceberg/manifest/manifest_group.cc index 8af717b25..3459057d1 100644 --- a/src/iceberg/manifest/manifest_group.cc +++ b/src/iceberg/manifest/manifest_group.cc @@ -19,8 +19,12 @@ #include "iceberg/manifest/manifest_group.h" +#include +#include +#include #include +#include "iceberg/expression/binder.h" #include "iceberg/expression/evaluator.h" #include "iceberg/expression/expression.h" #include "iceberg/expression/manifest_evaluator.h" @@ -29,6 +33,7 @@ #include "iceberg/file_io.h" #include "iceberg/manifest/manifest_reader.h" #include "iceberg/partition_spec.h" +#include "iceberg/row/manifest_wrapper.h" #include "iceberg/schema.h" #include "iceberg/table_scan.h" #include "iceberg/util/checked_cast.h" @@ -265,10 +270,45 @@ Result> ManifestGroup::MakeReader( ICEBERG_ASSIGN_OR_RAISE(auto reader, ManifestReader::Make(manifest, io_, schema_, specs_by_id_)); + auto columns = columns_; + if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue && + !columns.empty() && + std::ranges::find(columns, Schema::kAllColumns) == columns.end()) { + auto spec_iter = specs_by_id_.find(manifest.partition_spec_id); + ICEBERG_CHECK(spec_iter != specs_by_id_.cend(), + "Cannot find partition spec for ID {}", manifest.partition_spec_id); + + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, + spec_iter->second->PartitionType(*schema_)); + auto data_file_schema = DataFile::Type(std::move(partition_type))->ToSchema(); + ICEBERG_ASSIGN_OR_RAISE( + auto bound_file_filter, + Binder::Bind(*data_file_schema, file_filter_, case_sensitive_)); + ICEBERG_ASSIGN_OR_RAISE(auto referenced_field_ids, + ReferenceVisitor::GetReferencedFieldIds(bound_file_filter)); + + std::unordered_set selected_columns(columns.cbegin(), columns.cend()); + for (const auto field_id : referenced_field_ids) { + ICEBERG_ASSIGN_OR_RAISE(auto column_name, + data_file_schema->FindColumnNameById(field_id)); + if (column_name.has_value()) { + std::string column_name_str(column_name.value()); + if (column_name_str.starts_with(DataFile::kPartitionField + ".")) { + column_name_str = DataFile::kPartitionField; + } + if (selected_columns.contains(column_name_str)) { + continue; + } + columns.push_back(std::move(column_name_str)); + selected_columns.insert(columns.back()); + } + } + } + reader->FilterRows(data_filter_) .FilterPartitions(partition_filter_) .CaseSensitive(case_sensitive_) - .Select(columns_); + .Select(std::move(columns)); return reader; } @@ -299,11 +339,31 @@ ManifestGroup::ReadEntries() { return eval_cache[spec_id].get(); }; - std::unique_ptr data_file_evaluator; - if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue) { - // TODO(gangwu): create an Evaluator on the DataFile schema with empty - // partition type - } + const bool has_file_filter = + file_filter_ && file_filter_->op() != Expression::Operation::kTrue; + std::unordered_map> data_file_eval_cache; + auto get_data_file_evaluator = [&](int32_t spec_id) -> Result { + if (!has_file_filter) { + return nullptr; + } + if (data_file_eval_cache.contains(spec_id)) { + return data_file_eval_cache[spec_id].get(); + } + + auto spec_iter = specs_by_id_.find(spec_id); + ICEBERG_CHECK(spec_iter != specs_by_id_.cend(), + "Cannot find partition spec for ID {}", spec_id); + + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, + spec_iter->second->PartitionType(*schema_)); + auto data_file_schema = DataFile::Type(std::move(partition_type))->ToSchema(); + ICEBERG_ASSIGN_OR_RAISE( + auto data_file_evaluator, + Evaluator::Make(*data_file_schema, file_filter_, case_sensitive_)); + data_file_eval_cache[spec_id] = std::move(data_file_evaluator); + + return data_file_eval_cache[spec_id].get(); + }; std::unordered_map> result; @@ -336,6 +396,7 @@ ManifestGroup::ReadEntries() { ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest)); ICEBERG_ASSIGN_OR_RAISE(auto entries, ignore_deleted_ ? reader->LiveEntries() : reader->Entries()); + ICEBERG_ASSIGN_OR_RAISE(auto data_file_evaluator, get_data_file_evaluator(spec_id)); for (auto& entry : entries) { if (ignore_existing_ && entry.status == ManifestStatus::kExisting) { @@ -343,8 +404,12 @@ ManifestGroup::ReadEntries() { } if (data_file_evaluator != nullptr) { - // TODO(gangwu): implement data_file_evaluator to evaluate StructLike on - // top of entry.data_file + DataFileStructLike data_file(*entry.data_file); + ICEBERG_ASSIGN_OR_RAISE(bool should_match, + data_file_evaluator->Evaluate(data_file)); + if (!should_match) { + continue; + } } if (!manifest_entry_predicate_(entry)) { diff --git a/src/iceberg/row/manifest_wrapper.cc b/src/iceberg/row/manifest_wrapper.cc index 851f9e72f..a55acce56 100644 --- a/src/iceberg/row/manifest_wrapper.cc +++ b/src/iceberg/row/manifest_wrapper.cc @@ -19,18 +19,55 @@ #include "iceberg/row/manifest_wrapper.h" +#include +#include +#include +#include +#include + #include "iceberg/manifest/manifest_reader_internal.h" #include "iceberg/util/macros.h" namespace iceberg { namespace { + +enum class DataFileFieldPosition : size_t { + kContent = 0, + kFilePath = 1, + kFileFormat = 2, + kPartition = 3, + kRecordCount = 4, + kFileSize = 5, + kColumnSizes = 6, + kValueCounts = 7, + kNullValueCounts = 8, + kNanValueCounts = 9, + kLowerBounds = 10, + kUpperBounds = 11, + kKeyMetadata = 12, + kSplitOffsets = 13, + kEqualityIds = 14, + kSortOrderId = 15, + kFirstRowId = 16, + kReferencedDataFile = 17, + kContentOffset = 18, + kContentSize = 19, + kNextUnusedId = 20, +}; + template requires std::is_same_v> || std::is_same_v std::string_view ToView(const T& value) { return {reinterpret_cast(value.data()), value.size()}; // NOLINT } +Scalar ToScalar(const int32_t value) { return value; } + +Scalar ToScalar(const int64_t value) { return value; } + +Scalar ToScalar(const std::vector& value) { return ToView(value); } + template Result FromOptional(const std::optional& value) { if (value.has_value()) { @@ -39,6 +76,56 @@ Result FromOptional(const std::optional& value) { return std::monostate{}; } +Result FromOptionalString(const std::optional& value) { + if (value.has_value()) { + return ToView(value.value()); + } + return std::monostate{}; +} + +template +class VectorArrayLike : public ArrayLike { + public: + explicit VectorArrayLike(const std::vector& values) : values_(values) {} + + Result GetElement(size_t pos) const override { + if (pos >= size()) { + return InvalidArgument("Invalid array index: {}", pos); + } + return ToScalar(values_.get()[pos]); + } + + size_t size() const override { return values_.get().size(); } + + private: + std::reference_wrapper> values_; +}; + +template +class IntMapLike : public MapLike { + public: + explicit IntMapLike(const std::map& values) : values_(values) {} + + Result GetKey(size_t pos) const override { + if (pos >= size()) { + return InvalidArgument("Invalid map index: {}", pos); + } + return std::next(values_.get().cbegin(), pos)->first; + } + + Result GetValue(size_t pos) const override { + if (pos >= size()) { + return InvalidArgument("Invalid map index: {}", pos); + } + return ToScalar(std::next(values_.get().cbegin(), pos)->second); + } + + size_t size() const override { return values_.get().size(); } + + private: + std::reference_wrapper> values_; +}; + } // namespace Result PartitionFieldSummaryStructLike::GetField(size_t pos) const { @@ -134,4 +221,63 @@ std::unique_ptr FromManifestFile(const ManifestFile& file) { return std::make_unique(file); } +Result DataFileStructLike::GetField(size_t pos) const { + if (pos >= num_fields()) { + return InvalidArgument("Invalid data file field index: {}", pos); + } + + const auto& data_file = data_file_.get(); + switch (static_cast(pos)) { + case DataFileFieldPosition::kContent: + return static_cast(data_file.content); + case DataFileFieldPosition::kFilePath: + return ToView(data_file.file_path); + case DataFileFieldPosition::kFileFormat: + return ToString(data_file.file_format); + case DataFileFieldPosition::kPartition: { + partition_ = std::make_shared(data_file.partition); + return partition_; + } + case DataFileFieldPosition::kRecordCount: + return data_file.record_count; + case DataFileFieldPosition::kFileSize: + return data_file.file_size_in_bytes; + case DataFileFieldPosition::kColumnSizes: + return std::make_shared>(data_file.column_sizes); + case DataFileFieldPosition::kValueCounts: + return std::make_shared>(data_file.value_counts); + case DataFileFieldPosition::kNullValueCounts: + return std::make_shared>(data_file.null_value_counts); + case DataFileFieldPosition::kNanValueCounts: + return std::make_shared>(data_file.nan_value_counts); + case DataFileFieldPosition::kLowerBounds: + return std::make_shared>>(data_file.lower_bounds); + case DataFileFieldPosition::kUpperBounds: + return std::make_shared>>(data_file.upper_bounds); + case DataFileFieldPosition::kKeyMetadata: + return ToView(data_file.key_metadata); + case DataFileFieldPosition::kSplitOffsets: + return std::make_shared>(data_file.split_offsets); + case DataFileFieldPosition::kEqualityIds: + return std::make_shared>(data_file.equality_ids); + case DataFileFieldPosition::kSortOrderId: + return FromOptional(data_file.sort_order_id); + case DataFileFieldPosition::kFirstRowId: + return FromOptional(data_file.first_row_id); + case DataFileFieldPosition::kReferencedDataFile: + return FromOptionalString(data_file.referenced_data_file); + case DataFileFieldPosition::kContentOffset: + return FromOptional(data_file.content_offset); + case DataFileFieldPosition::kContentSize: + return FromOptional(data_file.content_size_in_bytes); + case DataFileFieldPosition::kNextUnusedId: + return InvalidArgument("Invalid data file field index: {}", pos); + } + return InvalidArgument("Invalid data file field index: {}", pos); +} + +size_t DataFileStructLike::num_fields() const { + return static_cast(DataFileFieldPosition::kNextUnusedId); +} + } // namespace iceberg diff --git a/src/iceberg/row/manifest_wrapper.h b/src/iceberg/row/manifest_wrapper.h index bc04c1e8b..20c2165b2 100644 --- a/src/iceberg/row/manifest_wrapper.h +++ b/src/iceberg/row/manifest_wrapper.h @@ -26,6 +26,7 @@ #include #include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_list.h" #include "iceberg/row/struct_like.h" @@ -97,4 +98,24 @@ class ICEBERG_EXPORT ManifestFileStructLike : public StructLike { mutable std::shared_ptr summaries_; }; +/// \brief StructLike wrapper for DataFile metadata. +class ICEBERG_EXPORT DataFileStructLike : public StructLike { + public: + explicit DataFileStructLike(const DataFile& file) : data_file_(file) {} + ~DataFileStructLike() override = default; + + DataFileStructLike(const DataFileStructLike&) = delete; + DataFileStructLike& operator=(const DataFileStructLike&) = delete; + + Result GetField(size_t pos) const override; + + size_t num_fields() const override; + + void Reset(const DataFile& file) { data_file_ = std::cref(file); } + + private: + std::reference_wrapper data_file_; + mutable std::shared_ptr partition_; +}; + } // namespace iceberg diff --git a/src/iceberg/test/manifest_group_test.cc b/src/iceberg/test/manifest_group_test.cc index 017f98036..788d97e1d 100644 --- a/src/iceberg/test/manifest_group_test.cc +++ b/src/iceberg/test/manifest_group_test.cc @@ -76,13 +76,14 @@ class ManifestGroupTest : public testing::TestWithParam { std::shared_ptr MakeDataFile(const std::string& path, const PartitionValues& partition, - int32_t spec_id, int64_t record_count = 1) { + int32_t spec_id, int64_t record_count = 1, + int64_t file_size_in_bytes = 10) { return std::make_shared(DataFile{ .file_path = path, .file_format = FileFormatType::kParquet, .partition = partition, .record_count = record_count, - .file_size_in_bytes = 10, + .file_size_in_bytes = file_size_in_bytes, .sort_order_id = 0, .partition_spec_id = spec_id, }); @@ -404,6 +405,133 @@ TEST_P(ManifestGroupTest, CustomManifestEntriesFilter) { "/path/to/data3.parquet")); } +TEST_P(ManifestGroupTest, FilterFilesByRecordCount) { + auto version = GetParam(); + + constexpr int64_t kSnapshotId = 1000L; + const auto part_value = PartitionValues({Literal::Int(0)}); + + std::vector data_entries{ + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/small.parquet", part_value, + partitioned_spec_->spec_id(), /*record_count=*/5)), + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/boundary.parquet", part_value, + partitioned_spec_->spec_id(), /*record_count=*/10)), + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/large.parquet", part_value, + partitioned_spec_->spec_id(), /*record_count=*/15))}; + auto data_manifest = + WriteDataManifest(version, kSnapshotId, std::move(data_entries), partitioned_spec_); + + std::vector manifests = {data_manifest}; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + group->FilterFiles(Expressions::GreaterThanOrEqual("record_count", Literal::Long(10))); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries()); + EXPECT_THAT(GetEntryPaths(entries), + testing::UnorderedElementsAre("/path/to/boundary.parquet", + "/path/to/large.parquet")); +} + +TEST_P(ManifestGroupTest, FilterFilesByPartitionMetadata) { + auto version = GetParam(); + + constexpr int64_t kSnapshotId = 1000L; + const auto partition_bucket_0 = PartitionValues({Literal::Int(0)}); + const auto partition_bucket_1 = PartitionValues({Literal::Int(1)}); + + std::vector data_entries{ + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/bucket0.parquet", partition_bucket_0, + partitioned_spec_->spec_id())), + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/bucket1.parquet", partition_bucket_1, + partitioned_spec_->spec_id()))}; + auto data_manifest = + WriteDataManifest(version, kSnapshotId, std::move(data_entries), partitioned_spec_); + + std::vector manifests = {data_manifest}; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + group->FilterFiles(Expressions::Equal("partition.data_bucket_16_2", Literal::Int(1))); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries()); + EXPECT_THAT(GetEntryPaths(entries), testing::ElementsAre("/path/to/bucket1.parquet")); +} + +TEST_P(ManifestGroupTest, FilterFilesReadsFilteredColumnsWhenSelected) { + auto version = GetParam(); + + constexpr int64_t kSnapshotId = 1000L; + const auto part_value = PartitionValues({Literal::Int(0)}); + + std::vector data_entries{ + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/too-small.parquet", part_value, + partitioned_spec_->spec_id(), /*record_count=*/1, + /*file_size_in_bytes=*/5)), + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/matching.parquet", part_value, + partitioned_spec_->spec_id(), /*record_count=*/1, + /*file_size_in_bytes=*/20))}; + auto data_manifest = + WriteDataManifest(version, kSnapshotId, std::move(data_entries), partitioned_spec_); + + std::vector manifests = {data_manifest}; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + group->Select({"file_path"}) + .FilterFiles(Expressions::GreaterThan("file_size_in_bytes", Literal::Long(10))); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries()); + EXPECT_THAT(GetEntryPaths(entries), testing::ElementsAre("/path/to/matching.parquet")); +} + +TEST_P(ManifestGroupTest, FilterFilesReadsPartitionMetadataWhenSelected) { + auto version = GetParam(); + + std::shared_ptr multi_field_spec; + ICEBERG_UNWRAP_OR_FAIL( + multi_field_spec, + PartitionSpec::Make( + /*spec_id=*/2, {PartitionField(/*source_id=*/1, /*field_id=*/1001, + "id_identity", Transform::Identity()), + PartitionField(/*source_id=*/2, /*field_id=*/1002, + "data_identity", Transform::Identity())})); + + constexpr int64_t kSnapshotId = 1000L; + const auto keep_partition = PartitionValues({Literal::Int(1), Literal::String("keep")}); + const auto drop_partition = PartitionValues({Literal::Int(2), Literal::String("drop")}); + + std::vector data_entries{ + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/drop.parquet", drop_partition, + multi_field_spec->spec_id())), + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/keep.parquet", keep_partition, + multi_field_spec->spec_id()))}; + auto data_manifest = + WriteDataManifest(version, kSnapshotId, std::move(data_entries), multi_field_spec); + + auto specs_by_id = GetSpecsById(); + specs_by_id[multi_field_spec->spec_id()] = multi_field_spec; + std::vector manifests = {data_manifest}; + ICEBERG_UNWRAP_OR_FAIL(auto group, + ManifestGroup::Make(file_io_, schema_, std::move(specs_by_id), + std::move(manifests))); + group->Select({"file_path"}) + .FilterFiles( + Expressions::Equal("partition.data_identity", Literal::String("keep"))); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries()); + EXPECT_THAT(GetEntryPaths(entries), testing::ElementsAre("/path/to/keep.parquet")); +} + TEST_P(ManifestGroupTest, EmptyManifestGroup) { std::vector manifests; ICEBERG_UNWRAP_OR_FAIL(