Skip to content

Commit

Permalink
support for concurrent adds to memtable
Browse files Browse the repository at this point in the history
Summary:
This diff adds support for concurrent adds to the skiplist memtable
implementations.  Memory allocation is made thread-safe by the addition of
a spinlock, with small per-core buffers to avoid contention.  Concurrent
memtable writes are made via an additional method and don't impose a
performance overhead on the non-concurrent case, so parallelism can be
selected on a per-batch basis.

Write thread synchronization is an increasing bottleneck for higher levels
of concurrency, so this diff adds --enable_write_thread_adaptive_yield
(default off).  This feature causes threads joining a write batch
group to spin for a short time (default 100 usec) using sched_yield,
rather than going to sleep on a mutex.  If the timing of the yield calls
indicates that another thread has actually run during the yield then
spinning is avoided.  This option improves performance for concurrent
situations even without parallel adds, although it has the potential to
increase CPU usage (and the heuristic adaptation is not yet mature).

Parallel writes are not currently compatible with
inplace updates, update callbacks, or delete filtering.
Enable it with --allow_concurrent_memtable_write (and
--enable_write_thread_adaptive_yield).  Parallel memtable writes
are performance neutral when there is no actual parallelism, and in
my experiments (SSD server-class Linux and varying contention and key
sizes for fillrandom) they are always a performance win when there is
more than one thread.

Statistics are updated earlier in the write path, dropping the number
of DB mutex acquisitions from 2 to 1 for almost all cases.

This diff was motivated and inspired by Yahoo's cLSM work.  It is more
conservative than cLSM: RocksDB's write batch group leader role is
preserved (along with all of the existing flush and write throttling
logic) and concurrent writers are blocked until all memtable insertions
have completed and the sequence number has been advanced, to preserve
linearizability.

My test config is "db_bench -benchmarks=fillrandom -threads=$T
-batch_size=1 -memtablerep=skip_list -value_size=100 --num=1000000/$T
-level0_slowdown_writes_trigger=9999 -level0_stop_writes_trigger=9999
-disable_auto_compactions --max_write_buffer_number=8
-max_background_flushes=8 --disable_wal --write_buffer_size=160000000
--block_size=16384 --allow_concurrent_memtable_write" on a two-socket
Xeon E5-2660 @ 2.2Ghz with lots of memory and an SSD hard drive.  With 1
thread I get ~440Kops/sec.  Peak performance for 1 socket (numactl
-N1) is slightly more than 1Mops/sec, at 16 threads.  Peak performance
across both sockets happens at 30 threads, and is ~900Kops/sec, although
with fewer threads there is less performance loss when the system has
background work.

Test Plan:
1. concurrent stress tests for InlineSkipList and DynamicBloom
2. make clean; make check
3. make clean; DISABLE_JEMALLOC=1 make valgrind_check; valgrind db_bench
4. make clean; COMPILE_WITH_TSAN=1 make all check; db_bench
5. make clean; COMPILE_WITH_ASAN=1 make all check; db_bench
6. make clean; OPT=-DROCKSDB_LITE make check
7. verify no perf regressions when disabled

Reviewers: igor, sdong

Reviewed By: sdong

Subscribers: MarkCallaghan, IslamAbdelRahman, anthony, yhchiang, rven, sdong, guyg8, kradhakrishnan, dhruba

Differential Revision: https://reviews.facebook.net/D50589
  • Loading branch information
Nathan Bronson committed Dec 25, 2015
1 parent 5b2587b commit 7d87f02
Show file tree
Hide file tree
Showing 41 changed files with 1,812 additions and 495 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Expand Up @@ -191,6 +191,7 @@ set(SOURCES
util/coding.cc
util/compaction_job_stats_impl.cc
util/comparator.cc
util/concurrent_arena.cc
util/crc32c.cc
util/db_info_dumper.cc
util/delete_scheduler_impl.cc
Expand Down
21 changes: 14 additions & 7 deletions db/column_family.cc
Expand Up @@ -110,6 +110,20 @@ Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
return Status::OK();
}

Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
if (cf_options.inplace_update_support) {
return Status::InvalidArgument(
"In-place memtable updates (inplace_update_support) is not compatible "
"with concurrent writes (allow_concurrent_memtable_write)");
}
if (cf_options.filter_deletes) {
return Status::InvalidArgument(
"Delete filtering (filter_deletes) is not compatible with concurrent "
"memtable writes (allow_concurrent_memtable_writes)");
}
return Status::OK();
}

ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
const InternalKeyComparator* icmp,
const ColumnFamilyOptions& src) {
Expand Down Expand Up @@ -916,13 +930,6 @@ ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
return &handle_;
}

void ColumnFamilyMemTablesImpl::CheckMemtableFull() {
if (current_ != nullptr && current_->mem()->ShouldScheduleFlush()) {
flush_scheduler_->ScheduleFlush(current_);
current_->mem()->MarkFlushScheduled();
}
}

uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
uint32_t column_family_id = 0;
if (column_family != nullptr) {
Expand Down
49 changes: 29 additions & 20 deletions db/column_family.h
Expand Up @@ -19,12 +19,10 @@
#include "db/write_controller.h"
#include "db/table_cache.h"
#include "db/table_properties_collector.h"
#include "db/flush_scheduler.h"
#include "rocksdb/compaction_job_stats.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "util/instrumented_mutex.h"
#include "util/mutable_cf_options.h"
#include "util/thread_local.h"

Expand Down Expand Up @@ -134,6 +132,9 @@ struct SuperVersion {

extern Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options);

extern Status CheckConcurrentWritesSupported(
const ColumnFamilyOptions& cf_options);

extern ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
const InternalKeyComparator* icmp,
const ColumnFamilyOptions& src);
Expand All @@ -158,14 +159,16 @@ class ColumnFamilyData {
// thread-safe
const std::string& GetName() const { return name_; }

// Ref() can only be called whily holding a DB mutex or during a
// single-threaded write.
// Ref() can only be called from a context where the caller can guarantee
// that ColumnFamilyData is alive (while holding a non-zero ref already,
// holding a DB mutex, or as the leader in a write batch group).
void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
// will just decrease reference count to 0, but will not delete it. returns
// true if the ref count was decreased to zero. in that case, it can be
// deleted by the caller immediately, or later, by calling
// FreeDeadColumnFamilies()
// Unref() can only be called while holding a DB mutex

// Unref decreases the reference count, but does not handle deletion
// when the count goes to 0. If this method returns true then the
// caller should delete the instance immediately, or later, by calling
// FreeDeadColumnFamilies(). Unref() can only be called while holding
// a DB mutex, or during single-threaded recovery.
bool Unref() {
int old_refs = refs_.fetch_sub(1, std::memory_order_relaxed);
assert(old_refs > 0);
Expand Down Expand Up @@ -497,36 +500,42 @@ class ColumnFamilySet {
// memtables of different column families (specified by ID in the write batch)
class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
public:
explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set,
FlushScheduler* flush_scheduler)
: column_family_set_(column_family_set),
current_(nullptr),
flush_scheduler_(flush_scheduler) {}
explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set)
: column_family_set_(column_family_set), current_(nullptr) {}

// Constructs a ColumnFamilyMemTablesImpl equivalent to one constructed
// with the arguments used to construct *orig.
explicit ColumnFamilyMemTablesImpl(ColumnFamilyMemTablesImpl* orig)
: column_family_set_(orig->column_family_set_), current_(nullptr) {}

// sets current_ to ColumnFamilyData with column_family_id
// returns false if column family doesn't exist
// REQUIRES: under a DB mutex OR from a write thread
// REQUIRES: use this function of DBImpl::column_family_memtables_ should be
// under a DB mutex OR from a write thread
bool Seek(uint32_t column_family_id) override;

// Returns log number of the selected column family
// REQUIRES: under a DB mutex OR from a write thread
uint64_t GetLogNumber() const override;

// REQUIRES: Seek() called first
// REQUIRES: under a DB mutex OR from a write thread
// REQUIRES: use this function of DBImpl::column_family_memtables_ should be
// under a DB mutex OR from a write thread
virtual MemTable* GetMemTable() const override;

// Returns column family handle for the selected column family
// REQUIRES: under a DB mutex OR from a write thread
// REQUIRES: use this function of DBImpl::column_family_memtables_ should be
// under a DB mutex OR from a write thread
virtual ColumnFamilyHandle* GetColumnFamilyHandle() override;

// REQUIRES: under a DB mutex OR from a write thread
virtual void CheckMemtableFull() override;
// Cannot be called while another thread is calling Seek().
// REQUIRES: use this function of DBImpl::column_family_memtables_ should be
// under a DB mutex OR from a write thread
virtual ColumnFamilyData* current() { return current_; }

private:
ColumnFamilySet* column_family_set_;
ColumnFamilyData* current_;
FlushScheduler* flush_scheduler_;
ColumnFamilyHandleInternal handle_;
};

Expand Down
20 changes: 20 additions & 0 deletions db/db_bench.cc
Expand Up @@ -646,6 +646,20 @@ DEFINE_uint64(delayed_write_rate, 8388608u,
"Limited bytes allowed to DB when soft_rate_limit or "
"level0_slowdown_writes_trigger triggers");

DEFINE_bool(allow_concurrent_memtable_write, false,
"Allow multi-writers to update mem tables in parallel.");

DEFINE_bool(enable_write_thread_adaptive_yield, false,
"Use a yielding spin loop for brief writer thread waits.");

DEFINE_uint64(
write_thread_max_yield_usec, 100,
"Maximum microseconds for enable_write_thread_adaptive_yield operation.");

DEFINE_uint64(write_thread_slow_yield_usec, 3,
"The threshold at which a slow yield is considered a signal that "
"other processes or threads want the core.");

DEFINE_int32(rate_limit_delay_max_milliseconds, 1000,
"When hard_rate_limit is set then this is the max time a put will"
" be stalled.");
Expand Down Expand Up @@ -2552,6 +2566,12 @@ class Benchmark {
options.hard_pending_compaction_bytes_limit =
FLAGS_hard_pending_compaction_bytes_limit;
options.delayed_write_rate = FLAGS_delayed_write_rate;
options.allow_concurrent_memtable_write =
FLAGS_allow_concurrent_memtable_write;
options.enable_write_thread_adaptive_yield =
FLAGS_enable_write_thread_adaptive_yield;
options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec;
options.write_thread_slow_yield_usec = FLAGS_write_thread_slow_yield_usec;
options.rate_limit_delay_max_milliseconds =
FLAGS_rate_limit_delay_max_milliseconds;
options.table_cache_numshardbits = FLAGS_table_cache_numshardbits;
Expand Down

0 comments on commit 7d87f02

Please sign in to comment.