Skip to content

Commit

Permalink
[Enhancement] make lake pk compaction upper threshold configurable (#…
Browse files Browse the repository at this point in the history
…35129)

Signed-off-by: luohaha <18810541851@163.com>
(cherry picked from commit a1ac17e)

# Conflicts:
#	be/src/storage/lake/compaction_policy.cpp
  • Loading branch information
luohaha authored and mergify[bot] committed Nov 17, 2023
1 parent 9c8c4ce commit 1bd9bd4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ CONF_mInt64(update_compaction_size_threshold, "268435456");
CONF_mInt64(update_compaction_result_bytes, "1073741824");
// This config controls the io amp ratio of delvec files.
CONF_mInt32(update_compaction_delvec_file_io_amp_ratio, "2");
// This config defines the maximum percentage of data allowed per compaction
CONF_mDouble(update_compaction_ratio_threshold, "0.5");

CONF_mInt32(repair_compaction_interval_seconds, "600"); // 10 min
CONF_Int32(manual_compaction_threads, "4");
Expand Down
15 changes: 13 additions & 2 deletions be/src/storage/lake/compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,14 @@ StatusOr<std::vector<RowsetPtr>> PrimaryCompactionPolicy::pick_rowsets(const Tab
std::vector<RowsetPtr> input_rowsets;
UpdateManager* mgr = _tablet->update_mgr();
std::priority_queue<RowsetCandidate> rowset_queue;
<<<<<<< HEAD
const auto tablet_data_size = _get_data_size(tablet_metadata);
=======
const auto tablet_id = tablet_metadata->id();
const auto tablet_version = tablet_metadata->version();
const int64_t compaction_data_size_threshold =
static_cast<int64_t>((double)_get_data_size(tablet_metadata) * config::update_compaction_ratio_threshold);
>>>>>>> a1ac17edd6 ([Enhancement] make lake pk compaction upper threshold configurable (#35129))
for (const auto& rowset_pb : tablet_metadata->rowsets()) {
RowsetStat stat;
stat.num_rows = rowset_pb.num_rows();
Expand All @@ -160,19 +167,23 @@ StatusOr<std::vector<RowsetPtr>> PrimaryCompactionPolicy::pick_rowsets(const Tab
while (!rowset_queue.empty()) {
const auto& rowset_candidate = rowset_queue.top();
cur_compaction_result_bytes += rowset_candidate.read_bytes();
<<<<<<< HEAD
if (input_rowsets.size() > 0 &&
cur_compaction_result_bytes > std::max(config::update_compaction_result_bytes * 2, tablet_data_size / 2)) {
break;
}
input_rowsets.emplace_back(
std::make_shared<Rowset>(_tablet.get(), std::move(rowset_candidate.rowset_meta_ptr)));
=======
input_rowsets.emplace_back(std::make_shared<Rowset>(tablet, std::move(rowset_candidate.rowset_meta_ptr)));
>>>>>>> a1ac17edd6 ([Enhancement] make lake pk compaction upper threshold configurable (#35129))
if (has_dels != nullptr) {
has_dels->push_back(rowset_candidate.delete_bytes() > 0);
}
input_infos << input_rowsets.back()->id() << "|";

// Allow to merge half of this tablet
if (cur_compaction_result_bytes > std::max(config::update_compaction_result_bytes, tablet_data_size / 2) ||
if (cur_compaction_result_bytes >
std::max(config::update_compaction_result_bytes, compaction_data_size_threshold) ||
input_rowsets.size() >= config::max_update_compaction_num_singleton_deltas) {
break;
}
Expand Down

0 comments on commit 1bd9bd4

Please sign in to comment.