Skip to content

Commit

Permalink
[Enhancement] Optimize compaction resource usage for cloud native pri…
Browse files Browse the repository at this point in the history
…mary table (#39611)

Signed-off-by: luohaha <18810541851@163.com>
(cherry picked from commit cc38db6)
  • Loading branch information
luohaha authored and mergify[bot] committed Jan 21, 2024
1 parent 57556ad commit 8f7fa43
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 17 deletions.
1 change: 1 addition & 0 deletions be/src/common/config.h
Expand Up @@ -935,6 +935,7 @@ CONF_mInt64(lake_vacuum_retry_max_attempts, "5");
CONF_mInt64(lake_vacuum_retry_min_delay_ms, "10");
CONF_mBool(enable_primary_key_recover, "false");
CONF_mBool(lake_enable_compaction_async_write, "false");
CONF_mInt64(lake_pk_compaction_max_input_rowsets, "5");

CONF_mBool(dependency_librdkafka_debug_enable, "false");

Expand Down
17 changes: 11 additions & 6 deletions be/src/storage/lake/compaction_policy.cpp
Expand Up @@ -133,7 +133,7 @@ class PrimaryCompactionPolicy : public CompactionPolicy {

StatusOr<std::vector<RowsetPtr>> pick_rowsets() override;
StatusOr<std::vector<RowsetPtr>> pick_rowsets(const std::shared_ptr<const TabletMetadataPB>& tablet_metadata,
std::vector<bool>* has_dels);
bool calc_score, std::vector<bool>* has_dels);

private:
int64_t _get_data_size(const std::shared_ptr<const TabletMetadataPB>& tablet_metadata) {
Expand All @@ -146,11 +146,11 @@ class PrimaryCompactionPolicy : public CompactionPolicy {
};

StatusOr<std::vector<RowsetPtr>> PrimaryCompactionPolicy::pick_rowsets() {
return pick_rowsets(_tablet_metadata, nullptr);
return pick_rowsets(_tablet_metadata, false, nullptr);
}

StatusOr<std::vector<RowsetPtr>> PrimaryCompactionPolicy::pick_rowsets(
const std::shared_ptr<const TabletMetadataPB>& tablet_metadata, std::vector<bool>* has_dels) {
const std::shared_ptr<const TabletMetadataPB>& tablet_metadata, bool calc_score, std::vector<bool>* has_dels) {
std::vector<RowsetPtr> input_rowsets;
UpdateManager* mgr = _tablet_mgr->update_mgr();
std::priority_queue<RowsetCandidate> rowset_queue;
Expand Down Expand Up @@ -184,8 +184,13 @@ StatusOr<std::vector<RowsetPtr>> PrimaryCompactionPolicy::pick_rowsets(
input_infos << input_rowsets.back()->id() << "|";

if (cur_compaction_result_bytes >
std::max(config::update_compaction_result_bytes, compaction_data_size_threshold) ||
input_rowsets.size() >= config::max_update_compaction_num_singleton_deltas) {
std::max(config::update_compaction_result_bytes, compaction_data_size_threshold)) {
break;
}
// If calc_score is true, we skip `config::lake_pk_compaction_max_input_rowsets` check,
// because `config::lake_pk_compaction_max_input_rowsets` is only used to limit the number
// of rowsets for real compaction merges
if (!calc_score && input_rowsets.size() >= config::lake_pk_compaction_max_input_rowsets) {
break;
}
rowset_queue.pop();
Expand All @@ -200,7 +205,7 @@ StatusOr<uint32_t> primary_compaction_score_by_policy(TabletManager* tablet_mgr,
const std::shared_ptr<const TabletMetadataPB>& metadata) {
PrimaryCompactionPolicy policy(tablet_mgr, metadata);
std::vector<bool> has_dels;
ASSIGN_OR_RETURN(auto pick_rowsets, policy.pick_rowsets(metadata, &has_dels));
ASSIGN_OR_RETURN(auto pick_rowsets, policy.pick_rowsets(metadata, true, &has_dels));
uint32_t segment_num_score = 0;
for (int i = 0; i < pick_rowsets.size(); i++) {
const auto& pick_rowset = pick_rowsets[i];
Expand Down
22 changes: 11 additions & 11 deletions be/test/storage/lake/primary_key_compaction_task_test.cpp
Expand Up @@ -418,15 +418,15 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_policy) {
ASSERT_EQ(kChunkSize * 3, read(version));
ASSIGN_OR_ABORT(auto tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
ASSIGN_OR_ABORT(auto compaction_policy, CompactionPolicy::create(_tablet_mgr.get(), tablet_metadata));
config::max_update_compaction_num_singleton_deltas = 1000;
config::lake_pk_compaction_max_input_rowsets = 1000;
ASSIGN_OR_ABORT(auto input_rowsets, compaction_policy->pick_rowsets());
EXPECT_EQ(3, input_rowsets.size());

config::max_update_compaction_num_singleton_deltas = 2;
config::lake_pk_compaction_max_input_rowsets = 2;
ASSIGN_OR_ABORT(auto input_rowsets2, compaction_policy->pick_rowsets());
EXPECT_EQ(2, input_rowsets2.size());

config::max_update_compaction_num_singleton_deltas = 1;
config::lake_pk_compaction_max_input_rowsets = 1;
ASSIGN_OR_ABORT(auto input_rowsets3, compaction_policy->pick_rowsets());
EXPECT_EQ(1, input_rowsets3.size());
}
Expand Down Expand Up @@ -484,7 +484,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_policy2) {
}
ASSERT_EQ(kChunkSize * 6, read(version));

config::max_update_compaction_num_singleton_deltas = 4;
config::lake_pk_compaction_max_input_rowsets = 4;
ASSIGN_OR_ABORT(auto tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
ASSIGN_OR_ABORT(auto compaction_policy, CompactionPolicy::create(_tablet_mgr.get(), tablet_metadata));
ASSIGN_OR_ABORT(auto input_rowsets, compaction_policy->pick_rowsets());
Expand Down Expand Up @@ -559,7 +559,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_policy3) {
config::write_buffer_size = old_size;
ASSERT_EQ(kChunkSize * 6, read(version));

config::max_update_compaction_num_singleton_deltas = 4;
config::lake_pk_compaction_max_input_rowsets = 4;
ASSIGN_OR_ABORT(auto tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
ASSIGN_OR_ABORT(auto compaction_policy, CompactionPolicy::create(_tablet_mgr.get(), tablet_metadata));
ASSIGN_OR_ABORT(auto input_rowsets, compaction_policy->pick_rowsets());
Expand Down Expand Up @@ -612,21 +612,21 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_score_by_policy) {
ASSIGN_OR_ABORT(auto tablet_meta, _tablet_mgr->get_tablet_metadata(tablet_id, version));

ASSIGN_OR_ABORT(auto compaction_policy, CompactionPolicy::create(_tablet_mgr.get(), tablet_meta));
config::max_update_compaction_num_singleton_deltas = 1000;
config::lake_pk_compaction_max_input_rowsets = 1000;
ASSIGN_OR_ABORT(auto input_rowsets, compaction_policy->pick_rowsets());
EXPECT_EQ(3, input_rowsets.size());
EXPECT_EQ(3, compaction_score(_tablet_mgr.get(), tablet_meta));

config::max_update_compaction_num_singleton_deltas = 2;
config::lake_pk_compaction_max_input_rowsets = 2;
ASSIGN_OR_ABORT(auto input_rowsets2, compaction_policy->pick_rowsets());
EXPECT_EQ(2, input_rowsets2.size());
EXPECT_EQ(2, compaction_score(_tablet_mgr.get(), tablet_meta));
EXPECT_EQ(3, compaction_score(_tablet_mgr.get(), tablet_meta));

config::max_update_compaction_num_singleton_deltas = 1;
config::lake_pk_compaction_max_input_rowsets = 1;
ASSIGN_OR_ABORT(auto input_rowsets3, compaction_policy->pick_rowsets());
EXPECT_EQ(1, input_rowsets3.size());
EXPECT_EQ(1, compaction_score(_tablet_mgr.get(), tablet_meta));
config::max_update_compaction_num_singleton_deltas = 1000;
EXPECT_EQ(3, compaction_score(_tablet_mgr.get(), tablet_meta));
config::lake_pk_compaction_max_input_rowsets = 1000;
}

TEST_P(LakePrimaryKeyCompactionTest, test_compaction_sorted) {
Expand Down

0 comments on commit 8f7fa43

Please sign in to comment.