Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ TEST(DeletionVectorsIndexFileTest, Basic) {
FileSystemFactory::Get("local", dir->Str(), {}));
auto path_factory = std::make_shared<MockIndexPathFactory>(dir->Str());
auto pool = GetDefaultPool();
DeletionVectorsIndexFile index_file(fs, path_factory, /*bitmap64=*/false, pool);
auto index_file =
std::make_shared<DeletionVectorsIndexFile>(fs, path_factory, /*bitmap64=*/false, pool);

std::map<std::string, std::shared_ptr<DeletionVector>> input;
RoaringBitmap32 roaring_1;
Expand All @@ -50,15 +51,18 @@ TEST(DeletionVectorsIndexFileTest, Basic) {
}
input["dv2"] = std::make_shared<BitmapDeletionVector>(roaring_2);

ASSERT_FALSE(index_file.Bitmap64());
ASSERT_OK_AND_ASSIGN(auto meta, index_file.WriteSingleFile(input));
ASSERT_FALSE(index_file->Bitmap64());
ASSERT_OK_AND_ASSIGN(auto meta, index_file->WriteSingleFile(input));
ASSERT_GT(meta->FileSize(), 0);
ASSERT_OK_AND_ASSIGN(auto size, index_file->FileSize(meta));
ASSERT_EQ(meta->FileSize(), size);
ASSERT_EQ(meta->IndexType(), DeletionVectorsIndexFile::DELETION_VECTORS_INDEX);
ASSERT_EQ(meta->FileName(), "index-0");
ASSERT_FALSE(index_file->IsExternalPath());
ASSERT_EQ(meta->ExternalPath(), std::nullopt);

// Round trip: write then read all deletion vectors from index file.
ASSERT_OK_AND_ASSIGN(auto read_back, index_file.ReadAllDeletionVectors(meta));
ASSERT_OK_AND_ASSIGN(auto read_back, index_file->ReadAllDeletionVectors(meta));
ASSERT_EQ(read_back.size(), input.size());
ASSERT_EQ(read_back.at("dv1")->GetCardinality(), 10);
ASSERT_EQ(read_back.at("dv2")->GetCardinality(), 10);
Expand Down
1 change: 1 addition & 0 deletions src/paimon/core/global_index/global_index_write_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Result<std::unique_ptr<BatchReader>> CreateBatchReader(
const std::shared_ptr<MemoryPool>& pool) {
ReadContextBuilder read_context_builder(table_path);
read_context_builder.SetOptions(core_options.ToMap())
.WithFileSystem(core_options.GetFileSystem())
.EnablePrefetch(true)
.WithMemoryPool(pool)
.SetReadSchema({field_name});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class FieldListaggAgg : public FieldAggregator {
new_result.append(in_str);
result_ = std::move(new_result);
}
return std::string_view(result_);
return std::string_view{result_};
}

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,23 @@ LookupMergeTreeCompactRewriter<T>::Create(
auto write_schema = SpecialFields::CompleteSequenceAndValueKindField(data_schema);

// TODO(xinyu.lxy): set executor
// TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high memory
// usage during compaction. Will fix via parquet format refactor.
ReadContextBuilder read_context_builder(path_factory_cache->RootPath());
read_context_builder.SetOptions(options.ToMap())
.WithFileSystem(options.GetFileSystem())
.EnablePrefetch(true)
.SetPrefetchMaxParallelNum(1)
.SetPrefetchBatchCount(3)
.WithMemoryPool(pool)
.AddOption("parquet.read.enable-pre-buffer", "false");
.WithMemoryPool(pool);
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<ReadContext> read_context,
read_context_builder.Finish());

PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<InternalReadContext> internal_context,
InternalReadContext::Create(read_context, table_schema, options.ToMap()));
// TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high memory
// usage during compaction. Will fix via parquet format refactor.
auto new_options = options.ToMap();
if (new_options.find("parquet.read.enable-pre-buffer") == new_options.end()) {
new_options["parquet.read.enable-pre-buffer"] = "false";
}
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InternalReadContext> internal_context,
InternalReadContext::Create(read_context, table_schema, new_options));
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<FileStorePathFactory> path_factory,
path_factory_cache->GetOrCreatePathFactory(options.GetFileFormat()->Identifier()));
Expand Down
18 changes: 10 additions & 8 deletions src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,23 @@ Result<std::unique_ptr<MergeTreeCompactRewriter>> MergeTreeCompactRewriter::Crea
auto write_schema = SpecialFields::CompleteSequenceAndValueKindField(data_schema);

// TODO(xinyu.lxy): set executor
// TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high memory
// usage during compaction. Will fix via parquet format refactor.
ReadContextBuilder read_context_builder(path_factory_cache->RootPath());
read_context_builder.SetOptions(options.ToMap())
.WithFileSystem(options.GetFileSystem())
.EnablePrefetch(true)
.SetPrefetchMaxParallelNum(1)
.SetPrefetchBatchCount(3)
.WithMemoryPool(pool)
.AddOption("parquet.read.enable-pre-buffer", "false");
.WithMemoryPool(pool);
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<ReadContext> read_context,
read_context_builder.Finish());

PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<InternalReadContext> internal_context,
InternalReadContext::Create(read_context, table_schema, options.ToMap()));
// TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high memory
// usage during compaction. Will fix via parquet format refactor.
auto new_options = options.ToMap();
if (new_options.find("parquet.read.enable-pre-buffer") == new_options.end()) {
new_options["parquet.read.enable-pre-buffer"] = "false";
}
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InternalReadContext> internal_context,
InternalReadContext::Create(read_context, table_schema, new_options));
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<FileStorePathFactory> path_factory,
path_factory_cache->GetOrCreatePathFactory(options.GetFileFormat()->Identifier()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,38 @@ TEST_F(MergeTreeCompactRewriterTest, TestSimple) {
CheckResult(compact_file_name, fs, table_schema, expected_array);
}

TEST_F(MergeTreeCompactRewriterTest, TestCancel) {
std::string origin_table_path = GetDataDir() + "/orc/pk_table_scan_and_read_mor.db/";
auto table_dir = UniqueTestDirectory::Create("local");
ASSERT_TRUE(TestUtil::CopyDirectory(origin_table_path, table_dir->Str()));
std::string table_path = table_dir->Str() + "/pk_table_scan_and_read_mor";
auto fs = table_dir->GetFileSystem();

SchemaManager schema_manager(fs, table_path);
ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
ASSERT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap(table_schema->Options()));
auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields());
auto dv_factory = [](const std::string&) -> Result<std::shared_ptr<DeletionVector>> {
return std::shared_ptr<DeletionVector>();
};
auto cancellation_controller = std::make_shared<CancellationController>();
auto path_factory_cache =
std::make_shared<FileStorePathFactoryCache>(table_path, table_schema, options, pool_);
ASSERT_OK_AND_ASSIGN(
auto rewriter,
MergeTreeCompactRewriter::Create(
/*bucket=*/1, /*partition=*/BinaryRowGenerator::GenerateRow({10}, pool_.get()),
table_schema, dv_factory, path_factory_cache, options, cancellation_controller, pool_));

ASSERT_OK_AND_ASSIGN(auto runs, GenerateSortedRuns(table_path, table_schema, /*bucket=*/1,
/*partition=*/{{"f1", "10"}}));

// cancel compaction here
cancellation_controller->Cancel();
ASSERT_NOK_WITH_MSG(rewriter->Rewrite(/*output_level=*/5, /*drop_delete=*/true, runs),
"Compaction is cancelled");
}

TEST_F(MergeTreeCompactRewriterTest, TestNotDropDelete) {
std::string origin_table_path = GetDataDir() + "/orc/pk_table_scan_and_read_mor.db/";
auto table_dir = UniqueTestDirectory::Create("local");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Status MergeTreeCompactTask::Rewrite(std::vector<std::vector<SortedRun>>* candid
if (candidate->size() == 1) {
const auto& section = (*candidate)[0];
if (section.empty()) {
return Status::OK();
return Status::Invalid("invalid section, section cannot be empty in candidate");
}
if (section.size() == 1) {
for (const auto& file : section[0].Files()) {
Expand Down
15 changes: 15 additions & 0 deletions src/paimon/core/mergetree/compact/universal_compaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,21 @@ TEST_F(UniversalCompactionTest, TestPick) {
ASSERT_EQ(GetFileSizeVecFromCompactUnit(pick.value()), std::vector<int64_t>({1, 50, 3}));
}

TEST_F(UniversalCompactionTest, TestAllLevelRunsInvolved) {
int64_t current_time = 0;
auto full_compact_trigger = std::make_shared<TestableEarlyFullCompaction>(
/*full_compaction_interval=*/std::nullopt,
/*total_size_threshold=*/std::nullopt,
/*incremental_size_threshold=*/1000l, &current_time);
UniversalCompaction compaction(/*max_size_amp=*/100, /*size_ratio=*/1,
/*num_run_compaction_trigger=*/3, full_compact_trigger, nullptr);
ASSERT_OK_AND_ASSIGN(auto pick, compaction.Pick(/*num_levels=*/3, CreateRunsWithLevelAndSize(
/*levels=*/{0, 0, 0},
/*sizes=*/{1, 1, 3})));
ASSERT_TRUE(pick);
ASSERT_EQ(GetFileSizeVecFromCompactUnit(pick.value()), std::vector<int64_t>({1, 1, 3}));
}

TEST_F(UniversalCompactionTest, TestOptimizedCompactionInterval) {
int64_t current_time = 0;
auto full_compact_trigger = std::make_shared<TestableEarlyFullCompaction>(
Expand Down
17 changes: 10 additions & 7 deletions src/paimon/core/mergetree/lookup_levels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,23 @@ Result<std::unique_ptr<LookupLevels<T>>> LookupLevels<T>::Create(
auto partition_schema = DataField::ConvertDataFieldsToArrowSchema(partition_fields);

// TODO(xinyu.lxy): set executor
// TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high memory
// usage during compaction. Will fix via parquet format refactor.
ReadContextBuilder read_context_builder(path_factory->RootPath());
read_context_builder.SetOptions(options.ToMap())
.WithFileSystem(options.GetFileSystem())
.EnablePrefetch(true)
.SetPrefetchMaxParallelNum(1)
.SetPrefetchBatchCount(3)
.WithMemoryPool(pool)
.AddOption("parquet.read.enable-pre-buffer", "false");
.WithMemoryPool(pool);
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<ReadContext> read_context,
read_context_builder.Finish());
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<InternalReadContext> internal_read_context,
InternalReadContext::Create(read_context, table_schema, options.ToMap()));
// TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high memory
// usage during compaction. Will fix via parquet format refactor.
auto new_options = options.ToMap();
if (new_options.find("parquet.read.enable-pre-buffer") == new_options.end()) {
new_options["parquet.read.enable-pre-buffer"] = "false";
}
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InternalReadContext> internal_read_context,
InternalReadContext::Create(read_context, table_schema, new_options));
auto split_read = std::make_unique<RawFileSplitRead>(path_factory, internal_read_context, pool,
CreateDefaultExecutor());

Expand Down
Loading
Loading