Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix MultiGet range deletion handling and a memory leak #10513

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2146,7 +2146,7 @@ class DBMultiGetAsyncIOTest : public DBBasicTest {

// Put all keys in the bottommost level, and overwrite some keys
// in L0 and L1
for (int i = 0; i < 128; ++i) {
for (int i = 0; i < 256; ++i) {
EXPECT_OK(Put(Key(i), "val_l2_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
Expand All @@ -2172,6 +2172,21 @@ class DBMultiGetAsyncIOTest : public DBBasicTest {
EXPECT_OK(Flush());
num_keys = 0;
}
// Put some range deletes in L1
for (int i = 128; i < 256; i += 32) {
std::string range_begin = Key(i);
std::string range_end = Key(i + 16);
EXPECT_OK(dbfull()->DeleteRange(WriteOptions(),
dbfull()->DefaultColumnFamily(),
range_begin, range_end));
// Also do some Puts to force creation of bloom filter
for (int j = i + 16; j < i + 32; ++j) {
if (j % 3 == 0) {
EXPECT_OK(Put(Key(j), "val_l1_" + std::to_string(j)));
}
}
EXPECT_OK(Flush());
}
MoveFilesToLevel(1);

for (int i = 0; i < 128; i += 5) {
Expand Down Expand Up @@ -2366,6 +2381,32 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) {
// Bloom filters in L0/L1 will avoid the coroutine calls in those levels
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
}

TEST_F(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) {
std::vector<std::string> key_strs;
std::vector<Slice> keys;
std::vector<PinnableSlice> values;
std::vector<Status> statuses;

// 139 and 163 are in L2, but overlap with a range deletes in L1
key_strs.push_back(Key(139));
key_strs.push_back(Key(163));
keys.push_back(key_strs[0]);
keys.push_back(key_strs[1]);
values.resize(keys.size());
statuses.resize(keys.size());

ReadOptions ro;
ro.async_io = true;
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values.size(), 2);
ASSERT_EQ(statuses[0], Status::NotFound());
ASSERT_EQ(statuses[1], Status::NotFound());

// Bloom filters in L0/L1 will avoid the coroutine calls in those levels
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
}
#endif // USE_COROUTINES

TEST_F(DBBasicTest, MultiGetStats) {
Expand Down
30 changes: 30 additions & 0 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,22 @@ Status TableCache::Get(
return s;
}

void TableCache::UpdateRangeTombstoneSeqnums(
const ReadOptions& options, TableReader* t,
MultiGetContext::Range& table_range) {
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
t->NewRangeTombstoneIterator(options));
if (range_del_iter != nullptr) {
for (auto iter = table_range.begin(); iter != table_range.end(); ++iter) {
SequenceNumber* max_covering_tombstone_seq =
iter->get_context->max_covering_tombstone_seq();
*max_covering_tombstone_seq = std::max(
*max_covering_tombstone_seq,
range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey_with_ts));
}
}
}

Status TableCache::MultiGetFilter(
const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
Expand All @@ -524,6 +540,8 @@ Status TableCache::MultiGetFilter(
Status s;
TableReader* t = fd.table_reader;
Cache::Handle* handle = nullptr;
MultiGetContext::Range tombstone_range(*mget_range, mget_range->begin(),
mget_range->end());
if (t == nullptr) {
s = FindTable(
options, file_options_, internal_comparator, fd, &handle,
Expand All @@ -539,6 +557,18 @@ Status TableCache::MultiGetFilter(
if (s.ok()) {
s = t->MultiGetFilter(options, prefix_extractor.get(), mget_range);
}
if (mget_range->empty()) {
if (s.ok() && !options.ignore_range_deletions) {
// If all the keys have been filtered out by the bloom filter, then
// update the range tombstone sequence numbers for the keys as
// MultiGet() will not be called for this set of keys.
UpdateRangeTombstoneSeqnums(options, t, tombstone_range);
}
if (handle) {
ReleaseHandle(handle);
*table_handle = nullptr;
}
}

return s;
}
Expand Down
5 changes: 5 additions & 0 deletions db/table_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ class TableCache {
size_t max_file_size_for_l0_meta_pin = 0,
Temperature file_temperature = Temperature::kUnknown);

// Update the max_covering_tombstone_seq in the GetContext for each key based
// on the range deletions in the table
void UpdateRangeTombstoneSeqnums(const ReadOptions& options, TableReader* t,
MultiGetContext::Range& table_range);

// Create a key prefix for looking up the row cache. The prefix is of the
// format row_cache_id + fd_number + seq_no. Later, the user key can be
// appended to form the full key
Expand Down
13 changes: 1 addition & 12 deletions db/table_cache_sync_and_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,7 @@ DEFINE_SYNC_AND_ASYNC(Status, TableCache::MultiGet)
}
}
if (s.ok() && !options.ignore_range_deletions) {
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
t->NewRangeTombstoneIterator(options));
if (range_del_iter != nullptr) {
for (auto iter = table_range.begin(); iter != table_range.end();
++iter) {
SequenceNumber* max_covering_tombstone_seq =
iter->get_context->max_covering_tombstone_seq();
*max_covering_tombstone_seq = std::max(
*max_covering_tombstone_seq,
range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey_with_ts));
}
}
UpdateRangeTombstoneSeqnums(options, t, table_range);
}
if (s.ok()) {
CO_AWAIT(t->MultiGet)
Expand Down