Skip to content

Commit

Permalink
fixed issues 66 (leaking files on disk error) and 68 (no sync of CURR…
Browse files Browse the repository at this point in the history
…ENT file)
  • Loading branch information
ghemawat committed Jan 25, 2012
1 parent c8c5866 commit 3c8be10
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 34 deletions.
20 changes: 6 additions & 14 deletions db/db_impl.cc
Expand Up @@ -655,6 +655,8 @@ void DBImpl::BackgroundCompaction() {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
CleanupCompaction(compact);
c->ReleaseInputs();
DeleteObsoleteFiles();
}
delete c;

Expand All @@ -672,6 +674,9 @@ void DBImpl::BackgroundCompaction() {

if (is_manual) {
ManualCompaction* m = manual_compaction_;
if (!status.ok()) {
m->done = true;
}
if (!m->done) {
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
Expand Down Expand Up @@ -793,21 +798,8 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
compact->compaction->edit()->AddFile(
level + 1,
out.number, out.file_size, out.smallest, out.largest);
pending_outputs_.erase(out.number);
}
compact->outputs.clear();

Status s = versions_->LogAndApply(compact->compaction->edit(), &mutex_);
if (s.ok()) {
compact->compaction->ReleaseInputs();
DeleteObsoleteFiles();
} else {
// Discard any files we may have created during this failed compaction
for (size_t i = 0; i < compact->outputs.size(); i++) {
env_->DeleteFile(TableFileName(dbname_, compact->outputs[i].number));
}
}
return s;
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}

Status DBImpl::DoCompactionWork(CompactionState* compact) {
Expand Down
63 changes: 56 additions & 7 deletions db/db_test.cc
Expand Up @@ -28,8 +28,12 @@ class SpecialEnv : public EnvWrapper {
// sstable Sync() calls are blocked while this pointer is non-NULL.
port::AtomicPointer delay_sstable_sync_;

// Simulate no-space errors while this pointer is non-NULL.
port::AtomicPointer no_space_;

explicit SpecialEnv(Env* base) : EnvWrapper(base) {
delay_sstable_sync_.Release_Store(NULL);
no_space_.Release_Store(NULL);
}

Status NewWritableFile(const std::string& f, WritableFile** r) {
Expand All @@ -44,7 +48,14 @@ class SpecialEnv : public EnvWrapper {
base_(base) {
}
~SSTableFile() { delete base_; }
Status Append(const Slice& data) { return base_->Append(data); }
Status Append(const Slice& data) {
if (env_->no_space_.Acquire_Load() != NULL) {
// Drop writes on the floor
return Status::OK();
} else {
return base_->Append(data);
}
}
Status Close() { return base_->Close(); }
Status Flush() { return base_->Flush(); }
Status Sync() {
Expand Down Expand Up @@ -239,6 +250,12 @@ class DBTest {
return result;
}

int CountFiles() {
std::vector<std::string> files;
env_->GetChildren(dbname_, &files);
return static_cast<int>(files.size());
}

uint64_t Size(const Slice& start, const Slice& limit) {
Range r(start, limit);
uint64_t size;
Expand Down Expand Up @@ -1266,6 +1283,37 @@ TEST(DBTest, DBOpen_Options) {
db = NULL;
}

// Check that number of files does not grow when we are out of space
TEST(DBTest, NoSpace) {
Options options;
options.env = env_;
Reopen(&options);

ASSERT_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
Compact("a", "z");
const int num_files = CountFiles();
env_->no_space_.Release_Store(env_); // Force out-of-space errors
for (int i = 0; i < 10; i++) {
for (int level = 0; level < config::kNumLevels-1; level++) {
dbfull()->TEST_CompactRange(level, NULL, NULL);
}
}
env_->no_space_.Release_Store(NULL);
ASSERT_LT(CountFiles(), num_files + 5);
}

TEST(DBTest, FilesDeletedAfterCompaction) {
ASSERT_OK(Put("foo", "v2"));
Compact("a", "z");
const int num_files = CountFiles();
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put("foo", "v2"));
Compact("a", "z");
}
ASSERT_EQ(CountFiles(), num_files);
}

// Multi-threaded test:
namespace {

Expand All @@ -1287,14 +1335,15 @@ struct MTThread {

static void MTThreadBody(void* arg) {
MTThread* t = reinterpret_cast<MTThread*>(arg);
int id = t->id;
DB* db = t->state->test->db_;
uintptr_t counter = 0;
fprintf(stderr, "... starting thread %d\n", t->id);
Random rnd(1000 + t->id);
fprintf(stderr, "... starting thread %d\n", id);
Random rnd(1000 + id);
std::string value;
char valbuf[1500];
while (t->state->stop.Acquire_Load() == NULL) {
t->state->counter[t->id].Release_Store(reinterpret_cast<void*>(counter));
t->state->counter[id].Release_Store(reinterpret_cast<void*>(counter));

int key = rnd.Uniform(kNumKeys);
char keybuf[20];
Expand All @@ -1304,7 +1353,7 @@ static void MTThreadBody(void* arg) {
// Write values of the form <key, my id, counter>.
// We add some padding for force compactions.
snprintf(valbuf, sizeof(valbuf), "%d.%d.%-1000d",
key, t->id, static_cast<int>(counter));
key, id, static_cast<int>(counter));
ASSERT_OK(db->Put(WriteOptions(), Slice(keybuf), Slice(valbuf)));
} else {
// Read a value and verify that it matches the pattern written above.
Expand All @@ -1325,8 +1374,8 @@ static void MTThreadBody(void* arg) {
}
counter++;
}
t->state->thread_done[t->id].Release_Store(t);
fprintf(stderr, "... stopping thread %d after %d ops\n", t->id, int(counter));
t->state->thread_done[id].Release_Store(t);
fprintf(stderr, "... stopping thread %d after %d ops\n", id, int(counter));
}

} // namespace
Expand Down
6 changes: 5 additions & 1 deletion db/filename.cc
Expand Up @@ -11,6 +11,10 @@

namespace leveldb {

// A utility routine: write "data" to the named file and Sync() it.
extern Status WriteStringToFileSync(Env* env, const Slice& data,
const std::string& fname);

static std::string MakeFileName(const std::string& name, uint64_t number,
const char* suffix) {
char buf[100];
Expand Down Expand Up @@ -122,7 +126,7 @@ Status SetCurrentFile(Env* env, const std::string& dbname,
assert(contents.starts_with(dbname + "/"));
contents.remove_prefix(dbname.size() + 1);
std::string tmp = TempFileName(dbname, descriptor_number);
Status s = WriteStringToFile(env, contents.ToString() + "\n", tmp);
Status s = WriteStringToFileSync(env, contents.ToString() + "\n", tmp);
if (s.ok()) {
s = env->RenameFile(tmp, CurrentFileName(dbname));
}
Expand Down
18 changes: 16 additions & 2 deletions util/env.cc
Expand Up @@ -33,14 +33,18 @@ void Log(Logger* info_log, const char* format, ...) {
}
}

Status WriteStringToFile(Env* env, const Slice& data,
const std::string& fname) {
static Status DoWriteStringToFile(Env* env, const Slice& data,
const std::string& fname,
bool should_sync) {
WritableFile* file;
Status s = env->NewWritableFile(fname, &file);
if (!s.ok()) {
return s;
}
s = file->Append(data);
if (s.ok() && should_sync) {
s = file->Sync();
}
if (s.ok()) {
s = file->Close();
}
Expand All @@ -51,6 +55,16 @@ Status WriteStringToFile(Env* env, const Slice& data,
return s;
}

Status WriteStringToFile(Env* env, const Slice& data,
const std::string& fname) {
return DoWriteStringToFile(env, data, fname, false);
}

Status WriteStringToFileSync(Env* env, const Slice& data,
const std::string& fname) {
return DoWriteStringToFile(env, data, fname, true);
}

Status ReadFileToString(Env* env, const std::string& fname, std::string* data) {
data->clear();
SequentialFile* file;
Expand Down
22 changes: 12 additions & 10 deletions util/env_test.cc
Expand Up @@ -22,29 +22,30 @@ class EnvPosixTest {
};

static void SetBool(void* ptr) {
*(reinterpret_cast<bool*>(ptr)) = true;
reinterpret_cast<port::AtomicPointer*>(ptr)->NoBarrier_Store(ptr);
}

TEST(EnvPosixTest, RunImmediately) {
bool called = false;
port::AtomicPointer called (NULL);
env_->Schedule(&SetBool, &called);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(called);
ASSERT_TRUE(called.NoBarrier_Load() != NULL);
}

TEST(EnvPosixTest, RunMany) {
int last_id = 0;
port::AtomicPointer last_id (NULL);

struct CB {
int* last_id_ptr; // Pointer to shared slot
int id; // Order# for the execution of this callback
port::AtomicPointer* last_id_ptr; // Pointer to shared slot
uintptr_t id; // Order# for the execution of this callback

CB(int* p, int i) : last_id_ptr(p), id(i) { }
CB(port::AtomicPointer* p, int i) : last_id_ptr(p), id(i) { }

static void Run(void* v) {
CB* cb = reinterpret_cast<CB*>(v);
ASSERT_EQ(cb->id-1, *cb->last_id_ptr);
*cb->last_id_ptr = cb->id;
void* cur = cb->last_id_ptr->NoBarrier_Load();
ASSERT_EQ(cb->id-1, reinterpret_cast<uintptr_t>(cur));
cb->last_id_ptr->Release_Store(reinterpret_cast<void*>(cb->id));
}
};

Expand All @@ -59,7 +60,8 @@ TEST(EnvPosixTest, RunMany) {
env_->Schedule(&CB::Run, &cb4);

Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(4, last_id);
void* cur = last_id.Acquire_Load();
ASSERT_EQ(4, reinterpret_cast<uintptr_t>(cur));
}

struct State {
Expand Down

0 comments on commit 3c8be10

Please sign in to comment.