Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReadOptions.iter_start_ts should support tombstones #7178

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,10 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
}

assert(ikey_.user_key.size() >= timestamp_size_);
Slice ts;
Slice ts = timestamp_size_ > 0 ? ExtractTimestampFromUserKey(
ikey_.user_key, timestamp_size_)
: Slice();
bool more_recent = false;
if (timestamp_size_ > 0) {
ts = ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_);
}
if (IsVisible(ikey_.sequence, ts, &more_recent)) {
// If the previous entry is of seqnum 0, the current entry will not
// possibly be skipped. This condition can potentially be relaxed to
Expand Down Expand Up @@ -285,7 +284,20 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
// 2) return ikey only if ikey.seqnum >= start_seqnum_
// note that if deletion seqnum is < start_seqnum_ we
// just skip it like in normal iterator.
if (start_seqnum_ > 0 && ikey_.sequence >= start_seqnum_) {
if (start_seqnum_ > 0) {
if (ikey_.sequence >= start_seqnum_) {
saved_key_.SetInternalKey(ikey_);
valid_ = true;
return true;
} else {
saved_key_.SetUserKey(
ikey_.user_key,
!pin_thru_lifetime_ ||
!iter_.iter()->IsKeyPinned() /* copy */);
skipping_saved_key = true;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
}
} else if (timestamp_lb_) {
saved_key_.SetInternalKey(ikey_);
valid_ = true;
return true;
Expand All @@ -300,10 +312,8 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
case kTypeValue:
case kTypeBlobIndex:
if (start_seqnum_ > 0) {
// we are taking incremental snapshot here
// incremental snapshots aren't supported on DB with range deletes
assert(ikey_.type != kTypeBlobIndex);
if (ikey_.sequence >= start_seqnum_) {
assert(ikey_.type != kTypeBlobIndex);
saved_key_.SetInternalKey(ikey_);
valid_ = true;
return true;
Expand All @@ -316,6 +326,10 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
!iter_.iter()->IsKeyPinned() /* copy */);
skipping_saved_key = true;
}
} else if (timestamp_lb_) {
saved_key_.SetInternalKey(ikey_);
valid_ = true;
return true;
} else {
saved_key_.SetUserKey(
ikey_.user_key, !pin_thru_lifetime_ ||
Expand Down
2 changes: 1 addition & 1 deletion db/db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class DBIter final : public Iterator {
bool Valid() const override { return valid_; }
Slice key() const override {
assert(valid_);
if (start_seqnum_ > 0) {
if (start_seqnum_ > 0 || timestamp_lb_) {
return saved_key_.GetInternalKey();
} else {
const Slice ukey_and_ts = saved_key_.GetUserKey();
Expand Down
144 changes: 112 additions & 32 deletions db/db_with_timestamp_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,15 @@ class DBBasicTestWithTimestampBase : public DBTestBase {
}

void CheckIterUserEntry(const Iterator* it, const Slice& expected_key,
ValueType expected_value_type,
const Slice& expected_value,
const Slice& expected_ts) const {
ASSERT_TRUE(it->Valid());
ASSERT_OK(it->status());
ASSERT_EQ(expected_key, it->key());
ASSERT_EQ(expected_value, it->value());
if (kTypeValue == expected_value_type) {
ASSERT_EQ(expected_value, it->value());
}
ASSERT_EQ(expected_ts, it->timestamp());
}

Expand All @@ -137,10 +140,30 @@ class DBBasicTestWithTimestampBase : public DBTestBase {
std::string ukey_and_ts;
ukey_and_ts.assign(expected_ukey.data(), expected_ukey.size());
ukey_and_ts.append(expected_ts.data(), expected_ts.size());
ParsedInternalKey parsed_ikey(ukey_and_ts, expected_seq, expected_val_type);
std::string ikey;
AppendInternalKey(&ikey, parsed_ikey);
ASSERT_EQ(Slice(ikey), it->key());
ParsedInternalKey parsed_ikey;
ASSERT_TRUE(ParseInternalKey(it->key(), &parsed_ikey));
ASSERT_EQ(ukey_and_ts, parsed_ikey.user_key);
ASSERT_EQ(expected_val_type, parsed_ikey.type);
ASSERT_EQ(expected_seq, parsed_ikey.sequence);
if (expected_val_type == kTypeValue) {
ASSERT_EQ(expected_value, it->value());
}
ASSERT_EQ(expected_ts, it->timestamp());
}

void CheckIterEntry(const Iterator* it, const Slice& expected_ukey,
ValueType expected_val_type, const Slice& expected_value,
const Slice& expected_ts) {
ASSERT_TRUE(it->Valid());
ASSERT_OK(it->status());
std::string ukey_and_ts;
ukey_and_ts.assign(expected_ukey.data(), expected_ukey.size());
ukey_and_ts.append(expected_ts.data(), expected_ts.size());

ParsedInternalKey parsed_ikey;
ASSERT_TRUE(ParseInternalKey(it->key(), &parsed_ikey));
ASSERT_EQ(expected_val_type, parsed_ikey.type);
ASSERT_EQ(Slice(ukey_and_ts), parsed_ikey.user_key);
if (expected_val_type == kTypeValue) {
ASSERT_EQ(expected_value, it->value());
}
Expand Down Expand Up @@ -188,8 +211,8 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) {
uint64_t key = 0;
for (it->Seek(Key1(0)), key = start_keys[i]; it->Valid();
it->Next(), ++count, ++key) {
CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i),
write_timestamps[i]);
CheckIterUserEntry(it.get(), Key1(key), kTypeValue,
"value" + std::to_string(i), write_timestamps[i]);
}
size_t expected_count = kMaxKey - start_keys[i] + 1;
ASSERT_EQ(expected_count, count);
Expand All @@ -208,8 +231,8 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) {
it.reset(db_->NewIterator(read_opts));
for (it->SeekToFirst(), key = std::max(l, start_keys[i]), count = 0;
it->Valid(); it->Next(), ++key, ++count) {
CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i),
write_timestamps[i]);
CheckIterUserEntry(it.get(), Key1(key), kTypeValue,
"value" + std::to_string(i), write_timestamps[i]);
}
ASSERT_EQ(r - std::max(l, start_keys[i]), count);
l += (kMaxKey / 100);
Expand All @@ -220,8 +243,8 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) {
}

TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) {
const int kNumKeysPerFile = 128;
const uint64_t kMaxKey = 1024;
constexpr int kNumKeysPerFile = 128;
constexpr uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
Expand Down Expand Up @@ -255,17 +278,47 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) {
int count = 0;
uint64_t key = 0;
for (it->Seek(Key1(0)), key = 0; it->Valid(); it->Next(), ++count, ++key) {
CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i),
write_timestamps[i]);
CheckIterEntry(it.get(), Key1(key), kTypeValue,
"value" + std::to_string(i), write_timestamps[i]);
if (i > 0) {
it->Next();
CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i - 1),
write_timestamps[i - 1]);
CheckIterEntry(it.get(), Key1(key), kTypeValue,
"value" + std::to_string(i - 1),
write_timestamps[i - 1]);
}
}
size_t expected_count = kMaxKey + 1;
ASSERT_EQ(expected_count, count);
}
// Delete all keys@ts=5 and check iteration result with start ts set
{
std::string write_timestamp = Timestamp(5, 0);
WriteOptions write_opts;
Slice write_ts = write_timestamp;
write_opts.timestamp = &write_ts;
for (uint64_t key = 0; key < kMaxKey + 1; ++key) {
Status s = db_->Delete(write_opts, Key1(key));
ASSERT_OK(s);
}

std::string read_timestamp = Timestamp(6, 0);
ReadOptions read_opts;
Slice read_ts = read_timestamp;
read_opts.timestamp = &read_ts;
std::string read_timestamp_lb = Timestamp(2, 0);
Slice read_ts_lb = read_timestamp_lb;
read_opts.iter_start_ts = &read_ts_lb;
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
int count = 0;
uint64_t key = 0;
for (it->Seek(Key1(0)), key = 0; it->Valid(); it->Next(), ++count, ++key) {
CheckIterEntry(it.get(), Key1(key), kTypeDeletionWithTimestamp, Slice(),
write_ts);
// Skip key@ts=3 and land on tombstone key@ts=5
it->Next();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this is to skip the Put with timestamp 3, right? If so, I think a comment saying so would be useful here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.

}
ASSERT_EQ(kMaxKey + 1, count);
}
Close();
}

Expand Down Expand Up @@ -296,34 +349,61 @@ TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) {
for (size_t i = 0; i != write_ts_list.size(); ++i) {
Slice write_ts = write_ts_list[i];
write_opts.timestamp = &write_ts;
uint64_t k = kMinKey;
do {
Status s = db_->Put(write_opts, Key1(k), "value" + std::to_string(i));
ASSERT_OK(s);
if (k == kMaxKey) {
break;
for (uint64_t k = kMaxKey; k >= kMinKey; --k) {
Status s;
if (k % 2) {
s = db_->Put(write_opts, Key1(k), "value" + std::to_string(i));
} else {
s = db_->Delete(write_opts, Key1(k));
}
++k;
} while (k != 0);
ASSERT_OK(s);
}
start_seqs.push_back(db_->GetLatestSequenceNumber());
}
std::vector<std::string> read_ts_list;
for (int t = 0; t != kNumTimestamps - 1; ++t) {
read_ts_list.push_back(Timestamp(2 * t + 3, /*do not care*/ 17));
}

ReadOptions read_opts;
// Scan with only read_opts.iter_start_seqnum set.
for (size_t i = 0; i != read_ts_list.size(); ++i) {
Slice read_ts = read_ts_list[i];
read_opts.timestamp = &read_ts;
read_opts.iter_start_seqnum = start_seqs[i];
read_opts.iter_start_seqnum = start_seqs[i] + 1;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
SequenceNumber expected_seq = start_seqs[i] + 1;
SequenceNumber expected_seq = start_seqs[i] + (kMaxKey - kMinKey) + 1;
uint64_t key = kMinKey;
for (iter->Seek(Key1(kMinKey)); iter->Valid(); iter->Next()) {
CheckIterEntry(iter.get(), Key1(key), expected_seq, kTypeValue,
CheckIterEntry(
iter.get(), Key1(key), expected_seq,
(key % 2) ? kTypeValue : kTypeDeletionWithTimestamp,
(key % 2) ? "value" + std::to_string(i + 1) : std::string(),
write_ts_list[i + 1]);
++key;
--expected_seq;
}
}
// Scan with both read_opts.iter_start_seqnum and read_opts.iter_start_ts set.
std::vector<std::string> read_ts_lb_list;
for (int t = 0; t < kNumTimestamps - 1; ++t) {
read_ts_lb_list.push_back(Timestamp(2 * t, /*do not care*/ 17));
}
for (size_t i = 0; i < read_ts_list.size(); ++i) {
Slice read_ts = read_ts_list[i];
Slice read_ts_lb = read_ts_lb_list[i];
read_opts.timestamp = &read_ts;
read_opts.iter_start_ts = &read_ts_lb;
read_opts.iter_start_seqnum = start_seqs[i] + 1;
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
uint64_t key = kMinKey;
SequenceNumber expected_seq = start_seqs[i] + (kMaxKey - kMinKey) + 1;
for (it->Seek(Key1(kMinKey)); it->Valid(); it->Next()) {
CheckIterEntry(it.get(), Key1(key), expected_seq,
(key % 2) ? kTypeValue : kTypeDeletionWithTimestamp,
"value" + std::to_string(i + 1), write_ts_list[i + 1]);
++key;
++expected_seq;
--expected_seq;
}
}
Close();
Expand Down Expand Up @@ -357,7 +437,7 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToTargetTimestamp) {
read_opts.timestamp = &ts;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
iter->SeekToFirst();
CheckIterUserEntry(iter.get(), "foo", "value0", ts_str);
CheckIterUserEntry(iter.get(), "foo", kTypeValue, "value0", ts_str);
ASSERT_EQ(
1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
}
Expand Down Expand Up @@ -403,7 +483,7 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) {
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
iter->Seek("a");
iter->Next();
CheckIterUserEntry(iter.get(), "b", "new_value", ts_str);
CheckIterUserEntry(iter.get(), "b", kTypeValue, "new_value", ts_str);
ASSERT_EQ(
1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
}
Expand Down Expand Up @@ -1049,8 +1129,8 @@ TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) {

// Seek to kMaxKey
iter->Seek(Key1(kMaxKey));
CheckIterUserEntry(iter.get(), Key1(kMaxKey), "value" + std::to_string(i),
write_ts_list[i]);
CheckIterUserEntry(iter.get(), Key1(kMaxKey), kTypeValue,
"value" + std::to_string(i), write_ts_list[i]);
iter->Next();
ASSERT_FALSE(iter->Valid());
}
Expand Down Expand Up @@ -1084,7 +1164,7 @@ TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) {
pe->Transform(saved_prev_key) != pe->Transform(start_key)) {
break;
}
CheckIterUserEntry(it.get(), Key1(expected_key),
CheckIterUserEntry(it.get(), Key1(expected_key), kTypeValue,
"value" + std::to_string(i), write_ts_list[i]);
++count;
++expected_key;
Expand Down