Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ CONF_mInt64(cumulative_size_based_compaction_lower_size_mbytes, "64");

// cumulative compaction policy: min and max delta file's number
CONF_mInt64(min_cumulative_compaction_num_singleton_deltas, "5");
CONF_mInt64(max_cumulative_compaction_num_singleton_deltas, "1000");
CONF_mInt64(max_cumulative_compaction_num_singleton_deltas, "100");

// if compaction of a tablet failed, this tablet should not be chosen to
// compaction until this interval passes.
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) {
LOG(INFO) << "succeed to do " << compaction_name() << ". tablet=" << _tablet->full_name()
<< ", output_version=" << _output_version
<< ", current_max_version=" << current_max_version
<< ", cumulative_point=" << _tablet->cumulative_layer_point()
<< ", disk=" << _tablet->data_dir()->path() << ", segments=" << segments_num
<< ". elapsed time=" << watch.get_elapse_second() << "s. cumulative_compaction_policy="
<< (cumu_policy == nullptr ? "quick" : cumu_policy->name());
Expand Down
20 changes: 10 additions & 10 deletions be/src/olap/cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void SizeBasedCumulativeCompactionPolicy::calculate_cumulative_point(
CHECK((*base_rowset_meta)->start_version() == 0);

int64_t promotion_size = 0;
_calc_promotion_size(*base_rowset_meta, &promotion_size);
_calc_promotion_size(tablet, *base_rowset_meta, &promotion_size);

int64_t prev_version = -1;
for (const RowsetMetaSharedPtr& rs : existing_rss) {
Expand Down Expand Up @@ -118,7 +118,7 @@ void SizeBasedCumulativeCompactionPolicy::calculate_cumulative_point(
}
}

void SizeBasedCumulativeCompactionPolicy::_calc_promotion_size(RowsetMetaSharedPtr base_rowset_meta,
void SizeBasedCumulativeCompactionPolicy::_calc_promotion_size(Tablet* tablet, RowsetMetaSharedPtr base_rowset_meta,
int64_t* promotion_size) {
int64_t base_size = base_rowset_meta->total_disk_size();
*promotion_size = base_size * _size_based_promotion_ratio;
Expand All @@ -129,12 +129,12 @@ void SizeBasedCumulativeCompactionPolicy::_calc_promotion_size(RowsetMetaSharedP
} else if (*promotion_size <= _size_based_promotion_min_size) {
*promotion_size = _size_based_promotion_min_size;
}
_refresh_tablet_size_based_promotion_size(*promotion_size);
_refresh_tablet_size_based_promotion_size(tablet, *promotion_size);
}

void SizeBasedCumulativeCompactionPolicy::_refresh_tablet_size_based_promotion_size(
int64_t promotion_size) {
_tablet_size_based_promotion_size = promotion_size;
Tablet* tablet, int64_t promotion_size) {
tablet->set_cumulative_promotion_size(promotion_size);
}

void SizeBasedCumulativeCompactionPolicy::update_cumulative_point(
Expand All @@ -151,13 +151,13 @@ void SizeBasedCumulativeCompactionPolicy::update_cumulative_point(
// if rowsets have no delete version, check output_rowset total disk size
// satisfies promotion size.
size_t total_size = output_rowset->rowset_meta()->total_disk_size();
if (total_size >= _tablet_size_based_promotion_size) {
if (total_size >= tablet->cumulative_promotion_size()) {
tablet->set_cumulative_layer_point(output_rowset->end_version() + 1);
}
}
}

void SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(TabletState state,
void SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet, TabletState state,
const std::vector<RowsetMetaSharedPtr>& all_metas, int64_t current_cumulative_point,
uint32_t* score) {
bool base_rowset_exist = false;
Expand Down Expand Up @@ -199,7 +199,7 @@ void SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(Table

// Use "first"(not base) version to calc promotion size
// because some tablet do not have base version(under alter operation)
_calc_promotion_size(first_meta, &promotion_size);
_calc_promotion_size(tablet, first_meta, &promotion_size);

// If base version does not exist, but its state is RUNNING.
// It is abnormal, do not select it and set *score = 0
Expand Down Expand Up @@ -239,7 +239,7 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
size_t* compaction_score) {
size_t promotion_size = _tablet_size_based_promotion_size;
size_t promotion_size = tablet->cumulative_promotion_size();
int transient_size = 0;
*compaction_score = 0;
int64_t total_size = 0;
Expand Down Expand Up @@ -396,7 +396,7 @@ int NumBasedCumulativeCompactionPolicy::pick_input_rowsets(
return transient_size;
}

void NumBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(TabletState state,
void NumBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet, TabletState state,
const std::vector<RowsetMetaSharedPtr>& all_rowsets, const int64_t current_cumulative_point,
uint32_t* score) {
const int64_t point = current_cumulative_point;
Expand Down
12 changes: 5 additions & 7 deletions be/src/olap/cumulative_compaction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class CumulativeCompactionPolicy {
/// param current_cumulative_point, current cumulative point value.
/// return score, the result score after calculate.
virtual void calc_cumulative_compaction_score(
TabletState state, const std::vector<RowsetMetaSharedPtr>& all_rowsets,
Tablet* tablet, TabletState state, const std::vector<RowsetMetaSharedPtr>& all_rowsets,
int64_t current_cumulative_point, uint32_t* score) = 0;

/// This function implements the policy which represents how to pick the candidate rowsets for compaction.
Expand Down Expand Up @@ -154,7 +154,7 @@ class NumBasedCumulativeCompactionPolicy final : public CumulativeCompactionPoli

/// Num based cumulative compaction policy implements calc cumulative compaction score function.
/// Its main policy is calculating the accumulative compaction score after current cumulative_point in tablet.
void calc_cumulative_compaction_score(TabletState state,
void calc_cumulative_compaction_score(Tablet* tablet, TabletState state,
const std::vector<RowsetMetaSharedPtr>& all_rowsets,
int64_t current_cumulative_point,
uint32_t* score) override;
Expand Down Expand Up @@ -210,7 +210,7 @@ class SizeBasedCumulativeCompactionPolicy final : public CumulativeCompactionPol

/// Num based cumulative compaction policy implements calc cumulative compaction score function.
/// Its main policy is calculating the accumulative compaction score after current cumulative_point in tablet.
void calc_cumulative_compaction_score(TabletState state,
void calc_cumulative_compaction_score(Tablet* tablet, TabletState state,
const std::vector<RowsetMetaSharedPtr>& all_rowsets,
int64_t current_cumulative_point,
uint32_t* score) override;
Expand All @@ -219,15 +219,15 @@ class SizeBasedCumulativeCompactionPolicy final : public CumulativeCompactionPol

private:
/// calculate promotion size using current base rowset meta size and promotion configs
void _calc_promotion_size(RowsetMetaSharedPtr base_rowset_meta, int64_t* promotion_size);
void _calc_promotion_size(Tablet* tablet, RowsetMetaSharedPtr base_rowset_meta, int64_t* promotion_size);

/// calculate the disk size belong to which level, the level is divide by power of 2
/// between cumulative_size_based_promotion_min_size_mbytes
/// and cumulative_size_based_promotion_size_mbytes
int _level_size(const int64_t size);

/// when policy calculate cumulative_compaction_score, update promotion size at the same time
void _refresh_tablet_size_based_promotion_size(int64_t promotion_size);
void _refresh_tablet_size_based_promotion_size(Tablet* tablet, int64_t promotion_size);

private:
/// cumulative compaction promotion size, unit is byte.
Expand All @@ -238,8 +238,6 @@ class SizeBasedCumulativeCompactionPolicy final : public CumulativeCompactionPol
int64_t _size_based_promotion_min_size;
/// lower bound size to do compaction compaction.
int64_t _size_based_compaction_lower_bound_size;
/// record tablet promotion size, it is updated each time when calculate cumulative_compaction_score
int64_t _tablet_size_based_promotion_size;
/// levels division of disk size, same level rowsets can do compaction
std::vector<int64_t> _levels;
};
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ const uint32_t Tablet::_calc_cumulative_compaction_score(
}
#endif
uint32_t score = 0;
_cumulative_compaction_policy->calc_cumulative_compaction_score(tablet_state(),
_cumulative_compaction_policy->calc_cumulative_compaction_score(this, tablet_state(),
_tablet_meta->all_rs_metas(), cumulative_layer_point(), &score);
return score;
}
Expand Down
10 changes: 10 additions & 0 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class Tablet : public BaseTablet {

inline const int64_t cumulative_layer_point() const;
inline void set_cumulative_layer_point(int64_t new_point);
inline const int64_t cumulative_promotion_size() const;
inline void set_cumulative_promotion_size(int64_t new_size);

inline size_t tablet_footprint(); // disk space occupied by tablet
inline size_t num_rows();
Expand Down Expand Up @@ -338,6 +340,7 @@ class Tablet : public BaseTablet {
std::atomic<int64_t> _last_base_compaction_success_millis;
std::atomic<int64_t> _last_quick_compaction_success_time_millis;
std::atomic<int64_t> _cumulative_point;
std::atomic<int64_t> _cumulative_promotion_size;
std::atomic<int32_t> _newly_created_rowset_num;
std::atomic<int64_t> _last_checkpoint_time;

Expand Down Expand Up @@ -402,6 +405,13 @@ inline void Tablet::set_cumulative_layer_point(int64_t new_point) {
<< ", origin: " << _cumulative_point.load();
}
}
inline const int64_t Tablet::cumulative_promotion_size() const {
return _cumulative_promotion_size;
}

inline void Tablet::set_cumulative_promotion_size(int64_t new_size) {
_cumulative_promotion_size = new_size;
}

// TODO(lingbin): Why other methods that need to get information from _tablet_meta
// are not locked, here needs a comment to explain.
Expand Down
11 changes: 2 additions & 9 deletions be/test/olap/cumulative_compaction_policy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -978,11 +978,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _calc_promotion_size_big) {
_tablet->init();
_tablet->calculate_cumulative_point();

SizeBasedCumulativeCompactionPolicy* policy =
dynamic_cast<SizeBasedCumulativeCompactionPolicy*>(
_tablet->_cumulative_compaction_policy.get());

EXPECT_EQ(1073741824, policy->_tablet_size_based_promotion_size);
EXPECT_EQ(1073741824, _tablet->cumulative_promotion_size());
}

TEST_F(TestSizeBasedCumulativeCompactionPolicy, _calc_promotion_size_small) {
Expand All @@ -997,10 +993,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _calc_promotion_size_small) {
_tablet->init();
_tablet->calculate_cumulative_point();

SizeBasedCumulativeCompactionPolicy* policy =
dynamic_cast<SizeBasedCumulativeCompactionPolicy*>(
_tablet->_cumulative_compaction_policy.get());
EXPECT_EQ(67108864, policy->_tablet_size_based_promotion_size);
EXPECT_EQ(67108864, _tablet->cumulative_promotion_size());
}

TEST_F(TestSizeBasedCumulativeCompactionPolicy, _level_size) {
Expand Down