Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix corruption with intra-L0 on ingested files #5958

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Rocksdb Change Log
## Unreleased
### Bug Fixes
* Fix data corruption casued by output of intra-L0 compaction on ingested file not being placed in correct order in L0.

### Public API Change
* Added an API GetCreationTimeOfOldestFile(uint64_t* creation_time) to get the
file_creation_time of the oldest SST file in the DB.
Expand Down
6 changes: 5 additions & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -929,8 +929,12 @@ bool ColumnFamilyData::NeedsCompaction() const {

Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
SequenceNumber earliest_mem_seqno =
std::min(mem_->GetEarliestSequenceNumber(),
imm_.current()->GetEarliestSequenceNumber(false));
auto* result = compaction_picker_->PickCompaction(
GetName(), mutable_options, current_->storage_info(), log_buffer);
GetName(), mutable_options, current_->storage_info(), log_buffer,
earliest_mem_seqno);
if (result != nullptr) {
result->SetInputVersion(current_);
}
Expand Down
44 changes: 32 additions & 12 deletions db/compaction/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,32 +39,52 @@ bool FindIntraL0Compaction(const std::vector<FileMetaData*>& level_files,
size_t min_files_to_compact,
uint64_t max_compact_bytes_per_del_file,
uint64_t max_compaction_bytes,
CompactionInputFiles* comp_inputs) {
size_t compact_bytes = static_cast<size_t>(level_files[0]->fd.file_size);
uint64_t compensated_compact_bytes = level_files[0]->compensated_file_size;
CompactionInputFiles* comp_inputs,
SequenceNumber earliest_mem_seqno) {
// Do not pick ingested file when there is at least one memtable not flushed
// which of seqno is overlap with the sst.
size_t start = 0;
for (; start < level_files.size(); start++) {
if (level_files[start]->being_compacted) {
return false;
}
// If there is no data in memtable, the earliest sequence number would the
// largest sequence number in last memtable.
// Because all files are sorted in descending order by largest_seqno, so we
// only need to check the first one.
if (level_files[start]->fd.largest_seqno <= earliest_mem_seqno) {
break;
}
}
if (start >= level_files.size()) {
return false;
}
size_t compact_bytes = static_cast<size_t>(level_files[start]->fd.file_size);
uint64_t compensated_compact_bytes =
level_files[start]->compensated_file_size;
size_t compact_bytes_per_del_file = port::kMaxSizet;
// Compaction range will be [0, span_len).
size_t span_len;
// Compaction range will be [start, limit).
size_t limit;
// Pull in files until the amount of compaction work per deleted file begins
// increasing or maximum total compaction size is reached.
size_t new_compact_bytes_per_del_file = 0;
for (span_len = 1; span_len < level_files.size(); ++span_len) {
compact_bytes += static_cast<size_t>(level_files[span_len]->fd.file_size);
compensated_compact_bytes += level_files[span_len]->compensated_file_size;
new_compact_bytes_per_del_file = compact_bytes / span_len;
if (level_files[span_len]->being_compacted ||
for (limit = start + 1; limit < level_files.size(); ++limit) {
compact_bytes += static_cast<size_t>(level_files[limit]->fd.file_size);
compensated_compact_bytes += level_files[limit]->compensated_file_size;
new_compact_bytes_per_del_file = compact_bytes / (limit - start);
if (level_files[limit]->being_compacted ||
new_compact_bytes_per_del_file > compact_bytes_per_del_file ||
compensated_compact_bytes > max_compaction_bytes) {
break;
}
compact_bytes_per_del_file = new_compact_bytes_per_del_file;
}

if (span_len >= min_files_to_compact &&
if ((limit - start) >= min_files_to_compact &&
compact_bytes_per_del_file < max_compact_bytes_per_del_file) {
assert(comp_inputs != nullptr);
comp_inputs->level = 0;
for (size_t i = 0; i < span_len; ++i) {
for (size_t i = start; i < limit; ++i) {
comp_inputs->files.push_back(level_files[i]);
}
return true;
Expand Down
27 changes: 14 additions & 13 deletions db/compaction/compaction_picker.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ class CompactionPicker {
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) = 0;
virtual Compaction* PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer,
SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) = 0;

// Return a compaction object for compacting the range [begin,end] in
// the specified level. Returns nullptr if there is nothing in that
Expand Down Expand Up @@ -247,10 +247,11 @@ class NullCompactionPicker : public CompactionPicker {
virtual ~NullCompactionPicker() {}

// Always return "nullptr"
Compaction* PickCompaction(const std::string& /*cf_name*/,
const MutableCFOptions& /*mutable_cf_options*/,
VersionStorageInfo* /*vstorage*/,
LogBuffer* /*log_buffer*/) override {
Compaction* PickCompaction(
const std::string& /*cf_name*/,
const MutableCFOptions& /*mutable_cf_options*/,
VersionStorageInfo* /*vstorage*/, LogBuffer* /* log_buffer */,
SequenceNumber /* earliest_memtable_seqno */) override {
return nullptr;
}

Expand Down Expand Up @@ -292,11 +293,11 @@ class NullCompactionPicker : public CompactionPicker {
// files. Cannot be nullptr.
//
// @return true iff compaction was found.
bool FindIntraL0Compaction(const std::vector<FileMetaData*>& level_files,
size_t min_files_to_compact,
uint64_t max_compact_bytes_per_del_file,
uint64_t max_compaction_bytes,
CompactionInputFiles* comp_inputs);
bool FindIntraL0Compaction(
const std::vector<FileMetaData*>& level_files, size_t min_files_to_compact,
uint64_t max_compact_bytes_per_del_file, uint64_t max_compaction_bytes,
CompactionInputFiles* comp_inputs,
SequenceNumber earliest_mem_seqno = kMaxSequenceNumber);

CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
const VersionStorageInfo* vstorage,
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_picker_fifo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(

Compaction* FIFOCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
VersionStorageInfo* vstorage, LogBuffer* log_buffer,
SequenceNumber /*earliest_memtable_seqno*/) {
assert(vstorage->num_levels() == 1);

Compaction* c = nullptr;
Expand Down
8 changes: 4 additions & 4 deletions db/compaction/compaction_picker_fifo.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ class FIFOCompactionPicker : public CompactionPicker {
const InternalKeyComparator* icmp)
: CompactionPicker(ioptions, icmp) {}

virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* version,
LogBuffer* log_buffer) override;
virtual Compaction* PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* version, LogBuffer* log_buffer,
SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) override;

virtual Compaction* CompactRange(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
Expand Down
17 changes: 11 additions & 6 deletions db/compaction/compaction_picker_level.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ class LevelCompactionBuilder {
public:
LevelCompactionBuilder(const std::string& cf_name,
VersionStorageInfo* vstorage,
SequenceNumber earliest_mem_seqno,
CompactionPicker* compaction_picker,
LogBuffer* log_buffer,
const MutableCFOptions& mutable_cf_options,
const ImmutableCFOptions& ioptions)
: cf_name_(cf_name),
vstorage_(vstorage),
earliest_mem_seqno_(earliest_mem_seqno),
compaction_picker_(compaction_picker),
log_buffer_(log_buffer),
mutable_cf_options_(mutable_cf_options),
Expand Down Expand Up @@ -97,6 +99,7 @@ class LevelCompactionBuilder {

const std::string& cf_name_;
VersionStorageInfo* vstorage_;
SequenceNumber earliest_mem_seqno_;
CompactionPicker* compaction_picker_;
LogBuffer* log_buffer_;
int start_level_ = -1;
Expand Down Expand Up @@ -537,17 +540,19 @@ bool LevelCompactionBuilder::PickIntraL0Compaction() {
// resort to L0->L0 compaction yet.
return false;
}
return FindIntraL0Compaction(
level_files, kMinFilesForIntraL0Compaction, port::kMaxUint64,
mutable_cf_options_.max_compaction_bytes, &start_level_inputs_);
return FindIntraL0Compaction(level_files, kMinFilesForIntraL0Compaction,
port::kMaxUint64,
mutable_cf_options_.max_compaction_bytes,
&start_level_inputs_, earliest_mem_seqno_);
}
} // namespace

Compaction* LevelCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
LevelCompactionBuilder builder(cf_name, vstorage, this, log_buffer,
mutable_cf_options, ioptions_);
VersionStorageInfo* vstorage, LogBuffer* log_buffer,
SequenceNumber earliest_mem_seqno) {
LevelCompactionBuilder builder(cf_name, vstorage, earliest_mem_seqno, this,
log_buffer, mutable_cf_options, ioptions_);
return builder.PickCompaction();
}
} // namespace rocksdb
8 changes: 4 additions & 4 deletions db/compaction/compaction_picker_level.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ class LevelCompactionPicker : public CompactionPicker {
LevelCompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: CompactionPicker(ioptions, icmp) {}
virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) override;
virtual Compaction* PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer,
SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) override;

virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage) const override;
Expand Down
56 changes: 44 additions & 12 deletions db/compaction/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1486,12 +1486,12 @@ TEST_F(CompactionPickerTest, IntraL0MaxCompactionBytesNotHit) {
// All 5 L0 files will be picked for intra L0 compaction. The one L1 file
// spans entire L0 key range and is marked as being compacted to avoid
// L0->L1 compaction.
Add(0, 1U, "100", "150", 200000U);
Add(0, 2U, "151", "200", 200000U);
Add(0, 3U, "201", "250", 200000U);
Add(0, 4U, "251", "300", 200000U);
Add(0, 5U, "301", "350", 200000U);
Add(1, 6U, "100", "350", 200000U);
Add(0, 1U, "100", "150", 200000U, 0, 100, 101);
Add(0, 2U, "151", "200", 200000U, 0, 102, 103);
Add(0, 3U, "201", "250", 200000U, 0, 104, 105);
Add(0, 4U, "251", "300", 200000U, 0, 106, 107);
Add(0, 5U, "301", "350", 200000U, 0, 108, 109);
Add(1, 6U, "100", "350", 200000U, 0, 110, 111);
vstorage_->LevelFiles(1)[0]->being_compacted = true;
UpdateVersionStorageInfo();

Expand All @@ -1516,12 +1516,12 @@ TEST_F(CompactionPickerTest, IntraL0MaxCompactionBytesHit) {
// max_compaction_bytes limit (the minimum number of files for triggering
// intra L0 compaction is 4). The one L1 file spans entire L0 key range and
// is marked as being compacted to avoid L0->L1 compaction.
Add(0, 1U, "100", "150", 200000U);
Add(0, 2U, "151", "200", 200000U);
Add(0, 3U, "201", "250", 200000U);
Add(0, 4U, "251", "300", 200000U);
Add(0, 5U, "301", "350", 200000U);
Add(1, 6U, "100", "350", 200000U);
Add(0, 1U, "100", "150", 200000U, 0, 100, 101);
Add(0, 2U, "151", "200", 200000U, 0, 102, 103);
Add(0, 3U, "201", "250", 200000U, 0, 104, 105);
Add(0, 4U, "251", "300", 200000U, 0, 106, 107);
Add(0, 5U, "301", "350", 200000U, 0, 108, 109);
Add(1, 6U, "100", "350", 200000U, 0, 109, 110);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add several tests to cover more scenarios in FindIntraL0Compaction()? For example, when being_compacted shows up in L0? Also, it will be nice to directly cover the earliest_mem_seqno scenarios in tests here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have add a tests for FindIntraL0Compaction, which make sure that it would skip sst which of largest lsn is larger than earliest_seqno.

vstorage_->LevelFiles(1)[0]->being_compacted = true;
UpdateVersionStorageInfo();

Expand All @@ -1535,6 +1535,38 @@ TEST_F(CompactionPickerTest, IntraL0MaxCompactionBytesHit) {
ASSERT_EQ(0, compaction->output_level());
}

TEST_F(CompactionPickerTest, IntraL0ForEarliestSeqno) {
// Intra L0 compaction triggers only if there are at least
// level0_file_num_compaction_trigger + 2 L0 files.
mutable_cf_options_.level0_file_num_compaction_trigger = 3;
mutable_cf_options_.max_compaction_bytes = 999999u;
NewVersionStorage(6, kCompactionStyleLevel);

// 4 out of 6 L0 files will be picked for intra L0 compaction due to
// being_compact limit. And the latest one L0 will be skipped due to earliest
// seqno. The one L1 file spans entire L0 key range and is marked as being
// compacted to avoid L0->L1 compaction.
Add(1, 1U, "100", "350", 200000U, 0, 110, 111);
Add(0, 2U, "301", "350", 1U, 0, 108, 109);
Add(0, 3U, "251", "300", 1U, 0, 106, 107);
Add(0, 4U, "201", "250", 1U, 0, 104, 105);
Add(0, 5U, "151", "200", 1U, 0, 102, 103);
Add(0, 6U, "100", "150", 1U, 0, 100, 101);
Add(0, 7U, "100", "100", 1U, 0, 99, 100);
vstorage_->LevelFiles(0)[5]->being_compacted = true;
vstorage_->LevelFiles(1)[0]->being_compacted = true;
UpdateVersionStorageInfo();

std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_, 107));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_levels());
ASSERT_EQ(4U, compaction->num_input_files(0));
ASSERT_EQ(CompactionReason::kLevelL0FilesNum,
compaction->compaction_reason());
ASSERT_EQ(0, compaction->output_level());
}

} // namespace rocksdb

int main(int argc, char** argv) {
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_picker_universal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ bool UniversalCompactionPicker::NeedsCompaction(

Compaction* UniversalCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
VersionStorageInfo* vstorage, LogBuffer* log_buffer,
SequenceNumber /* earliest_memtable_seqno */) {
UniversalCompactionBuilder builder(ioptions_, icmp_, cf_name,
mutable_cf_options, vstorage, this,
log_buffer);
Expand Down
9 changes: 4 additions & 5 deletions db/compaction/compaction_picker_universal.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ class UniversalCompactionPicker : public CompactionPicker {
UniversalCompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: CompactionPicker(ioptions, icmp) {}
virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) override;

virtual Compaction* PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer,
SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) override;
virtual int MaxOutputLevel() const override { return NumberLevels() - 1; }

virtual bool NeedsCompaction(
Expand Down
Loading