Skip to content

Commit

Permalink
Fix a race condition in WAL tracking causing DB open failure (#9715)
Browse files Browse the repository at this point in the history
Summary:
There is a race condition if WAL tracking in the MANIFEST is enabled in a database that disables 2PC.

The race condition is between two background flush threads trying to install flush results to the MANIFEST.

Consider an example database with two column families: "default" (cfd0) and "cf1" (cfd1). Initially,
both column families have one mutable (active) memtable whose data backed by 6.log.

1. Trigger a manual flush for "cf1", creating a 7.log
2. Insert another key to "default", and trigger flush for "default", creating 8.log
3. BgFlushThread1 finishes writing 9.sst
4. BgFlushThread2 finishes writing 10.sst

```
Time  BgFlushThread1                                    BgFlushThread2
 |    mutex_.Lock()
 |    precompute min_wal_to_keep as 6
 |    mutex_.Unlock()
 |                                                     mutex_.Lock()
 |                                                     precompute min_wal_to_keep as 6
 |                                                     join MANIFEST write queue and mutex_.Unlock()
 |    write to MANIFEST
 |    mutex_.Lock()
 |    cfd1->log_number = 7
 |    Signal bg_flush_2 and mutex_.Unlock()
 |                                                     wake up and mutex_.Lock()
 |                                                     cfd0->log_number = 8
 |                                                     FindObsoleteFiles() with job_context->log_number == 7
 |                                                     mutex_.Unlock()
 |                                                     PurgeObsoleteFiles() deletes 6.log
 V
```

As shown in the above, BgFlushThread2 thinks that the min wal to keep is 6.log because "cf1" has unflushed data in 6.log (cf1.log_number=6).
Similarly, BgThread1 thinks that min wal to keep is also 6.log because "default" has unflushed data (default.log_number=6).
No WAL deletion will be written to MANIFEST because 6 is equal to `versions_->wals_.min_wal_number_to_keep`,
due to https://github.com/facebook/rocksdb/blob/7.1.fb/db/memtable_list.cc#L513:L514.
The bg flush thread that finishes last will perform file purging. `job_context.log_number` will be evaluated as 7, i.e.
the min wal that contains unflushed data, causing 6.log to be deleted. However, MANIFEST thinks 6.log should still exist.
If you close the db at this point, you won't be able to re-open it if `track_and_verify_wal_in_manifest` is true.

We must handle the case of multiple bg flush threads, and it is difficult for one bg flush thread to know
the correct min wal number until the other bg flush threads have finished committing to the manifest and updated
the `cfd::log_number`.
To fix this issue, we rename an existing variable `min_log_number_to_keep_2pc` to `min_log_number_to_keep`,
and use it to track WAL file deletion in non-2pc mode as well.
This variable is updated only 1) during recovery with mutex held, or 2) in the MANIFEST write thread.
`min_log_number_to_keep` means RocksDB will delete WALs below it, although there may be WALs
above it which are also obsolete. Formally, we will have [min_wal_to_keep, max_obsolete_wal]. During recovery, we
make sure that only WALs above max_obsolete_wal are checked and added back to `alive_log_files_`.

Pull Request resolved: #9715

Test Plan:
```
make check
```
Also ran stress test below (with asan) to make sure it completes successfully.
```
TEST_TMPDIR=/dev/shm/rocksdb OPT=-g ASAN_OPTIONS=disable_coredump=0 \
CRASH_TEST_EXT_ARGS=--compression_type=zstd SKIP_FORMAT_BUCK_CHECKS=1 \
make J=52 -j52 blackbox_asan_crash_test
```

Reviewed By: ltamasi

Differential Revision: D34984412

Pulled By: riversand963

fbshipit-source-id: c7b21a8d84751bb55ea79c9f387103d21b231005
  • Loading branch information
riversand963 authored and ajkr committed Mar 29, 2022
1 parent 6733f75 commit 2de3bfc
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 44 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Fixed a race condition when disable and re-enable manual compaction.
* Fixed a race condition for `alive_log_files_` in non-two-write-queues mode. The race is between the write_thread_ in WriteToWAL() and another thread executing `FindObsoleteFiles()`. The race condition will be caught if `__glibcxx_requires_nonempty` is enabled.
* Fixed a race condition when mmaping a WritableFile on POSIX.
* Fixed a race condition when 2PC is disabled and WAL tracking in the MANIFEST is enabled. The race condition is between two background flush threads trying to install flush results, causing a WAL deletion not tracked in the MANIFEST. A future DB open may fail.

## 7.0.3 (03/23/2022)
### Bug Fixes
Expand Down
8 changes: 2 additions & 6 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@
namespace ROCKSDB_NAMESPACE {

uint64_t DBImpl::MinLogNumberToKeep() {
if (allow_2pc()) {
return versions_->min_log_number_to_keep_2pc();
} else {
return versions_->MinLogNumberWithUnflushedData();
}
return versions_->min_log_number_to_keep();
}

uint64_t DBImpl::MinObsoleteSstNumberToKeep() {
Expand Down Expand Up @@ -224,7 +220,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
}

// Add log files in wal_dir

if (!immutable_db_options_.IsWalDirSameAsDBPath(dbname_)) {
std::vector<std::string> log_files;
Status s = env_->GetChildren(immutable_db_options_.wal_dir, &log_files);
Expand All @@ -234,6 +229,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
log_file, immutable_db_options_.wal_dir);
}
}

// Add info log files in db_log_dir
if (!immutable_db_options_.db_log_dir.empty() &&
immutable_db_options_.db_log_dir != dbname_) {
Expand Down
25 changes: 22 additions & 3 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,11 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
bool flushed = false;
uint64_t corrupted_wal_number = kMaxSequenceNumber;
uint64_t min_wal_number = MinLogNumberToKeep();
if (!allow_2pc()) {
// In non-2pc mode, we skip WALs that do not back unflushed data.
min_wal_number =
std::max(min_wal_number, versions_->MinLogNumberWithUnflushedData());
}
for (auto wal_number : wal_numbers) {
if (wal_number < min_wal_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
Expand Down Expand Up @@ -1270,9 +1275,16 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
}

std::unique_ptr<VersionEdit> wal_deletion;
if (immutable_db_options_.track_and_verify_wals_in_manifest) {
wal_deletion.reset(new VersionEdit);
wal_deletion->DeleteWalsBefore(max_wal_number + 1);
if (flushed) {
wal_deletion = std::make_unique<VersionEdit>();
if (immutable_db_options_.track_and_verify_wals_in_manifest) {
wal_deletion->DeleteWalsBefore(max_wal_number + 1);
}
if (!allow_2pc()) {
// In non-2pc mode, flushing the memtables of the column families
// means we can advance min_log_number_to_keep.
wal_deletion->SetMinLogNumberToKeep(max_wal_number + 1);
}
edit_lists.back().push_back(wal_deletion.get());
}

Expand Down Expand Up @@ -1351,7 +1363,14 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
// FindObsoleteFiles()
total_log_size_ = 0;
log_empty_ = false;
uint64_t min_wal_with_unflushed_data =
versions_->MinLogNumberWithUnflushedData();
for (auto wal_number : wal_numbers) {
if (!allow_2pc() && wal_number < min_wal_with_unflushed_data) {
// In non-2pc mode, the WAL files not backing unflushed data are not
// alive, thus should not be added to the alive_log_files_.
continue;
}
// We preallocate space for wals, but then after a crash and restart, those
// preallocated space are not needed anymore. It is likely only the last
// log has such preallocated space, so we only truncate for the last log.
Expand Down
87 changes: 87 additions & 0 deletions db/db_wal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1491,6 +1491,93 @@ TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) {
ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options));
}

TEST_F(DBWALTest, RaceInstallFlushResultsWithWalObsoletion) {
Options options = CurrentOptions();
options.env = env_;
options.track_and_verify_wals_in_manifest = true;
// The following make sure there are two bg flush threads.
options.max_background_jobs = 8;

const std::string cf1_name("cf1");
CreateAndReopenWithCF({cf1_name}, options);
assert(handles_.size() == 2);

{
dbfull()->TEST_LockMutex();
ASSERT_LE(2, dbfull()->GetBGJobLimits().max_flushes);
dbfull()->TEST_UnlockMutex();
}

ASSERT_OK(dbfull()->PauseBackgroundWork());

ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "value"));
ASSERT_OK(db_->Put(WriteOptions(), "foo", "value"));

ASSERT_OK(dbfull()->TEST_FlushMemTable(false, true, handles_[1]));

ASSERT_OK(db_->Put(WriteOptions(), "foo", "value"));
ASSERT_OK(dbfull()->TEST_FlushMemTable(false, true, handles_[0]));

bool called = false;
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
// This callback will be called when the first bg flush thread reaches the
// point before entering the MANIFEST write queue after flushing the SST
// file.
// The purpose of the sync points here is to ensure both bg flush threads
// finish computing `min_wal_number_to_keep` before any of them updates the
// `log_number` for the column family that's being flushed.
SyncPoint::GetInstance()->SetCallBack(
"MemTableList::TryInstallMemtableFlushResults:AfterComputeMinWalToKeep",
[&](void* /*arg*/) {
dbfull()->mutex()->AssertHeld();
if (!called) {
// We are the first bg flush thread in the MANIFEST write queue.
// We set up the dependency between sync points for two threads that
// will be executing the same code.
// For the interleaving of events, see
// https://github.com/facebook/rocksdb/pull/9715.
// bg flush thread1 will release the db mutex while in the MANIFEST
// write queue. In the meantime, bg flush thread2 locks db mutex and
// computes the min_wal_number_to_keep (before thread1 writes to
// MANIFEST thus before cf1->log_number is updated). Bg thread2 joins
// the MANIFEST write queue afterwards and bg flush thread1 proceeds
// with writing to MANIFEST.
called = true;
SyncPoint::GetInstance()->LoadDependency({
{"VersionSet::LogAndApply:WriteManifestStart",
"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2"},
{"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2",
"VersionSet::LogAndApply:WriteManifest"},
});
} else {
// The other bg flush thread has already been in the MANIFEST write
// queue, and we are after.
TEST_SYNC_POINT(
"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2");
}
});
SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(dbfull()->ContinueBackgroundWork());

ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));

ASSERT_TRUE(called);

Close();

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();

DB* db1 = nullptr;
Status s = DB::OpenForReadOnly(options, dbname_, &db1);
ASSERT_OK(s);
assert(db1);
delete db1;
}

// Test scope:
// - We expect to open data store under all circumstances
// - We expect only data upto the point where the first error was encountered
Expand Down
5 changes: 3 additions & 2 deletions db/event_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished(
jwriter << "cf_name" << cf_name << "job" << job_id << "event"
<< "table_file_creation"
<< "file_number" << fd.GetNumber() << "file_size"
<< fd.GetFileSize() << "file_checksum" << file_checksum
<< "file_checksum_func_name" << file_checksum_func_name;
<< fd.GetFileSize() << "file_checksum"
<< Slice(file_checksum).ToString(true) << "file_checksum_func_name"
<< file_checksum_func_name;

// table_properties
{
Expand Down
26 changes: 14 additions & 12 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -494,8 +494,8 @@ Status MemTableList::TryInstallMemtableFlushResults(
// TODO(myabandeh): Not sure how batch_count could be 0 here.
if (batch_count > 0) {
uint64_t min_wal_number_to_keep = 0;
assert(edit_list.size() > 0);
if (vset->db_options()->allow_2pc) {
assert(edit_list.size() > 0);
// Note that if mempurge is successful, the edit_list will
// not be applicable (contains info of new min_log number to keep,
// and level 0 file path of SST file created during normal flush,
Expand All @@ -504,23 +504,26 @@ Status MemTableList::TryInstallMemtableFlushResults(
min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
vset, *cfd, edit_list, memtables_to_flush, prep_tracker);

// We piggyback the information of earliest log file to keep in the
// We piggyback the information of earliest log file to keep in the
// manifest entry for the last file flushed.
edit_list.back()->SetMinLogNumberToKeep(min_wal_number_to_keep);
} else {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list);
}
edit_list.back()->SetMinLogNumberToKeep(min_wal_number_to_keep);

std::unique_ptr<VersionEdit> wal_deletion;
if (vset->db_options()->track_and_verify_wals_in_manifest) {
if (!vset->db_options()->allow_2pc) {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list);
}
if (min_wal_number_to_keep >
vset->GetWalSet().GetMinWalNumberToKeep()) {
wal_deletion.reset(new VersionEdit);
wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
edit_list.push_back(wal_deletion.get());
}
TEST_SYNC_POINT_CALLBACK(
"MemTableList::TryInstallMemtableFlushResults:"
"AfterComputeMinWalToKeep",
nullptr);
}

const auto manifest_write_cb = [this, cfd, batch_count, log_buffer,
Expand Down Expand Up @@ -805,15 +808,14 @@ Status InstallMemtableAtomicFlushResults(
if (vset->db_options()->allow_2pc) {
min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
vset, cfds, edit_lists, mems_list, prep_tracker);
edit_lists.back().back()->SetMinLogNumberToKeep(min_wal_number_to_keep);
} else {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists);
}
edit_lists.back().back()->SetMinLogNumberToKeep(min_wal_number_to_keep);

std::unique_ptr<VersionEdit> wal_deletion;
if (vset->db_options()->track_and_verify_wals_in_manifest) {
if (!vset->db_options()->allow_2pc) {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists);
}
if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) {
wal_deletion.reset(new VersionEdit);
wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
Expand Down
7 changes: 3 additions & 4 deletions db/version_edit_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader,
if (s->ok()) {
version_set_->GetColumnFamilySet()->UpdateMaxColumnFamily(
version_edit_params_.max_column_family_);
version_set_->MarkMinLogNumberToKeep2PC(
version_set_->MarkMinLogNumberToKeep(
version_edit_params_.min_log_number_to_keep_);
version_set_->MarkFileNumberUsed(version_edit_params_.prev_log_number_);
version_set_->MarkFileNumberUsed(version_edit_params_.log_number_);
Expand Down Expand Up @@ -970,12 +970,11 @@ void DumpManifestHandler::CheckIterationResult(const log::Reader& reader,
fprintf(stdout,
"next_file_number %" PRIu64 " last_sequence %" PRIu64
" prev_log_number %" PRIu64 " max_column_family %" PRIu32
" min_log_number_to_keep "
"%" PRIu64 "\n",
" min_log_number_to_keep %" PRIu64 "\n",
version_set_->current_next_file_number(),
version_set_->LastSequence(), version_set_->prev_log_number(),
version_set_->column_family_set_->GetMaxColumnFamily(),
version_set_->min_log_number_to_keep_2pc());
version_set_->min_log_number_to_keep());
}

} // namespace ROCKSDB_NAMESPACE
15 changes: 7 additions & 8 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4141,7 +4141,7 @@ void VersionSet::Reset() {
}
db_id_.clear();
next_file_number_.store(2);
min_log_number_to_keep_2pc_.store(0);
min_log_number_to_keep_.store(0);
manifest_file_number_ = 0;
options_file_number_ = 0;
pending_manifest_file_number_ = 0;
Expand Down Expand Up @@ -4610,8 +4610,7 @@ Status VersionSet::ProcessManifestWrites(
}

if (last_min_log_number_to_keep != 0) {
// Should only be set in 2PC mode.
MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
MarkMinLogNumberToKeep(last_min_log_number_to_keep);
}

for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
Expand Down Expand Up @@ -4965,7 +4964,7 @@ Status VersionSet::Recover(
",min_log_number_to_keep is %" PRIu64 "\n",
manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
last_sequence_.load(), log_number, prev_log_number_,
column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep());

for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
Expand Down Expand Up @@ -5378,9 +5377,9 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) {
}
// Called only either from ::LogAndApply which is protected by mutex or during
// recovery which is single-threaded.
void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
void VersionSet::MarkMinLogNumberToKeep(uint64_t number) {
if (min_log_number_to_keep_.load(std::memory_order_relaxed) < number) {
min_log_number_to_keep_.store(number, std::memory_order_relaxed);
}
}

Expand Down Expand Up @@ -5506,7 +5505,7 @@ Status VersionSet::WriteCurrentStateToManifest(
// min_log_number_to_keep is for the whole db, not for specific column family.
// So it does not need to be set for every column family, just need to be set once.
// Since default CF can never be dropped, we set the min_log to the default CF here.
uint64_t min_log = min_log_number_to_keep_2pc();
uint64_t min_log = min_log_number_to_keep();
if (min_log != 0) {
edit.SetMinLogNumberToKeep(min_log);
}
Expand Down
15 changes: 8 additions & 7 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -1128,8 +1128,8 @@ class VersionSet {

uint64_t current_next_file_number() const { return next_file_number_.load(); }

uint64_t min_log_number_to_keep_2pc() const {
return min_log_number_to_keep_2pc_.load();
uint64_t min_log_number_to_keep() const {
return min_log_number_to_keep_.load();
}

// Allocate and return a new file number
Expand Down Expand Up @@ -1187,7 +1187,7 @@ class VersionSet {
// Mark the specified log number as deleted
// REQUIRED: this is only called during single-threaded recovery or repair, or
// from ::LogAndApply where the global mutex is held.
void MarkMinLogNumberToKeep2PC(uint64_t number);
void MarkMinLogNumberToKeep(uint64_t number);

// Return the log file number for the log file that is currently
// being compacted, or zero if there is no such log file.
Expand All @@ -1196,10 +1196,12 @@ class VersionSet {
// Returns the minimum log number which still has data not flushed to any SST
// file.
// In non-2PC mode, all the log numbers smaller than this number can be safely
// deleted.
// deleted, although we still use `min_log_number_to_keep_` to determine when
// to delete a WAL file.
uint64_t MinLogNumberWithUnflushedData() const {
return PreComputeMinLogNumberWithUnflushedData(nullptr);
}

// Returns the minimum log number which still has data not flushed to any SST
// file.
// Empty column families' log number is considered to be
Expand Down Expand Up @@ -1399,9 +1401,8 @@ class VersionSet {
const ImmutableDBOptions* const db_options_;
std::atomic<uint64_t> next_file_number_;
// Any WAL number smaller than this should be ignored during recovery,
// and is qualified for being deleted in 2PC mode. In non-2PC mode, this
// number is ignored.
std::atomic<uint64_t> min_log_number_to_keep_2pc_ = {0};
// and is qualified for being deleted.
std::atomic<uint64_t> min_log_number_to_keep_ = {0};
uint64_t manifest_file_number_;
uint64_t options_file_number_;
uint64_t options_file_size_;
Expand Down
5 changes: 3 additions & 2 deletions db/version_set_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3424,6 +3424,7 @@ TEST_F(VersionSetTestMissingFiles, NoFileMissing) {
}

TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) {
db_options_.allow_2pc = true;
NewDB();

SstInfo sst(100, kDefaultColumnFamilyName, "a");
Expand All @@ -3435,12 +3436,12 @@ TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) {
edit.AddFile(0, file_metas[0]);
edit.SetMinLogNumberToKeep(kMinWalNumberToKeep2PC);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC);
ASSERT_EQ(versions_->min_log_number_to_keep(), kMinWalNumberToKeep2PC);

for (int i = 0; i < 3; i++) {
CreateNewManifest();
ReopenDB();
ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC);
ASSERT_EQ(versions_->min_log_number_to_keep(), kMinWalNumberToKeep2PC);
}
}

Expand Down

0 comments on commit 2de3bfc

Please sign in to comment.