Skip to content

Commit

Permalink
Merge upstream LevelDB 1.15.
Browse files Browse the repository at this point in the history
  • Loading branch information
sipa committed Dec 12, 2013
2 parents 02ac9f1 + 0cfb990 commit e991315
Show file tree
Hide file tree
Showing 22 changed files with 320 additions and 361 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ SHARED = $(SHARED1)
else
# Update db.h if you change these.
SHARED_MAJOR = 1
SHARED_MINOR = 14
SHARED_MINOR = 15
SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT)
SHARED2 = $(SHARED1).$(SHARED_MAJOR)
SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR)
Expand Down
10 changes: 10 additions & 0 deletions build_detect_platform
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,16 @@ case "$TARGET_OS" in
# man ld: +h internal_name
PLATFORM_SHARED_LDFLAGS="-shared -Wl,+h -Wl,"
;;
IOS)
PLATFORM=IOS
COMMON_FLAGS="$MEMCMP_FLAG -DOS_MACOSX"
[ -z "$INSTALL_PATH" ] && INSTALL_PATH=`pwd`
PORT_FILE=port/port_posix.cc
PLATFORM_SHARED_EXT=
PLATFORM_SHARED_LDFLAGS=
PLATFORM_SHARED_CFLAGS=
PLATFORM_SHARED_VERSIONED=
;;
OS_WINDOWS_CROSSCOMPILE | NATIVE_WINDOWS)
PLATFORM=OS_WINDOWS
COMMON_FLAGS="-fno-builtin-memcmp -D_REENTRANT -DOS_WINDOWS -DLEVELDB_PLATFORM_WINDOWS -DWINVER=0x0500 -D__USE_MINGW_ANSI_STDIO=1"
Expand Down
26 changes: 24 additions & 2 deletions db/corruption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,13 @@ class CorruptionTest {
Slice key = Key(i, &key_space);
batch.Clear();
batch.Put(key, Value(i, &value_space));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
WriteOptions options;
// Corrupt() doesn't work without this sync on windows; stat reports 0 for
// the file size.
if (i == n - 1) {
options.sync = true;
}
ASSERT_OK(db_->Write(options, &batch));
}
}

Expand Down Expand Up @@ -125,7 +131,7 @@ class CorruptionTest {
FileType type;
std::string fname;
int picked_number = -1;
for (int i = 0; i < filenames.size(); i++) {
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type) &&
type == filetype &&
int(number) > picked_number) { // Pick latest file
Expand Down Expand Up @@ -238,6 +244,22 @@ TEST(CorruptionTest, TableFile) {
Check(90, 99);
}

TEST(CorruptionTest, TableFileRepair) {
options_.block_size = 2 * kValueSize; // Limit scope of corruption
options_.paranoid_checks = true;
Reopen();
Build(100);
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable();
dbi->TEST_CompactRange(0, NULL, NULL);
dbi->TEST_CompactRange(1, NULL, NULL);

Corrupt(kTableFile, 100, 1);
RepairDB();
Reopen();
Check(95, 99);
}

TEST(CorruptionTest, TableFileIndexData) {
Build(10000); // Enough to build multiple Tables
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
Expand Down
8 changes: 4 additions & 4 deletions db/db_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class RandomGenerator {
pos_ = 0;
}

Slice Generate(int len) {
Slice Generate(size_t len) {
if (pos_ + len > data_.size()) {
pos_ = 0;
assert(len < data_.size());
Expand All @@ -139,11 +139,11 @@ class RandomGenerator {
};

static Slice TrimSpace(Slice s) {
int start = 0;
size_t start = 0;
while (start < s.size() && isspace(s[start])) {
start++;
}
int limit = s.size();
size_t limit = s.size();
while (limit > start && isspace(s[limit-1])) {
limit--;
}
Expand Down Expand Up @@ -399,7 +399,7 @@ class Benchmark {
heap_counter_(0) {
std::vector<std::string> files;
Env::Default()->GetChildren(FLAGS_db, &files);
for (int i = 0; i < files.size(); i++) {
for (size_t i = 0; i < files.size(); i++) {
if (Slice(files[i]).starts_with("heap-")) {
Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
}
Expand Down
99 changes: 57 additions & 42 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
seed_(0),
tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false),
manual_compaction_(NULL),
consecutive_compaction_errors_(0) {
manual_compaction_(NULL) {
mem_->Ref();
has_imm_.Release_Store(NULL);

Expand Down Expand Up @@ -217,6 +216,12 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
}

void DBImpl::DeleteObsoleteFiles() {
if (!bg_error_.ok()) {
// After a background error, we don't know whether a new version may
// or may not have been committed, so we cannot safely garbage collect.
return;
}

// Make a set of all of the live files
std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live);
Expand Down Expand Up @@ -495,7 +500,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
return s;
}

Status DBImpl::CompactMemTable() {
void DBImpl::CompactMemTable() {
mutex_.AssertHeld();
assert(imm_ != NULL);

Expand Down Expand Up @@ -523,9 +528,9 @@ Status DBImpl::CompactMemTable() {
imm_ = NULL;
has_imm_.Release_Store(NULL);
DeleteObsoleteFiles();
} else {
RecordBackgroundError(s);
}

return s;
}

void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
Expand Down Expand Up @@ -568,16 +573,18 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
}

MutexLock l(&mutex_);
while (!manual.done) {
while (manual_compaction_ != NULL) {
bg_cv_.Wait();
}
manual_compaction_ = &manual;
MaybeScheduleCompaction();
while (manual_compaction_ == &manual) {
while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) {
if (manual_compaction_ == NULL) { // Idle
manual_compaction_ = &manual;
MaybeScheduleCompaction();
} else { // Running either my compaction or another compaction.
bg_cv_.Wait();
}
}
if (manual_compaction_ == &manual) {
// Cancel my manual compaction since we aborted early for some reason.
manual_compaction_ = NULL;
}
}

Status DBImpl::TEST_CompactMemTable() {
Expand All @@ -596,12 +603,22 @@ Status DBImpl::TEST_CompactMemTable() {
return s;
}

void DBImpl::RecordBackgroundError(const Status& s) {
mutex_.AssertHeld();
if (bg_error_.ok()) {
bg_error_ = s;
bg_cv_.SignalAll();
}
}

void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (bg_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else if (imm_ == NULL &&
manual_compaction_ == NULL &&
!versions_->NeedsCompaction()) {
Expand All @@ -619,30 +636,12 @@ void DBImpl::BGWork(void* db) {
void DBImpl::BackgroundCall() {
MutexLock l(&mutex_);
assert(bg_compaction_scheduled_);
if (!shutting_down_.Acquire_Load()) {
Status s = BackgroundCompaction();
if (s.ok()) {
// Success
consecutive_compaction_errors_ = 0;
} else if (shutting_down_.Acquire_Load()) {
// Error most likely due to shutdown; do not wait
} else {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
Log(options_.info_log, "Waiting after background compaction error: %s",
s.ToString().c_str());
mutex_.Unlock();
++consecutive_compaction_errors_;
int seconds_to_sleep = 1;
for (int i = 0; i < 3 && i < consecutive_compaction_errors_ - 1; ++i) {
seconds_to_sleep *= 2;
}
env_->SleepForMicroseconds(seconds_to_sleep * 1000000);
mutex_.Lock();
}
if (shutting_down_.Acquire_Load()) {
// No more background work when shutting down.
} else if (!bg_error_.ok()) {
// No more background work after a background error.
} else {
BackgroundCompaction();
}

bg_compaction_scheduled_ = false;
Expand All @@ -653,11 +652,12 @@ void DBImpl::BackgroundCall() {
bg_cv_.SignalAll();
}

Status DBImpl::BackgroundCompaction() {
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();

if (imm_ != NULL) {
return CompactMemTable();
CompactMemTable();
return;
}

Compaction* c;
Expand Down Expand Up @@ -691,6 +691,9 @@ Status DBImpl::BackgroundCompaction() {
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number),
Expand All @@ -701,6 +704,9 @@ Status DBImpl::BackgroundCompaction() {
} else {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
if (!status.ok()) {
RecordBackgroundError(status);
}
CleanupCompaction(compact);
c->ReleaseInputs();
DeleteObsoleteFiles();
Expand All @@ -714,9 +720,6 @@ Status DBImpl::BackgroundCompaction() {
} else {
Log(options_.info_log,
"Compaction error: %s", status.ToString().c_str());
if (options_.paranoid_checks && bg_error_.ok()) {
bg_error_ = status;
}
}

if (is_manual) {
Expand All @@ -732,7 +735,6 @@ Status DBImpl::BackgroundCompaction() {
}
manual_compaction_ = NULL;
}
return status;
}

void DBImpl::CleanupCompaction(CompactionState* compact) {
Expand Down Expand Up @@ -1002,6 +1004,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (status.ok()) {
status = InstallCompactionResults(compact);
}
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log,
"compacted to: %s", versions_->LevelSummary(&tmp));
Expand Down Expand Up @@ -1185,13 +1190,23 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (updates == tmp_batch_) tmp_batch_->Clear();

Expand Down
9 changes: 5 additions & 4 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ class DBImpl : public DB {

// Compact the in-memory write buffer to disk. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
Status CompactMemTable()
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// Errors are recorded in bg_error_.
void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_);

Status RecoverLogFile(uint64_t log_number,
VersionEdit* edit,
Expand All @@ -102,10 +102,12 @@ class DBImpl : public DB {
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
WriteBatch* BuildBatchGroup(Writer** last_writer);

void RecordBackgroundError(const Status& s);

void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
static void BGWork(void* db);
void BackgroundCall();
Status BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void CleanupCompaction(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status DoCompactionWork(CompactionState* compact)
Expand Down Expand Up @@ -170,7 +172,6 @@ class DBImpl : public DB {

// Have we encountered a background error in paranoid mode?
Status bg_error_;
int consecutive_compaction_errors_;

// Per level compaction stats. stats_[level] stores the stats for
// compactions that produced data for the specified "level".
Expand Down
Loading

0 comments on commit e991315

Please sign in to comment.