Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug causing incorrect data returned by snapshot read #9648

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
* Added BlobDB options to `ldb`

### Bug Fixes
* * Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496)
* Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496)
* Fixed a bug caused by race among flush, incoming writes and taking snapshots. Queries to snapshots created with these race condition can return incorrect result, e.g. resurfacing deleted data.

### Public API changes
* Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect.
Expand Down
56 changes: 56 additions & 0 deletions db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@ class TestFlushListener : public EventListener {
~TestFlushListener() override {
prev_fc_info_.status.PermitUncheckedError(); // Ignore the status
}

void OnTableFileCreated(const TableFileCreationInfo& info) override {
// remember the info for later checking the FlushJobInfo.
prev_fc_info_ = info;
Expand Down Expand Up @@ -2002,6 +2003,61 @@ TEST_P(DBFlushTestBlobError, FlushError) {
}

#ifndef ROCKSDB_LITE
TEST_F(DBFlushTest, TombstoneVisibleInSnapshot) {
class SimpleTestFlushListener : public EventListener {
public:
explicit SimpleTestFlushListener(DBFlushTest* _test) : test_(_test) {}
~SimpleTestFlushListener() override {}

void OnFlushBegin(DB* db, const FlushJobInfo& info) override {
ASSERT_EQ(static_cast<uint32_t>(0), info.cf_id);

ASSERT_OK(db->Delete(WriteOptions(), "foo"));
snapshot_ = db->GetSnapshot();
ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));

auto* dbimpl = static_cast_with_check<DBImpl>(db);
assert(dbimpl);

ColumnFamilyHandle* cfh = db->DefaultColumnFamily();
auto* cfhi = static_cast_with_check<ColumnFamilyHandleImpl>(cfh);
assert(cfhi);
ASSERT_OK(dbimpl->TEST_SwitchMemtable(cfhi->cfd()));
}

DBFlushTest* test_ = nullptr;
const Snapshot* snapshot_ = nullptr;
};

Options options = CurrentOptions();
options.create_if_missing = true;
auto* listener = new SimpleTestFlushListener(this);
options.listeners.emplace_back(listener);
DestroyAndReopen(options);

ASSERT_OK(db_->Put(WriteOptions(), "foo", "value0"));

ManagedSnapshot snapshot_guard(db_);

ColumnFamilyHandle* default_cf = db_->DefaultColumnFamily();
ASSERT_OK(db_->Flush(FlushOptions(), default_cf));

const Snapshot* snapshot = listener->snapshot_;
assert(snapshot);

ReadOptions read_opts;
read_opts.snapshot = snapshot;

// Using snapshot should not see "foo".
{
std::string value;
Status s = db_->Get(read_opts, "foo", &value);
ASSERT_TRUE(s.IsNotFound());
}

db_->ReleaseSnapshot(snapshot);
}

TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) {
Options options = CurrentOptions();
options.create_if_missing = true;
Expand Down
36 changes: 31 additions & 5 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,41 @@ Status DBImpl::FlushMemTableToOutputFile(
const bool needs_to_sync_closed_wals =
logfile_number_ > 0 &&
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1;

// If needs_to_sync_closed_wals is true, we need to record the current
// maximum memtable ID of this column family so that a later PickMemtables()
// call will not pick memtables whose IDs are higher. This is due to the fact
// that SyncClosedLogs() may release the db mutex, and memtable switch can
// happen for this column family in the meantime. The newly created memtables
// have their data backed by unsynced WALs, thus they cannot be included in
// this flush job.
// Another reason why we must record the current maximum memtable ID of this
// column family: SyncClosedLogs() may release db mutex, thus it's possible
// for application to continue to insert into memtables increasing db's
// sequence number. The application may take a snapshot, but this snapshot is
// not included in `snapshot_seqs` which will be passed to flush job because
// `snapshot_seqs` has already been computed before this function starts.
// Recording the max memtable ID ensures that the flush job does not flush
// a memtable without knowing such snapshot(s).
uint64_t max_memtable_id = needs_to_sync_closed_wals
? cfd->imm()->GetLatestMemTableID()
: port::kMaxUint64;

// If needs_to_sync_closed_wals is false, then the flush job will pick ALL
// existing memtables of the column family when PickMemTable() is called
// later. Although we won't call SyncClosedLogs() in this case, we may still
// call the callbacks of the listeners, i.e. NotifyOnFlushBegin() which also
// releases and re-acquires the db mutex. In the meantime, the application
// can still insert into the memtables and increase the db's sequence number.
// The application can take a snapshot, hoping that the latest visible state
// to this snapshto is preserved. This is hard to guarantee since db mutex
// not held. This newly-created snapshot is not included in `snapshot_seqs`
// and the flush job is unaware of its presence. Consequently, the flush job
// may drop certain keys when generating the L0, causing incorrect data to be
// returned for snapshot read using this snapshot.
// To address this, we make sure NotifyOnFlushBegin() executes after memtable
// picking so that no new snapshot can be taken between the two functions.

FlushJob flush_job(
dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id,
file_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_,
Expand All @@ -192,11 +217,6 @@ Status DBImpl::FlushMemTableToOutputFile(
&blob_callback_);
FileMetaData file_meta;

#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
#endif // ROCKSDB_LITE

Status s;
bool need_cancel = false;
IOStatus log_io_s = IOStatus::OK();
Expand All @@ -221,6 +241,12 @@ Status DBImpl::FlushMemTableToOutputFile(
}
TEST_SYNC_POINT_CALLBACK(
"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", &flush_job);

#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
#endif // ROCKSDB_LITE

bool switched_to_mempurge = false;
// Within flush_job.Run, rocksdb may call event listener to notify
// file creation and deletion.
Expand Down