From c16648816ba31f4d48ce8fa25321f1a8ee0f87d7 Mon Sep 17 00:00:00 2001 From: "lisizhuo.lsz" Date: Fri, 17 Apr 2026 14:44:16 +0800 Subject: [PATCH 1/3] test(compaction): add ut --- .../deletion_vectors_index_file_test.cpp | 12 +- .../global_index/global_index_write_task.cpp | 1 + .../lookup_merge_tree_compact_rewriter.cpp | 18 +- .../compact/merge_tree_compact_rewriter.cpp | 18 +- .../merge_tree_compact_rewriter_test.cpp | 32 ++++ .../compact/merge_tree_compact_task.cpp | 2 +- .../compact/universal_compaction_test.cpp | 15 ++ src/paimon/core/mergetree/lookup_levels.cpp | 17 +- .../core/mergetree/merge_tree_writer_test.cpp | 179 ++++++++++++++++++ .../append_only_file_store_write.cpp | 18 +- .../data_evolution_file_store_scan_test.cpp | 3 +- src/paimon/core/schema/table_schema.cpp | 17 +- src/paimon/core/schema/table_schema.h | 6 +- src/paimon/core/schema/table_schema_test.cpp | 1 + src/paimon/core/table/table_test.cpp | 25 +++ .../file_store_path_factory_cache_test.cpp | 3 +- src/paimon/core/utils/offset_row_test.cpp | 98 +++++++--- .../format/avro/avro_direct_encoder.cpp | 5 +- .../avro/avro_direct_encoder_decoder_test.cpp | 15 ++ .../parquet/parquet_format_writer_test.cpp | 34 ++++ .../parquet/parquet_writer_builder_test.cpp | 2 + 21 files changed, 447 insertions(+), 74 deletions(-) diff --git a/src/paimon/core/deletionvectors/deletion_vectors_index_file_test.cpp b/src/paimon/core/deletionvectors/deletion_vectors_index_file_test.cpp index bd1889c3c..9d204c9f5 100644 --- a/src/paimon/core/deletionvectors/deletion_vectors_index_file_test.cpp +++ b/src/paimon/core/deletionvectors/deletion_vectors_index_file_test.cpp @@ -36,7 +36,8 @@ TEST(DeletionVectorsIndexFileTest, Basic) { FileSystemFactory::Get("local", dir->Str(), {})); auto path_factory = std::make_shared(dir->Str()); auto pool = GetDefaultPool(); - DeletionVectorsIndexFile index_file(fs, path_factory, /*bitmap64=*/false, pool); + auto index_file = + std::make_shared(fs, path_factory, /*bitmap64=*/false, pool); std::map> input; RoaringBitmap32 roaring_1; @@ -50,15 +51,18 @@ TEST(DeletionVectorsIndexFileTest, Basic) { } input["dv2"] = std::make_shared(roaring_2); - ASSERT_FALSE(index_file.Bitmap64()); - ASSERT_OK_AND_ASSIGN(auto meta, index_file.WriteSingleFile(input)); + ASSERT_FALSE(index_file->Bitmap64()); + ASSERT_OK_AND_ASSIGN(auto meta, index_file->WriteSingleFile(input)); ASSERT_GT(meta->FileSize(), 0); + ASSERT_OK_AND_ASSIGN(auto size, index_file->FileSize(meta)); + ASSERT_EQ(meta->FileSize(), size); ASSERT_EQ(meta->IndexType(), DeletionVectorsIndexFile::DELETION_VECTORS_INDEX); ASSERT_EQ(meta->FileName(), "index-0"); + ASSERT_FALSE(index_file->IsExternalPath()); ASSERT_EQ(meta->ExternalPath(), std::nullopt); // Round trip: write then read all deletion vectors from index file. - ASSERT_OK_AND_ASSIGN(auto read_back, index_file.ReadAllDeletionVectors(meta)); + ASSERT_OK_AND_ASSIGN(auto read_back, index_file->ReadAllDeletionVectors(meta)); ASSERT_EQ(read_back.size(), input.size()); ASSERT_EQ(read_back.at("dv1")->GetCardinality(), 10); ASSERT_EQ(read_back.at("dv2")->GetCardinality(), 10); diff --git a/src/paimon/core/global_index/global_index_write_task.cpp b/src/paimon/core/global_index/global_index_write_task.cpp index 6ba17a74c..c92dbe76b 100644 --- a/src/paimon/core/global_index/global_index_write_task.cpp +++ b/src/paimon/core/global_index/global_index_write_task.cpp @@ -79,6 +79,7 @@ Result> CreateBatchReader( const std::shared_ptr& pool) { ReadContextBuilder read_context_builder(table_path); read_context_builder.SetOptions(core_options.ToMap()) + .WithFileSystem(core_options.GetFileSystem()) .EnablePrefetch(true) .WithMemoryPool(pool) .SetReadSchema({field_name}); diff --git a/src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter.cpp b/src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter.cpp index 2c4809283..b9a79b7c3 100644 --- a/src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter.cpp +++ b/src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter.cpp @@ -65,21 +65,23 @@ LookupMergeTreeCompactRewriter::Create( auto write_schema = SpecialFields::CompleteSequenceAndValueKindField(data_schema); // TODO(xinyu.lxy): set executor - // TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high memory - // usage during compaction. Will fix via parquet format refactor. ReadContextBuilder read_context_builder(path_factory_cache->RootPath()); read_context_builder.SetOptions(options.ToMap()) + .WithFileSystem(options.GetFileSystem()) .EnablePrefetch(true) .SetPrefetchMaxParallelNum(1) .SetPrefetchBatchCount(3) - .WithMemoryPool(pool) - .AddOption("parquet.read.enable-pre-buffer", "false"); + .WithMemoryPool(pool); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr read_context, read_context_builder.Finish()); - - PAIMON_ASSIGN_OR_RAISE( - std::shared_ptr internal_context, - InternalReadContext::Create(read_context, table_schema, options.ToMap())); + // TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high memory + // usage during compaction. Will fix via parquet format refactor. + auto new_options = options.ToMap(); + if (new_options.find("parquet.read.enable-pre-buffer") == new_options.end()) { + new_options["parquet.read.enable-pre-buffer"] = "false"; + } + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr internal_context, + InternalReadContext::Create(read_context, table_schema, new_options)); PAIMON_ASSIGN_OR_RAISE( std::shared_ptr path_factory, path_factory_cache->GetOrCreatePathFactory(options.GetFileFormat()->Identifier())); diff --git a/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.cpp b/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.cpp index feae38f42..a88840b46 100644 --- a/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.cpp +++ b/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.cpp @@ -71,21 +71,23 @@ Result> MergeTreeCompactRewriter::Crea auto write_schema = SpecialFields::CompleteSequenceAndValueKindField(data_schema); // TODO(xinyu.lxy): set executor - // TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high memory - // usage during compaction. Will fix via parquet format refactor. ReadContextBuilder read_context_builder(path_factory_cache->RootPath()); read_context_builder.SetOptions(options.ToMap()) + .WithFileSystem(options.GetFileSystem()) .EnablePrefetch(true) .SetPrefetchMaxParallelNum(1) .SetPrefetchBatchCount(3) - .WithMemoryPool(pool) - .AddOption("parquet.read.enable-pre-buffer", "false"); + .WithMemoryPool(pool); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr read_context, read_context_builder.Finish()); - - PAIMON_ASSIGN_OR_RAISE( - std::shared_ptr internal_context, - InternalReadContext::Create(read_context, table_schema, options.ToMap())); + // TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high memory + // usage during compaction. Will fix via parquet format refactor. + auto new_options = options.ToMap(); + if (new_options.find("parquet.read.enable-pre-buffer") == new_options.end()) { + new_options["parquet.read.enable-pre-buffer"] = "false"; + } + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr internal_context, + InternalReadContext::Create(read_context, table_schema, new_options)); PAIMON_ASSIGN_OR_RAISE( std::shared_ptr path_factory, path_factory_cache->GetOrCreatePathFactory(options.GetFileFormat()->Identifier())); diff --git a/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter_test.cpp b/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter_test.cpp index a959aed8b..b28596c80 100644 --- a/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter_test.cpp +++ b/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter_test.cpp @@ -165,6 +165,38 @@ TEST_F(MergeTreeCompactRewriterTest, TestSimple) { CheckResult(compact_file_name, fs, table_schema, expected_array); } +TEST_F(MergeTreeCompactRewriterTest, TestCancel) { + std::string origin_table_path = GetDataDir() + "/orc/pk_table_scan_and_read_mor.db/"; + auto table_dir = UniqueTestDirectory::Create("local"); + ASSERT_TRUE(TestUtil::CopyDirectory(origin_table_path, table_dir->Str())); + std::string table_path = table_dir->Str() + "/pk_table_scan_and_read_mor"; + auto fs = table_dir->GetFileSystem(); + + SchemaManager schema_manager(fs, table_path); + ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0)); + ASSERT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap(table_schema->Options())); + auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields()); + auto dv_factory = [](const std::string&) -> Result> { + return std::shared_ptr(); + }; + auto cancellation_controller = std::make_shared(); + auto path_factory_cache = + std::make_shared(table_path, table_schema, options, pool_); + ASSERT_OK_AND_ASSIGN( + auto rewriter, + MergeTreeCompactRewriter::Create( + /*bucket=*/1, /*partition=*/BinaryRowGenerator::GenerateRow({10}, pool_.get()), + table_schema, dv_factory, path_factory_cache, options, cancellation_controller, pool_)); + + ASSERT_OK_AND_ASSIGN(auto runs, GenerateSortedRuns(table_path, table_schema, /*bucket=*/1, + /*partition=*/{{"f1", "10"}})); + + // cancel compaction here + cancellation_controller->Cancel(); + ASSERT_NOK_WITH_MSG(rewriter->Rewrite(/*output_level=*/5, /*drop_delete=*/true, runs), + "Compaction is cancelled"); +} + TEST_F(MergeTreeCompactRewriterTest, TestNotDropDelete) { std::string origin_table_path = GetDataDir() + "/orc/pk_table_scan_and_read_mor.db/"; auto table_dir = UniqueTestDirectory::Create("local"); diff --git a/src/paimon/core/mergetree/compact/merge_tree_compact_task.cpp b/src/paimon/core/mergetree/compact/merge_tree_compact_task.cpp index e4c33bb8f..b702f3023 100644 --- a/src/paimon/core/mergetree/compact/merge_tree_compact_task.cpp +++ b/src/paimon/core/mergetree/compact/merge_tree_compact_task.cpp @@ -92,7 +92,7 @@ Status MergeTreeCompactTask::Rewrite(std::vector>* candid if (candidate->size() == 1) { const auto& section = (*candidate)[0]; if (section.empty()) { - return Status::OK(); + return Status::Invalid("invalid section, section cannot be empty in candidate"); } if (section.size() == 1) { for (const auto& file : section[0].Files()) { diff --git a/src/paimon/core/mergetree/compact/universal_compaction_test.cpp b/src/paimon/core/mergetree/compact/universal_compaction_test.cpp index ffe83c65b..0e95ac419 100644 --- a/src/paimon/core/mergetree/compact/universal_compaction_test.cpp +++ b/src/paimon/core/mergetree/compact/universal_compaction_test.cpp @@ -143,6 +143,21 @@ TEST_F(UniversalCompactionTest, TestPick) { ASSERT_EQ(GetFileSizeVecFromCompactUnit(pick.value()), std::vector({1, 50, 3})); } +TEST_F(UniversalCompactionTest, TestAllLevelRunsInvolved) { + int64_t current_time = 0; + auto full_compact_trigger = std::make_shared( + /*full_compaction_interval=*/std::nullopt, + /*total_size_threshold=*/std::nullopt, + /*incremental_size_threshold=*/1000l, ¤t_time); + UniversalCompaction compaction(/*max_size_amp=*/100, /*size_ratio=*/1, + /*num_run_compaction_trigger=*/3, full_compact_trigger, nullptr); + ASSERT_OK_AND_ASSIGN(auto pick, compaction.Pick(/*num_levels=*/3, CreateRunsWithLevelAndSize( + /*levels=*/{0, 0, 0}, + /*sizes=*/{1, 1, 3}))); + ASSERT_TRUE(pick); + ASSERT_EQ(GetFileSizeVecFromCompactUnit(pick.value()), std::vector({1, 1, 3})); +} + TEST_F(UniversalCompactionTest, TestOptimizedCompactionInterval) { int64_t current_time = 0; auto full_compact_trigger = std::make_shared( diff --git a/src/paimon/core/mergetree/lookup_levels.cpp b/src/paimon/core/mergetree/lookup_levels.cpp index 2921a5162..9ab74b1d7 100644 --- a/src/paimon/core/mergetree/lookup_levels.cpp +++ b/src/paimon/core/mergetree/lookup_levels.cpp @@ -55,20 +55,23 @@ Result>> LookupLevels::Create( auto partition_schema = DataField::ConvertDataFieldsToArrowSchema(partition_fields); // TODO(xinyu.lxy): set executor - // TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high memory - // usage during compaction. Will fix via parquet format refactor. ReadContextBuilder read_context_builder(path_factory->RootPath()); read_context_builder.SetOptions(options.ToMap()) + .WithFileSystem(options.GetFileSystem()) .EnablePrefetch(true) .SetPrefetchMaxParallelNum(1) .SetPrefetchBatchCount(3) - .WithMemoryPool(pool) - .AddOption("parquet.read.enable-pre-buffer", "false"); + .WithMemoryPool(pool); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr read_context, read_context_builder.Finish()); - PAIMON_ASSIGN_OR_RAISE( - std::shared_ptr internal_read_context, - InternalReadContext::Create(read_context, table_schema, options.ToMap())); + // TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high memory + // usage during compaction. Will fix via parquet format refactor. + auto new_options = options.ToMap(); + if (new_options.find("parquet.read.enable-pre-buffer") == new_options.end()) { + new_options["parquet.read.enable-pre-buffer"] = "false"; + } + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr internal_read_context, + InternalReadContext::Create(read_context, table_schema, new_options)); auto split_read = std::make_unique(path_factory, internal_read_context, pool, CreateDefaultExecutor()); diff --git a/src/paimon/core/mergetree/merge_tree_writer_test.cpp b/src/paimon/core/mergetree/merge_tree_writer_test.cpp index 8ec43814f..ad8cd0407 100644 --- a/src/paimon/core/mergetree/merge_tree_writer_test.cpp +++ b/src/paimon/core/mergetree/merge_tree_writer_test.cpp @@ -47,6 +47,7 @@ #include "paimon/fs/file_system.h" #include "paimon/fs/local/local_file_system.h" #include "paimon/memory/memory_pool.h" +#include "paimon/testing/mock/mock_file_system.h" #include "paimon/testing/utils/binary_row_generator.h" #include "paimon/testing/utils/io_exception_helper.h" #include "paimon/testing/utils/read_result_collector.h" @@ -60,6 +61,41 @@ class MergeFunctionWrapper; namespace paimon::test { class MergeTreeWriterTest : public ::testing::Test { public: + class FakeCompactManager : public paimon::CompactManager { + public: + Status AddNewFile(const std::shared_ptr& file) override { + return Status::OK(); + } + std::vector> AllFiles() const override { + static std::vector> empty; + return empty; + } + Status TriggerCompaction(bool full_compaction) override { + return Status::OK(); + } + Result>> GetCompactionResult( + bool blocking) override { + get_result_blocking_calls.push_back(blocking); + return std::optional>(); + } + void RequestCancelCompaction() override {} + void WaitForCompactionToExit() override {} + bool CompactNotCompleted() const override { + return false; + } + bool ShouldWaitForLatestCompaction() const override { + return true; + } + bool ShouldWaitForPreparingCheckpoint() const override { + return true; + } + Status Close() override { + return Status::OK(); + } + + std::vector get_result_blocking_calls; + }; + void SetUp() override { pool_ = GetDefaultPool(); file_system_ = std::make_shared(); @@ -116,6 +152,18 @@ class MergeTreeWriterTest : public ::testing::Test { ASSERT_TRUE(expected_array->Equals(result_array)) << result_array->ToString(); } + std::shared_ptr CreateMeta(const std::string& name, int32_t level) const { + return std::make_shared( + name, /*file_size=*/100, /*row_count=*/1, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/0, /*max_sequence_number=*/1, /*schema_id=*/0, level, + /*extra_files=*/std::vector>(), Timestamp(), + /*delete_row_count=*/0, + /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + } + private: std::shared_ptr pool_; std::shared_ptr file_system_; @@ -832,4 +880,135 @@ TEST_F(MergeTreeWriterTest, TestBulkData) { } } +TEST_F(MergeTreeWriterTest, TestShouldWait) { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::FILE_FORMAT, "orc"}})); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = std::make_shared(); + ASSERT_OK(path_factory->Init(dir->Str(), "orc", options.DataFilePrefix(), nullptr)); + + auto fake_compact_manager = std::make_shared(); + auto merge_writer = std::make_shared( + /*last_sequence_number=*/-1, primary_keys_, path_factory, key_comparator_, + /*user_defined_seq_comparator=*/nullptr, merge_function_wrapper_, /*schema_id=*/0, + value_schema_, options, fake_compact_manager, pool_); + + std::shared_ptr array = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Lucy", 20, 1, 14.1], + ["Paul", 20, 1, null], + ["Alice", 10, 0, 13.1] + ])") + .ValueOrDie(); + WriteBatch(array, /*row_kinds=*/{}, merge_writer.get()); + ASSERT_TRUE(fake_compact_manager->get_result_blocking_calls.empty()); + + ASSERT_OK_AND_ASSIGN(CommitIncrement commit_increment, + merge_writer->PrepareCommit(/*wait_compaction=*/false)); + ASSERT_EQ(fake_compact_manager->get_result_blocking_calls.size(), 2u); + ASSERT_TRUE(fake_compact_manager->get_result_blocking_calls[0]); + ASSERT_TRUE(fake_compact_manager->get_result_blocking_calls[1]); + ASSERT_OK(merge_writer->Close()); +} + +TEST_F(MergeTreeWriterTest, TestUpdateCompactResultDeleteIntermediateFile) { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::FILE_FORMAT, "orc"}})); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = std::make_shared(); + ASSERT_OK(path_factory->Init(dir->Str(), "orc", options.DataFilePrefix(), nullptr)); + + auto fake_compact_manager = std::make_shared(); + auto merge_writer = std::make_shared( + /*last_sequence_number=*/-1, primary_keys_, path_factory, key_comparator_, + /*user_defined_seq_comparator=*/nullptr, merge_function_wrapper_, /*schema_id=*/0, + value_schema_, options, fake_compact_manager, pool_); + + // Round 1: Before=[A], After=[X] => compact_before_=[A], compact_after_=[X] + // Round 2: Before=[X], After=[Y] => X is in compact_after_, so it's an intermediate file + auto file_a = CreateMeta("file_a", /*level=*/0); + auto file_x = CreateMeta("file_x", /*level=*/0); + auto file_y = CreateMeta("file_y", /*level=*/1); + + merge_writer->compact_before_ = {file_a}; + merge_writer->compact_after_ = {file_x}; + + auto before = std::vector>({file_x}); + auto after = std::vector>({file_y}); + auto compact_result = std::make_shared(before, after); + ASSERT_OK(merge_writer->UpdateCompactResult(compact_result)); + ASSERT_EQ(merge_writer->compact_before_, std::vector>({file_a})); + ASSERT_EQ(merge_writer->compact_after_, std::vector>({file_y})); +} + +TEST_F(MergeTreeWriterTest, TestUpdateCompactResultWithFileInCompactAfter) { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::FILE_FORMAT, "orc"}})); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = std::make_shared(); + ASSERT_OK(path_factory->Init(dir->Str(), "orc", options.DataFilePrefix(), nullptr)); + + auto fake_compact_manager = std::make_shared(); + auto merge_writer = std::make_shared( + /*last_sequence_number=*/-1, primary_keys_, path_factory, key_comparator_, + /*user_defined_seq_comparator=*/nullptr, merge_function_wrapper_, /*schema_id=*/0, + value_schema_, options, fake_compact_manager, pool_); + + // Round 1: Before=[A], After=[X@level0] => compact_after_ = [X@level0] + // Round 2 (upgrade): Before=[X@level0], After=[X@level1] + // X is in compact_after_, but also in after_files => should NOT be deleted. + auto file_a = CreateMeta("file_a", /*level=*/0); + auto file_x_level0 = CreateMeta("file_x_level0", /*level=*/0); + auto file_x_level1 = CreateMeta("file_x_level1", /*level=*/1); + + merge_writer->compact_before_ = {file_a}; + merge_writer->compact_after_ = {file_x_level0}; + + auto before = std::vector>({file_x_level0}); + auto after = std::vector>({file_x_level1}); + auto compact_result = std::make_shared(before, after); + ASSERT_OK(merge_writer->UpdateCompactResult(compact_result)); + ASSERT_EQ(merge_writer->compact_before_, std::vector>({file_a})); + ASSERT_EQ(merge_writer->compact_after_, + std::vector>({file_x_level1})); +} + +TEST_F(MergeTreeWriterTest, TestUpdateCompactResultWithFileInCompactBefore) { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::FILE_FORMAT, "orc"}})); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = std::make_shared(); + ASSERT_OK(path_factory->Init(dir->Str(), "orc", options.DataFilePrefix(), nullptr)); + + auto fake_compact_manager = std::make_shared(); + auto merge_writer = std::make_shared( + /*last_sequence_number=*/-1, primary_keys_, path_factory, key_comparator_, + /*user_defined_seq_comparator=*/nullptr, merge_function_wrapper_, /*schema_id=*/0, + value_schema_, options, fake_compact_manager, pool_); + + // Round 1 (upgrade): Before=[X@level0], After=[X@level1] + // X is not in compact_after_ yet, so it goes to compact_before_ = [X]. + // compact_after_ = [X@level1]. + // Round 2: Before=[X@level1], After=[Y] + // X@level1 is in compact_after_, so it's an intermediate file candidate. + // But in_compact_before(X) is true (from round 1), so X should NOT be deleted. + auto file_x = CreateMeta("file_x", /*level=*/0); + auto file_x_level1 = CreateMeta("file_x_level1", /*level=*/1); + auto file_y = CreateMeta("file_y", /*level=*/1); + + merge_writer->compact_before_ = {file_x}; + merge_writer->compact_after_ = {file_x_level1}; + + auto before = std::vector>({file_x_level1}); + auto after = std::vector>({file_y}); + auto compact_result = std::make_shared(before, after); + ASSERT_OK(merge_writer->UpdateCompactResult(compact_result)); + ASSERT_EQ(merge_writer->compact_before_, std::vector>({file_x})); + ASSERT_EQ(merge_writer->compact_after_, std::vector>({file_y})); +} + } // namespace paimon::test diff --git a/src/paimon/core/operation/append_only_file_store_write.cpp b/src/paimon/core/operation/append_only_file_store_write.cpp index 88dfb9277..e3ffe4473 100644 --- a/src/paimon/core/operation/append_only_file_store_write.cpp +++ b/src/paimon/core/operation/append_only_file_store_write.cpp @@ -250,16 +250,22 @@ Result> AppendOnlyFileStoreWrite::CreateFilesReader const BinaryRow& partition, int32_t bucket, DeletionVector::Factory dv_factory, const std::vector>& files) const { ReadContextBuilder context_builder(root_path_); - // TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high memory - // usage during compaction. Will fix via parquet format refactor. - context_builder.EnablePrefetch(true) + context_builder.SetOptions(options_.ToMap()) + .WithFileSystem(options_.GetFileSystem()) + .EnablePrefetch(true) .SetPrefetchMaxParallelNum(1) .SetPrefetchBatchCount(3) - .AddOption("parquet.read.enable-pre-buffer", "false"); + .WithMemoryPool(pool_); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr read_context, context_builder.Finish()); - std::map map = options_.ToMap(); + std::map options = options_.ToMap(); + // TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high + // memory usage during compaction. Will fix via parquet format refactor. + auto new_options = options; + if (new_options.find("parquet.read.enable-pre-buffer") == new_options.end()) { + new_options["parquet.read.enable-pre-buffer"] = "false"; + } PAIMON_ASSIGN_OR_RAISE(std::shared_ptr internal_read_context, - InternalReadContext::Create(read_context, table_schema_, map)); + InternalReadContext::Create(read_context, table_schema_, new_options)); auto read = std::make_unique(file_store_path_factory_, internal_read_context, pool_, compact_executor_); diff --git a/src/paimon/core/operation/data_evolution_file_store_scan_test.cpp b/src/paimon/core/operation/data_evolution_file_store_scan_test.cpp index 3bf369d5c..12e6681ab 100644 --- a/src/paimon/core/operation/data_evolution_file_store_scan_test.cpp +++ b/src/paimon/core/operation/data_evolution_file_store_scan_test.cpp @@ -46,7 +46,8 @@ class DataEvolutionFileStoreScanTest : public ::testing::Test { std::shared_ptr table_schema, TableSchema::InitSchema(schema_id, fields, /*highest_field_id=*/fields.size(), /*partition_keys=*/{}, - /*primary_keys=*/{}, /*options=*/{}, /*time_millis=*/0)); + /*primary_keys=*/{}, /*options=*/{}, /*comment=*/std::nullopt, + /*time_millis=*/0)); return table_schema; } ManifestEntry CreateManifestEntry( diff --git a/src/paimon/core/schema/table_schema.cpp b/src/paimon/core/schema/table_schema.cpp index a79d753e3..e1ab53f81 100644 --- a/src/paimon/core/schema/table_schema.cpp +++ b/src/paimon/core/schema/table_schema.cpp @@ -65,7 +65,7 @@ Result> TableSchema::Create( data_fields.push_back(data_field); } return InitSchema(schema_id, data_fields, field_id - 1, partition_keys, primary_keys, options, - DateTimeUtils::GetCurrentUTCTimeUs() / 1000); + /*comment=*/std::nullopt, DateTimeUtils::GetCurrentUTCTimeUs() / 1000); } Result> TableSchema::MakeMetaDataWithFieldId( @@ -142,19 +142,20 @@ rapidjson::Value TableSchema::ToJson(rapidjson::Document::AllocatorType* allocat RapidJsonUtil::SerializeValue(primary_keys_, allocator).Move(), *allocator); obj.AddMember(rapidjson::StringRef("options"), RapidJsonUtil::SerializeValue(options_, allocator).Move(), *allocator); - obj.AddMember(rapidjson::StringRef("timeMillis"), - RapidJsonUtil::SerializeValue(time_millis_, allocator).Move(), *allocator); if (comment_) { obj.AddMember(rapidjson::StringRef("comment"), RapidJsonUtil::SerializeValue(comment_, allocator).Move(), *allocator); } + obj.AddMember(rapidjson::StringRef("timeMillis"), + RapidJsonUtil::SerializeValue(time_millis_, allocator).Move(), *allocator); return obj; } TableSchema::TableSchema(int32_t version, int64_t id, const std::vector& fields, int32_t highest_field_id, const std::vector& partition_keys, const std::vector& primary_keys, - const std::map& options, int64_t time_millis) + const std::map& options, + const std::optional& comment, int64_t time_millis) : version_(version), id_(id), fields_(fields), @@ -162,6 +163,7 @@ TableSchema::TableSchema(int32_t version, int64_t id, const std::vector> TableSchema::CreateFromJson(const std::stri PAIMON_ASSIGN_OR_RAISE(TableSchema table_schema, TableSchema::FromJsonString(json_str)); return InitSchema(table_schema.id_, table_schema.fields_, table_schema.highest_field_id_, table_schema.partition_keys_, table_schema.primary_keys_, - table_schema.options_, table_schema.time_millis_); + table_schema.options_, table_schema.comment_, table_schema.time_millis_); } Result> TableSchema::InitSchema( int64_t schema_id, const std::vector& fields, int32_t highest_field_id, const std::vector& partition_keys, const std::vector& primary_keys, - const std::map& options, int64_t time_millis) { + const std::map& options, const std::optional& comment, + int64_t time_millis) { // validate schema first auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(fields); PAIMON_RETURN_NOT_OK(ArrowSchemaValidator::ValidateSchemaWithFieldId(*arrow_schema)); auto table_schema = std::unique_ptr( new TableSchema(TableSchema::CURRENT_VERSION, schema_id, fields, highest_field_id, - partition_keys, primary_keys, options, time_millis)); + partition_keys, primary_keys, options, comment, time_millis)); PAIMON_ASSIGN_OR_RAISE(std::vector keys, table_schema->TrimmedPrimaryKeys()); // Try to validate bucket keys diff --git a/src/paimon/core/schema/table_schema.h b/src/paimon/core/schema/table_schema.h index 6cee5f9e9..60e11b638 100644 --- a/src/paimon/core/schema/table_schema.h +++ b/src/paimon/core/schema/table_schema.h @@ -117,12 +117,14 @@ class TableSchema : public Schema, public Jsonizable { int64_t schema_id, const std::vector& fields, int32_t highest_field_id, const std::vector& partition_keys, const std::vector& primary_keys, - const std::map& options, int64_t time_millis); + const std::map& options, + const std::optional& comment, int64_t time_millis); TableSchema(int32_t version, int64_t id, const std::vector& fields, int32_t highest_field_id, const std::vector& partition_keys, const std::vector& primary_keys, - const std::map& options, int64_t time_millis); + const std::map& options, + const std::optional& comment, int64_t time_millis); Result> OriginalBucketKeys() const; diff --git a/src/paimon/core/schema/table_schema_test.cpp b/src/paimon/core/schema/table_schema_test.cpp index 359113685..f8bd9eeed 100644 --- a/src/paimon/core/schema/table_schema_test.cpp +++ b/src/paimon/core/schema/table_schema_test.cpp @@ -510,6 +510,7 @@ TEST_F(TableSchemaTest, TestToJson2) { "file.format" : "orc", "manifest.format" : "orc" }, + "comment" : "this is a comment", "timeMillis" : 1721614341162 })"; ASSERT_OK_AND_ASSIGN(std::unique_ptr schema_result, diff --git a/src/paimon/core/table/table_test.cpp b/src/paimon/core/table/table_test.cpp index 9c8a0f661..f4be04014 100644 --- a/src/paimon/core/table/table_test.cpp +++ b/src/paimon/core/table/table_test.cpp @@ -55,4 +55,29 @@ TEST(TableTest, TestCreateWithUnknownDatabase) { EXPECT_EQ(latest_schema->PrimaryKeys(), primary_keys); } +TEST(TableTest, TestCreateFailedWithNonExistSchema) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto fs = dir->GetFileSystem(); + SchemaManager schema_manager(fs, dir->Str()); + auto schema = + arrow::schema({arrow::field("id", arrow::int32(), /*nullable=*/false), + arrow::field("name", arrow::utf8()), arrow::field("value", arrow::int64())}); + std::vector partition_keys = {"name"}; + std::vector primary_keys = {"id"}; + std::map options = { + {"file.format", "orc"}, + {"commit.force-compact", "true"}, + }; + + ASSERT_OK_AND_ASSIGN([[maybe_unused]] std::unique_ptr created_schema, + schema_manager.CreateTable(schema, partition_keys, primary_keys, options)); + + // remove schema + std::string schema_path = schema_manager.ToSchemaPath(0); + ASSERT_OK(fs->Delete(schema_path, /*recursive=*/false)); + // check create table failed + ASSERT_NOK_WITH_MSG(Table::Create(fs, dir->Str(), Identifier("tbl1")), "load table schema"); +} + } // namespace paimon::test diff --git a/src/paimon/core/utils/file_store_path_factory_cache_test.cpp b/src/paimon/core/utils/file_store_path_factory_cache_test.cpp index 40a40a762..2e87450d0 100644 --- a/src/paimon/core/utils/file_store_path_factory_cache_test.cpp +++ b/src/paimon/core/utils/file_store_path_factory_cache_test.cpp @@ -27,7 +27,8 @@ TEST(FileStorePathFactoryCacheTest, TestSimple) { std::shared_ptr table_schema, TableSchema::InitSchema(/*schema_id=*/0, fields, /*highest_field_id=*/fields.size(), /*partition_keys=*/{}, - /*primary_keys=*/{}, /*options=*/{}, /*time_millis=*/0)); + /*primary_keys=*/{}, /*options=*/{}, /*comment=*/std::nullopt, + /*time_millis=*/0)); ASSERT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap({})); FileStorePathFactoryCache cache("/test_root/", table_schema, options, GetDefaultPool()); diff --git a/src/paimon/core/utils/offset_row_test.cpp b/src/paimon/core/utils/offset_row_test.cpp index f8f47b805..fce9c88ed 100644 --- a/src/paimon/core/utils/offset_row_test.cpp +++ b/src/paimon/core/utils/offset_row_test.cpp @@ -19,44 +19,88 @@ #include #include +#include "arrow/api.h" +#include "arrow/ipc/json_simple.h" #include "gtest/gtest.h" #include "paimon/common/data/binary_row.h" +#include "paimon/common/data/columnar/columnar_map.h" #include "paimon/common/data/data_define.h" +#include "paimon/common/data/generic_row.h" +#include "paimon/common/data/internal_array.h" +#include "paimon/common/data/internal_map.h" #include "paimon/common/types/row_kind.h" -#include "paimon/memory/bytes.h" +#include "paimon/common/utils/decimal_utils.h" #include "paimon/memory/memory_pool.h" #include "paimon/testing/utils/binary_row_generator.h" namespace paimon::test { TEST(OffsetRowTest, TestSimple) { - auto pool = GetDefaultPool().get(); - auto bytes = std::make_shared("world", pool); - Timestamp ts(/*millisecond=*/1000, /*nano_of_millisecond=*/10); - Decimal decimal(/*precision=*/20, /*scale=*/3, 1234567l); - auto inner_row = BinaryRowGenerator::GenerateRow( - {static_cast(0), static_cast(1), static_cast(11), - static_cast(111), static_cast(1111), static_cast(12.3), 12.34, - false, std::string("hello"), bytes, TimestampType(ts, Timestamp::MAX_PRECISION), decimal, - NullType()}, - pool); - OffsetRow row(inner_row, /*arity=*/12, /*offset=*/1); + auto pool = GetDefaultPool(); + // generate internal row + GenericRow internal_row(17); + internal_row.SetField(0, false); + internal_row.SetField(1, true); + internal_row.SetField(2, static_cast(1)); + internal_row.SetField(3, static_cast(2)); + internal_row.SetField(4, static_cast(3)); + internal_row.SetField(5, static_cast(4)); + internal_row.SetField(6, static_cast(5.1)); + internal_row.SetField(7, 6.12); + auto str = BinaryString::FromString("abcd", pool.get()); + internal_row.SetField(8, str); + std::shared_ptr bytes = Bytes::AllocateBytes("efgh", pool.get()); + internal_row.SetField(9, bytes); + std::string str9 = "apple"; + internal_row.SetField(10, std::string_view(str9.data(), str9.size())); + + Timestamp ts(100, 20); + internal_row.SetField(11, ts); + Decimal decimal(/*precision=*/30, /*scale=*/20, + DecimalUtils::StrToInt128("12345678998765432145678").value()); + internal_row.SetField(12, decimal); + + auto array = std::make_shared(BinaryArray::FromLongArray( + {static_cast(10), static_cast(20)}, pool.get())); + internal_row.SetField(13, array); + + std::shared_ptr binary_row = + BinaryRowGenerator::GenerateRowPtr({100, 200}, pool.get()); + internal_row.SetField(14, binary_row); + + auto key = arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), "[1, 2, 3]").ValueOrDie(); + auto value = + arrow::ipc::internal::json::ArrayFromJSON(arrow::int64(), "[2, 4, 6]").ValueOrDie(); + auto map = std::make_shared(key, value, pool, /*offset=*/0, /*length=*/3); + internal_row.SetField(15, map); + // do not set value at pos 16, therefore, pos 16 is null + ASSERT_EQ(internal_row.GetFieldCount(), 17); + + OffsetRow row(internal_row, /*arity=*/16, /*offset=*/1); ASSERT_EQ(row.GetRowKind().value(), RowKind::Insert()); - ASSERT_EQ(row.GetFieldCount(), 12); - ASSERT_FALSE(row.IsNullAt(0)); - ASSERT_EQ(row.GetByte(0), static_cast(1)); - ASSERT_EQ(row.GetShort(1), static_cast(11)); - ASSERT_EQ(row.GetInt(2), static_cast(111)); - ASSERT_EQ(row.GetDate(2), static_cast(111)); - ASSERT_EQ(row.GetLong(3), static_cast(1111)); - ASSERT_EQ(row.GetFloat(4), static_cast(12.3)); - ASSERT_EQ(row.GetDouble(5), static_cast(12.34)); - ASSERT_EQ(row.GetBoolean(6), false); - ASSERT_EQ(row.GetString(7).ToString(), "hello"); + ASSERT_EQ(row.GetFieldCount(), 16); + ASSERT_EQ(row.GetBoolean(0), true); + ASSERT_EQ(row.GetByte(1), static_cast(1)); + ASSERT_EQ(row.GetShort(2), static_cast(2)); + ASSERT_EQ(row.GetInt(3), static_cast(3)); + ASSERT_EQ(row.GetDate(3), static_cast(3)); + ASSERT_EQ(row.GetLong(4), static_cast(4)); + ASSERT_EQ(row.GetFloat(5), static_cast(5.1)); + ASSERT_EQ(row.GetDouble(6), static_cast(6.12)); + ASSERT_EQ(row.GetString(7), str); ASSERT_EQ(*row.GetBinary(8), *bytes); - ASSERT_EQ(row.GetTimestamp(9, Timestamp::MAX_PRECISION), ts); - ASSERT_EQ(row.GetDecimal(10, /*precision=*/20, /*scale=*/3), decimal); - ASSERT_TRUE(row.IsNullAt(11)); - ASSERT_EQ(row.ToString(), "OffsetRow, arity 12, offset 1"); + ASSERT_EQ(std::string(row.GetStringView(9)), str9); + ASSERT_EQ(row.GetTimestamp(10, /*precision=*/9), ts); + ASSERT_EQ(row.GetDecimal(11, /*precision=*/30, /*scale=*/20), decimal); + ASSERT_EQ(row.GetArray(12)->ToLongArray().value(), array->ToLongArray().value()); + auto binary_row_result = std::dynamic_pointer_cast(row.GetRow(13, 2)); + auto binary_row_expected = std::dynamic_pointer_cast(binary_row); + ASSERT_EQ(*binary_row_result, *binary_row_expected); + ASSERT_EQ(row.GetMap(14)->KeyArray()->ToIntArray().value(), + map->KeyArray()->ToIntArray().value()); + ASSERT_EQ(row.GetMap(14)->ValueArray()->ToLongArray().value(), + map->ValueArray()->ToLongArray().value()); + ASSERT_TRUE(row.IsNullAt(15)); + ASSERT_EQ(row.ToString(), "OffsetRow, arity 16, offset 1"); } } // namespace paimon::test diff --git a/src/paimon/format/avro/avro_direct_encoder.cpp b/src/paimon/format/avro/avro_direct_encoder.cpp index 20c63fea2..d8ace9f7d 100644 --- a/src/paimon/format/avro/avro_direct_encoder.cpp +++ b/src/paimon/format/avro/avro_direct_encoder.cpp @@ -101,7 +101,7 @@ Status AvroDirectEncoder::EncodeArrowToAvro(const ::avro::NodePtr& avro_node, } case ::avro::AVRO_INT: { - // AVRO_INT can represent: int32, date (days since epoch) + // AVRO_INT can represent: int8, int16, int32, date (days since epoch) switch (array.type()->id()) { case arrow::Type::INT8: { const auto& int8_array = @@ -130,7 +130,8 @@ Status AvroDirectEncoder::EncodeArrowToAvro(const ::avro::NodePtr& avro_node, } default: return Status::Invalid( - fmt::format("AVRO_INT expects Int32Array or Date32Array, got {}", + fmt::format("AVRO_INT expects Int8Array or Int16Array or Int32Array or " + "Date32Array, got {}", array.type()->ToString())); } } diff --git a/src/paimon/format/avro/avro_direct_encoder_decoder_test.cpp b/src/paimon/format/avro/avro_direct_encoder_decoder_test.cpp index decba3683..1e7e1c195 100644 --- a/src/paimon/format/avro/avro_direct_encoder_decoder_test.cpp +++ b/src/paimon/format/avro/avro_direct_encoder_decoder_test.cpp @@ -265,6 +265,21 @@ TEST_F(AvroDirectEncoderDecoderTest, TestTimestampType) { } } +TEST_F(AvroDirectEncoderDecoderTest, TestInvalidTimestampType) { + std::string schema_json = R"({"type": "long", "logicalType": "timestamp-millis"})"; + arrow::TimestampBuilder builder(arrow::timestamp(arrow::TimeUnit::NANO), + arrow::default_memory_pool()); + ASSERT_TRUE(builder.Append(1609459200123L).ok()); // 2021-01-01 00:00:00.123 + ASSERT_TRUE(builder.Append(0L).ok()); // 1970-01-01 00:00:00 + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + + auto avro_schema = ::avro::compileJsonSchemaFromString(schema_json); + ASSERT_NOK_WITH_MSG(EncodeData(avro_schema.root(), input_array), + "Unsupported timestamp type with avro logical type \"logicalType\": " + "\"timestamp-millis\" and arrow time unit NANOSECOND."); +} + TEST_F(AvroDirectEncoderDecoderTest, TestUnionType) { // Test nullable int (union of null and int) std::string schema_json = R"(["null", "int"])"; diff --git a/src/paimon/format/parquet/parquet_format_writer_test.cpp b/src/paimon/format/parquet/parquet_format_writer_test.cpp index a0bd6dd08..55951c21b 100644 --- a/src/paimon/format/parquet/parquet_format_writer_test.cpp +++ b/src/paimon/format/parquet/parquet_format_writer_test.cpp @@ -225,6 +225,40 @@ TEST_F(ParquetFormatWriterTest, TestWriteWithVariousBatchSize) { } } +TEST_F(ParquetFormatWriterTest, TestWriteWithV1Version) { + auto schema_pair = PrepareArrowSchema(); + const auto& arrow_schema = schema_pair.first; + const auto& struct_type = schema_pair.second; + std::map options; + auto record_batch_size = 10; + auto batch_capacity = 5; + std::string file_name = "test.parquet"; + std::string file_path = PathUtil::JoinPath(dir_->Str(), file_name); + ASSERT_OK_AND_ASSIGN(std::shared_ptr out, + fs_->Create(file_path, /*overwrite=*/false)); + ::parquet::WriterProperties::Builder builder; + builder.write_batch_size(batch_capacity); + builder.version(::parquet::ParquetVersion::type::PARQUET_1_0); + auto writer_properties = builder.build(); + ASSERT_OK_AND_ASSIGN( + auto format_writer, + ParquetFormatWriter::Create(out, arrow_schema, writer_properties, + DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, arrow_pool_)); + auto array = PrepareArray(struct_type, record_batch_size); + auto arrow_array = std::make_unique(); + ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok()); + + auto batch = std::make_shared( + /*partition=*/std::map(), /*bucket=*/-1, + /*row_kinds=*/std::vector(), arrow_array.get()); + ASSERT_OK(format_writer->AddBatch(batch->GetData())); + ASSERT_OK(format_writer->Flush()); + ASSERT_OK(format_writer->Finish()); + ASSERT_OK(out->Flush()); + ASSERT_OK(out->Close()); + CheckResult(file_path, record_batch_size, /*row_group_count=*/1); +} + TEST_F(ParquetFormatWriterTest, TestWriteMultipleTimes) { // arrow array length = 6 + 10 + 15 + 6 = 37 // parquet batch capacity = 10 diff --git a/src/paimon/format/parquet/parquet_writer_builder_test.cpp b/src/paimon/format/parquet/parquet_writer_builder_test.cpp index 870c06e2a..09cefd244 100644 --- a/src/paimon/format/parquet/parquet_writer_builder_test.cpp +++ b/src/paimon/format/parquet/parquet_writer_builder_test.cpp @@ -82,11 +82,13 @@ TEST(ParquetWriterBuilderTest, PrepareWriterPropertiesWithZstdLevelPriority) { std::map options; options[Options::FILE_COMPRESSION_ZSTD_LEVEL] = "4"; options[PARQUET_COMPRESSION_CODEC_ZSTD_LEVEL] = "3"; + options[PARQUET_WRITER_VERSION] = "PARQUET_1_0"; options[Options::FILE_FORMAT] = "parquet"; options[Options::MANIFEST_FORMAT] = "parquet"; ParquetWriterBuilder builder(schema, /*batch_size=*/1024 * 1024, options); ASSERT_OK_AND_ASSIGN(auto properties, builder.PrepareWriterProperties("zstd")); ASSERT_EQ(3, properties->default_column_properties().compression_level()); + ASSERT_EQ(::parquet::ParquetVersion::PARQUET_1_0, properties->version()); } { std::map options; From b5a92f504c9996ef4c1900f5bee54bfe10a14d14 Mon Sep 17 00:00:00 2001 From: "lisizhuo.lsz" Date: Fri, 17 Apr 2026 08:32:24 +0000 Subject: [PATCH 2/3] fix review --- src/paimon/core/mergetree/merge_tree_writer_test.cpp | 1 + src/paimon/core/table/table_test.cpp | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/paimon/core/mergetree/merge_tree_writer_test.cpp b/src/paimon/core/mergetree/merge_tree_writer_test.cpp index ad8cd0407..1257ace35 100644 --- a/src/paimon/core/mergetree/merge_tree_writer_test.cpp +++ b/src/paimon/core/mergetree/merge_tree_writer_test.cpp @@ -913,6 +913,7 @@ TEST_F(MergeTreeWriterTest, TestShouldWait) { } TEST_F(MergeTreeWriterTest, TestUpdateCompactResultDeleteIntermediateFile) { + // TODO(lisizhuo.lsz): test UpdateCompactResult in inte compaction test. ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({{Options::FILE_FORMAT, "orc"}})); auto dir = UniqueTestDirectory::Create(); diff --git a/src/paimon/core/table/table_test.cpp b/src/paimon/core/table/table_test.cpp index f4be04014..c9fb5e749 100644 --- a/src/paimon/core/table/table_test.cpp +++ b/src/paimon/core/table/table_test.cpp @@ -77,7 +77,9 @@ TEST(TableTest, TestCreateFailedWithNonExistSchema) { std::string schema_path = schema_manager.ToSchemaPath(0); ASSERT_OK(fs->Delete(schema_path, /*recursive=*/false)); // check create table failed - ASSERT_NOK_WITH_MSG(Table::Create(fs, dir->Str(), Identifier("tbl1")), "load table schema"); + ASSERT_NOK_WITH_MSG( + Table::Create(fs, dir->Str(), Identifier("tbl1")), + "load table schema for Identifier{database='unknown', table='tbl1'} failed"); } } // namespace paimon::test From 4bb4ede81f62f52e45fdc6eb7c808d8cc43b8481 Mon Sep 17 00:00:00 2001 From: "lisizhuo.lsz" Date: Fri, 17 Apr 2026 08:34:31 +0000 Subject: [PATCH 3/3] fix field_listagg_agg.h --- src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h index d693c7334..ff6ac149d 100644 --- a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h +++ b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h @@ -79,7 +79,7 @@ class FieldListaggAgg : public FieldAggregator { new_result.append(in_str); result_ = std::move(new_result); } - return std::string_view(result_); + return std::string_view{result_}; } private: