Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pre-commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ on:
- '!dependabot/**'

permissions:
contents: read
contents: write

jobs:
pre-commit:
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ set(ICEBERG_SOURCES
type.cc
update/expire_snapshots.cc
update/fast_append.cc
update/merging_snapshot_update.cc
update/pending_update.cc
update/set_snapshot.cc
update/snapshot_manager.cc
Expand Down
37 changes: 36 additions & 1 deletion src/iceberg/manifest/manifest_filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,24 @@ bool ManifestFilterManager::ContainsDeletes() const {
!drop_partitions_.empty();
}

void ManifestFilterManager::DropDeleteFilesOlderThan(int64_t sequence_number) {
min_sequence_number_ = sequence_number;
}

void ManifestFilterManager::RemoveDanglingDeletesFor(const DataFileSet& deleted_files) {
for (const auto& file : deleted_files) {
removed_data_file_paths_.insert(file->file_path);
}
}

Result<bool> ManifestFilterManager::CanContainDroppedFiles(const ManifestFile&) const {
// TODO(Guotao): Use the manifest descriptor to skip unrelated object-delete
// manifests once object-delete partitions are tracked separately.
// Currently, DeleteFile(std::shared_ptr<DataFile>) degrades to a path-based delete,
// which forces scanning all manifests.
return !delete_paths_.empty();
// Also open delete manifests when a minimum sequence number is set for cleanup.
return !delete_paths_.empty() || !removed_data_file_paths_.empty() ||
(manifest_content_ == ManifestContent::kDeletes && min_sequence_number_ > 0);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java does not open delete manifests solely because minSequenceNumber is set; old deletes are dropped only when a delete manifest is already being filtered.

}

Result<bool> ManifestFilterManager::CanContainDroppedPartitions(
Expand Down Expand Up @@ -219,6 +231,25 @@ Result<bool> ManifestFilterManager::ShouldDelete(const ManifestEntry& entry,
return true;
}

// Delete-manifest-specific cleanup (only for ManifestContent::kDeletes).
if (manifest_content_ == ManifestContent::kDeletes) {
// Drop delete files whose data sequence number is older than the minimum
// retained by the table (they can no longer match any live data rows).
// seq == 0 (kInitialSequenceNumber / nullopt) is intentionally excluded:
// those entries predate sequence number assignment and must not be pruned.
int64_t seq = entry.sequence_number.value_or(0);
if (min_sequence_number_ > 0 && seq > 0 && seq < min_sequence_number_) {
return true;
}

// Drop DVs that reference a data file that has been removed (dangling DV).
if (!removed_data_file_paths_.empty() && file.IsDeletionVector() &&
file.referenced_data_file.has_value() &&
removed_data_file_paths_.count(*file.referenced_data_file)) {
return true;
}
}

if (HasRowFilterExpression(delete_expr_)) {
ICEBERG_ASSIGN_OR_RAISE(auto* residual_eval,
GetResidualEvaluator(schema, specs_by_id, spec_id));
Expand Down Expand Up @@ -403,6 +434,7 @@ Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
bool trust_manifest_references = CanTrustManifestReferences(manifests);
manifest_evaluator_cache_.clear();
residual_evaluator_cache_.clear();
replaced_manifests_count_ = 0;

// TODO(Guotao): Parallelize manifest filtering with per-manifest results, then
// merge found paths and deleted files after the loop.
Expand All @@ -413,6 +445,9 @@ Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
auto filtered_manifest,
FilterManifest(schema, specs_by_id, *manifest_ptr, trust_manifest_references,
writer_factory, found_paths));
if (filtered_manifest.manifest_path != manifest_ptr->manifest_path) {
++replaced_manifests_count_;
}
filtered.push_back(std::move(filtered_manifest));
}

Expand Down
34 changes: 34 additions & 0 deletions src/iceberg/manifest/manifest_filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,36 @@ class ICEBERG_EXPORT ManifestFilterManager {
/// manifest entry matches a delete condition.
void FailAnyDelete();

/// \brief Returns the number of manifests rewritten (replaced) by the last
/// FilterManifests() call. A manifest is replaced when it contained deleted entries
/// and was rewritten with those entries marked DELETED.
int32_t ReplacedManifestsCount() const { return replaced_manifests_count_; }

/// \brief Returns true if any delete condition has been registered.
bool ContainsDeletes() const;

/// \brief Set the minimum data sequence number for delete files to retain.
///
/// Only valid for ManifestContent::kDeletes managers. Delete entries whose
/// data_sequence_number is positive and less than \p sequence_number will be
/// marked DELETED. This continuously removes delete files that cannot match
/// any remaining data rows (i.e. all data written before that sequence number
/// has itself been deleted).
///
/// \param sequence_number the inclusive lower bound; delete files older than
/// this value are dropped
void DropDeleteFilesOlderThan(int64_t sequence_number);

/// \brief Register data files that have been removed so their dangling DVs
/// can be cleaned up.
///
/// Only valid for ManifestContent::kDeletes managers. For each DV whose
/// referenced_data_file path appears in \p deleted_files, the DV entry is
/// marked DELETED because the data file it targets no longer exists.
///
/// \param deleted_files set of data files that have been marked for deletion
void RemoveDanglingDeletesFor(const DataFileSet& deleted_files);

/// \brief Apply all accumulated delete conditions to the base snapshot's manifests.
///
/// Manifests that cannot possibly contain deleted files are returned unchanged.
Expand Down Expand Up @@ -220,6 +247,13 @@ class ICEBERG_EXPORT ManifestFilterManager {
bool fail_any_delete_{false};
bool case_sensitive_{true};

int32_t replaced_manifests_count_{0};

// minimum data sequence number; delete entries older than this are dropped
int64_t min_sequence_number_{0};
// paths of data files that were removed; DVs referencing these are dangling
std::unordered_set<std::string> removed_data_file_paths_;

std::unordered_map<int32_t, std::unique_ptr<ManifestEvaluator>>
manifest_evaluator_cache_;
std::unordered_map<int32_t, std::unique_ptr<ResidualEvaluator>>
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/manifest/manifest_merge_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Result<std::vector<ManifestFile>> ManifestMergeManager::MergeManifests(
std::ranges::copy(manifest_ranges | std::views::join, std::back_inserter(all));

if (all.empty() || !merge_enabled_) {
replaced_manifests_count_ = 0;
return all |
std::views::transform([](const ManifestFile* manifest) { return *manifest; }) |
std::ranges::to<std::vector<ManifestFile>>();
Expand All @@ -82,6 +83,7 @@ Result<std::vector<ManifestFile>> ManifestMergeManager::MergeManifests(

std::vector<ManifestFile> result;
result.reserve(all.size());
replaced_manifests_count_ = 0;
for (auto& [key, group] : by_spec) {
const auto* first = first_by_content.at(key.second);
ICEBERG_ASSIGN_OR_RAISE(auto merged, MergeGroup(group, first, snapshot_id, metadata,
Expand Down Expand Up @@ -140,6 +142,8 @@ Result<std::vector<ManifestFile>> ManifestMergeManager::MergeGroup(
} else {
ICEBERG_ASSIGN_OR_RAISE(
auto merged, FlushBin(bin, snapshot_id, metadata, file_io, writer_factory));
// Each manifest consumed into the merged output (beyond the 1 output) is replaced.
replaced_manifests_count_ += static_cast<int32_t>(bin.size()) - 1;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java counts each previous-snapshot manifest consumed by the merge as replaced, not bin.size() - 1. This can undercount old-only bins and overcount current-snapshot inputs.

result.push_back(std::move(merged));
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/iceberg/manifest/manifest_merge_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ class ICEBERG_EXPORT ManifestMergeManager {
const TableMetadata& metadata, std::shared_ptr<FileIO> file_io,
const ManifestWriterFactory& writer_factory);

/// \brief Returns the number of manifests replaced (consumed into merged outputs)
/// by the last MergeManifests() call.
int32_t ReplacedManifestsCount() const { return replaced_manifests_count_; }

private:
/// \brief Merge a group of manifests sharing the same spec_id.
///
Expand All @@ -109,6 +113,7 @@ class ICEBERG_EXPORT ManifestMergeManager {
const int64_t target_size_bytes_;
const int32_t min_count_to_merge_;
const bool merge_enabled_;
int32_t replaced_manifests_count_{0};
};

} // namespace iceberg
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ iceberg_sources = files(
'type.cc',
'update/expire_snapshots.cc',
'update/fast_append.cc',
'update/merging_snapshot_update.cc',
'update/pending_update.cc',
'update/set_snapshot.cc',
'update/snapshot_manager.cc',
Expand Down
24 changes: 24 additions & 0 deletions src/iceberg/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,10 @@ void SnapshotSummaryBuilder::Clear() {
metrics_.Clear();
deleted_duplicate_files_ = 0;
trust_partition_metrics_ = true;
manifests_counts_set_ = false;
manifests_created_ = 0;
manifests_kept_ = 0;
manifests_replaced_ = 0;
}

void SnapshotSummaryBuilder::SetPartitionSummaryLimit(int32_t max) {
Expand Down Expand Up @@ -475,6 +479,14 @@ void SnapshotSummaryBuilder::Set(const std::string& property, const std::string&
properties_[property] = value;
}

void SnapshotSummaryBuilder::SetManifestCounts(int32_t created, int32_t kept,
int32_t replaced) {
manifests_counts_set_ = true;
manifests_created_ = created;
manifests_kept_ = kept;
manifests_replaced_ = replaced;
}

void SnapshotSummaryBuilder::Merge(const SnapshotSummaryBuilder& other) {
for (const auto& [key, value] : other.properties_) {
properties_[key] = value;
Expand All @@ -491,6 +503,10 @@ void SnapshotSummaryBuilder::Merge(const SnapshotSummaryBuilder& other) {
}

deleted_duplicate_files_ += other.deleted_duplicate_files_;
// Manifest counts (manifests_counts_set_ / manifests_created_ / manifests_kept_ /
// manifests_replaced_) are intentionally not merged here. They are set directly
// on the root summary builder by Apply() after all manifests are finalized, and
// are never populated on sub-builders that get Merge()d in.
}

std::unordered_map<std::string, std::string> SnapshotSummaryBuilder::Build() const {
Expand All @@ -504,6 +520,14 @@ std::unordered_map<std::string, std::string> SnapshotSummaryBuilder::Build() con
SetIf(deleted_duplicate_files_ > 0, builder,
SnapshotSummaryFields::kDeletedDuplicatedFiles, deleted_duplicate_files_);

// Always emit all three manifest count fields together when they have been set.
SetIf(manifests_counts_set_, builder, SnapshotSummaryFields::kManifestsCreated,
manifests_created_);
SetIf(manifests_counts_set_, builder, SnapshotSummaryFields::kManifestsKept,
manifests_kept_);
SetIf(manifests_counts_set_, builder, SnapshotSummaryFields::kManifestsReplaced,
manifests_replaced_);

SetIf(trust_partition_metrics_, builder,
SnapshotSummaryFields::kChangedPartitionCountProp, partition_metrics_.size());

Expand Down
13 changes: 13 additions & 0 deletions src/iceberg/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,15 @@ class ICEBERG_EXPORT SnapshotSummaryBuilder {
/// \param value Property value
void Set(const std::string& property, const std::string& value);

/// \brief Set manifest count summary fields.
///
/// Records how many manifests were created, kept, and replaced in this snapshot.
///
/// \param created Manifests written by this snapshot
/// \param kept Manifests carried over unchanged from the previous snapshot
/// \param replaced Manifests rewritten or merged away
void SetManifestCounts(int32_t created, int32_t kept, int32_t replaced);

/// \brief Merge another builder's metrics into this one
///
/// \param other The builder to merge from
Expand All @@ -359,6 +368,10 @@ class ICEBERG_EXPORT SnapshotSummaryBuilder {
int32_t max_changed_partitions_for_summaries_{0};
int64_t deleted_duplicate_files_{0};
bool trust_partition_metrics_{true};
bool manifests_counts_set_{false};
int32_t manifests_created_{0};
int32_t manifests_kept_{0};
int32_t manifests_replaced_{0};
};

/// \brief Data operation that produce snapshots.
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ if(ICEBERG_BUILD_BUNDLE)
expire_snapshots_test.cc
fast_append_test.cc
manifest_filter_manager_test.cc
merging_snapshot_update_test.cc
name_mapping_update_test.cc
snapshot_manager_test.cc
transaction_test.cc
Expand Down
Loading
Loading