Skip to content

Commit

Permalink
Add a unit test to verify iterators release data blocks after using t…
Browse files Browse the repository at this point in the history
…hem (#4170)

Summary:
Add a unit test to check that iterators release data blocks after it has moved away from it. Verify the same for compaction input iterators.
Pull Request resolved: #4170

Differential Revision: D8962513

Pulled By: siying

fbshipit-source-id: 05a5b604d7d29887fb488f2cda7286f554a14407
  • Loading branch information
siying authored and facebook-github-bot committed Aug 14, 2018
1 parent 999d955 commit f3d91a0
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 1 deletion.
2 changes: 2 additions & 0 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,8 @@ Status CompactionJob::Run() {
compaction_stats_.micros = env_->NowMicros() - start_micros;
MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros);

TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");

// Check if any thread encountered an error during execution
Status status;
for (const auto& state : compact_->sub_compact_states) {
Expand Down
104 changes: 104 additions & 0 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2649,6 +2649,110 @@ TEST_F(DBTest2, PinnableSliceAndMmapReads) {
#endif
}

TEST_F(DBTest2, IteratorPinnedMemory) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.no_block_cache = false;
bbto.cache_index_and_filter_blocks = false;
bbto.block_cache = NewLRUCache(100000);
bbto.block_size = 400; // small block size
options.table_factory.reset(new BlockBasedTableFactory(bbto));
Reopen(options);

Random rnd(301);
std::string v = RandomString(&rnd, 400);

// Since v is the size of a block, each key should take a block
// of 400+ bytes.
Put("1", v);
Put("3", v);
Put("5", v);
Put("7", v);
ASSERT_OK(Flush());

ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage());

// Verify that iterators don't pin more than one data block in block cache
// at each time.
{
unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
iter->SeekToFirst();

for (int i = 0; i < 4; i++) {
ASSERT_TRUE(iter->Valid());
// Block cache should contain exactly one block.
ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
iter->Next();
}
ASSERT_FALSE(iter->Valid());

iter->Seek("4");
ASSERT_TRUE(iter->Valid());

ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);

iter->Seek("3");
ASSERT_TRUE(iter->Valid());

ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
}
ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage());

// Test compaction case
Put("2", v);
Put("5", v);
Put("6", v);
Put("8", v);
ASSERT_OK(Flush());

// Clear existing data in block cache
bbto.block_cache->SetCapacity(0);
bbto.block_cache->SetCapacity(100000);

// Verify compaction input iterators don't hold more than one data blocks at
// one time.
std::atomic<bool> finished(false);
std::atomic<int> block_newed(0);
std::atomic<int> block_destroyed(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"Block::Block:0", [&](void* /*arg*/) {
if (finished) {
return;
}
// Two iterators. At most 2 outstanding blocks.
EXPECT_GE(block_newed.load(), block_destroyed.load());
EXPECT_LE(block_newed.load(), block_destroyed.load() + 1);
block_newed.fetch_add(1);
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"Block::~Block", [&](void* /*arg*/) {
if (finished) {
return;
}
// Two iterators. At most 2 outstanding blocks.
EXPECT_GE(block_newed.load(), block_destroyed.load() + 1);
EXPECT_LE(block_newed.load(), block_destroyed.load() + 2);
block_destroyed.fetch_add(1);
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run:BeforeVerify",
[&](void* /*arg*/) { finished = true; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));

// Two input files. Each of them has 4 data blocks.
ASSERT_EQ(8, block_newed.load());
ASSERT_EQ(8, block_destroyed.load());

rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBTest2, TestBBTTailPrefetch) {
std::atomic<bool> called(false);
size_t expected_lower_bound = 512 * 1024;
Expand Down
3 changes: 3 additions & 0 deletions table/block.cc
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,8 @@ uint32_t Block::NumRestarts() const {
return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
}

Block::~Block() { TEST_SYNC_POINT("Block::~Block"); }

Block::Block(BlockContents&& contents, SequenceNumber _global_seqno,
size_t read_amp_bytes_per_bit, Statistics* statistics)
: contents_(std::move(contents)),
Expand All @@ -619,6 +621,7 @@ Block::Block(BlockContents&& contents, SequenceNumber _global_seqno,
restart_offset_(0),
num_restarts_(0),
global_seqno_(_global_seqno) {
TEST_SYNC_POINT("Block::Block:0");
if (size_ < sizeof(uint32_t)) {
size_ = 0; // Error marker
} else {
Expand Down
2 changes: 1 addition & 1 deletion table/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class Block {
size_t read_amp_bytes_per_bit = 0,
Statistics* statistics = nullptr);

~Block() = default;
~Block();

size_t size() const { return size_; }
const char* data() const { return data_; }
Expand Down

0 comments on commit f3d91a0

Please sign in to comment.