diff --git a/db/compaction/compaction_picker_level.cc b/db/compaction/compaction_picker_level.cc index 87d1e8e63da..a20b4839d91 100644 --- a/db/compaction/compaction_picker_level.cc +++ b/db/compaction/compaction_picker_level.cc @@ -448,6 +448,10 @@ bool LevelCompactionBuilder::PickFileToCompact() { // do not pick a file to compact if it is being compacted // from n-1 level. if (f->being_compacted) { + if (ioptions_.compaction_pri == kRoundRobin) { + // Simply move forward the cursor if a file is being compacted + vstorage_->IncrCurrentFileToCursor(start_level_); + } continue; } @@ -460,6 +464,13 @@ bool LevelCompactionBuilder::PickFileToCompact() { // A locked (pending compaction) input-level file was pulled in due to // user-key overlap. start_level_inputs_.clear(); + + // To ensure every files is selcted in a round-robin manner, we cannot + // skip the current file. So we return false and wait for the next time + // we can pick this file to compact + if (ioptions_.compaction_pri == kRoundRobin) { + return false; + } continue; } @@ -479,6 +490,10 @@ bool LevelCompactionBuilder::PickFileToCompact() { !compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_, &output_level_inputs)) { start_level_inputs_.clear(); + // The same reason as above to ensure the round-robin compaction + if (ioptions_.compaction_pri == kRoundRobin) { + return false; + } continue; } base_index_ = index; @@ -487,7 +502,10 @@ bool LevelCompactionBuilder::PickFileToCompact() { // store where to start the iteration in the next call to PickCompaction vstorage_->SetNextCompactionIndex(start_level_, cmp_idx); - + if (ioptions_.compaction_pri == kRoundRobin) { + // update the compact cursor if it is round-robin + vstorage_->IncrCurrentFileToCursor(start_level_); + } return start_level_inputs_.size() > 0; } diff --git a/db/compaction/compaction_picker_test.cc b/db/compaction/compaction_picker_test.cc index fca6ccd69a2..3c0fba06fe6 100644 --- a/db/compaction/compaction_picker_test.cc +++ b/db/compaction/compaction_picker_test.cc @@ -1311,6 +1311,86 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlapping4) { ASSERT_EQ(6U, compaction->input(0, 0)->fd.GetNumber()); } +TEST_F(CompactionPickerTest, CompactionPriRoundRobin) { + std::vector test_cursors = {InternalKey("249", 100, kTypeValue), + InternalKey("600", 100, kTypeValue), + InternalKey()}; + std::vector selected_files = {8U, 6U, 6U}; + std::vector next_file_idxes = {0, 1, 1}; + ioptions_.compaction_pri = kRoundRobin; + mutable_cf_options_.max_bytes_for_level_base = 10000000; + mutable_cf_options_.max_bytes_for_level_multiplier = 10; + for (size_t i = 0; i < test_cursors.size(); i++) { + // start a brand new version in each test. + NewVersionStorage(6, kCompactionStyleLevel); + vstorage_->ResizeCompactCursors(6); + // Set the cursor + vstorage_->AddCursorForOneLevel(2, test_cursors[i]); + Add(2, 6U, "150", "199", 50000000U); // Overlap with 26U, 27U + Add(2, 7U, "200", "249", 50000000U); // File not overlapping + Add(2, 8U, "300", "600", 50000000U); // Overlap with 28U, 29U + + Add(3, 26U, "130", "165", 60000000U); + Add(3, 27U, "166", "170", 60000000U); + Add(3, 28U, "270", "340", 60000000U); + Add(3, 29U, "401", "500", 60000000U); + UpdateVersionStorageInfo(); + LevelCompactionPicker local_level_compaction_picker = + LevelCompactionPicker(ioptions_, &icmp_); + std::unique_ptr compaction( + local_level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(1U, compaction->num_input_files(0)); + ASSERT_EQ(selected_files[i], compaction->input(0, 0)->fd.GetNumber()); + const std::vector next_file_to_cursor_by_level = + vstorage_->GetCurrentFileToCursor(); + ASSERT_EQ(next_file_to_cursor_by_level[2], next_file_idxes[i]); + + // release the version storage + DeleteVersionStorage(); + } +} + +TEST_F(CompactionPickerTest, CompactionPriRoundRobinWithCompactingFile) { + InternalKey test_cursor = InternalKey("200", 100, kTypeValue); + ioptions_.compaction_pri = kRoundRobin; + mutable_cf_options_.max_bytes_for_level_base = 10000000; + mutable_cf_options_.max_bytes_for_level_multiplier = 10; + // start a brand new version in each test. + NewVersionStorage(6, kCompactionStyleLevel); + vstorage_->ResizeCompactCursors(6); + // Set the cursor + vstorage_->AddCursorForOneLevel(2, test_cursor); + Add(2, 6U, "150", "199", 50000000U); + Add(2, 7U, "200", "249", 50000000U); // is being compacted + Add(2, 8U, "300", "600", 50000000U); + + Add(3, 26U, "130", "165", 60000000U); + Add(3, 27U, "166", "170", 60000000U); + Add(3, 28U, "270", "340", 60000000U); + Add(3, 29U, "401", "500", 60000000U); + vstorage_->LevelFiles(2)[1]->being_compacted = true; + + UpdateVersionStorageInfo(); + LevelCompactionPicker local_level_compaction_picker = + LevelCompactionPicker(ioptions_, &icmp_); + std::unique_ptr compaction( + local_level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(1U, compaction->num_input_files(0)); + ASSERT_EQ(8U, compaction->input(0, 0)->fd.GetNumber()); + const std::vector next_file_to_cursor_by_level = + vstorage_->GetCurrentFileToCursor(); + ASSERT_EQ(next_file_to_cursor_by_level[2], 0); + + // release the version storage + DeleteVersionStorage(); +} + // This test exhibits the bug where we don't properly reset parent_index in // PickCompaction() TEST_F(CompactionPickerTest, ParentIndexResetBug) { diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index d307cadbbc2..becb0be168a 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -5219,7 +5219,8 @@ INSTANTIATE_TEST_CASE_P( ::testing::Values(CompactionPri::kByCompensatedSize, CompactionPri::kOldestLargestSeqFirst, CompactionPri::kOldestSmallestSeqFirst, - CompactionPri::kMinOverlappingRatio)); + CompactionPri::kMinOverlappingRatio, + CompactionPri::kRoundRobin)); class NoopMergeOperator : public MergeOperator { public: diff --git a/db/version_edit.cc b/db/version_edit.cc index 8e45b353e7d..3f4769ac3fc 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -512,7 +512,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; - case kCompactPointer: + case kCompactCursor: if (GetLevel(&input, &level, &msg) && GetInternalKey(&input, &key)) { // we don't use compact pointers anymore, @@ -520,7 +520,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) { // in manifest } else { if (!msg) { - msg = "compaction pointer"; + msg = "compaction cursor"; } } break; diff --git a/db/version_edit.h b/db/version_edit.h index 38e4ad37271..ca716cb180c 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -35,7 +35,7 @@ enum Tag : uint32_t { kLogNumber = 2, kNextFileNumber = 3, kLastSequence = 4, - kCompactPointer = 5, + kCompactCursor = 5, kDeletedFile = 6, kNewFile = 7, // 8 was used for large value refs diff --git a/db/version_set.cc b/db/version_set.cc index bbe450a7205..4e2b16971ca 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1796,6 +1796,8 @@ VersionStorageInfo::VersionStorageInfo( compaction_score_(num_levels_), compaction_level_(num_levels_), l0_delay_trigger_count_(0), + compact_cursor_(num_levels_), + current_file_to_cursor_(num_levels_, 0), accumulated_file_size_(0), accumulated_raw_key_size_(0), accumulated_raw_value_size_(0), @@ -3193,6 +3195,80 @@ void SortFileByOverlappingRatio( file_to_order[f2.file->fd.GetNumber()]; }); } + +void MoveFileByRoundRobin(const InternalKeyComparator& icmp, + std::vector* compact_cursor, + std::vector* current_file_to_cursor, + bool level0_non_overlapping, int level, + std::vector* temp) { + if (level == 0 && !level0_non_overlapping) { + // Using kOldestSmallestSeqFirst when level === 0, since the + // files may overlap (not fully sorted) + std::sort(temp->begin(), temp->end(), + [](const Fsize& f1, const Fsize& f2) -> bool { + return f1.file->fd.smallest_seqno < f2.file->fd.smallest_seqno; + }); + return; + } + + bool should_move_files = + compact_cursor->at(level).Valid() && temp->size() > 1; + + // The iterator points to the Fsize with smallest key larger than or equal to + // the given cursor + std::vector::iterator current_file_iter; + if (should_move_files) { + // Find the file of which the smallest key is larger than or equal to + // the cursor (the smallest key in the successor file of the last + // chosen file), skip this if the cursor is invalid or there is only + // one file in this level + current_file_iter = std::lower_bound( + temp->begin(), temp->end(), compact_cursor->at(level), + [&](const Fsize& f, const InternalKey& cursor) -> bool { + return icmp.Compare(cursor, f.file->smallest) > 0; + }); + + should_move_files = current_file_iter != temp->end(); + } + if (should_move_files) { + // Set the next file using the current iterator, the next file will be + // updated/advanced when "UpdateCompactCursor(int level)" is called". + if (current_file_iter == temp->end()) { + // Retart from the begining if we cannot find a qualified file with + // smallest key larger than/equal to the cursor + current_file_to_cursor->at(level) = 0; + } else { + current_file_to_cursor->at(level) = (int)current_file_iter->index; + } + + // Construct a local temporary vector + std::vector local_temp; + local_temp.reserve(temp->size()); + // Move the selected File into the first position and its successors + // into the second, third, ..., positions + for (auto iter = current_file_iter; iter != temp->end(); iter++) { + local_temp.push_back(*iter); + } + // Move the origin predecessors of the selected file in a round-robin + // manner + for (auto iter = temp->begin(); iter != current_file_iter; iter++) { + local_temp.push_back(*iter); + } + // Replace all the items in temp + for (size_t i = 0; i < local_temp.size(); i++) { + temp->at(i) = local_temp[i]; + } + } else if (temp->size() >= 1) { + // Do not move files if the stored cursor is invalid, or cannot find + // file with smallest key larger than or equal to the cursor. Rest + // the cursor as the smalleast key of the second file. + current_file_to_cursor->at(level) = 0; + } else { + // No more than 1 file exists in this level. An empty key will be assigned + // to the compact cursor if the next file index is -1. + current_file_to_cursor->at(level) = -1; + } +} } // namespace void VersionStorageInfo::UpdateFilesByCompactionPri( @@ -3245,6 +3321,11 @@ void VersionStorageInfo::UpdateFilesByCompactionPri( files_[level + 1], ioptions.clock, level, num_non_empty_levels_, options.ttl, &temp); break; + case kRoundRobin: + MoveFileByRoundRobin(*internal_comparator_, &compact_cursor_, + ¤t_file_to_cursor_, level0_non_overlapping_, + level, &temp); + break; default: assert(false); } @@ -5280,6 +5361,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, delete[] vstorage -> files_; vstorage->files_ = new_files_list; vstorage->num_levels_ = new_levels; + vstorage->ResizeCompactCursors(new_levels); MutableCFOptions mutable_cf_options(*options); VersionEdit ve; diff --git a/db/version_set.h b/db/version_set.h index 8f0073a8975..4db7ab1a70e 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -130,6 +130,35 @@ class VersionStorageInfo { void AddFile(int level, FileMetaData* f); + // Resize/Initialize the space for compact_cursor_ + void ResizeCompactCursors(int level) { + compact_cursor_.resize(level, InternalKey()); + } + + const std::vector& GetCompactCursors() const { + return compact_cursor_; + } + + const std::vector& GetCurrentFileToCursor() const { + return current_file_to_cursor_; + } + + // REQUIRES: ResizeCompactCursors has been called + void AddCursorForOneLevel(int level, + const InternalKey& smallest_uncompacted_key) { + compact_cursor_[level] = smallest_uncompacted_key; + } + + // REQUIRES: lock is held + // Update the compact cursor and advance the file index so that it can point + // to the next cursor + void IncrCurrentFileToCursor(int level) { + current_file_to_cursor_[level] = + (current_file_to_cursor_[level] + 1) % files_[level].size(); + compact_cursor_[level] = + files_[level][(size_t)current_file_to_cursor_[level]]->smallest; + } + void ReserveBlob(size_t size) { blob_files_.reserve(size); } void AddBlobFile(std::shared_ptr blob_file_meta); @@ -657,6 +686,13 @@ class VersionStorageInfo { int l0_delay_trigger_count_ = 0; // Count used to trigger slow down and stop // for number of L0 files. + // Compact cursors for round-robin compactions in each level + std::vector compact_cursor_; + // The current compact file index in a round-robin manner. We update the + // current file to be compacted in `UpdateFilesByCompactionPri(...)` instead + // of the current compact cursor. + std::vector current_file_to_cursor_; + // the following are the sampled temporary stats. // the current accumulated size of sampled files. uint64_t accumulated_file_size_; diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index e3f4ccee72a..746e468e1da 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -54,6 +54,11 @@ enum CompactionPri : char { // and its size is the smallest. It in many cases can optimize write // amplification. kMinOverlappingRatio = 0x3, + // Keeps a cursor(s) of the successor of the file (key range) was/were + // compacted before, and always picks the next files (key range) in that + // level. The file picking process will cycle through all the files in a + // round-robin manner. + kRoundRobin = 0x4, }; struct CompactionOptionsFIFO { diff --git a/options/options_helper.cc b/options/options_helper.cc index 65eb708c165..38510c7c8d4 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -319,7 +319,8 @@ std::map OptionsHelper::compaction_pri_to_string = { {kByCompensatedSize, "kByCompensatedSize"}, {kOldestLargestSeqFirst, "kOldestLargestSeqFirst"}, {kOldestSmallestSeqFirst, "kOldestSmallestSeqFirst"}, - {kMinOverlappingRatio, "kMinOverlappingRatio"}}; + {kMinOverlappingRatio, "kMinOverlappingRatio"}, + {kRoundRobin, "kRoundRobin"}}; std::map OptionsHelper::compaction_stop_style_to_string = { @@ -829,7 +830,8 @@ std::unordered_map {"kByCompensatedSize", kByCompensatedSize}, {"kOldestLargestSeqFirst", kOldestLargestSeqFirst}, {"kOldestSmallestSeqFirst", kOldestSmallestSeqFirst}, - {"kMinOverlappingRatio", kMinOverlappingRatio}}; + {"kMinOverlappingRatio", kMinOverlappingRatio}, + {"kRoundRobin", kRoundRobin}}; std::unordered_map OptionsHelper::compaction_stop_style_string_map = {