Skip to content

Commit

Permalink
Add basic kRoundRobin compaction policy
Browse files Browse the repository at this point in the history
  • Loading branch information
zczhu committed Jun 10, 2022
1 parent ecfd4ae commit 5911b87
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 7 deletions.
20 changes: 19 additions & 1 deletion db/compaction/compaction_picker_level.cc
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,10 @@ bool LevelCompactionBuilder::PickFileToCompact() {
// do not pick a file to compact if it is being compacted
// from n-1 level.
if (f->being_compacted) {
if (ioptions_.compaction_pri == kRoundRobin) {
// Simply move forward the cursor if a file is being compacted
vstorage_->IncrCurrentFileToCursor(start_level_);
}
continue;
}

Expand All @@ -460,6 +464,13 @@ bool LevelCompactionBuilder::PickFileToCompact() {
// A locked (pending compaction) input-level file was pulled in due to
// user-key overlap.
start_level_inputs_.clear();

// To ensure every files is selcted in a round-robin manner, we cannot
// skip the current file. So we return false and wait for the next time
// we can pick this file to compact
if (ioptions_.compaction_pri == kRoundRobin) {
return false;
}
continue;
}

Expand All @@ -479,6 +490,10 @@ bool LevelCompactionBuilder::PickFileToCompact() {
!compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
&output_level_inputs)) {
start_level_inputs_.clear();
// The same reason as above to ensure the round-robin compaction
if (ioptions_.compaction_pri == kRoundRobin) {
return false;
}
continue;
}
base_index_ = index;
Expand All @@ -487,7 +502,10 @@ bool LevelCompactionBuilder::PickFileToCompact() {

// store where to start the iteration in the next call to PickCompaction
vstorage_->SetNextCompactionIndex(start_level_, cmp_idx);

if (ioptions_.compaction_pri == kRoundRobin) {
// update the compact cursor if it is round-robin
vstorage_->IncrCurrentFileToCursor(start_level_);
}
return start_level_inputs_.size() > 0;
}

Expand Down
80 changes: 80 additions & 0 deletions db/compaction/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,86 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlapping4) {
ASSERT_EQ(6U, compaction->input(0, 0)->fd.GetNumber());
}

TEST_F(CompactionPickerTest, CompactionPriRoundRobin) {
std::vector<InternalKey> test_cursors = {InternalKey("249", 100, kTypeValue),
InternalKey("600", 100, kTypeValue),
InternalKey()};
std::vector<uint32_t> selected_files = {8U, 6U, 6U};
std::vector<int> next_file_idxes = {0, 1, 1};
ioptions_.compaction_pri = kRoundRobin;
mutable_cf_options_.max_bytes_for_level_base = 10000000;
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
for (size_t i = 0; i < test_cursors.size(); i++) {
// start a brand new version in each test.
NewVersionStorage(6, kCompactionStyleLevel);
vstorage_->ResizeCompactCursors(6);
// Set the cursor
vstorage_->AddCursorForOneLevel(2, test_cursors[i]);
Add(2, 6U, "150", "199", 50000000U); // Overlap with 26U, 27U
Add(2, 7U, "200", "249", 50000000U); // File not overlapping
Add(2, 8U, "300", "600", 50000000U); // Overlap with 28U, 29U

Add(3, 26U, "130", "165", 60000000U);
Add(3, 27U, "166", "170", 60000000U);
Add(3, 28U, "270", "340", 60000000U);
Add(3, 29U, "401", "500", 60000000U);
UpdateVersionStorageInfo();
LevelCompactionPicker local_level_compaction_picker =
LevelCompactionPicker(ioptions_, &icmp_);
std::unique_ptr<Compaction> compaction(
local_level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(selected_files[i], compaction->input(0, 0)->fd.GetNumber());
const std::vector<int> next_file_to_cursor_by_level =
vstorage_->GetCurrentFileToCursor();
ASSERT_EQ(next_file_to_cursor_by_level[2], next_file_idxes[i]);

// release the version storage
DeleteVersionStorage();
}
}

TEST_F(CompactionPickerTest, CompactionPriRoundRobinWithCompactingFile) {
InternalKey test_cursor = InternalKey("200", 100, kTypeValue);
ioptions_.compaction_pri = kRoundRobin;
mutable_cf_options_.max_bytes_for_level_base = 10000000;
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
// start a brand new version in each test.
NewVersionStorage(6, kCompactionStyleLevel);
vstorage_->ResizeCompactCursors(6);
// Set the cursor
vstorage_->AddCursorForOneLevel(2, test_cursor);
Add(2, 6U, "150", "199", 50000000U);
Add(2, 7U, "200", "249", 50000000U); // is being compacted
Add(2, 8U, "300", "600", 50000000U);

Add(3, 26U, "130", "165", 60000000U);
Add(3, 27U, "166", "170", 60000000U);
Add(3, 28U, "270", "340", 60000000U);
Add(3, 29U, "401", "500", 60000000U);
vstorage_->LevelFiles(2)[1]->being_compacted = true;

UpdateVersionStorageInfo();
LevelCompactionPicker local_level_compaction_picker =
LevelCompactionPicker(ioptions_, &icmp_);
std::unique_ptr<Compaction> compaction(
local_level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(8U, compaction->input(0, 0)->fd.GetNumber());
const std::vector<int> next_file_to_cursor_by_level =
vstorage_->GetCurrentFileToCursor();
ASSERT_EQ(next_file_to_cursor_by_level[2], 0);

// release the version storage
DeleteVersionStorage();
}

// This test exhibits the bug where we don't properly reset parent_index in
// PickCompaction()
TEST_F(CompactionPickerTest, ParentIndexResetBug) {
Expand Down
3 changes: 2 additions & 1 deletion db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5219,7 +5219,8 @@ INSTANTIATE_TEST_CASE_P(
::testing::Values(CompactionPri::kByCompensatedSize,
CompactionPri::kOldestLargestSeqFirst,
CompactionPri::kOldestSmallestSeqFirst,
CompactionPri::kMinOverlappingRatio));
CompactionPri::kMinOverlappingRatio,
CompactionPri::kRoundRobin));

class NoopMergeOperator : public MergeOperator {
public:
Expand Down
4 changes: 2 additions & 2 deletions db/version_edit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -512,15 +512,15 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;

case kCompactPointer:
case kCompactCursor:
if (GetLevel(&input, &level, &msg) &&
GetInternalKey(&input, &key)) {
// we don't use compact pointers anymore,
// but we should not fail if they are still
// in manifest
} else {
if (!msg) {
msg = "compaction pointer";
msg = "compaction cursor";
}
}
break;
Expand Down
2 changes: 1 addition & 1 deletion db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ enum Tag : uint32_t {
kLogNumber = 2,
kNextFileNumber = 3,
kLastSequence = 4,
kCompactPointer = 5,
kCompactCursor = 5,
kDeletedFile = 6,
kNewFile = 7,
// 8 was used for large value refs
Expand Down
82 changes: 82 additions & 0 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1796,6 +1796,8 @@ VersionStorageInfo::VersionStorageInfo(
compaction_score_(num_levels_),
compaction_level_(num_levels_),
l0_delay_trigger_count_(0),
compact_cursor_(num_levels_),
current_file_to_cursor_(num_levels_, 0),
accumulated_file_size_(0),
accumulated_raw_key_size_(0),
accumulated_raw_value_size_(0),
Expand Down Expand Up @@ -3193,6 +3195,80 @@ void SortFileByOverlappingRatio(
file_to_order[f2.file->fd.GetNumber()];
});
}

void MoveFileByRoundRobin(const InternalKeyComparator& icmp,
std::vector<InternalKey>* compact_cursor,
std::vector<int>* current_file_to_cursor,
bool level0_non_overlapping, int level,
std::vector<Fsize>* temp) {
if (level == 0 && !level0_non_overlapping) {
// Using kOldestSmallestSeqFirst when level === 0, since the
// files may overlap (not fully sorted)
std::sort(temp->begin(), temp->end(),
[](const Fsize& f1, const Fsize& f2) -> bool {
return f1.file->fd.smallest_seqno < f2.file->fd.smallest_seqno;
});
return;
}

bool should_move_files =
compact_cursor->at(level).Valid() && temp->size() > 1;

// The iterator points to the Fsize with smallest key larger than or equal to
// the given cursor
std::vector<Fsize>::iterator current_file_iter;
if (should_move_files) {
// Find the file of which the smallest key is larger than or equal to
// the cursor (the smallest key in the successor file of the last
// chosen file), skip this if the cursor is invalid or there is only
// one file in this level
current_file_iter = std::lower_bound(
temp->begin(), temp->end(), compact_cursor->at(level),
[&](const Fsize& f, const InternalKey& cursor) -> bool {
return icmp.Compare(cursor, f.file->smallest) > 0;
});

should_move_files = current_file_iter != temp->end();
}
if (should_move_files) {
// Set the next file using the current iterator, the next file will be
// updated/advanced when "UpdateCompactCursor(int level)" is called".
if (current_file_iter == temp->end()) {
// Retart from the begining if we cannot find a qualified file with
// smallest key larger than/equal to the cursor
current_file_to_cursor->at(level) = 0;
} else {
current_file_to_cursor->at(level) = (int)current_file_iter->index;
}

// Construct a local temporary vector
std::vector<Fsize> local_temp;
local_temp.reserve(temp->size());
// Move the selected File into the first position and its successors
// into the second, third, ..., positions
for (auto iter = current_file_iter; iter != temp->end(); iter++) {
local_temp.push_back(*iter);
}
// Move the origin predecessors of the selected file in a round-robin
// manner
for (auto iter = temp->begin(); iter != current_file_iter; iter++) {
local_temp.push_back(*iter);
}
// Replace all the items in temp
for (size_t i = 0; i < local_temp.size(); i++) {
temp->at(i) = local_temp[i];
}
} else if (temp->size() >= 1) {
// Do not move files if the stored cursor is invalid, or cannot find
// file with smallest key larger than or equal to the cursor. Rest
// the cursor as the smalleast key of the second file.
current_file_to_cursor->at(level) = 0;
} else {
// No more than 1 file exists in this level. An empty key will be assigned
// to the compact cursor if the next file index is -1.
current_file_to_cursor->at(level) = -1;
}
}
} // namespace

void VersionStorageInfo::UpdateFilesByCompactionPri(
Expand Down Expand Up @@ -3245,6 +3321,11 @@ void VersionStorageInfo::UpdateFilesByCompactionPri(
files_[level + 1], ioptions.clock, level,
num_non_empty_levels_, options.ttl, &temp);
break;
case kRoundRobin:
MoveFileByRoundRobin(*internal_comparator_, &compact_cursor_,
&current_file_to_cursor_, level0_non_overlapping_,
level, &temp);
break;
default:
assert(false);
}
Expand Down Expand Up @@ -5280,6 +5361,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
delete[] vstorage -> files_;
vstorage->files_ = new_files_list;
vstorage->num_levels_ = new_levels;
vstorage->ResizeCompactCursors(new_levels);

MutableCFOptions mutable_cf_options(*options);
VersionEdit ve;
Expand Down
36 changes: 36 additions & 0 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,35 @@ class VersionStorageInfo {

void AddFile(int level, FileMetaData* f);

// Resize/Initialize the space for compact_cursor_
void ResizeCompactCursors(int level) {
compact_cursor_.resize(level, InternalKey());
}

const std::vector<InternalKey>& GetCompactCursors() const {
return compact_cursor_;
}

const std::vector<int>& GetCurrentFileToCursor() const {
return current_file_to_cursor_;
}

// REQUIRES: ResizeCompactCursors has been called
void AddCursorForOneLevel(int level,
const InternalKey& smallest_uncompacted_key) {
compact_cursor_[level] = smallest_uncompacted_key;
}

// REQUIRES: lock is held
// Update the compact cursor and advance the file index so that it can point
// to the next cursor
void IncrCurrentFileToCursor(int level) {
current_file_to_cursor_[level] =
(current_file_to_cursor_[level] + 1) % files_[level].size();
compact_cursor_[level] =
files_[level][(size_t)current_file_to_cursor_[level]]->smallest;
}

void ReserveBlob(size_t size) { blob_files_.reserve(size); }

void AddBlobFile(std::shared_ptr<BlobFileMetaData> blob_file_meta);
Expand Down Expand Up @@ -657,6 +686,13 @@ class VersionStorageInfo {
int l0_delay_trigger_count_ = 0; // Count used to trigger slow down and stop
// for number of L0 files.

// Compact cursors for round-robin compactions in each level
std::vector<InternalKey> compact_cursor_;
// The current compact file index in a round-robin manner. We update the
// current file to be compacted in `UpdateFilesByCompactionPri(...)` instead
// of the current compact cursor.
std::vector<int> current_file_to_cursor_;

// the following are the sampled temporary stats.
// the current accumulated size of sampled files.
uint64_t accumulated_file_size_;
Expand Down
5 changes: 5 additions & 0 deletions include/rocksdb/advanced_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ enum CompactionPri : char {
// and its size is the smallest. It in many cases can optimize write
// amplification.
kMinOverlappingRatio = 0x3,
// Keeps a cursor(s) of the successor of the file (key range) was/were
// compacted before, and always picks the next files (key range) in that
// level. The file picking process will cycle through all the files in a
// round-robin manner.
kRoundRobin = 0x4,
};

struct CompactionOptionsFIFO {
Expand Down
6 changes: 4 additions & 2 deletions options/options_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ std::map<CompactionPri, std::string> OptionsHelper::compaction_pri_to_string = {
{kByCompensatedSize, "kByCompensatedSize"},
{kOldestLargestSeqFirst, "kOldestLargestSeqFirst"},
{kOldestSmallestSeqFirst, "kOldestSmallestSeqFirst"},
{kMinOverlappingRatio, "kMinOverlappingRatio"}};
{kMinOverlappingRatio, "kMinOverlappingRatio"},
{kRoundRobin, "kRoundRobin"}};

std::map<CompactionStopStyle, std::string>
OptionsHelper::compaction_stop_style_to_string = {
Expand Down Expand Up @@ -829,7 +830,8 @@ std::unordered_map<std::string, CompactionPri>
{"kByCompensatedSize", kByCompensatedSize},
{"kOldestLargestSeqFirst", kOldestLargestSeqFirst},
{"kOldestSmallestSeqFirst", kOldestSmallestSeqFirst},
{"kMinOverlappingRatio", kMinOverlappingRatio}};
{"kMinOverlappingRatio", kMinOverlappingRatio},
{"kRoundRobin", kRoundRobin}};

std::unordered_map<std::string, CompactionStopStyle>
OptionsHelper::compaction_stop_style_string_map = {
Expand Down

0 comments on commit 5911b87

Please sign in to comment.