Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ set(ICEBERG_SOURCES
util/url_encoder.cc
util/uuid.cc)

find_package(Threads REQUIRED)
list(APPEND ICEBERG_SYSTEM_DEPENDENCIES Threads)

set(ICEBERG_STATIC_BUILD_INTERFACE_LIBS)
set(ICEBERG_SHARED_BUILD_INTERFACE_LIBS)
set(ICEBERG_STATIC_INSTALL_INTERFACE_LIBS)
Expand All @@ -128,22 +131,24 @@ list(APPEND
ICEBERG_STATIC_BUILD_INTERFACE_LIBS
"$<IF:$<BOOL:${NANOARROW_VENDORED}>,nanoarrow::nanoarrow_static,$<IF:$<TARGET_EXISTS:nanoarrow::nanoarrow_static>,nanoarrow::nanoarrow_static,nanoarrow::nanoarrow_shared>>"
nlohmann_json::nlohmann_json
ZLIB::ZLIB)
ZLIB::ZLIB
Threads::Threads)
list(APPEND
ICEBERG_SHARED_BUILD_INTERFACE_LIBS
"$<IF:$<BOOL:${NANOARROW_VENDORED}>,nanoarrow::nanoarrow_static,$<IF:$<TARGET_EXISTS:nanoarrow::nanoarrow_shared>,nanoarrow::nanoarrow_shared,nanoarrow::nanoarrow_static>>"
nlohmann_json::nlohmann_json
ZLIB::ZLIB)
ZLIB::ZLIB
Threads::Threads)
list(APPEND
ICEBERG_STATIC_INSTALL_INTERFACE_LIBS
"$<IF:$<BOOL:${NANOARROW_VENDORED}>,iceberg::nanoarrow_static,$<IF:$<TARGET_EXISTS:nanoarrow::nanoarrow_static>,nanoarrow::nanoarrow_static,nanoarrow::nanoarrow_shared>>"
"$<IF:$<BOOL:${NLOHMANN_JSON_VENDORED}>,iceberg::nlohmann_json,$<IF:$<TARGET_EXISTS:nlohmann_json::nlohmann_json>,nlohmann_json::nlohmann_json,nlohmann_json::nlohmann_json>>"
)
Threads::Threads)
list(APPEND
ICEBERG_SHARED_INSTALL_INTERFACE_LIBS
"$<IF:$<BOOL:${NANOARROW_VENDORED}>,iceberg::nanoarrow_static,$<IF:$<TARGET_EXISTS:nanoarrow::nanoarrow_shared>,nanoarrow::nanoarrow_shared,nanoarrow::nanoarrow_static>>"
"$<IF:$<BOOL:${NLOHMANN_JSON_VENDORED}>,iceberg::nlohmann_json,$<IF:$<TARGET_EXISTS:nlohmann_json::nlohmann_json>,nlohmann_json::nlohmann_json,nlohmann_json::nlohmann_json>>"
)
Threads::Threads)

add_iceberg_lib(iceberg
SOURCES
Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,9 @@ croaring_dep = dependency('croaring', static: croaring_needs_static)
nanoarrow_dep = dependency('nanoarrow')
nlohmann_json_dep = dependency('nlohmann_json')
zlib_dep = dependency('zlib')
thread_dep = dependency('threads')

iceberg_deps = [nanoarrow_dep, nlohmann_json_dep, zlib_dep]
iceberg_deps = [nanoarrow_dep, nlohmann_json_dep, zlib_dep, thread_dep]

iceberg_lib = library(
'iceberg',
Expand Down
162 changes: 124 additions & 38 deletions src/iceberg/test/expire_snapshots_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

#include "iceberg/update/expire_snapshots.h"

#include <atomic>
#include <chrono>
#include <mutex>
#include <optional>
#include <string>
#include <thread>
#include <vector>

#include <gmock/gmock.h>
Expand All @@ -38,9 +42,19 @@

namespace iceberg {

class ExpireSnapshotsTest : public UpdateTestBase {};
class ExpireSnapshotsTest : public UpdateTestBase {
protected:
void RecordDeletedFile(std::vector<std::string>& deleted_files,
const std::string& path) {
std::lock_guard lock(deleted_files_mutex_);
deleted_files.push_back(path);
}

private:
std::mutex deleted_files_mutex_;
};

class ExpireSnapshotsCleanupTest : public UpdateTestBase {
class ExpireSnapshotsCleanupTest : public ExpireSnapshotsTest {
protected:
static constexpr int64_t kExpiredSnapshotId = 3051729675574597004;
static constexpr int64_t kCurrentSnapshotId = 3055729675574597004;
Expand Down Expand Up @@ -289,8 +303,9 @@ TEST_F(ExpireSnapshotsCleanupTest, RetainsUnreferencedSnapshotAtExpireThreshold)
TEST_F(ExpireSnapshotsTest, FinalizeRequiresCommittedMetadata) {
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([this, &deleted_files](const std::string& path) {
RecordDeletedFile(deleted_files, path);
});

// Apply first so apply_result_ is cached
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
Expand All @@ -308,8 +323,9 @@ TEST_F(ExpireSnapshotsTest, CleanupNoneSkipsDeletion) {
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->CleanupLevel(CleanupLevel::kNone);
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([this, &deleted_files](const std::string& path) {
RecordDeletedFile(deleted_files, path);
});

ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
Expand All @@ -323,8 +339,9 @@ TEST_F(ExpireSnapshotsTest, CleanupNoneSkipsDeletion) {
TEST_F(ExpireSnapshotsTest, FinalizeSkippedOnCommitError) {
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([this, &deleted_files](const std::string& path) {
RecordDeletedFile(deleted_files, path);
});

ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
Expand All @@ -340,8 +357,9 @@ TEST_F(ExpireSnapshotsTest, FinalizeSkipsWhenNothingExpired) {
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->RetainLast(2);
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([this, &deleted_files](const std::string& path) {
RecordDeletedFile(deleted_files, path);
});

ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
EXPECT_TRUE(result.snapshot_ids_to_remove.empty());
Expand Down Expand Up @@ -395,8 +413,9 @@ TEST_F(ExpireSnapshotsCleanupTest, IgnoresExpiredDeleteManifestReadFailures) {
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
// Force the reachable path.
update->ExpireSnapshotId(kExpiredSnapshotId);
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([this, &deleted_files](const std::string& path) {
RecordDeletedFile(deleted_files, path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path,
Expand Down Expand Up @@ -434,10 +453,65 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredFiles) {
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->ExpireSnapshotId(kExpiredSnapshotId);
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([this, &deleted_files](const std::string& path) {
RecordDeletedFile(deleted_files, path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path,
expired_data_manifest_path,
expired_delete_manifest_path,
expired_manifest_list_path));
}

TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredFilesInParallel) {
const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet";
const auto expired_delete_file_path = table_location_ + "/data/expired-delete.parquet";
const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro";
const auto expired_delete_manifest_path =
table_location_ + "/metadata/expired-delete.avro";
const auto expired_manifest_list_path =
table_location_ + "/metadata/expired-manifest-list.avro";
const auto current_manifest_list_path =
table_location_ + "/metadata/current-manifest-list.avro";

auto expired_data_manifest = WriteDataManifest(
expired_data_manifest_path, kExpiredSnapshotId,
{MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber,
MakeDataFile(expired_data_file_path))});
auto expired_delete_manifest = WriteDeleteManifest(
expired_delete_manifest_path, kExpiredSnapshotId,
{MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber,
MakePositionDeleteFile(expired_delete_file_path))});
WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
/*parent_snapshot_id=*/0, kExpiredSequenceNumber,
{expired_data_manifest, expired_delete_manifest});
WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId,
kCurrentSequenceNumber, {});
RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path);

std::atomic<int> active_deletes{0};
std::atomic<int> max_active_deletes{0};
std::vector<std::string> deleted_files;

ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->ExpireSnapshotId(kExpiredSnapshotId);
update->DeleteWith([this, &active_deletes, &max_active_deletes,
&deleted_files](const std::string& path) {
int active = active_deletes.fetch_add(1) + 1;
int observed = max_active_deletes.load();
while (active > observed &&
!max_active_deletes.compare_exchange_weak(observed, active)) {
}

std::this_thread::sleep_for(std::chrono::milliseconds(50));

RecordDeletedFile(deleted_files, path);
active_deletes.fetch_sub(1);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_GT(max_active_deletes.load(), 1);
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path,
expired_data_manifest_path,
expired_delete_manifest_path,
Expand Down Expand Up @@ -473,8 +547,9 @@ TEST_F(ExpireSnapshotsCleanupTest, MetadataOnlySkipsDataDeletion) {
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->CleanupLevel(CleanupLevel::kMetadataOnly);
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([this, &deleted_files](const std::string& path) {
RecordDeletedFile(deleted_files, path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path,
Expand Down Expand Up @@ -510,8 +585,9 @@ TEST_F(ExpireSnapshotsCleanupTest, RetainedDeleteManifestSkipsDataDeletion) {

std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([this, &deleted_files](const std::string& path) {
RecordDeletedFile(deleted_files, path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path,
Expand All @@ -535,8 +611,9 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredStats) {

std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([this, &deleted_files](const std::string& path) {
RecordDeletedFile(deleted_files, path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::Contains(expired_statistics_path));
Expand All @@ -561,8 +638,9 @@ TEST_F(ExpireSnapshotsCleanupTest, KeepsReusedStats) {

std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([this, &deleted_files](const std::string& path) {
RecordDeletedFile(deleted_files, path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::Not(testing::Contains(reused_statistics_path)));
Expand All @@ -586,8 +664,9 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredPartitionStats) {

std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([this, &deleted_files](const std::string& path) {
RecordDeletedFile(deleted_files, path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::Contains(expired_statistics_path));
Expand All @@ -612,8 +691,9 @@ TEST_F(ExpireSnapshotsCleanupTest, KeepsReusedPartitionStats) {

std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([this, &deleted_files](const std::string& path) {
RecordDeletedFile(deleted_files, path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::Not(testing::Contains(reused_statistics_path)));
Expand All @@ -640,8 +720,9 @@ TEST_F(ExpireSnapshotsCleanupTest, IncrementalDispatchPreservesAncestorAddedFile

std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([this, &deleted_files](const std::string& path) {
RecordDeletedFile(deleted_files, path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::Contains(expired_data_manifest_path));
Expand Down Expand Up @@ -673,8 +754,9 @@ TEST_F(ExpireSnapshotsCleanupTest, IncrementalDeletesExpiredDeletedEntries) {

std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([this, &deleted_files](const std::string& path) {
RecordDeletedFile(deleted_files, path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::Contains(deleted_data_file_path));
Expand Down Expand Up @@ -704,8 +786,9 @@ TEST_F(ExpireSnapshotsCleanupTest, ReachableDispatchDeletesUnreachableData) {
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->ExpireSnapshotId(kExpiredSnapshotId);
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([this, &deleted_files](const std::string& path) {
RecordDeletedFile(deleted_files, path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path,
Expand Down Expand Up @@ -742,8 +825,9 @@ TEST_F(ExpireSnapshotsCleanupTest, IncrementalSkipsCherryPickedSnapshotCleanup)

std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([this, &deleted_files](const std::string& path) {
RecordDeletedFile(deleted_files, path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_TRUE(deleted_files.empty());
Expand Down Expand Up @@ -792,8 +876,9 @@ TEST_F(ExpireSnapshotsCleanupTest, ReachableCleanupFailsClosedOnUnbindableExpire
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->ExpireSnapshotId(kExpiredSnapshotId);
update->CleanExpiredMetadata(true);
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([this, &deleted_files](const std::string& path) {
RecordDeletedFile(deleted_files, path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path,
Expand Down Expand Up @@ -821,8 +906,9 @@ TEST_F(ExpireSnapshotsCleanupTest, CommitIgnoresMalformedSourceSnapshotIdCleanup

std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([this, &deleted_files](const std::string& path) {
RecordDeletedFile(deleted_files, path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_TRUE(deleted_files.empty());
Expand Down
Loading
Loading