Skip to content

Commit

Permalink
Push model for flushing memtables
Browse files Browse the repository at this point in the history
Summary:
When memtable is full it calls the registered callback. That callback then registers column family as needing the flush. Every write checks if there are some column families that need to be flushed. This completely eliminates the need for MakeRoomForWrite() function and simplifies our Write code-path.

There is some complexity with the concurrency when the column family is dropped. I made it a bit less complex by dropping the column family from the write thread in https://reviews.facebook.net/D22965. Let me know if you want to discuss this.

Test Plan: make check works. I'll also run db_stress with creating and dropping column families for a while.

Reviewers: yhchiang, sdong, ljin

Reviewed By: ljin

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D23067
  • Loading branch information
igorcanadi committed Sep 11, 2014
1 parent 059e584 commit 3d9e6f7
Show file tree
Hide file tree
Showing 12 changed files with 262 additions and 111 deletions.
12 changes: 12 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,11 @@ bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
column_family_set_->Lock();
current_ = column_family_set_->GetColumnFamily(column_family_id);
column_family_set_->Unlock();
// TODO(icanadi) Maybe remove column family from the hash table when it's
// dropped?
if (current_ != nullptr && current_->IsDropped()) {
current_ = nullptr;
}
}
handle_.SetCFD(current_);
return current_ != nullptr;
Expand All @@ -685,6 +690,13 @@ ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
return &handle_;
}

void ColumnFamilyMemTablesImpl::CheckMemtableFull() {
if (current_ != nullptr && current_->mem()->ShouldScheduleFlush()) {
flush_scheduler_->ScheduleFlush(current_);
current_->mem()->MarkFlushScheduled();
}
}

uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
uint32_t column_family_id = 0;
if (column_family != nullptr) {
Expand Down
11 changes: 9 additions & 2 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "db/write_controller.h"
#include "db/table_cache.h"
#include "util/thread_local.h"
#include "db/flush_scheduler.h"

namespace rocksdb {

Expand Down Expand Up @@ -394,8 +395,11 @@ class ColumnFamilySet {
// memtables of different column families (specified by ID in the write batch)
class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
public:
explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set)
: column_family_set_(column_family_set), current_(nullptr) {}
explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set,
FlushScheduler* flush_scheduler)
: column_family_set_(column_family_set),
current_(nullptr),
flush_scheduler_(flush_scheduler) {}

// sets current_ to ColumnFamilyData with column_family_id
// returns false if column family doesn't exist
Expand All @@ -414,9 +418,12 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
// Returns column family handle for the selected column family
virtual ColumnFamilyHandle* GetColumnFamilyHandle() override;

virtual void CheckMemtableFull() override;

private:
ColumnFamilySet* column_family_set_;
ColumnFamilyData* current_;
FlushScheduler* flush_scheduler_;
ColumnFamilyHandleInternal handle_;
};

Expand Down
151 changes: 68 additions & 83 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)

versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_controller_));
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
column_family_memtables_.reset(new ColumnFamilyMemTablesImpl(
versions_->GetColumnFamilySet(), &flush_scheduler_));

DumpLeveldbBuildVersion(db_options_.info_log.get());
DumpDBFileSummary(db_options_, dbname_);
Expand Down Expand Up @@ -392,6 +392,8 @@ DBImpl::~DBImpl() {
bg_cv_.Wait();
}

flush_scheduler_.Clear();

if (default_cf_handle_ != nullptr) {
// we need to delete handle outside of lock because it does its own locking
mutex_.Unlock();
Expand Down Expand Up @@ -1336,28 +1338,30 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
}

if (!read_only) {
// no need to refcount since client still doesn't have access
// to the DB and can not drop column families while we iterate
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->mem()->ShouldFlush()) {
// If this asserts, it means that InsertInto failed in
// filtering updates to already-flushed column families
assert(cfd->GetLogNumber() <= log_number);
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
VersionEdit* edit = &iter->second;
status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit);
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
return status;
}
cfd->CreateNewMemtable();
// we can do this because this is called before client has access to the
// DB and there is only a single thread operating on DB
ColumnFamilyData* cfd;

while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) {
cfd->Unref();
// If this asserts, it means that InsertInto failed in
// filtering updates to already-flushed column families
assert(cfd->GetLogNumber() <= log_number);
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
VersionEdit* edit = &iter->second;
status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit);
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
return status;
}
cfd->CreateNewMemtable();
}
}
}

flush_scheduler_.Clear();
if (versions_->LastSequence() < *max_sequence) {
versions_->SetLastSequence(*max_sequence);
}
Expand Down Expand Up @@ -2201,7 +2205,7 @@ void DBImpl::BackgroundCallCompaction() {
}
if (madeProgress || bg_compaction_scheduled_ == 0 || bg_manual_only_ > 0) {
// signal if
// * madeProgress -- need to wakeup MakeRoomForWrite
// * madeProgress -- need to wakeup DelayWrite
// * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
// * bg_manual_only_ > 0 -- need to wakeup RunManualCompaction
// If none of this is true, there is no need to signal since nobody is
Expand Down Expand Up @@ -2622,7 +2626,7 @@ uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd,
cfd->Ref();
FlushMemTableToOutputFile(cfd, nullptr, deletion_state, log_buffer);
cfd->Unref();
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
bg_cv_.SignalAll(); // Wakeup DelayWrite() if necessary
}
mutex_.Unlock();
log_buffer->FlushBufferToLog();
Expand Down Expand Up @@ -3959,10 +3963,12 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
w.timeout_hint_us = options.timeout_hint_us;

uint64_t expiration_time = 0;
bool has_timeout = false;
if (w.timeout_hint_us == 0) {
w.timeout_hint_us = kNoTimeOut;
} else {
expiration_time = env_->NowMicros() + w.timeout_hint_us;
has_timeout = true;
}

if (!options.disableWAL) {
Expand Down Expand Up @@ -3997,56 +4003,48 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
assert(!single_column_family_mode_ ||
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);

uint64_t flush_column_family_if_log_file = 0;
uint64_t max_total_wal_size = (db_options_.max_total_wal_size == 0)
? 4 * max_total_in_memory_state_
: db_options_.max_total_wal_size;
if (UNLIKELY(!single_column_family_mode_) &&
alive_log_files_.begin()->getting_flushed == false &&
total_log_size_ > max_total_wal_size) {
flush_column_family_if_log_file = alive_log_files_.begin()->number;
uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number;
alive_log_files_.begin()->getting_flushed = true;
Log(db_options_.info_log,
"Flushing all column families with data in WAL number %" PRIu64
". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
flush_column_family_if_log_file, total_log_size_, max_total_wal_size);
}

if (write_controller_.IsStopped() || write_controller_.GetDelay() > 0) {
DelayWrite(expiration_time);
}

if (LIKELY(single_column_family_mode_)) {
// fast path
status = MakeRoomForWrite(default_cf_handle_->cfd(),
&context, expiration_time);
} else {
// refcounting cfd in iteration
bool dead_cfd = false;
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->Ref();
if (flush_column_family_if_log_file != 0 &&
cfd->GetLogNumber() <= flush_column_family_if_log_file) {
// log size excedded limit and we need to do flush
// SetNewMemtableAndNewLogFie may temporarily unlock and wait
if (cfd->GetLogNumber() <= flush_column_family_if_log_file) {
status = SetNewMemtableAndNewLogFile(cfd, &context);
if (!status.ok()) {
break;
}
cfd->imm()->FlushRequested();
MaybeScheduleFlushOrCompaction();
} else {
// May temporarily unlock and wait.
status = MakeRoomForWrite(cfd, &context, expiration_time);
}

if (cfd->Unref()) {
dead_cfd = true;
}
if (!status.ok()) {
break;
}
}
if (dead_cfd) {
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
}
MaybeScheduleFlushOrCompaction();
}

if (UNLIKELY(status.ok() && !bg_error_.ok())) {
status = bg_error_;
}

if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
status = ScheduleFlushes(&context);
}

if (UNLIKELY(status.ok()) &&
(write_controller_.IsStopped() || write_controller_.GetDelay() > 0)) {
DelayWrite(expiration_time);
}

if (UNLIKELY(status.ok() && has_timeout &&
env_->NowMicros() > expiration_time)) {
status = Status::TimedOut();
}

uint64_t last_sequence = versions_->LastSequence();
Expand Down Expand Up @@ -4241,36 +4239,23 @@ void DBImpl::DelayWrite(uint64_t expiration_time) {
}
}

// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd,
WriteContext* context,
uint64_t expiration_time) {
mutex_.AssertHeld();
assert(!writers_.empty());
Status s;
bool has_timeout = (expiration_time > 0);

while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
} else if (has_timeout && env_->NowMicros() > expiration_time) {
s = Status::TimedOut();
break;
} else if (!cfd->mem()->ShouldFlush()) {
// There is room in current memtable
break;
} else {
s = SetNewMemtableAndNewLogFile(cfd, context);
if (!s.ok()) {
break;
}
MaybeScheduleFlushOrCompaction();
Status DBImpl::ScheduleFlushes(WriteContext* context) {
bool schedule_bg_work = false;
ColumnFamilyData* cfd;
while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) {
schedule_bg_work = true;
auto status = SetNewMemtableAndNewLogFile(cfd, context);
if (cfd->Unref()) {
delete cfd;
}
if (!status.ok()) {
return status;
}
}
return s;
if (schedule_bg_work) {
MaybeScheduleFlushOrCompaction();
}
return Status::OK();
}

// REQUIRES: mutex_ is held
Expand Down
5 changes: 3 additions & 2 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "util/scoped_arena_iterator.h"
#include "db/internal_stats.h"
#include "db/write_controller.h"
#include "db/flush_scheduler.h"

namespace rocksdb {

Expand Down Expand Up @@ -399,8 +400,7 @@ class DBImpl : public DB {

void DelayWrite(uint64_t expiration_time);

Status MakeRoomForWrite(ColumnFamilyData* cfd, WriteContext* context,
uint64_t expiration_time);
Status ScheduleFlushes(WriteContext* context);

Status SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
WriteContext* context);
Expand Down Expand Up @@ -557,6 +557,7 @@ class DBImpl : public DB {
WriteBatch tmp_batch_;

WriteController write_controller_;
FlushScheduler flush_scheduler_;

SnapshotList snapshots_;

Expand Down

0 comments on commit 3d9e6f7

Please sign in to comment.