Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlobDB: handle the case when OnCompactionCompleted precedes OnFlushCompleted #6341

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion utilities/blob_db/blob_db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,8 @@ bool BlobDBImpl::MarkBlobFileObsoleteIfNeeded(
// blob file obsolete if there are still unflushed memtables from before
// the time the blob file was closed.
if (blob_file->GetImmutableSequence() > flush_sequence_ ||
!blob_file->GetLinkedSstFiles().empty()) {
!blob_file->GetLinkedSstFiles().empty() ||
!blob_file->GetUnlinkedSstFiles().empty()) {
return false;
}

Expand Down
51 changes: 51 additions & 0 deletions utilities/blob_db/blob_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1655,6 +1655,7 @@ TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
for (size_t i = 0; i < 5; ++i) {
const auto &blob_file = blob_files[i];
ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
ASSERT_TRUE(blob_file->GetUnlinkedSstFiles().empty());
ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
}

Expand Down Expand Up @@ -1683,6 +1684,7 @@ TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
for (size_t i = 0; i < 5; ++i) {
const auto &blob_file = blob_files[i];
ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
ASSERT_TRUE(blob_file->GetUnlinkedSstFiles().empty());
ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
}

Expand Down Expand Up @@ -1712,6 +1714,7 @@ TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
for (size_t i = 0; i < 5; ++i) {
const auto &blob_file = blob_files[i];
ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
ASSERT_TRUE(blob_file->GetUnlinkedSstFiles().empty());
ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
}

Expand Down Expand Up @@ -1750,6 +1753,7 @@ TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
for (size_t i = 0; i < 5; ++i) {
const auto &blob_file = blob_files[i];
ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
ASSERT_TRUE(blob_file->GetUnlinkedSstFiles().empty());
ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
}

Expand Down Expand Up @@ -1780,6 +1784,7 @@ TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
for (size_t i = 0; i < 5; ++i) {
const auto &blob_file = blob_files[i];
ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
ASSERT_TRUE(blob_file->GetUnlinkedSstFiles().empty());
ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
}

Expand Down Expand Up @@ -1811,6 +1816,7 @@ TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
for (size_t i = 0; i < 5; ++i) {
const auto &blob_file = blob_files[i];
ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
ASSERT_TRUE(blob_file->GetUnlinkedSstFiles().empty());
ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
}

Expand Down Expand Up @@ -1841,6 +1847,7 @@ TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
for (size_t i = 0; i < 5; ++i) {
const auto &blob_file = blob_files[i];
ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
ASSERT_TRUE(blob_file->GetUnlinkedSstFiles().empty());
ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
}

Expand All @@ -1857,6 +1864,50 @@ TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
}
}

TEST_F(BlobDBTest, MaintainBlobFileToSstMappingRace) {
BlobDBOptions bdb_options;
bdb_options.enable_garbage_collection = true;
bdb_options.disable_background_tasks = true;
Open(bdb_options);

// Register a dummy blob file.
blob_db_impl()->TEST_AddDummyBlobFile(1, /* immutable_sequence */ 200);

auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(blob_files.size(), 1);

auto blob_file = blob_files[0];

// Simulate the case when we receive the notification for a compaction
// that removes an SST <-> blob file link before the notification for the
// flush that establishes the link.
{
CompactionJobInfo info{};
info.input_file_infos.emplace_back(CompactionFileInfo{1, 10, 1});

blob_db_impl()->TEST_ProcessCompactionJobInfo(info);

ASSERT_TRUE(blob_file->GetLinkedSstFiles().empty());
ASSERT_EQ(blob_file->GetUnlinkedSstFiles(),
std::unordered_set<uint64_t>{10});
ASSERT_FALSE(blob_file->Obsolete());
}

{
FlushJobInfo info{};
info.file_number = 10;
info.oldest_blob_file_number = 1;
info.smallest_seqno = 101;
info.largest_seqno = 200;

blob_db_impl()->TEST_ProcessFlushJobInfo(info);

ASSERT_TRUE(blob_file->GetLinkedSstFiles().empty());
ASSERT_TRUE(blob_file->GetUnlinkedSstFiles().empty());
ASSERT_TRUE(blob_file->Obsolete());
}
}

TEST_F(BlobDBTest, ShutdownWait) {
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = 100;
Expand Down
43 changes: 37 additions & 6 deletions utilities/blob_db/blob_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@ class BlobFile {
// after that
uint64_t file_number_{0};

// The file numbers of the SST files whose oldest blob file reference
// points to this blob file.
// The file numbers of the SST files that have been linked to/unlinked from
// this blob file, respectively. Note that it is possible for the compaction
// notification that removes an SST <-> blob file link to arrive before the
// flush notification that establishes said link, hence the need for
// tracking these separately.
std::unordered_set<uint64_t> linked_sst_files_;
std::unordered_set<uint64_t> unlinked_sst_files_;

// Info log.
Logger* info_log_{nullptr};
Expand Down Expand Up @@ -126,23 +130,50 @@ class BlobFile {
// once the file is created, this never changes
uint64_t BlobFileNumber() const { return file_number_; }

// Get the set of SST files whose oldest blob file reference points to
// this file.
// Get the set of SST files that have been linked to/unlinked from this
// blob file, respectively.
const std::unordered_set<uint64_t>& GetLinkedSstFiles() const {
return linked_sst_files_;
}

const std::unordered_set<uint64_t>& GetUnlinkedSstFiles() const {
return unlinked_sst_files_;
}

// Link an SST file whose oldest blob file reference points to this file.
void LinkSstFile(uint64_t sst_file_number) {
assert(linked_sst_files_.find(sst_file_number) == linked_sst_files_.end());

// Note: it is possible for an SST to be unlinked before it is linked due
// to OnFlushCompleted/OnCompactionCompleted notifications arriving out of
// order. If that happens, we simply remove the SST from the unlinked set
// here.

auto it = unlinked_sst_files_.find(sst_file_number);
if (it != unlinked_sst_files_.end()) {
unlinked_sst_files_.erase(it);
return;
}

linked_sst_files_.insert(sst_file_number);
}

// Unlink an SST file whose oldest blob file reference points to this file.
void UnlinkSstFile(uint64_t sst_file_number) {
assert(unlinked_sst_files_.find(sst_file_number) ==
unlinked_sst_files_.end());

// Note: it is possible for an SST to be unlinked before it is linked due
// to OnFlushCompleted/OnCompactionCompleted notifications arriving out of
// order. If that happens, we simply add the SST to the unlinked set here.

auto it = linked_sst_files_.find(sst_file_number);
assert(it != linked_sst_files_.end());
linked_sst_files_.erase(it);
if (it != linked_sst_files_.end()) {
linked_sst_files_.erase(it);
return;
}

unlinked_sst_files_.insert(sst_file_number);
}

// the following functions are atomic, and don't need
Expand Down