Skip to content

Commit

Permalink
Trigger read compaction only if seeks to storage are incurred.
Browse files Browse the repository at this point in the history
Summary:
In the current code, a Get() call can trigger compaction if it has to look at more than one file. This causes unnecessary compaction because looking at more than one file is a penalty only if the file is not yet in the cache. Also, th current code counts these files before the bloom filter check is applied.

This patch counts a 'seek' only if the file fails the bloom filter
check and has to read in data block(s) from the storage.

This patch also counts a 'seek' if a file is not present in the file-cache, because opening a file means that its index blocks need to be read into cache.

Test Plan: unit test attached. I will probably add one more unti tests.

Reviewers: heyongqiang

Reviewed By: heyongqiang

CC: MarkCallaghan

Differential Revision: https://reviews.facebook.net/D5709
  • Loading branch information
dhruba committed Sep 28, 2012
1 parent 92368ab commit c1bb32e
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 21 deletions.
66 changes: 65 additions & 1 deletion db/db_test.cc
Expand Up @@ -592,7 +592,7 @@ TEST(DBTest, GetEncountersEmptyLevel) {
// Step 4: Wait for compaction to finish
env_->SleepForMicroseconds(1000000);

ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(0), 1); // XXX
} while (ChangeOptions());
}

Expand Down Expand Up @@ -1817,6 +1817,70 @@ TEST(DBTest, SnapshotFiles) {
dbfull()->DisableFileDeletions();
}


TEST(DBTest, ReadCompaction) {
std::string value(4096, '4'); // a string of size 4K
{
Options options = CurrentOptions();
options.create_if_missing = true;
options.max_open_files = 20; // only 10 file in file-cache
options.target_file_size_base = 512;
options.write_buffer_size = 64 * 1024;
options.filter_policy = NULL;
options.block_size = 4096;
options.block_cache = NewLRUCache(0); // Prevent cache hits

Reopen(&options);

// Write 8MB (2000 values, each 4K)
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
std::vector<std::string> values;
for (int i = 0; i < 2000; i++) {
ASSERT_OK(Put(Key(i), value));
}

// clear level 0 and 1 if necessary.
dbfull()->TEST_CompactMemTable();
dbfull()->TEST_CompactRange(0, NULL, NULL);
dbfull()->TEST_CompactRange(1, NULL, NULL);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);

// write some new keys into level 0
for (int i = 0; i < 2000; i = i + 16) {
ASSERT_OK(Put(Key(i), value));
}
dbfull()->Flush(FlushOptions());

// Wait for any write compaction to finish
dbfull()->TEST_WaitForCompact();

// remember number of files in each level
int l1 = NumTableFilesAtLevel(0);
int l2 = NumTableFilesAtLevel(1);
int l3 = NumTableFilesAtLevel(3);
ASSERT_NE(NumTableFilesAtLevel(0), 0);
ASSERT_NE(NumTableFilesAtLevel(1), 0);
ASSERT_NE(NumTableFilesAtLevel(2), 0);

// read a bunch of times, trigger read compaction
for (int j = 0; j < 100; j++) {
for (int i = 0; i < 2000; i++) {
Get(Key(i));
}
}
// wait for read compaction to finish
env_->SleepForMicroseconds(1000000);

// verify that the number of files have decreased
// in some level, indicating that there was a compaction
ASSERT_TRUE(NumTableFilesAtLevel(0) < l1 ||
NumTableFilesAtLevel(1) < l2 ||
NumTableFilesAtLevel(2) < l3);
delete options.block_cache;
}
}

// Multi-threaded test:
namespace {

Expand Down
10 changes: 7 additions & 3 deletions db/table_cache.cc
Expand Up @@ -48,14 +48,17 @@ TableCache::~TableCache() {
}

Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
Cache::Handle** handle) {
Cache::Handle** handle, bool* tableIO) {
Status s;
char buf[sizeof(file_number)];
EncodeFixed64(buf, file_number);
Slice key(buf, sizeof(buf));
DBStatistics* stats = (DBStatistics*) options_->statistics;
*handle = cache_->Lookup(key);
if (*handle == NULL) {
if (tableIO != NULL) {
*tableIO = true; // we had to do IO from storage
}
std::string fname = TableFileName(dbname_, file_number);
RandomAccessFile* file = NULL;
Table* table = NULL;
Expand Down Expand Up @@ -109,9 +112,10 @@ Status TableCache::Get(const ReadOptions& options,
uint64_t file_size,
const Slice& k,
void* arg,
void (*saver)(void*, const Slice&, const Slice&)) {
void (*saver)(void*, const Slice&, const Slice&, bool),
bool* tableIO) {
Cache::Handle* handle = NULL;
Status s = FindTable(file_number, file_size, &handle);
Status s = FindTable(file_number, file_size, &handle, tableIO);
if (s.ok()) {
Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
s = t->InternalGet(options, k, arg, saver);
Expand Down
6 changes: 4 additions & 2 deletions db/table_cache.h
Expand Up @@ -42,7 +42,8 @@ class TableCache {
uint64_t file_size,
const Slice& k,
void* arg,
void (*handle_result)(void*, const Slice&, const Slice&));
void (*handle_result)(void*, const Slice&, const Slice&, bool),
bool* tableIO);

// Evict any entry for the specified file number
void Evict(uint64_t file_number);
Expand All @@ -53,7 +54,8 @@ class TableCache {
const Options* options_;
Cache* cache_;

Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**);
Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**,
bool* tableIO = NULL);
};

} // namespace leveldb
Expand Down
35 changes: 25 additions & 10 deletions db/version_set.cc
Expand Up @@ -243,11 +243,13 @@ struct Saver {
const Comparator* ucmp;
Slice user_key;
std::string* value;
bool didIO; // did we do any disk io?
};
}
static void SaveValue(void* arg, const Slice& ikey, const Slice& v) {
static void SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
Saver* s = reinterpret_cast<Saver*>(arg);
ParsedInternalKey parsed_key;
s->didIO = didIO;
if (!ParseInternalKey(ikey, &parsed_key)) {
s->state = kCorrupt;
} else {
Expand Down Expand Up @@ -335,26 +337,39 @@ Status Version::Get(const ReadOptions& options,
}

for (uint32_t i = 0; i < num_files; ++i) {
if (last_file_read != NULL && stats->seek_file == NULL) {
// We have had more than one seek for this read. Charge the 1st file.
stats->seek_file = last_file_read;
stats->seek_file_level = last_file_read_level;
}

FileMetaData* f = files[i];
last_file_read = f;
last_file_read_level = level;

Saver saver;
saver.state = kNotFound;
saver.ucmp = ucmp;
saver.user_key = user_key;
saver.value = value;
saver.didIO = false;
bool tableIO = false;
s = vset_->table_cache_->Get(options, f->number, f->file_size,
ikey, &saver, SaveValue);
ikey, &saver, SaveValue, &tableIO);
if (!s.ok()) {
return s;
}

if (last_file_read != NULL && stats->seek_file == NULL) {
// We have had more than one seek for this read. Charge the 1st file.
stats->seek_file = last_file_read;
stats->seek_file_level = last_file_read_level;
}

// If we did any IO as part of the read, then we remember it because
// it is a possible candidate for seek-based compaction. saver.didIO
// is true if the block had to be read in from storage and was not
// pre-exisiting in the block cache. Also, if this file was not pre-
// existing in the table cache and had to be freshly opened that needed
// the index blocks to be read-in, then tableIO is true. One thing
// to note is that the index blocks are not part of the block cache.
if (saver.didIO || tableIO) {
last_file_read = f;
last_file_read_level = level;
}

switch (saver.state) {
case kNotFound:
break; // Keep searching in other files
Expand Down
4 changes: 3 additions & 1 deletion include/leveldb/table.h
Expand Up @@ -61,6 +61,8 @@ class Table {

explicit Table(Rep* rep) { rep_ = rep; }
static Iterator* BlockReader(void*, const ReadOptions&, const Slice&);
static Iterator* BlockReader(void*, const ReadOptions&, const Slice&,
bool* didIO);

// Calls (*handle_result)(arg, ...) with the entry found after a call
// to Seek(key). May not make such a call if filter policy says
Expand All @@ -69,7 +71,7 @@ class Table {
Status InternalGet(
const ReadOptions&, const Slice& key,
void* arg,
void (*handle_result)(void* arg, const Slice& k, const Slice& v));
void (*handle_result)(void* arg, const Slice& k, const Slice& v, bool));


void ReadMeta(const Footer& footer);
Expand Down
20 changes: 16 additions & 4 deletions table/table.cc
Expand Up @@ -153,7 +153,8 @@ static void ReleaseBlock(void* arg, void* h) {
// into an iterator over the contents of the corresponding block.
Iterator* Table::BlockReader(void* arg,
const ReadOptions& options,
const Slice& index_value) {
const Slice& index_value,
bool* didIO) {
Table* table = reinterpret_cast<Table*>(arg);
Cache* block_cache = table->rep_->options.block_cache;
Block* block = NULL;
Expand Down Expand Up @@ -184,6 +185,9 @@ Iterator* Table::BlockReader(void* arg,
key, block, block->size(), &DeleteCachedBlock);
}
}
if (didIO != NULL) {
*didIO = true; // we did some io from storage
}
}
} else {
s = ReadBlock(table->rep_->file, options, handle, &contents);
Expand All @@ -207,6 +211,12 @@ Iterator* Table::BlockReader(void* arg,
return iter;
}

Iterator* Table::BlockReader(void* arg,
const ReadOptions& options,
const Slice& index_value) {
return BlockReader(arg, options, index_value, NULL);
}

Iterator* Table::NewIterator(const ReadOptions& options) const {
return NewTwoLevelIterator(
rep_->index_block->NewIterator(rep_->options.comparator),
Expand All @@ -215,7 +225,7 @@ Iterator* Table::NewIterator(const ReadOptions& options) const {

Status Table::InternalGet(const ReadOptions& options, const Slice& k,
void* arg,
void (*saver)(void*, const Slice&, const Slice&)) {
void (*saver)(void*, const Slice&, const Slice&, bool)) {
Status s;
Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);
iiter->Seek(k);
Expand All @@ -229,10 +239,12 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k,
// Not found
} else {
Slice handle = iiter->value();
Iterator* block_iter = BlockReader(this, options, iiter->value());
bool didIO = false;
Iterator* block_iter = BlockReader(this, options, iiter->value(),
&didIO);
block_iter->Seek(k);
if (block_iter->Valid()) {
(*saver)(arg, block_iter->key(), block_iter->value());
(*saver)(arg, block_iter->key(), block_iter->value(), didIO);
}
s = block_iter->status();
delete block_iter;
Expand Down

0 comments on commit c1bb32e

Please sign in to comment.