Skip to content

Commit

Permalink
Get() with timestamp should respect snapshot (#7227)
Browse files Browse the repository at this point in the history
Summary:
If user-defined timestamp is enabled, current implementation can expose
newer data to queries even if an older sequence number is specified via
read_options.snapshot. This PR makes Get() respect sequence-number-based
snapshot.

Solution is simple. Besides using <ukey, ts, seq> to search the index for the key,
we also verify that the candidate result's seq is smaller than or equal to seq. This
requires passing a seq via `GetContext`, which results in the majority of code
change caused by this PR.

Also added a few unit tests to demonstrate standard visibility during point lookup
and range scan when timestamp and snapshot are both present.

Test plan (devserver):
```
make check
$./db_bench --benchmarks=fillseq,readrandom -cache_size=$[64*1024*1024]
```
Result
this PR: readrandom   :       4.827 micros/op 207180 ops/sec;   22.9 MB/s (1000000 of 1000000 found)
master:  readrandom   :       4.936 micros/op 202610 ops/sec;   22.4 MB/s (1000000 of 1000000 found)

Pull Request resolved: #7227

Reviewed By: ltamasi

Differential Revision: D23015242

Pulled By: riversand963

fbshipit-source-id: ea7b85a728654553ba357d2e6a207b5e40f7376a
  • Loading branch information
riversand963 authored and facebook-github-bot committed Aug 15, 2020
1 parent 510c66f commit d758273
Show file tree
Hide file tree
Showing 3 changed files with 343 additions and 9 deletions.
32 changes: 24 additions & 8 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1583,19 +1583,32 @@ Status DBImpl::Get(const ReadOptions& read_options,
return s;
}

namespace {
class GetWithTimestampReadCallback : public ReadCallback {
public:
explicit GetWithTimestampReadCallback(SequenceNumber seq)
: ReadCallback(seq) {}
bool IsVisibleFullCheck(SequenceNumber seq) override {
return seq <= max_visible_seq_;
}
};
} // namespace

Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
GetImplOptions& get_impl_options) {
assert(get_impl_options.value != nullptr ||
get_impl_options.merge_operands != nullptr);

#ifndef NDEBUG
assert(get_impl_options.column_family);
ColumnFamilyHandle* cf = get_impl_options.column_family;
const Comparator* const ucmp = cf->GetComparator();
const Comparator* ucmp = get_impl_options.column_family->GetComparator();
assert(ucmp);
if (ucmp->timestamp_size() > 0) {
size_t ts_sz = ucmp->timestamp_size();
GetWithTimestampReadCallback read_cb(0); // Will call Refresh

#ifndef NDEBUG
if (ts_sz > 0) {
assert(read_options.timestamp);
assert(read_options.timestamp->size() == ucmp->timestamp_size());
assert(read_options.timestamp->size() == ts_sz);
} else {
assert(!read_options.timestamp);
}
Expand Down Expand Up @@ -1661,6 +1674,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
snapshot = get_impl_options.callback->max_visible_seq();
}
}
// If timestamp is used, we use read callback to ensure <key,t,s> is returned
// only if t <= read_opts.timestamp and s <= snapshot.
if (ts_sz > 0 && !get_impl_options.callback) {
read_cb.Refresh(snapshot);
get_impl_options.callback = &read_cb;
}
TEST_SYNC_POINT("DBImpl::GetImpl:3");
TEST_SYNC_POINT("DBImpl::GetImpl:4");

Expand All @@ -1678,9 +1697,6 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
bool skip_memtable = (read_options.read_tier == kPersistedTier &&
has_unpersisted_data_.load(std::memory_order_relaxed));
bool done = false;
const Comparator* comparator =
get_impl_options.column_family->GetComparator();
size_t ts_sz = comparator->timestamp_size();
std::string* timestamp = ts_sz > 0 ? get_impl_options.timestamp : nullptr;
if (!skip_memtable) {
// Get value associated with key
Expand Down
318 changes: 318 additions & 0 deletions db/db_with_timestamp_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,324 @@ TEST_F(DBBasicTestWithTimestamp, CompactDeletionWithTimestampMarkerToBottom) {
Close();
}

class DataVisibilityTest : public DBBasicTestWithTimestampBase {
public:
DataVisibilityTest() : DBBasicTestWithTimestampBase("data_visibility_test") {}
};

// Application specifies timestamp but not snapshot.
// reader writer
// ts'=90
// ts=100
// seq=10
// seq'=11
// write finishes
// GetImpl(ts,seq)
// It is OK to return <k, t1, s1> if ts>=t1 AND seq>=s1. If ts>=1t1 but seq<s1,
// the key should not be returned.
TEST_F(DataVisibilityTest, PointLookupWithoutSnapshot1) {
Options options = CurrentOptions();
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::GetImpl:3",
"DataVisibilityTest::PointLookupWithoutSnapshot1:BeforePut"},
{"DataVisibilityTest::PointLookupWithoutSnapshot1:AfterPut",
"DBImpl::GetImpl:4"},
});
SyncPoint::GetInstance()->EnableProcessing();
port::Thread writer_thread([this]() {
std::string write_ts_str = Timestamp(1, 0);
Slice write_ts = write_ts_str;
WriteOptions write_opts;
write_opts.timestamp = &write_ts;
TEST_SYNC_POINT(
"DataVisibilityTest::PointLookupWithoutSnapshot1:BeforePut");
Status s = db_->Put(write_opts, "foo", "value");
ASSERT_OK(s);
TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithoutSnapshot1:AfterPut");
});
ReadOptions read_opts;
std::string read_ts_str = Timestamp(3, 0);
Slice read_ts = read_ts_str;
read_opts.timestamp = &read_ts;
std::string value;
Status s = db_->Get(read_opts, "foo", &value);

writer_thread.join();
ASSERT_TRUE(s.IsNotFound());
Close();
}

// Application specifies timestamp but not snapshot.
// reader writer
// ts'=90
// ts=100
// seq=10
// seq'=11
// write finishes
// Flush
// GetImpl(ts,seq)
// It is OK to return <k, t1, s1> if ts>=t1 AND seq>=s1. If ts>=t1 but seq<s1,
// the key should not be returned.
TEST_F(DataVisibilityTest, PointLookupWithoutSnapshot2) {
Options options = CurrentOptions();
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::GetImpl:3",
"DataVisibilityTest::PointLookupWithoutSnapshot2:BeforePut"},
{"DataVisibilityTest::PointLookupWithoutSnapshot2:AfterPut",
"DBImpl::GetImpl:4"},
});
SyncPoint::GetInstance()->EnableProcessing();
port::Thread writer_thread([this]() {
std::string write_ts_str = Timestamp(1, 0);
Slice write_ts = write_ts_str;
WriteOptions write_opts;
write_opts.timestamp = &write_ts;
TEST_SYNC_POINT(
"DataVisibilityTest::PointLookupWithoutSnapshot2:BeforePut");
Status s = db_->Put(write_opts, "foo", "value");
ASSERT_OK(s);
ASSERT_OK(Flush());

write_ts_str = Timestamp(2, 0);
write_ts = write_ts_str;
write_opts.timestamp = &write_ts;
s = db_->Put(write_opts, "bar", "value");
ASSERT_OK(s);
TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithoutSnapshot2:AfterPut");
});
ReadOptions read_opts;
std::string read_ts_str = Timestamp(3, 0);
Slice read_ts = read_ts_str;
read_opts.timestamp = &read_ts;
std::string value;
Status s = db_->Get(read_opts, "foo", &value);
writer_thread.join();
ASSERT_TRUE(s.IsNotFound());
Close();
}

// Application specifies both timestamp and snapshot.
// reader writer
// seq=10
// ts'=90
// ts=100
// seq'=11
// write finishes
// GetImpl(ts,seq)
// Since application specifies both timestamp and snapshot, application expects
// to see data that visible in BOTH timestamp and sequence number. Therefore,
// <k, t1, s1> can be returned only if t1<=ts AND s1<=seq.
TEST_F(DataVisibilityTest, PointLookupWithSnapshot1) {
Options options = CurrentOptions();
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency({
{"DataVisibilityTest::PointLookupWithSnapshot1:AfterTakingSnap",
"DataVisibilityTest::PointLookupWithSnapshot1:BeforePut"},
{"DataVisibilityTest::PointLookupWithSnapshot1:AfterPut",
"DBImpl::GetImpl:1"},
});
SyncPoint::GetInstance()->EnableProcessing();
port::Thread writer_thread([this]() {
std::string write_ts_str = Timestamp(1, 0);
Slice write_ts = write_ts_str;
WriteOptions write_opts;
write_opts.timestamp = &write_ts;
TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot1:BeforePut");
Status s = db_->Put(write_opts, "foo", "value");
TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot1:AfterPut");
ASSERT_OK(s);
});
ReadOptions read_opts;
const Snapshot* snap = db_->GetSnapshot();
TEST_SYNC_POINT(
"DataVisibilityTest::PointLookupWithSnapshot1:AfterTakingSnap");
read_opts.snapshot = snap;
std::string read_ts_str = Timestamp(3, 0);
Slice read_ts = read_ts_str;
read_opts.timestamp = &read_ts;
std::string value;
Status s = db_->Get(read_opts, "foo", &value);
writer_thread.join();

ASSERT_TRUE(s.IsNotFound());

db_->ReleaseSnapshot(snap);
Close();
}

// Application specifies both timestamp and snapshot.
// reader writer
// seq=10
// ts'=90
// ts=100
// seq'=11
// write finishes
// Flush
// GetImpl(ts,seq)
// Since application specifies both timestamp and snapshot, application expects
// to see data that visible in BOTH timestamp and sequence number. Therefore,
// <k, t1, s1> can be returned only if t1<=ts AND s1<=seq.
TEST_F(DataVisibilityTest, PointLookupWithSnapshot2) {
Options options = CurrentOptions();
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency({
{"DataVisibilityTest::PointLookupWithSnapshot2:AfterTakingSnap",
"DataVisibilityTest::PointLookupWithSnapshot2:BeforePut"},
});
SyncPoint::GetInstance()->EnableProcessing();
port::Thread writer_thread([this]() {
std::string write_ts_str = Timestamp(1, 0);
Slice write_ts = write_ts_str;
WriteOptions write_opts;
write_opts.timestamp = &write_ts;
TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot2:BeforePut");
Status s = db_->Put(write_opts, "foo", "value1");
ASSERT_OK(s);
ASSERT_OK(Flush());

write_ts_str = Timestamp(2, 0);
write_ts = write_ts_str;
write_opts.timestamp = &write_ts;
s = db_->Put(write_opts, "bar", "value2");
ASSERT_OK(s);
});
const Snapshot* snap = db_->GetSnapshot();
TEST_SYNC_POINT(
"DataVisibilityTest::PointLookupWithSnapshot2:AfterTakingSnap");
writer_thread.join();
std::string read_ts_str = Timestamp(3, 0);
Slice read_ts = read_ts_str;
ReadOptions read_opts;
read_opts.snapshot = snap;
read_opts.timestamp = &read_ts;
std::string value;
Status s = db_->Get(read_opts, "foo", &value);
ASSERT_TRUE(s.IsNotFound());
db_->ReleaseSnapshot(snap);
Close();
}

// Application specifies timestamp but not snapshot.
// reader writer
// ts'=90
// ts=100
// seq=10
// seq'=11
// write finishes
// scan(ts,seq)
// <k, t1, s1> can be seen in scan as long as ts>=t1 AND seq>=s1. If ts>=t1 but
// seq<s1, then the key should not be returned.
TEST_F(DataVisibilityTest, RangeScanWithoutSnapshot) {
Options options = CurrentOptions();
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::NewIterator:3",
"DataVisibilityTest::RangeScanWithoutSnapshot:BeforePut"},
});
SyncPoint::GetInstance()->EnableProcessing();
port::Thread writer_thread([this]() {
WriteOptions write_opts;
TEST_SYNC_POINT("DataVisibilityTest::RangeScanWithoutSnapshot:BeforePut");
for (int i = 0; i < 3; ++i) {
std::string write_ts_str = Timestamp(i + 1, 0);
Slice write_ts = write_ts_str;
write_opts.timestamp = &write_ts;
Status s = db_->Put(write_opts, "key" + std::to_string(i),
"value" + std::to_string(i));
ASSERT_OK(s);
}
});
std::string read_ts_str = Timestamp(10, 0);
Slice read_ts = read_ts_str;
ReadOptions read_opts;
read_opts.total_order_seek = true;
read_opts.timestamp = &read_ts;
Iterator* it = db_->NewIterator(read_opts);
ASSERT_NE(nullptr, it);
writer_thread.join();
it->SeekToFirst();
ASSERT_FALSE(it->Valid());
delete it;
Close();
}

// Application specifies both timestamp and snapshot.
// reader writer
// seq=10
// ts'=90
// ts=100 seq'=11
// write finishes
// scan(ts,seq)
// <k, t1, s1> can be seen by the scan only if t1<=ts AND s1<=seq. If t1<=ts
// but s1>seq, then the key should not be returned.
TEST_F(DataVisibilityTest, RangeScanWithSnapshot) {
Options options = CurrentOptions();
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency({
{"DataVisibilityTest::RangeScanWithSnapshot:AfterTakingSnapshot",
"DataVisibilityTest::RangeScanWithSnapshot:BeforePut"},
});
SyncPoint::GetInstance()->EnableProcessing();
port::Thread writer_thread([this]() {
WriteOptions write_opts;
TEST_SYNC_POINT("DataVisibilityTest::RangeScanWithSnapshot:BeforePut");
for (int i = 0; i < 3; ++i) {
std::string write_ts_str = Timestamp(i + 1, 0);
Slice write_ts = write_ts_str;
write_opts.timestamp = &write_ts;
Status s = db_->Put(write_opts, "key" + std::to_string(i),
"value" + std::to_string(i));
ASSERT_OK(s);
}
});
const Snapshot* snap = db_->GetSnapshot();
TEST_SYNC_POINT(
"DataVisibilityTest::RangeScanWithSnapshot:AfterTakingSnapshot");

writer_thread.join();

std::string read_ts_str = Timestamp(10, 0);
Slice read_ts = read_ts_str;
ReadOptions read_opts;
read_opts.snapshot = snap;
read_opts.total_order_seek = true;
read_opts.timestamp = &read_ts;
Iterator* it = db_->NewIterator(read_opts);
ASSERT_NE(nullptr, it);
it->Seek("key0");
ASSERT_FALSE(it->Valid());

delete it;
db_->ReleaseSnapshot(snap);
Close();
}

class DBBasicTestWithTimestampCompressionSettings
: public DBBasicTestWithTimestampBase,
public testing::WithParamInterface<
Expand Down
2 changes: 1 addition & 1 deletion db/read_callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace ROCKSDB_NAMESPACE {

class ReadCallback {
public:
ReadCallback(SequenceNumber last_visible_seq)
explicit ReadCallback(SequenceNumber last_visible_seq)
: max_visible_seq_(last_visible_seq) {}
ReadCallback(SequenceNumber last_visible_seq, SequenceNumber min_uncommitted)
: max_visible_seq_(last_visible_seq), min_uncommitted_(min_uncommitted) {}
Expand Down

0 comments on commit d758273

Please sign in to comment.