diff --git a/be/src/common/config.h b/be/src/common/config.h index f585e92ef4fd1..e1b38cd36e15a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -935,6 +935,7 @@ CONF_mInt64(lake_vacuum_retry_max_attempts, "5"); CONF_mInt64(lake_vacuum_retry_min_delay_ms, "10"); CONF_mBool(enable_primary_key_recover, "false"); CONF_mBool(lake_enable_compaction_async_write, "false"); +CONF_mInt64(lake_pk_compaction_max_input_rowsets, "5"); CONF_mBool(dependency_librdkafka_debug_enable, "false"); diff --git a/be/src/storage/lake/compaction_policy.cpp b/be/src/storage/lake/compaction_policy.cpp index 7f47bbcbe090b..6611e0fd99f6d 100644 --- a/be/src/storage/lake/compaction_policy.cpp +++ b/be/src/storage/lake/compaction_policy.cpp @@ -133,7 +133,7 @@ class PrimaryCompactionPolicy : public CompactionPolicy { StatusOr> pick_rowsets() override; StatusOr> pick_rowsets(const std::shared_ptr& tablet_metadata, - std::vector* has_dels); + bool calc_score, std::vector* has_dels); private: int64_t _get_data_size(const std::shared_ptr& tablet_metadata) { @@ -146,11 +146,11 @@ class PrimaryCompactionPolicy : public CompactionPolicy { }; StatusOr> PrimaryCompactionPolicy::pick_rowsets() { - return pick_rowsets(_tablet_metadata, nullptr); + return pick_rowsets(_tablet_metadata, false, nullptr); } StatusOr> PrimaryCompactionPolicy::pick_rowsets( - const std::shared_ptr& tablet_metadata, std::vector* has_dels) { + const std::shared_ptr& tablet_metadata, bool calc_score, std::vector* has_dels) { std::vector input_rowsets; UpdateManager* mgr = _tablet_mgr->update_mgr(); std::priority_queue rowset_queue; @@ -184,8 +184,13 @@ StatusOr> PrimaryCompactionPolicy::pick_rowsets( input_infos << input_rowsets.back()->id() << "|"; if (cur_compaction_result_bytes > - std::max(config::update_compaction_result_bytes, compaction_data_size_threshold) || - input_rowsets.size() >= config::max_update_compaction_num_singleton_deltas) { + std::max(config::update_compaction_result_bytes, compaction_data_size_threshold)) { + break; + } + // If calc_score is true, we skip `config::lake_pk_compaction_max_input_rowsets` check, + // because `config::lake_pk_compaction_max_input_rowsets` is only used to limit the number + // of rowsets for real compaction merges + if (!calc_score && input_rowsets.size() >= config::lake_pk_compaction_max_input_rowsets) { break; } rowset_queue.pop(); @@ -200,7 +205,7 @@ StatusOr primary_compaction_score_by_policy(TabletManager* tablet_mgr, const std::shared_ptr& metadata) { PrimaryCompactionPolicy policy(tablet_mgr, metadata); std::vector has_dels; - ASSIGN_OR_RETURN(auto pick_rowsets, policy.pick_rowsets(metadata, &has_dels)); + ASSIGN_OR_RETURN(auto pick_rowsets, policy.pick_rowsets(metadata, true, &has_dels)); uint32_t segment_num_score = 0; for (int i = 0; i < pick_rowsets.size(); i++) { const auto& pick_rowset = pick_rowsets[i]; diff --git a/be/test/storage/lake/primary_key_compaction_task_test.cpp b/be/test/storage/lake/primary_key_compaction_task_test.cpp index 79639b382f7d4..e2b5635cb783a 100644 --- a/be/test/storage/lake/primary_key_compaction_task_test.cpp +++ b/be/test/storage/lake/primary_key_compaction_task_test.cpp @@ -418,15 +418,15 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_policy) { ASSERT_EQ(kChunkSize * 3, read(version)); ASSIGN_OR_ABORT(auto tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version)); ASSIGN_OR_ABORT(auto compaction_policy, CompactionPolicy::create(_tablet_mgr.get(), tablet_metadata)); - config::max_update_compaction_num_singleton_deltas = 1000; + config::lake_pk_compaction_max_input_rowsets = 1000; ASSIGN_OR_ABORT(auto input_rowsets, compaction_policy->pick_rowsets()); EXPECT_EQ(3, input_rowsets.size()); - config::max_update_compaction_num_singleton_deltas = 2; + config::lake_pk_compaction_max_input_rowsets = 2; ASSIGN_OR_ABORT(auto input_rowsets2, compaction_policy->pick_rowsets()); EXPECT_EQ(2, input_rowsets2.size()); - config::max_update_compaction_num_singleton_deltas = 1; + config::lake_pk_compaction_max_input_rowsets = 1; ASSIGN_OR_ABORT(auto input_rowsets3, compaction_policy->pick_rowsets()); EXPECT_EQ(1, input_rowsets3.size()); } @@ -484,7 +484,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_policy2) { } ASSERT_EQ(kChunkSize * 6, read(version)); - config::max_update_compaction_num_singleton_deltas = 4; + config::lake_pk_compaction_max_input_rowsets = 4; ASSIGN_OR_ABORT(auto tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version)); ASSIGN_OR_ABORT(auto compaction_policy, CompactionPolicy::create(_tablet_mgr.get(), tablet_metadata)); ASSIGN_OR_ABORT(auto input_rowsets, compaction_policy->pick_rowsets()); @@ -559,7 +559,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_policy3) { config::write_buffer_size = old_size; ASSERT_EQ(kChunkSize * 6, read(version)); - config::max_update_compaction_num_singleton_deltas = 4; + config::lake_pk_compaction_max_input_rowsets = 4; ASSIGN_OR_ABORT(auto tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version)); ASSIGN_OR_ABORT(auto compaction_policy, CompactionPolicy::create(_tablet_mgr.get(), tablet_metadata)); ASSIGN_OR_ABORT(auto input_rowsets, compaction_policy->pick_rowsets()); @@ -612,21 +612,21 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_score_by_policy) { ASSIGN_OR_ABORT(auto tablet_meta, _tablet_mgr->get_tablet_metadata(tablet_id, version)); ASSIGN_OR_ABORT(auto compaction_policy, CompactionPolicy::create(_tablet_mgr.get(), tablet_meta)); - config::max_update_compaction_num_singleton_deltas = 1000; + config::lake_pk_compaction_max_input_rowsets = 1000; ASSIGN_OR_ABORT(auto input_rowsets, compaction_policy->pick_rowsets()); EXPECT_EQ(3, input_rowsets.size()); EXPECT_EQ(3, compaction_score(_tablet_mgr.get(), tablet_meta)); - config::max_update_compaction_num_singleton_deltas = 2; + config::lake_pk_compaction_max_input_rowsets = 2; ASSIGN_OR_ABORT(auto input_rowsets2, compaction_policy->pick_rowsets()); EXPECT_EQ(2, input_rowsets2.size()); - EXPECT_EQ(2, compaction_score(_tablet_mgr.get(), tablet_meta)); + EXPECT_EQ(3, compaction_score(_tablet_mgr.get(), tablet_meta)); - config::max_update_compaction_num_singleton_deltas = 1; + config::lake_pk_compaction_max_input_rowsets = 1; ASSIGN_OR_ABORT(auto input_rowsets3, compaction_policy->pick_rowsets()); EXPECT_EQ(1, input_rowsets3.size()); - EXPECT_EQ(1, compaction_score(_tablet_mgr.get(), tablet_meta)); - config::max_update_compaction_num_singleton_deltas = 1000; + EXPECT_EQ(3, compaction_score(_tablet_mgr.get(), tablet_meta)); + config::lake_pk_compaction_max_input_rowsets = 1000; } TEST_P(LakePrimaryKeyCompactionTest, test_compaction_sorted) {