From e342fe0379d48a7c2580a6f37575b03fce497026 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 22 May 2026 09:05:25 +0800 Subject: [PATCH] feat(update): add parallel deletion for ExpireSnapshots cleanup. --- src/iceberg/CMakeLists.txt | 13 +- src/iceberg/meson.build | 3 +- src/iceberg/test/expire_snapshots_test.cc | 162 +++++++++++++++++----- src/iceberg/update/expire_snapshots.cc | 35 ++++- 4 files changed, 167 insertions(+), 46 deletions(-) diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 68cacebeb..3f0fabb8f 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -119,6 +119,9 @@ set(ICEBERG_SOURCES util/url_encoder.cc util/uuid.cc) +find_package(Threads REQUIRED) +list(APPEND ICEBERG_SYSTEM_DEPENDENCIES Threads) + set(ICEBERG_STATIC_BUILD_INTERFACE_LIBS) set(ICEBERG_SHARED_BUILD_INTERFACE_LIBS) set(ICEBERG_STATIC_INSTALL_INTERFACE_LIBS) @@ -128,22 +131,24 @@ list(APPEND ICEBERG_STATIC_BUILD_INTERFACE_LIBS "$,nanoarrow::nanoarrow_static,$,nanoarrow::nanoarrow_static,nanoarrow::nanoarrow_shared>>" nlohmann_json::nlohmann_json - ZLIB::ZLIB) + ZLIB::ZLIB + Threads::Threads) list(APPEND ICEBERG_SHARED_BUILD_INTERFACE_LIBS "$,nanoarrow::nanoarrow_static,$,nanoarrow::nanoarrow_shared,nanoarrow::nanoarrow_static>>" nlohmann_json::nlohmann_json - ZLIB::ZLIB) + ZLIB::ZLIB + Threads::Threads) list(APPEND ICEBERG_STATIC_INSTALL_INTERFACE_LIBS "$,iceberg::nanoarrow_static,$,nanoarrow::nanoarrow_static,nanoarrow::nanoarrow_shared>>" "$,iceberg::nlohmann_json,$,nlohmann_json::nlohmann_json,nlohmann_json::nlohmann_json>>" -) + Threads::Threads) list(APPEND ICEBERG_SHARED_INSTALL_INTERFACE_LIBS "$,iceberg::nanoarrow_static,$,nanoarrow::nanoarrow_shared,nanoarrow::nanoarrow_static>>" "$,iceberg::nlohmann_json,$,nlohmann_json::nlohmann_json,nlohmann_json::nlohmann_json>>" -) + Threads::Threads) add_iceberg_lib(iceberg SOURCES diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 03dc24479..cf544d57c 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -166,8 +166,9 @@ croaring_dep = dependency('croaring', static: croaring_needs_static) nanoarrow_dep = dependency('nanoarrow') nlohmann_json_dep = dependency('nlohmann_json') zlib_dep = dependency('zlib') +thread_dep = dependency('threads') -iceberg_deps = [nanoarrow_dep, nlohmann_json_dep, zlib_dep] +iceberg_deps = [nanoarrow_dep, nlohmann_json_dep, zlib_dep, thread_dep] iceberg_lib = library( 'iceberg', diff --git a/src/iceberg/test/expire_snapshots_test.cc b/src/iceberg/test/expire_snapshots_test.cc index 3a99b0009..bf3ceea9e 100644 --- a/src/iceberg/test/expire_snapshots_test.cc +++ b/src/iceberg/test/expire_snapshots_test.cc @@ -19,8 +19,12 @@ #include "iceberg/update/expire_snapshots.h" +#include +#include +#include #include #include +#include #include #include @@ -38,9 +42,19 @@ namespace iceberg { -class ExpireSnapshotsTest : public UpdateTestBase {}; +class ExpireSnapshotsTest : public UpdateTestBase { + protected: + void RecordDeletedFile(std::vector& deleted_files, + const std::string& path) { + std::lock_guard lock(deleted_files_mutex_); + deleted_files.push_back(path); + } + + private: + std::mutex deleted_files_mutex_; +}; -class ExpireSnapshotsCleanupTest : public UpdateTestBase { +class ExpireSnapshotsCleanupTest : public ExpireSnapshotsTest { protected: static constexpr int64_t kExpiredSnapshotId = 3051729675574597004; static constexpr int64_t kCurrentSnapshotId = 3055729675574597004; @@ -289,8 +303,9 @@ TEST_F(ExpireSnapshotsCleanupTest, RetainsUnreferencedSnapshotAtExpireThreshold) TEST_F(ExpireSnapshotsTest, FinalizeRequiresCommittedMetadata) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([this, &deleted_files](const std::string& path) { + RecordDeletedFile(deleted_files, path); + }); // Apply first so apply_result_ is cached ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); @@ -308,8 +323,9 @@ TEST_F(ExpireSnapshotsTest, CleanupNoneSkipsDeletion) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); update->CleanupLevel(CleanupLevel::kNone); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([this, &deleted_files](const std::string& path) { + RecordDeletedFile(deleted_files, path); + }); ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1); @@ -323,8 +339,9 @@ TEST_F(ExpireSnapshotsTest, CleanupNoneSkipsDeletion) { TEST_F(ExpireSnapshotsTest, FinalizeSkippedOnCommitError) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([this, &deleted_files](const std::string& path) { + RecordDeletedFile(deleted_files, path); + }); ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1); @@ -340,8 +357,9 @@ TEST_F(ExpireSnapshotsTest, FinalizeSkipsWhenNothingExpired) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); update->RetainLast(2); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([this, &deleted_files](const std::string& path) { + RecordDeletedFile(deleted_files, path); + }); ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); EXPECT_TRUE(result.snapshot_ids_to_remove.empty()); @@ -395,8 +413,9 @@ TEST_F(ExpireSnapshotsCleanupTest, IgnoresExpiredDeleteManifestReadFailures) { ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); // Force the reachable path. update->ExpireSnapshotId(kExpiredSnapshotId); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([this, &deleted_files](const std::string& path) { + RecordDeletedFile(deleted_files, path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path, @@ -434,10 +453,65 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredFiles) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); update->ExpireSnapshotId(kExpiredSnapshotId); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([this, &deleted_files](const std::string& path) { + RecordDeletedFile(deleted_files, path); + }); + + EXPECT_THAT(update->Commit(), IsOk()); + EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path, + expired_data_manifest_path, + expired_delete_manifest_path, + expired_manifest_list_path)); +} + +TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredFilesInParallel) { + const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet"; + const auto expired_delete_file_path = table_location_ + "/data/expired-delete.parquet"; + const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro"; + const auto expired_delete_manifest_path = + table_location_ + "/metadata/expired-delete.avro"; + const auto expired_manifest_list_path = + table_location_ + "/metadata/expired-manifest-list.avro"; + const auto current_manifest_list_path = + table_location_ + "/metadata/current-manifest-list.avro"; + + auto expired_data_manifest = WriteDataManifest( + expired_data_manifest_path, kExpiredSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber, + MakeDataFile(expired_data_file_path))}); + auto expired_delete_manifest = WriteDeleteManifest( + expired_delete_manifest_path, kExpiredSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber, + MakePositionDeleteFile(expired_delete_file_path))}); + WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId, + /*parent_snapshot_id=*/0, kExpiredSequenceNumber, + {expired_data_manifest, expired_delete_manifest}); + WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId, + kCurrentSequenceNumber, {}); + RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path); + + std::atomic active_deletes{0}; + std::atomic max_active_deletes{0}; + std::vector deleted_files; + + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->ExpireSnapshotId(kExpiredSnapshotId); + update->DeleteWith([this, &active_deletes, &max_active_deletes, + &deleted_files](const std::string& path) { + int active = active_deletes.fetch_add(1) + 1; + int observed = max_active_deletes.load(); + while (active > observed && + !max_active_deletes.compare_exchange_weak(observed, active)) { + } + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + RecordDeletedFile(deleted_files, path); + active_deletes.fetch_sub(1); + }); EXPECT_THAT(update->Commit(), IsOk()); + EXPECT_GT(max_active_deletes.load(), 1); EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path, expired_data_manifest_path, expired_delete_manifest_path, @@ -473,8 +547,9 @@ TEST_F(ExpireSnapshotsCleanupTest, MetadataOnlySkipsDataDeletion) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); update->CleanupLevel(CleanupLevel::kMetadataOnly); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([this, &deleted_files](const std::string& path) { + RecordDeletedFile(deleted_files, path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path, @@ -510,8 +585,9 @@ TEST_F(ExpireSnapshotsCleanupTest, RetainedDeleteManifestSkipsDataDeletion) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([this, &deleted_files](const std::string& path) { + RecordDeletedFile(deleted_files, path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path, @@ -535,8 +611,9 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredStats) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([this, &deleted_files](const std::string& path) { + RecordDeletedFile(deleted_files, path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::Contains(expired_statistics_path)); @@ -561,8 +638,9 @@ TEST_F(ExpireSnapshotsCleanupTest, KeepsReusedStats) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([this, &deleted_files](const std::string& path) { + RecordDeletedFile(deleted_files, path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::Not(testing::Contains(reused_statistics_path))); @@ -586,8 +664,9 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredPartitionStats) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([this, &deleted_files](const std::string& path) { + RecordDeletedFile(deleted_files, path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::Contains(expired_statistics_path)); @@ -612,8 +691,9 @@ TEST_F(ExpireSnapshotsCleanupTest, KeepsReusedPartitionStats) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([this, &deleted_files](const std::string& path) { + RecordDeletedFile(deleted_files, path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::Not(testing::Contains(reused_statistics_path))); @@ -640,8 +720,9 @@ TEST_F(ExpireSnapshotsCleanupTest, IncrementalDispatchPreservesAncestorAddedFile std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([this, &deleted_files](const std::string& path) { + RecordDeletedFile(deleted_files, path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::Contains(expired_data_manifest_path)); @@ -673,8 +754,9 @@ TEST_F(ExpireSnapshotsCleanupTest, IncrementalDeletesExpiredDeletedEntries) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([this, &deleted_files](const std::string& path) { + RecordDeletedFile(deleted_files, path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::Contains(deleted_data_file_path)); @@ -704,8 +786,9 @@ TEST_F(ExpireSnapshotsCleanupTest, ReachableDispatchDeletesUnreachableData) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); update->ExpireSnapshotId(kExpiredSnapshotId); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([this, &deleted_files](const std::string& path) { + RecordDeletedFile(deleted_files, path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path, @@ -742,8 +825,9 @@ TEST_F(ExpireSnapshotsCleanupTest, IncrementalSkipsCherryPickedSnapshotCleanup) std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([this, &deleted_files](const std::string& path) { + RecordDeletedFile(deleted_files, path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_TRUE(deleted_files.empty()); @@ -792,8 +876,9 @@ TEST_F(ExpireSnapshotsCleanupTest, ReachableCleanupFailsClosedOnUnbindableExpire ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); update->ExpireSnapshotId(kExpiredSnapshotId); update->CleanExpiredMetadata(true); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([this, &deleted_files](const std::string& path) { + RecordDeletedFile(deleted_files, path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path, @@ -821,8 +906,9 @@ TEST_F(ExpireSnapshotsCleanupTest, CommitIgnoresMalformedSourceSnapshotIdCleanup std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([this, &deleted_files](const std::string& path) { + RecordDeletedFile(deleted_files, path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_TRUE(deleted_files.empty()); diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index c9ac9e4cd..22cfc5afd 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -48,6 +49,8 @@ namespace iceberg { namespace { +constexpr size_t kDefaultDeleteParallelism = 4; + Result> MakeManifestReader( const ManifestFile& manifest, const std::shared_ptr& file_io, const TableMetadata& metadata) { @@ -100,13 +103,39 @@ class FileCleanupStrategy { /// \brief Delete files at the given locations. void DeleteFiles(const std::unordered_set& paths) { + if (paths.empty()) { + return; + } + + std::vector path_list(paths.begin(), paths.end()); + auto parallelism = std::min(kDefaultDeleteParallelism, path_list.size()); + if (parallelism <= 1) { + DeleteFileRange(path_list, 0, path_list.size()); + return; + } + + std::vector workers; + workers.reserve(parallelism); + const size_t chunk_size = (path_list.size() + parallelism - 1) / parallelism; + for (size_t begin = 0; begin < path_list.size(); begin += chunk_size) { + const size_t end = std::min(begin + chunk_size, path_list.size()); + workers.emplace_back( + [this, &path_list, begin, end] { DeleteFileRange(path_list, begin, end); }); + } + + for (auto& worker : workers) { + worker.join(); + } + } + + void DeleteFileRange(const std::vector& paths, size_t begin, size_t end) { try { if (delete_func_) { - for (const auto& path : paths) { - delete_func_(path); + for (size_t i = begin; i < end; ++i) { + delete_func_(paths[i]); } } else { - std::vector path_list(paths.begin(), paths.end()); + std::vector path_list(paths.begin() + begin, paths.begin() + end); std::ignore = file_io_->DeleteFiles(path_list); } } catch (...) {