Permalink
Browse files

Bugfixes: for Get(), don't hold mutex while writing log.

- Fix bug in Get: when it triggers a compaction, it could sometimes
  mark the compaction with the wrong level (if there was a gap
  in the set of levels examined for the Get).

- Do not hold mutex while writing to the log file or to the
  MANIFEST file.

  Added a new benchmark that runs a writer thread concurrently with
  reader threads.

  Percentiles
  ------------------------------
  micros/op: avg  median 99   99.9  99.99  99.999 max
  ------------------------------------------------------
  before:    42   38     110  225   32000  42000  48000
  after:     24   20     55   65    130    1100   7000

- Fixed race in optimized Get.  It should have been using the
  pinned memtables, not the current memtables.



git-svn-id: https://leveldb.googlecode.com/svn/trunk@50 62dab493-f737-651d-591e-8d6aee1b9529
  • Loading branch information...
1 parent e3584f9 commit 72630236513e7384cb0a2e8fffcae232135a5adc gabor@google.com committed Sep 1, 2011
Showing with 186 additions and 39 deletions.
  1. +40 −5 db/db_bench.cc
  2. +55 −14 db/db_impl.cc
  3. +7 −0 db/db_impl.h
  4. +41 −2 db/db_test.cc
  5. +33 −13 db/version_set.cc
  6. +10 −5 db/version_set.h
View
@@ -280,6 +280,7 @@ struct ThreadState {
int tid; // 0..n-1 when running in n threads
Random rand; // Has different seeds for different threads
Stats stats;
+ SharedState* shared;
ThreadState(int index)
: tid(index),
@@ -418,13 +419,14 @@ class Benchmark {
// Reset parameters that may be overriddden bwlow
num_ = FLAGS_num;
- reads_ = num_;
+ reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
value_size_ = FLAGS_value_size;
entries_per_batch_ = 1;
write_options_ = WriteOptions();
void (Benchmark::*method)(ThreadState*) = NULL;
bool fresh_db = false;
+ int num_threads = FLAGS_threads;
if (name == Slice("fillseq")) {
fresh_db = true;
@@ -460,6 +462,9 @@ class Benchmark {
} else if (name == Slice("readrandomsmall")) {
reads_ /= 1000;
method = &Benchmark::ReadRandom;
+ } else if (name == Slice("readwhilewriting")) {
+ num_threads++; // Add extra thread for writing
+ method = &Benchmark::ReadWhileWriting;
} else if (name == Slice("compact")) {
method = &Benchmark::Compact;
} else if (name == Slice("crc32c")) {
@@ -494,7 +499,7 @@ class Benchmark {
}
if (method != NULL) {
- RunBenchmark(name, method);
+ RunBenchmark(num_threads, name, method);
}
}
}
@@ -535,8 +540,8 @@ class Benchmark {
}
}
- void RunBenchmark(Slice name, void (Benchmark::*method)(ThreadState*)) {
- const int n = FLAGS_threads;
+ void RunBenchmark(int n, Slice name,
+ void (Benchmark::*method)(ThreadState*)) {
SharedState shared;
shared.total = n;
shared.num_initialized = 0;
@@ -549,6 +554,7 @@ class Benchmark {
arg[i].method = method;
arg[i].shared = &shared;
arg[i].thread = new ThreadState(i);
+ arg[i].thread->shared = &shared;
Env::Default()->StartThread(ThreadBody, &arg[i]);
}
@@ -688,7 +694,6 @@ class Benchmark {
RandomGenerator gen;
WriteBatch batch;
Status s;
- std::string val;
int64_t bytes = 0;
for (int i = 0; i < num_; i += entries_per_batch_) {
batch.Clear();
@@ -760,6 +765,36 @@ class Benchmark {
}
}
+ void ReadWhileWriting(ThreadState* thread) {
+ if (thread->tid > 0) {
+ ReadRandom(thread);
+ } else {
+ // Special thread that keeps writing until other threads are done.
+ RandomGenerator gen;
+ while (true) {
+ {
+ MutexLock l(&thread->shared->mu);
+ if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
+ // Other threads have finished
+ break;
+ }
+ }
+
+ const int k = thread->rand.Next() % FLAGS_num;
+ char key[100];
+ snprintf(key, sizeof(key), "%016d", k);
+ Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
+ if (!s.ok()) {
+ fprintf(stderr, "put error: %s\n", s.ToString().c_str());
+ exit(1);
+ }
+ }
+
+ // Do not count any of the preceding work/delay in stats.
+ thread->stats.Start();
+ }
+ }
+
void Compact(ThreadState* thread) {
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable();
View
@@ -113,6 +113,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
logfile_(NULL),
logfile_number_(0),
log_(NULL),
+ logger_(NULL),
+ logger_cv_(&mutex_),
bg_compaction_scheduled_(false),
manual_compaction_(NULL) {
mem_->Ref();
@@ -308,6 +310,11 @@ Status DBImpl::Recover(VersionEdit* edit) {
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], edit, &max_sequence);
+
+ // The previous incarnation may not have written any MANIFEST
+ // records after allocating this log number. So we manually
+ // update the file number allocation counter in VersionSet.
+ versions_->MarkFileNumberUsed(logs[i]);
}
if (s.ok()) {
@@ -485,7 +492,7 @@ Status DBImpl::CompactMemTable() {
if (s.ok()) {
edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
- s = versions_->LogAndApply(&edit);
+ s = versions_->LogAndApply(&edit, &mutex_);
}
if (s.ok()) {
@@ -523,7 +530,10 @@ void DBImpl::TEST_CompactRange(
Status DBImpl::TEST_CompactMemTable() {
MutexLock l(&mutex_);
+ LoggerId self;
+ AcquireLoggingResponsibility(&self);
Status s = MakeRoomForWrite(true /* force compaction */);
+ ReleaseLoggingResponsibility(&self);
if (s.ok()) {
// Wait until the compaction completes
while (imm_ != NULL && bg_error_.ok()) {
@@ -600,7 +610,7 @@ void DBImpl::BackgroundCompaction() {
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
- status = versions_->LogAndApply(c->edit());
+ status = versions_->LogAndApply(c->edit(), &mutex_);
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number),
@@ -748,7 +758,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
}
compact->outputs.clear();
- Status s = versions_->LogAndApply(compact->compaction->edit());
+ Status s = versions_->LogAndApply(compact->compaction->edit(), &mutex_);
if (s.ok()) {
compact->compaction->ReleaseInputs();
DeleteObsoleteFiles();
@@ -1004,9 +1014,9 @@ Status DBImpl::Get(const ReadOptions& options,
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
- if (mem_->Get(lkey, value, &s)) {
+ if (mem->Get(lkey, value, &s)) {
// Done
- } else if (imm_ != NULL && imm_->Get(lkey, value, &s)) {
+ } else if (imm != NULL && imm->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
@@ -1053,34 +1063,65 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key);
}
+// There is at most one thread that is the current logger. This call
+// waits until preceding logger(s) have finished and becomes the
+// current logger.
+void DBImpl::AcquireLoggingResponsibility(LoggerId* self) {
+ while (logger_ != NULL) {
+ logger_cv_.Wait();
+ }
+ logger_ = self;
+}
+
+void DBImpl::ReleaseLoggingResponsibility(LoggerId* self) {
+ assert(logger_ == self);
+ logger_ = NULL;
+ logger_cv_.SignalAll();
+}
+
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Status status;
MutexLock l(&mutex_);
+ LoggerId self;
+ AcquireLoggingResponsibility(&self);
status = MakeRoomForWrite(false); // May temporarily release lock and wait
uint64_t last_sequence = versions_->LastSequence();
if (status.ok()) {
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);
- versions_->SetLastSequence(last_sequence);
- // Add to log and apply to memtable
- status = log_->AddRecord(WriteBatchInternal::Contents(updates));
- if (status.ok() && options.sync) {
- status = logfile_->Sync();
- }
- if (status.ok()) {
- status = WriteBatchInternal::InsertInto(updates, mem_);
+ // Add to log and apply to memtable. We can release the lock during
+ // this phase since the "logger_" flag protects against concurrent
+ // loggers and concurrent writes into mem_.
+ {
+ assert(logger_ == &self);
+ mutex_.Unlock();
+ status = log_->AddRecord(WriteBatchInternal::Contents(updates));
+ if (status.ok() && options.sync) {
+ status = logfile_->Sync();
+ }
+ if (status.ok()) {
+ status = WriteBatchInternal::InsertInto(updates, mem_);
+ }
+ mutex_.Lock();
+ assert(logger_ == &self);
}
+
+ versions_->SetLastSequence(last_sequence);
}
if (options.post_write_snapshot != NULL) {
*options.post_write_snapshot =
status.ok() ? snapshots_.New(last_sequence) : NULL;
}
+ ReleaseLoggingResponsibility(&self);
return status;
}
+// REQUIRES: mutex_ is held
+// REQUIRES: this thread is the current logger
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
+ assert(logger_ != NULL);
bool allow_delay = !force;
Status s;
while (true) {
@@ -1249,7 +1290,7 @@ Status DB::Open(const Options& options, const std::string& dbname,
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
- s = impl->versions_->LogAndApply(&edit);
+ s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
impl->DeleteObsoleteFiles();
View
@@ -87,6 +87,11 @@ class DBImpl : public DB {
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base);
+ // Only thread is allowed to log at a time.
+ struct LoggerId { }; // Opaque identifier for logging thread
+ void AcquireLoggingResponsibility(LoggerId* self);
+ void ReleaseLoggingResponsibility(LoggerId* self);
+
Status MakeRoomForWrite(bool force /* compact even if there is room? */);
struct CompactionState;
@@ -126,6 +131,8 @@ class DBImpl : public DB {
WritableFile* logfile_;
uint64_t logfile_number_;
log::Writer* log_;
+ LoggerId* logger_; // NULL, or the id of the current logging thread
+ port::CondVar logger_cv_; // For threads waiting to log
SnapshotList snapshots_;
// Set of table files to protect from deletion because they are
View
@@ -10,6 +10,7 @@
#include "leveldb/env.h"
#include "leveldb/table.h"
#include "util/logging.h"
+#include "util/mutexlock.h"
#include "util/testharness.h"
#include "util/testutil.h"
@@ -345,6 +346,41 @@ TEST(DBTest, GetPicksCorrectFile) {
ASSERT_EQ("vx", Get("x"));
}
+TEST(DBTest, GetEncountersEmptyLevel) {
+ // Arrange for the following to happen:
+ // * sstable A in level 0
+ // * nothing in level 1
+ // * sstable B in level 2
+ // Then do enough Get() calls to arrange for an automatic compaction
+ // of sstable A. A bug would cause the compaction to be marked as
+ // occuring at level 1 (instead of the correct level 0).
+
+ // Step 1: First place sstables in levels 0 and 2
+ int compaction_count = 0;
+ while (NumTableFilesAtLevel(0) == 0 ||
+ NumTableFilesAtLevel(2) == 0) {
+ ASSERT_LE(compaction_count, 100) << "could not fill levels 0 and 2";
+ compaction_count++;
+ Put("a", "begin");
+ Put("z", "end");
+ dbfull()->TEST_CompactMemTable();
+ }
+
+ // Step 2: clear level 1 if necessary.
+ dbfull()->TEST_CompactRange(1, "a", "z");
+ ASSERT_EQ(NumTableFilesAtLevel(0), 1);
+ ASSERT_EQ(NumTableFilesAtLevel(1), 0);
+ ASSERT_EQ(NumTableFilesAtLevel(2), 1);
+
+ // Step 3: read until level 0 compaction disappears.
+ int read_count = 0;
+ while (NumTableFilesAtLevel(0) > 0) {
+ ASSERT_LE(read_count, 10000) << "did not trigger level 0 compaction";
+ read_count++;
+ ASSERT_EQ("NOT_FOUND", Get("missing"));
+ }
+}
+
TEST(DBTest, IterEmpty) {
Iterator* iter = db_->NewIterator(ReadOptions());
@@ -1355,6 +1391,9 @@ void BM_LogAndApply(int iters, int num_base_files) {
Env* env = Env::Default();
+ port::Mutex mu;
+ MutexLock l(&mu);
+
InternalKeyComparator cmp(BytewiseComparator());
Options options;
VersionSet vset(dbname, &options, NULL, &cmp);
@@ -1366,7 +1405,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
vbase.AddFile(2, fnum++, 1 /* file size */, start, limit);
}
- ASSERT_OK(vset.LogAndApply(&vbase));
+ ASSERT_OK(vset.LogAndApply(&vbase, &mu));
uint64_t start_micros = env->NowMicros();
@@ -1376,7 +1415,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
vedit.AddFile(2, fnum++, 1 /* file size */, start, limit);
- vset.LogAndApply(&vedit);
+ vset.LogAndApply(&vedit, &mu);
}
uint64_t stop_micros = env->NowMicros();
unsigned int us = stop_micros - start_micros;
Oops, something went wrong.

0 comments on commit 7263023

Please sign in to comment.