Skip to content

Commit

Permalink
Squashed 'src/leveldb/' changes from be1b0ff..be91006
Browse files Browse the repository at this point in the history
936b461 Merge upstream LevelDB 1.13.
748539c LevelDB 1.13

git-subtree-dir: src/leveldb
git-subtree-split: be9100673b05cec1662a54d0b7a59e4317fdda86
  • Loading branch information
laanwj authored and gavinandresen committed Nov 28, 2013
1 parent e4be09c commit 83efc91
Show file tree
Hide file tree
Showing 13 changed files with 363 additions and 67 deletions.
118 changes: 118 additions & 0 deletions db/autocompact_test.cc
@@ -0,0 +1,118 @@
// Copyright (c) 2013 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "leveldb/db.h"
#include "db/db_impl.h"
#include "leveldb/cache.h"
#include "util/testharness.h"
#include "util/testutil.h"

namespace leveldb {

class AutoCompactTest {
public:
std::string dbname_;
Cache* tiny_cache_;
Options options_;
DB* db_;

AutoCompactTest() {
dbname_ = test::TmpDir() + "/autocompact_test";
tiny_cache_ = NewLRUCache(100);
options_.block_cache = tiny_cache_;
DestroyDB(dbname_, options_);
options_.create_if_missing = true;
options_.compression = kNoCompression;
ASSERT_OK(DB::Open(options_, dbname_, &db_));
}

~AutoCompactTest() {
delete db_;
DestroyDB(dbname_, Options());
delete tiny_cache_;
}

std::string Key(int i) {
char buf[100];
snprintf(buf, sizeof(buf), "key%06d", i);
return std::string(buf);
}

uint64_t Size(const Slice& start, const Slice& limit) {
Range r(start, limit);
uint64_t size;
db_->GetApproximateSizes(&r, 1, &size);
return size;
}

void DoReads(int n);
};

static const int kValueSize = 200 * 1024;
static const int kTotalSize = 100 * 1024 * 1024;
static const int kCount = kTotalSize / kValueSize;

// Read through the first n keys repeatedly and check that they get
// compacted (verified by checking the size of the key space).
void AutoCompactTest::DoReads(int n) {
std::string value(kValueSize, 'x');
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);

// Fill database
for (int i = 0; i < kCount; i++) {
ASSERT_OK(db_->Put(WriteOptions(), Key(i), value));
}
ASSERT_OK(dbi->TEST_CompactMemTable());

// Delete everything
for (int i = 0; i < kCount; i++) {
ASSERT_OK(db_->Delete(WriteOptions(), Key(i)));
}
ASSERT_OK(dbi->TEST_CompactMemTable());

// Get initial measurement of the space we will be reading.
const int64_t initial_size = Size(Key(0), Key(n));
const int64_t initial_other_size = Size(Key(n), Key(kCount));

// Read until size drops significantly.
std::string limit_key = Key(n);
for (int read = 0; true; read++) {
ASSERT_LT(read, 100) << "Taking too long to compact";
Iterator* iter = db_->NewIterator(ReadOptions());
for (iter->SeekToFirst();
iter->Valid() && iter->key().ToString() < limit_key;
iter->Next()) {
// Drop data
}
delete iter;
// Wait a little bit to allow any triggered compactions to complete.
Env::Default()->SleepForMicroseconds(1000000);
uint64_t size = Size(Key(0), Key(n));
fprintf(stderr, "iter %3d => %7.3f MB [other %7.3f MB]\n",
read+1, size/1048576.0, Size(Key(n), Key(kCount))/1048576.0);
if (size <= initial_size/10) {
break;
}
}

// Verify that the size of the key space not touched by the reads
// is pretty much unchanged.
const int64_t final_other_size = Size(Key(n), Key(kCount));
ASSERT_LE(final_other_size, initial_other_size + 1048576);
ASSERT_GE(final_other_size, initial_other_size/5 - 1048576);
}

TEST(AutoCompactTest, ReadAll) {
DoReads(kCount);
}

TEST(AutoCompactTest, ReadHalf) {
DoReads(kCount/2);
}

} // namespace leveldb

int main(int argc, char** argv) {
return leveldb::test::RunAllTests();
}
6 changes: 5 additions & 1 deletion src/leveldb/Makefile
Expand Up @@ -31,6 +31,7 @@ TESTHARNESS = ./util/testharness.o $(TESTUTIL)

TESTS = \
arena_test \
autocompact_test \
bloom_test \
c_test \
cache_test \
Expand Down Expand Up @@ -70,7 +71,7 @@ SHARED = $(SHARED1)
else
# Update db.h if you change these.
SHARED_MAJOR = 1
SHARED_MINOR = 12
SHARED_MINOR = 13
SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT)
SHARED2 = $(SHARED1).$(SHARED_MAJOR)
SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR)
Expand Down Expand Up @@ -114,6 +115,9 @@ leveldbutil: db/leveldb_main.o $(LIBOBJECTS)
arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)

autocompact_test: db/autocompact_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/autocompact_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)

bloom_test: util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)

Expand Down
51 changes: 22 additions & 29 deletions src/leveldb/db/corruption_test.cc
Expand Up @@ -35,6 +35,7 @@ class CorruptionTest {
CorruptionTest() {
tiny_cache_ = NewLRUCache(100);
options_.env = &env_;
options_.block_cache = tiny_cache_;
dbname_ = test::TmpDir() + "/db_test";
DestroyDB(dbname_, options_);

Expand All @@ -50,17 +51,14 @@ class CorruptionTest {
delete tiny_cache_;
}

Status TryReopen(Options* options = NULL) {
Status TryReopen() {
delete db_;
db_ = NULL;
Options opt = (options ? *options : options_);
opt.env = &env_;
opt.block_cache = tiny_cache_;
return DB::Open(opt, dbname_, &db_);
return DB::Open(options_, dbname_, &db_);
}

void Reopen(Options* options = NULL) {
ASSERT_OK(TryReopen(options));
void Reopen() {
ASSERT_OK(TryReopen());
}

void RepairDB() {
Expand Down Expand Up @@ -92,6 +90,10 @@ class CorruptionTest {
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
uint64_t key;
Slice in(iter->key());
if (in == "" || in == "~") {
// Ignore boundary keys.
continue;
}
if (!ConsumeDecimalNumber(&in, &key) ||
!in.empty() ||
key < next_expected) {
Expand Down Expand Up @@ -233,7 +235,7 @@ TEST(CorruptionTest, TableFile) {
dbi->TEST_CompactRange(1, NULL, NULL);

Corrupt(kTableFile, 100, 1);
Check(99, 99);
Check(90, 99);
}

TEST(CorruptionTest, TableFileIndexData) {
Expand Down Expand Up @@ -299,40 +301,31 @@ TEST(CorruptionTest, CompactionInputError) {
ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last)));

Corrupt(kTableFile, 100, 1);
Check(9, 9);
Check(5, 9);

// Force compactions by writing lots of values
Build(10000);
Check(10000, 10000);
}

TEST(CorruptionTest, CompactionInputErrorParanoid) {
Options options;
options.paranoid_checks = true;
options.write_buffer_size = 1048576;
Reopen(&options);
options_.paranoid_checks = true;
options_.write_buffer_size = 512 << 10;
Reopen();
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);

// Fill levels >= 1 so memtable compaction outputs to level 1
for (int level = 1; level < config::kNumLevels; level++) {
dbi->Put(WriteOptions(), "", "begin");
dbi->Put(WriteOptions(), "~", "end");
// Make multiple inputs so we need to compact.
for (int i = 0; i < 2; i++) {
Build(10);
dbi->TEST_CompactMemTable();
Corrupt(kTableFile, 100, 1);
env_.SleepForMicroseconds(100000);
}
dbi->CompactRange(NULL, NULL);

Build(10);
dbi->TEST_CompactMemTable();
ASSERT_EQ(1, Property("leveldb.num-files-at-level0"));

Corrupt(kTableFile, 100, 1);
Check(9, 9);

// Write must eventually fail because of corrupted table
Status s;
// Write must fail because of corrupted table
std::string tmp1, tmp2;
for (int i = 0; i < 10000 && s.ok(); i++) {
s = db_->Put(WriteOptions(), Key(i, &tmp1), Value(i, &tmp2));
}
Status s = db_->Put(WriteOptions(), Key(5, &tmp1), Value(5, &tmp2));
ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db";
}

Expand Down
41 changes: 27 additions & 14 deletions src/leveldb/db/db_impl.cc
Expand Up @@ -113,14 +113,14 @@ Options SanitizeOptions(const std::string& dbname,
return result;
}

DBImpl::DBImpl(const Options& options, const std::string& dbname)
: env_(options.env),
internal_comparator_(options.comparator),
internal_filter_policy_(options.filter_policy),
options_(SanitizeOptions(
dbname, &internal_comparator_, &internal_filter_policy_, options)),
owns_info_log_(options_.info_log != options.info_log),
owns_cache_(options_.block_cache != options.block_cache),
DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
: env_(raw_options.env),
internal_comparator_(raw_options.comparator),
internal_filter_policy_(raw_options.filter_policy),
options_(SanitizeOptions(dbname, &internal_comparator_,
&internal_filter_policy_, raw_options)),
owns_info_log_(options_.info_log != raw_options.info_log),
owns_cache_(options_.block_cache != raw_options.block_cache),
dbname_(dbname),
db_lock_(NULL),
shutting_down_(NULL),
Expand All @@ -130,6 +130,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
logfile_(NULL),
logfile_number_(0),
log_(NULL),
seed_(0),
tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false),
manual_compaction_(NULL),
Expand All @@ -138,7 +139,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
has_imm_.Release_Store(NULL);

// Reserve ten files or so for other uses and give the rest to TableCache.
const int table_cache_size = options.max_open_files - kNumNonTableCacheFiles;
const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles;
table_cache_ = new TableCache(dbname_, &options_, table_cache_size);

versions_ = new VersionSet(dbname_, &options_, table_cache_,
Expand Down Expand Up @@ -1027,7 +1028,8 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
} // namespace

Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot) {
SequenceNumber* latest_snapshot,
uint32_t* seed) {
IterState* cleanup = new IterState;
mutex_.Lock();
*latest_snapshot = versions_->LastSequence();
Expand All @@ -1051,13 +1053,15 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
cleanup->version = versions_->current();
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);

*seed = ++seed_;
mutex_.Unlock();
return internal_iter;
}

Iterator* DBImpl::TEST_NewInternalIterator() {
SequenceNumber ignored;
return NewInternalIterator(ReadOptions(), &ignored);
uint32_t ignored_seed;
return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
}

int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
Expand Down Expand Up @@ -1114,12 +1118,21 @@ Status DBImpl::Get(const ReadOptions& options,

Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
uint32_t seed;
Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
return NewDBIterator(
&dbname_, env_, user_comparator(), internal_iter,
this, user_comparator(), iter,
(options.snapshot != NULL
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot));
: latest_snapshot),
seed);
}

void DBImpl::RecordReadSample(Slice key) {
MutexLock l(&mutex_);
if (versions_->current()->RecordReadSample(key)) {
MaybeScheduleCompaction();
}
}

const Snapshot* DBImpl::GetSnapshot() {
Expand Down
9 changes: 8 additions & 1 deletion src/leveldb/db/db_impl.h
Expand Up @@ -59,13 +59,19 @@ class DBImpl : public DB {
// file at a level >= 1.
int64_t TEST_MaxNextLevelOverlappingBytes();

// Record a sample of bytes read at the specified internal key.
// Samples are taken approximately once every config::kReadBytesPeriod
// bytes.
void RecordReadSample(Slice key);

private:
friend class DB;
struct CompactionState;
struct Writer;

Iterator* NewInternalIterator(const ReadOptions&,
SequenceNumber* latest_snapshot);
SequenceNumber* latest_snapshot,
uint32_t* seed);

Status NewDB();

Expand Down Expand Up @@ -135,6 +141,7 @@ class DBImpl : public DB {
WritableFile* logfile_;
uint64_t logfile_number_;
log::Writer* log_;
uint32_t seed_; // For sampling.

// Queue of writers.
std::deque<Writer*> writers_;
Expand Down

0 comments on commit 83efc91

Please sign in to comment.