Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable a multi-level db to smoothly migrate to FIFO via DB::Open #10348

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions HISTORY.md
Expand Up @@ -4,6 +4,9 @@
* `DeleteRange()` now supports user-defined timestamp.
* Provide support for async_io with tailing iterators when ReadOptions.tailing is enabled during scans.
* Tiered Storage: allow data moving up from the last level to the penultimate level if the input level is penultimate level or above.
* FIFO compaction now supports migrating from a multi-level DB via DB::Open(). During the migration phase, FIFO compaction picker will:
* picks the sst file with the smallest starting key in the bottom-most non-empty level.
* Note that during the migration phase, the file purge order will only be an approximation of "FIFO" as files in lower-level might sometime contain newer keys than files in upper-level.

### Bug Fixes
* Fix a bug in io_uring_prep_cancel in AbortIO API for posix which expects sqe->addr to match with read request submitted and wrong paramter was being passed.
Expand Down
1 change: 0 additions & 1 deletion db/column_family.cc
Expand Up @@ -291,7 +291,6 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
}

if (result.compaction_style == kCompactionStyleFIFO) {
result.num_levels = 1;
// since we delete level0 files in FIFO compaction when there are too many
// of them, these options don't really mean anything
result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
Expand Down
113 changes: 87 additions & 26 deletions db/compaction/compaction_picker_fifo.cc
Expand Up @@ -121,20 +121,45 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
return c;
}

// The size-based compaction picker for FIFO.
//
// When the entire column family size exceeds max_table_files_size, FIFO will
// try to delete the oldest sst file(s) until the resulting column family size
// is smaller than max_table_files_size.
//
// This function also takes care the case where a DB is migrating from level /
// universal compaction to FIFO compaction. During the migration, the column
// family will also have non-L0 files while FIFO can only create L0 files.
// In this case, this function will first purge the sst files in the bottom-
// most non-empty level first, and the DB will eventually converge to the
// regular FIFO case where there're only L0 files. Note that during the
// migration case, the purge order will only be an approximation of "FIFO"
// as entries inside lower-level files might sometimes be newer than some
// entries inside upper-level files.
Compaction* FIFOCompactionPicker::PickSizeCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer) {
const int kLevel0 = 0;
const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
uint64_t total_size = GetTotalFilesSize(level_files);
// compute the total size and identify the last non-empty level
int last_level = 0;
uint64_t total_size = 0;
for (int level = 0; level < vstorage->num_levels(); ++level) {
auto level_size = GetTotalFilesSize(vstorage->LevelFiles(level));
total_size += level_size;
if (level_size > 0) {
last_level = level;
}
}
const std::vector<FileMetaData*>& last_level_files =
vstorage->LevelFiles(last_level);

if (total_size <=
mutable_cf_options.compaction_options_fifo.max_table_files_size ||
level_files.size() == 0) {
// total size not exceeded
if (last_level == 0 &&
total_size <=
mutable_cf_options.compaction_options_fifo.max_table_files_size) {
// total size not exceeded, try to find intra level 0 compaction if enabled
const std::vector<FileMetaData*>& level0_files = vstorage->LevelFiles(0);
if (mutable_cf_options.compaction_options_fifo.allow_compaction &&
level_files.size() > 0) {
level0_files.size() > 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intra-level0 compaction logic is untouched. It will still only do compaction for level 0.

CompactionInputFiles comp_inputs;
// try to prevent same files from being compacted multiple times, which
// could produce large files that may never TTL-expire. Achieve this by
Expand All @@ -146,7 +171,7 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
static_cast<uint64_t>(mutable_cf_options.write_buffer_size),
1.1));
if (FindIntraL0Compaction(
level_files,
level0_files,
mutable_cf_options
.level0_file_num_compaction_trigger /* min_files_to_compact */
,
Expand Down Expand Up @@ -187,27 +212,58 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(

std::vector<CompactionInputFiles> inputs;
inputs.emplace_back();
inputs[0].level = 0;
inputs[0].level = last_level;

for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
auto f = *ritr;
total_size -= f->fd.file_size;
inputs[0].files.push_back(f);
char tmp_fsize[16];
AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize));
ROCKS_LOG_BUFFER(log_buffer,
"[%s] FIFO compaction: picking file %" PRIu64
" with size %s for deletion",
cf_name.c_str(), f->fd.GetNumber(), tmp_fsize);
if (total_size <=
mutable_cf_options.compaction_options_fifo.max_table_files_size) {
break;
if (last_level == 0) {
// In L0, right-most files are the oldest files.
for (auto ritr = last_level_files.rbegin(); ritr != last_level_files.rend();
++ritr) {
auto f = *ritr;
total_size -= f->fd.file_size;
inputs[0].files.push_back(f);
char tmp_fsize[16];
AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize));
ROCKS_LOG_BUFFER(log_buffer,
"[%s] FIFO compaction: picking file %" PRIu64
" with size %s for deletion",
cf_name.c_str(), f->fd.GetNumber(), tmp_fsize);
if (total_size <=
mutable_cf_options.compaction_options_fifo.max_table_files_size) {
break;
}
}
} else {
// If the last level is non-L0, we actually don't know which file is
// logically the oldest since the file creation time only represents
// when this file was compacted to this level, which is independent
// to when the entries in this file were first inserted.
//
// As a result, we delete files from the left instead. This means the sst
// file with the smallest key will be deleted first. This design decision
// better serves a major type of FIFO use cases where smaller keys are
// associated with older data.
for (const auto& f : last_level_files) {
total_size -= f->fd.file_size;
inputs[0].files.push_back(f);
char tmp_fsize[16];
AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize));
ROCKS_LOG_BUFFER(log_buffer,
"[%s] FIFO compaction: picking file %" PRIu64
" with size %s for deletion",
cf_name.c_str(), f->fd.GetNumber(), tmp_fsize);
if (total_size <=
mutable_cf_options.compaction_options_fifo.max_table_files_size) {
break;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are lots of common code in this if-else. Wonder whether there is a way to consolidate the logic. I can't think of a very easy way though. Perhaps define a lambda function and call it in two places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lambda sounds like a good idea. Originally I was trying to see whether I can define an iterator variable and initialize it based on whether it's forward or reverse, but I later found iterator and reverse-iterator are not compatible in their types.

}
}

Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, mutable_db_options,
std::move(inputs), 0, 0, 0, 0, kNoCompression,
std::move(inputs), last_level,
/* target_file_size */ 0,
/* max_compaction_bytes */ 0,
/* output_path_id */ 0, kNoCompression,
mutable_cf_options.compression_opts, Temperature::kUnknown,
/* max_subcompactions */ 0, {}, /* is manual */ false,
/* trim_ts */ "", vstorage->CompactionScore(0),
Expand All @@ -224,6 +280,13 @@ Compaction* FIFOCompactionPicker::PickCompactionToWarm(
return nullptr;
}

// PickCompactionToWarm is only triggered if there is no non-L0 files.
for (int level = 1; level < vstorage->num_levels(); ++level) {
if (GetTotalFilesSize(vstorage->LevelFiles(level)) > 0) {
return nullptr;
}
}

const int kLevel0 = 0;
const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);

Expand Down Expand Up @@ -327,8 +390,6 @@ Compaction* FIFOCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer, SequenceNumber /*earliest_memtable_seqno*/) {
assert(vstorage->num_levels() == 1);

Compaction* c = nullptr;
if (mutable_cf_options.ttl > 0) {
c = PickTTLCompaction(cf_name, mutable_cf_options, mutable_db_options,
Expand Down
6 changes: 3 additions & 3 deletions db/db_bloom_filter_test.cc
Expand Up @@ -1706,15 +1706,15 @@ TEST_F(DBBloomFilterTest, ContextCustomFilterPolicy) {
ASSERT_OK(Put(1, Key(maxKey + 55555), Key(maxKey + 55555)));
Flush(1);
EXPECT_EQ(policy->DumpTestReport(),
fifo ? "cf=abe,s=kCompactionStyleFIFO,n=1,l=0,b=0,r=kFlush\n"
fifo ? "cf=abe,s=kCompactionStyleFIFO,n=7,l=0,b=0,r=kFlush\n"
: "cf=bob,s=kCompactionStyleLevel,n=7,l=0,b=0,r=kFlush\n");

for (int i = maxKey / 2; i < maxKey; i++) {
ASSERT_OK(Put(1, Key(i), Key(i)));
}
Flush(1);
EXPECT_EQ(policy->DumpTestReport(),
fifo ? "cf=abe,s=kCompactionStyleFIFO,n=1,l=0,b=0,r=kFlush\n"
fifo ? "cf=abe,s=kCompactionStyleFIFO,n=7,l=0,b=0,r=kFlush\n"
: "cf=bob,s=kCompactionStyleLevel,n=7,l=0,b=0,r=kFlush\n");

// Check that they can be found
Expand All @@ -1738,7 +1738,7 @@ TEST_F(DBBloomFilterTest, ContextCustomFilterPolicy) {
EXPECT_LE(useful_count, maxKey * 2 * (fifo ? 0.9995 : 0.98));
}

if (!fifo) { // FIFO only has L0
if (!fifo) { // FIFO doesn't fully support CompactRange
// Full compaction
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr));
Expand Down
1 change: 0 additions & 1 deletion db/db_impl/db_impl_compaction_flush.cc
Expand Up @@ -3279,7 +3279,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
c->column_family_data());
assert(c->num_input_files(1) == 0);
assert(c->level() == 0);
assert(c->column_family_data()->ioptions()->compaction_style ==
kCompactionStyleFIFO);

Expand Down
12 changes: 0 additions & 12 deletions db/db_impl/db_impl_open.cc
Expand Up @@ -1944,18 +1944,6 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,

if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
auto* vstorage = cfd->current()->storage_info();
for (int i = 1; i < vstorage->num_levels(); ++i) {
int num_files = vstorage->NumLevelFiles(i);
if (num_files > 0) {
s = Status::InvalidArgument(
"Not all files are at level 0. Cannot "
"open with FIFO compaction style.");
break;
}
}
}
if (!cfd->mem()->IsSnapshotSupported()) {
impl->is_snapshot_supported_ = false;
}
Expand Down
131 changes: 130 additions & 1 deletion db/db_test.cc
Expand Up @@ -483,6 +483,135 @@ TEST_F(DBTest, LevelLimitReopen) {
}
#endif // ROCKSDB_LITE

#ifndef ROCKSDB_LITE
TEST_F(DBTest, LevelReopenWithFIFO) {
const int kLevelCount = 4;
const int kKeyCount = 5;
const int kTotalSstFileCount = kLevelCount * kKeyCount;
const int kCF = 1;

Options options = CurrentOptions();
// Config level0_file_num_compaction_trigger to prevent L0 files being
// automatically compacted while we are constructing a LSM tree structure
// to test multi-level FIFO compaction.
options.level0_file_num_compaction_trigger = kKeyCount + 1;
CreateAndReopenWithCF({"pikachu"}, options);

// The expected number of files per level after each file creation.
const std::string expected_files_per_level[kLevelCount][kKeyCount] = {
{"0,0,0,1", "0,0,0,2", "0,0,0,3", "0,0,0,4", "0,0,0,5"},
{"0,0,1,5", "0,0,2,5", "0,0,3,5", "0,0,4,5", "0,0,5,5"},
{"0,1,5,5", "0,2,5,5", "0,3,5,5", "0,4,5,5", "0,5,5,5"},
{"1,5,5,5", "2,5,5,5", "3,5,5,5", "4,5,5,5", "5,5,5,5"},
};

const std::string expected_entries[kKeyCount][kLevelCount + 1] = {
{"[ ]", "[ a3 ]", "[ a2, a3 ]", "[ a1, a2, a3 ]", "[ a0, a1, a2, a3 ]"},
{"[ ]", "[ b3 ]", "[ b2, b3 ]", "[ b1, b2, b3 ]", "[ b0, b1, b2, b3 ]"},
{"[ ]", "[ c3 ]", "[ c2, c3 ]", "[ c1, c2, c3 ]", "[ c0, c1, c2, c3 ]"},
{"[ ]", "[ d3 ]", "[ d2, d3 ]", "[ d1, d2, d3 ]", "[ d0, d1, d2, d3 ]"},
{"[ ]", "[ e3 ]", "[ e2, e3 ]", "[ e1, e2, e3 ]", "[ e0, e1, e2, e3 ]"},
};

// The loop below creates the following LSM tree where each (k, v) pair
// represents a file that contains that entry. When a file is created,
// the db is reopend with FIFO compaction and verified the LSM tree
// structure is still the same.
//
// The resulting LSM tree will contain 5 different keys. Each key as
// 4 different versions, located in different level.
//
// L0: (e, e0) (d, d0) (c, c0) (b, b0) (a, a0)
// L1: (a, a1) (b, b1) (c, c1) (d, d1) (e, e1)
// L2: (a, a2) (b, b2) (c, c2) (d, d2) (e, e2)
// L3: (a, a3) (b, b3) (c, c3) (d, d3) (e, e3)
for (int l = 0; l < kLevelCount; ++l) {
int level = kLevelCount - 1 - l;
for (int p = 0; p < kKeyCount; ++p) {
std::string put_key = std::string(1, char('a' + p));
ASSERT_OK(Put(kCF, put_key, put_key + std::to_string(level)));
ASSERT_OK(Flush(kCF));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
for (int g = 0; g < kKeyCount; ++g) {
int entry_count = (p >= g) ? l + 1 : l;
std::string get_key = std::string(1, char('a' + g));
CheckAllEntriesWithFifoReopen(expected_entries[g][entry_count], get_key,
kCF, {"pikachu"}, options);
}
if (level != 0) {
MoveFilesToLevel(level, kCF);
for (int g = 0; g < kKeyCount; ++g) {
int entry_count = (p >= g) ? l + 1 : l;
std::string get_key = std::string(1, char('a' + g));
CheckAllEntriesWithFifoReopen(expected_entries[g][entry_count],
get_key, kCF, {"pikachu"}, options);
}
}
ASSERT_EQ(expected_files_per_level[l][p], FilesPerLevel(kCF));
}
}

// The expected number of sst files in each level after each FIFO compaction
// that deletes the oldest sst file.
const std::string expected_files_per_level_after_fifo[] = {
"5,5,5,4", "5,5,5,3", "5,5,5,2", "5,5,5,1", "5,5,5", "5,5,4", "5,5,3",
"5,5,2", "5,5,1", "5,5", "5,4", "5,3", "5,2", "5,1",
"5", "4", "3", "2", "1", "",
};

// The expected value entries of each key after each FIFO compaction.
// This verifies whether FIFO removes the file with the smallest key in non-L0
// files first then the oldest files in L0.
const std::string expected_entries_after_fifo[kKeyCount][kLevelCount + 1] = {
{"[ a0, a1, a2, a3 ]", "[ a0, a1, a2 ]", "[ a0, a1 ]", "[ a0 ]", "[ ]"},
{"[ b0, b1, b2, b3 ]", "[ b0, b1, b2 ]", "[ b0, b1 ]", "[ b0 ]", "[ ]"},
{"[ c0, c1, c2, c3 ]", "[ c0, c1, c2 ]", "[ c0, c1 ]", "[ c0 ]", "[ ]"},
{"[ d0, d1, d2, d3 ]", "[ d0, d1, d2 ]", "[ d0, d1 ]", "[ d0 ]", "[ ]"},
{"[ e0, e1, e2, e3 ]", "[ e0, e1, e2 ]", "[ e0, e1 ]", "[ e0 ]", "[ ]"},
};

// In the 2nd phase, we reopen the DB with FIFO compaction. In each reopen,
// we config max_table_files_size so that FIFO will remove exactly one file
// at a time upon compaction, and we will use it to verify whether the sst
// files are deleted in the correct order.
for (int i = 0; i < kTotalSstFileCount; ++i) {
uint64_t total_sst_files_size = 0;
ASSERT_TRUE(dbfull()->GetIntProperty(
handles_[1], "rocksdb.total-sst-files-size", &total_sst_files_size));
ASSERT_TRUE(total_sst_files_size > 0);

Options fifo_options(options);
fifo_options.compaction_style = kCompactionStyleFIFO;
options.create_if_missing = false;
fifo_options.max_open_files = -1;
fifo_options.disable_auto_compactions = false;
// Config max_table_files_size to be total_sst_files_size - 1 so that
// FIFO will delete one file.
fifo_options.compaction_options_fifo.max_table_files_size =
total_sst_files_size - 1;
ASSERT_OK(
TryReopenWithColumnFamilies({"default", "pikachu"}, fifo_options));
// For FIFO to pick a compaction
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]));
ASSERT_OK(dbfull()->TEST_WaitForCompact(false));
for (int g = 0; g < kKeyCount; ++g) {
std::string get_key = std::string(1, char('a' + g));
int status_index = i / kKeyCount;
if ((i % kKeyCount) >= g) {
// If true, then it means the sst file containing the get_key in the
// current level has already been deleted, so we need to move the
// status_index for checking the expected value.
status_index++;
}
CheckAllEntriesWithFifoReopen(
expected_entries_after_fifo[g][status_index], get_key, kCF,
{"pikachu"}, options);
}
ASSERT_EQ(expected_files_per_level_after_fifo[i], FilesPerLevel(kCF));
}
}
#endif // !ROCKSDB_LITE

TEST_F(DBTest, PutSingleDeleteGet) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
Expand Down Expand Up @@ -668,7 +797,7 @@ TEST_F(DBTest, SingleDeletePutFlush) {
ASSERT_OK(Flush(1));

ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
// Skip FIFO and universal compaction beccaus they do not apply to the test
// Skip FIFO and universal compaction because they do not apply to the test
// case. Skip MergePut because single delete does not get removed when it
// encounters a merge.
} while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
Expand Down