Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ if(PAIMON_BUILD_TESTS)
core/catalog/catalog_test.cpp
core/catalog/identifier_test.cpp
core/compact/compact_deletion_file_test.cpp
core/compact/compact_future_manager_test.cpp
core/compact/noop_compact_manager_test.cpp
core/core_options_test.cpp
core/options/lookup_strategy_test.cpp
Expand Down
298 changes: 283 additions & 15 deletions src/paimon/core/append/bucketed_append_compact_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@
#include <vector>

#include "gtest/gtest.h"
#include "paimon/core/deletionvectors/bitmap_deletion_vector.h"
#include "paimon/core/deletionvectors/bucketed_dv_maintainer.h"
#include "paimon/core/deletionvectors/deletion_vectors_index_file.h"
#include "paimon/core/io/data_file_meta.h"
#include "paimon/core/manifest/file_source.h"
#include "paimon/core/stats/simple_stats.h"
#include "paimon/executor.h"
#include "paimon/fs/file_system_factory.h"
#include "paimon/result.h"
#include "paimon/testing/mock/mock_index_path_factory.h"
#include "paimon/testing/utils/testharness.h"

namespace paimon::test {
Expand Down Expand Up @@ -111,11 +116,49 @@ class BucketedAppendCompactManagerTest : public testing::Test {
/*write_cols=*/std::nullopt);
}

std::shared_ptr<DataFileMeta> NewNamedFile(const std::string& file_name, int64_t file_size,
int64_t min_sequence_number,
int64_t max_sequence_number) {
return std::make_shared<DataFileMeta>(
file_name, file_size,
/*row_count=*/0,
/*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(),
/*key_stats=*/SimpleStats::EmptyStats(),
/*value_stats=*/SimpleStats::EmptyStats(), min_sequence_number, max_sequence_number,
/*schema_id=*/0,
/*level=*/DataFileMeta::DUMMY_LEVEL,
/*extra_files=*/std::vector<std::optional<std::string>>(),
/*creation_time=*/Timestamp(1724090888706ll, 0),
/*delete_row_count=*/0,
/*embedded_index=*/nullptr, FileSource::Append(),
/*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt,
/*first_row_id=*/std::nullopt,
/*write_cols=*/std::nullopt);
}

std::shared_ptr<BucketedDvMaintainer> CreateTestDvMaintainer(
const std::string& root_path,
const std::map<std::string, std::shared_ptr<DeletionVector>>& deletion_vectors) {
auto pool = GetDefaultPool();
EXPECT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
FileSystemFactory::Get("local", root_path, {}));
auto path_factory = std::make_shared<MockIndexPathFactory>(root_path);
auto dv_index_file =
std::make_shared<DeletionVectorsIndexFile>(fs, path_factory, /*bitmap64=*/false, pool);
return std::make_shared<BucketedDvMaintainer>(dv_index_file, deletion_vectors);
}

std::shared_ptr<DeletionVector> CreateSimpleDeletionVector(int32_t deleted_position) {
RoaringBitmap32 bitmap;
bitmap.Add(deleted_position);
return std::make_shared<BitmapDeletionVector>(bitmap);
}

void ExpectVectorsEqual(const std::vector<std::shared_ptr<DataFileMeta>>& actual,
const std::vector<std::shared_ptr<DataFileMeta>>& expected) {
EXPECT_EQ(actual.size(), expected.size());
ASSERT_EQ(actual.size(), expected.size());
for (size_t i = 0; i < actual.size(); ++i) {
EXPECT_EQ(*actual[i], *expected[i]);
ASSERT_EQ(*actual[i], *expected[i]);
}
}

Expand All @@ -129,10 +172,10 @@ TEST_F(BucketedAppendCompactManagerTest, TestFileComparatorWithoutOverlap) {
auto& file3 = files[2];

auto comparator = BucketedAppendCompactManager::FileComparator(false);
EXPECT_TRUE(comparator(file1, file2));
EXPECT_FALSE(comparator(file2, file1));
EXPECT_TRUE(comparator(file1, file3));
EXPECT_FALSE(comparator(file3, file1));
ASSERT_TRUE(comparator(file1, file2));
ASSERT_FALSE(comparator(file2, file1));
ASSERT_TRUE(comparator(file1, file3));
ASSERT_FALSE(comparator(file3, file1));
}

TEST_F(BucketedAppendCompactManagerTest, TestFileComparatorWithOverlap) {
Expand All @@ -142,10 +185,10 @@ TEST_F(BucketedAppendCompactManagerTest, TestFileComparatorWithOverlap) {
auto& file3 = files[2];

auto comparator = BucketedAppendCompactManager::FileComparator(true);
EXPECT_TRUE(comparator(file1, file2));
EXPECT_FALSE(comparator(file2, file1));
EXPECT_TRUE(comparator(file1, file3));
EXPECT_FALSE(comparator(file3, file1));
ASSERT_TRUE(comparator(file1, file2));
ASSERT_FALSE(comparator(file2, file1));
ASSERT_TRUE(comparator(file1, file3));
ASSERT_FALSE(comparator(file3, file1));
}

TEST_F(BucketedAppendCompactManagerTest, TestIsOverlap) {
Expand All @@ -154,9 +197,9 @@ TEST_F(BucketedAppendCompactManagerTest, TestIsOverlap) {
auto& file2 = files[1];
auto& file3 = files[2];

EXPECT_TRUE(BucketedAppendCompactManager::IsOverlap(file1, file2));
EXPECT_FALSE(BucketedAppendCompactManager::IsOverlap(file1, file3));
EXPECT_FALSE(BucketedAppendCompactManager::IsOverlap(file2, file3));
ASSERT_TRUE(BucketedAppendCompactManager::IsOverlap(file1, file2));
ASSERT_FALSE(BucketedAppendCompactManager::IsOverlap(file1, file3));
ASSERT_FALSE(BucketedAppendCompactManager::IsOverlap(file2, file3));
}

TEST_F(BucketedAppendCompactManagerTest, TestPickEmptyAndNotRelease) {
Expand Down Expand Up @@ -292,7 +335,7 @@ TEST_F(BucketedAppendCompactManagerTest, TestCancelCompactionPropagatesToRewrite
ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true));
manager.CancelAndWaitCompaction();

EXPECT_EQ(exit_future.wait_for(std::chrono::seconds(1)), std::future_status::ready);
ASSERT_EQ(exit_future.wait_for(std::chrono::seconds(1)), std::future_status::ready);
}

TEST_F(BucketedAppendCompactManagerTest, TestTriggerCompactionResetsCancelFlag) {
Expand All @@ -311,7 +354,232 @@ TEST_F(BucketedAppendCompactManagerTest, TestTriggerCompactionResetsCancelFlag)
/*reporter=*/nullptr, cancellation_controller);

ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true));
EXPECT_FALSE(cancellation_controller->IsCancelled());
ASSERT_FALSE(cancellation_controller->IsCancelled());
}

TEST_F(BucketedAppendCompactManagerTest, TestHasDeletionFileLargeFileWithDvRetained) {
// A large file with a deletion vector should NOT be skipped during full compaction.
// compaction_file_size = 500, so "big_file" (size=2000) is a large file.
// Because it has a deletion vector, HasDeletionFile returns true and keeps it in compaction.
auto dir = UniqueTestDirectory::Create();
ASSERT_TRUE(dir);

std::map<std::string, std::shared_ptr<DeletionVector>> deletion_vectors;
deletion_vectors["big_file"] = CreateSimpleDeletionVector(0);
auto dv_maintainer = CreateTestDvMaintainer(dir->Str(), deletion_vectors);

auto rewriter = [](const std::vector<std::shared_ptr<DataFileMeta>>& to_compact)
-> Result<std::vector<std::shared_ptr<DataFileMeta>>> { return to_compact; };

// big_file: size=2000 → exceeds compaction_file_size=500
// small_file1, small_file2: size=100 each → below threshold
auto big_file = NewNamedFile("big_file", 2000, 1, 2000);
auto small_file1 = NewNamedFile("small_file1", 100, 2001, 2100);
auto small_file2 = NewNamedFile("small_file2", 100, 2101, 2200);

BucketedAppendCompactManager manager(
executor_, {big_file, small_file1, small_file2}, dv_maintainer,
/*min_file_num=*/4,
/*target_file_size=*/1024,
/*compaction_file_size=*/500,
/*force_rewrite_all_files=*/false, rewriter,
/*reporter=*/nullptr,
/*cancellation_controller=*/std::make_shared<CancellationController>());

ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true));
ASSERT_OK_AND_ASSIGN(auto result, manager.GetCompactionResult(/*blocking=*/true));
ASSERT_TRUE(result.has_value());

// big_file has a deletion vector, so HasDeletionFile returns true and it is retained.
// All 3 files should appear in Before().
const auto& before = result.value()->Before();
ASSERT_EQ(before.size(), 3);
ASSERT_EQ(before[0]->file_name, "big_file");
ASSERT_EQ(before[1]->file_name, "small_file1");
ASSERT_EQ(before[2]->file_name, "small_file2");
}

TEST_F(BucketedAppendCompactManagerTest, TestHasDeletionFileLargeFileWithoutDvSkipped) {
// A large file WITHOUT a deletion vector should be skipped during full compaction.
auto dir = UniqueTestDirectory::Create();
ASSERT_TRUE(dir);

// dv_maintainer exists but has no deletion vector for "big_file"
std::map<std::string, std::shared_ptr<DeletionVector>> deletion_vectors;
deletion_vectors["other_file"] = CreateSimpleDeletionVector(0);
auto dv_maintainer = CreateTestDvMaintainer(dir->Str(), deletion_vectors);

auto rewriter = [](const std::vector<std::shared_ptr<DataFileMeta>>& to_compact)
-> Result<std::vector<std::shared_ptr<DataFileMeta>>> { return to_compact; };

// big_file: size=2000, exceeds compaction_file_size=500, but has no DV → skipped
auto big_file = NewNamedFile("big_file", 2000, 1, 2000);
auto small_file1 = NewNamedFile("small_file1", 100, 2001, 2100);
auto small_file2 = NewNamedFile("small_file2", 100, 2101, 2200);

BucketedAppendCompactManager manager(
executor_, {big_file, small_file1, small_file2}, dv_maintainer,
/*min_file_num=*/4,
/*target_file_size=*/1024,
/*compaction_file_size=*/500,
/*force_rewrite_all_files=*/false, rewriter,
/*reporter=*/nullptr,
/*cancellation_controller=*/std::make_shared<CancellationController>());

ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true));
ASSERT_OK_AND_ASSIGN(auto result, manager.GetCompactionResult(/*blocking=*/true));
ASSERT_TRUE(result.has_value());

// big_file has no deletion vector, so HasDeletionFile returns false and it is skipped.
// Only small files remain in Before().
const auto& before = result.value()->Before();
ASSERT_EQ(before.size(), 2);
ASSERT_EQ(before[0]->file_name, "small_file1");
ASSERT_EQ(before[1]->file_name, "small_file2");
}

TEST_F(BucketedAppendCompactManagerTest, TestDoCompactWithNullDvMaintainerWithLessBigFile) {
// When dv_maintainer is nullptr, so large files should be skipped.
auto rewriter = [](const std::vector<std::shared_ptr<DataFileMeta>>& to_compact)
-> Result<std::vector<std::shared_ptr<DataFileMeta>>> { return to_compact; };

auto big_file = NewNamedFile("big_file", 2000, 1, 2000);
auto small_file1 = NewNamedFile("small1", 100, 2001, 2100);
auto small_file2 = NewNamedFile("small2", 100, 2101, 2200);
auto small_file3 = NewNamedFile("small3", 100, 2201, 2300);

BucketedAppendCompactManager manager(
executor_, {big_file, small_file1, small_file2, small_file3},
/*dv_maintainer=*/nullptr,
/*min_file_num=*/4,
/*target_file_size=*/1024,
/*compaction_file_size=*/500,
/*force_rewrite_all_files=*/false, rewriter,
/*reporter=*/nullptr,
/*cancellation_controller=*/std::make_shared<CancellationController>());

ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true));
ASSERT_OK_AND_ASSIGN(auto result, manager.GetCompactionResult(/*blocking=*/true));
ASSERT_TRUE(result.has_value());

// No dv_maintainer → HasDeletionFile returns false → big_file is skipped.
// Only the 3 small files appear in Before().
const auto& before = result.value()->Before();
ASSERT_EQ(before.size(), 3);
ASSERT_EQ(before[0]->file_name, "small1");
ASSERT_EQ(before[1]->file_name, "small2");
ASSERT_EQ(before[2]->file_name, "small3");
}

TEST_F(BucketedAppendCompactManagerTest, TestDoNoCompact) {
auto rewriter = [](const std::vector<std::shared_ptr<DataFileMeta>>& to_compact)
-> Result<std::vector<std::shared_ptr<DataFileMeta>>> { return to_compact; };

auto small_file = NewNamedFile("small_file", 100, 1, 100);
auto big_file1 = NewNamedFile("big_file1", 2000, 101, 2100);
auto big_file2 = NewNamedFile("big_file2", 2000, 2101, 4100);

BucketedAppendCompactManager manager(
executor_, {small_file, big_file1, big_file2},
/*dv_maintainer=*/nullptr,
/*min_file_num=*/4,
/*target_file_size=*/1024,
/*compaction_file_size=*/500,
/*force_rewrite_all_files=*/false, rewriter,
/*reporter=*/nullptr,
/*cancellation_controller=*/std::make_shared<CancellationController>());

ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true));
ASSERT_OK_AND_ASSIGN(auto result, manager.GetCompactionResult(/*blocking=*/true));
ASSERT_TRUE(result.has_value());
ASSERT_TRUE(result.value()->Before().empty());
ASSERT_TRUE(result.value()->After().empty());
}

TEST_F(BucketedAppendCompactManagerTest, TestAllFilesWithoutCompacting) {
// When no compaction is running, AllFiles returns only the to_compact_ files.
auto rewriter = [](const std::vector<std::shared_ptr<DataFileMeta>>& to_compact)
-> Result<std::vector<std::shared_ptr<DataFileMeta>>> { return to_compact; };

auto file1 = NewNamedFile("file1", 100, 1, 100);
auto file2 = NewNamedFile("file2", 200, 101, 300);

BucketedAppendCompactManager manager(
executor_, {file1, file2},
/*dv_maintainer=*/nullptr,
/*min_file_num=*/4,
/*target_file_size=*/1024,
/*compaction_file_size=*/500,
/*force_rewrite_all_files=*/false, rewriter,
/*reporter=*/nullptr,
/*cancellation_controller=*/std::make_shared<CancellationController>());

auto all_files = manager.AllFiles();
// to_compact_ is a min-heap sorted by min_sequence_number, so file1 comes first.
ASSERT_EQ(all_files.size(), 2);
ASSERT_EQ(all_files[0]->file_name, "file1");
ASSERT_EQ(all_files[1]->file_name, "file2");
}

TEST_F(BucketedAppendCompactManagerTest, TestAllFilesWithCompacting) {
// When compaction is running, AllFiles returns compacting_ + to_compact_ files.
auto exit_signal = std::make_shared<std::promise<void>>();
auto proceed_signal = std::make_shared<std::promise<void>>();
auto proceed_future = std::make_shared<std::shared_future<void>>(proceed_signal->get_future());

auto rewriter = [exit_signal,
proceed_future](const std::vector<std::shared_ptr<DataFileMeta>>& to_compact)
-> Result<std::vector<std::shared_ptr<DataFileMeta>>> {
// Signal that rewriter has started
exit_signal->set_value();
// Wait for the test to check AllFiles before completing
proceed_future->wait();
return to_compact;
};

auto file1 = NewNamedFile("file1", 100, 1, 100);
auto file2 = NewNamedFile("file2", 100, 101, 200);
auto file3 = NewNamedFile("file3", 100, 201, 300);
auto file4 = NewNamedFile("file4", 100, 301, 400);

BucketedAppendCompactManager manager(
executor_, {file1, file2, file3, file4},
/*dv_maintainer=*/nullptr,
/*min_file_num=*/4,
/*target_file_size=*/1024,
/*compaction_file_size=*/500,
/*force_rewrite_all_files=*/false, rewriter,
/*reporter=*/nullptr,
/*cancellation_controller=*/std::make_shared<CancellationController>());

// Trigger compaction — all 4 small files will be picked
ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true));

// Wait until the rewriter is running (compaction in progress)
auto started_future = exit_signal->get_future();
ASSERT_EQ(started_future.wait_for(std::chrono::seconds(5)), std::future_status::ready);

// While compaction is in progress, AllFiles should return all files (compacting_ + to_compact_)
auto all_files = manager.AllFiles();
ASSERT_EQ(all_files.size(), 4);
ASSERT_EQ(all_files[0]->file_name, "file1");
ASSERT_EQ(all_files[1]->file_name, "file2");
ASSERT_EQ(all_files[2]->file_name, "file3");
ASSERT_EQ(all_files[3]->file_name, "file4");

// Let compaction finish
proceed_signal->set_value();

// After getting the result, compacting_ is cleared
ASSERT_OK_AND_ASSIGN(auto result, manager.GetCompactionResult(/*blocking=*/true));
ASSERT_TRUE(result.has_value());

// Now AllFiles should be empty (no to_compact_, compacting_ cleared)
// But the last file might be put back if it's small enough
auto all_files_after = manager.AllFiles();
// file4 (size=100 < compaction_file_size=500) should be put back to to_compact_
ASSERT_EQ(all_files_after.size(), 1);
ASSERT_EQ(all_files_after[0]->file_name, "file4");
}

} // namespace paimon::test
Loading
Loading