From 55e2bf6ca34322b62ac6a670bbf9ea15e4486f6b Mon Sep 17 00:00:00 2001 From: Rulin Huang Date: Wed, 15 May 2024 04:44:56 -0400 Subject: [PATCH] Parallelize SetState in LaunchParallelMemTableWriters To wake up each writer to write its own memtable, the leader writer first wakes up the (n^0.5-1) caller writers, and then those callers and the leader will wake up n/x separately to write to the memtable. This reduces the number for the leader's to SetState n-1 writers to 2*(n^0.5) writers in turn. --- db/db_impl/db_impl_write.cc | 7 ++++- db/write_thread.cc | 56 +++++++++++++++++++++++++++++++++---- db/write_thread.h | 15 ++++++++++ 3 files changed, 72 insertions(+), 6 deletions(-) diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 121bb55e912..7ab95683d6a 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -329,6 +329,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE); write_thread_.JoinBatchGroup(&w); + if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_CALLER) { + write_thread_.SetMemWritersEachStride(&w); + } if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { // we are a non-leader in a parallel group @@ -826,7 +829,9 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, // so we need to set its status to pass ASSERT_STATUS_CHECKED memtable_write_group.status.PermitUncheckedError(); } - + if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_CALLER) { + write_thread_.SetMemWritersEachStride(&w); + } if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_FOR_WAIT_GUARD(write_memtable_time); diff --git a/db/write_thread.cc b/db/write_thread.cc index 39f13c31875..ce80bd52586 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -428,6 +428,7 @@ void WriteThread::JoinBatchGroup(Writer* w) { TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w); AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER | + STATE_PARALLEL_MEMTABLE_CALLER | STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, &jbg_ctx); TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w); @@ -656,12 +657,57 @@ void WriteThread::ExitAsMemTableWriter(Writer* /*self*/, SetState(leader, STATE_COMPLETED); } +void WriteThread::SetMemWritersEachStride(Writer* w) { + WriteGroup* write_group = w->write_group; + Writer* last_writer = write_group->last_writer; + + // The stride is the same for each writer in write_group, so w will + // call the writers with the same number in write_group mod total size + size_t stride = static_cast(sqrt(write_group->size)); + size_t count = 0; + while (w) { + if (count++ % stride == 0) { + SetState(w, STATE_PARALLEL_MEMTABLE_WRITER); + } + w = (w == last_writer) ? nullptr : w->link_newer; + } +} + void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) { assert(write_group != nullptr); - write_group->running.store(write_group->size); - for (auto w : *write_group) { - SetState(w, STATE_PARALLEL_MEMTABLE_WRITER); + size_t group_size = write_group->size; + write_group->running.store(group_size); + + // The minimum number to allow the group use parallel caller mode. + // The number must no lower than 3; + const size_t MinParallelSize = 5; + + // The group_size is too small, and there is no need to have + // the parallel partial callers. + if (group_size < MinParallelSize) { + for (auto w : *write_group) { + SetState(w, STATE_PARALLEL_MEMTABLE_WRITER); + } + return; } + + // The stride is equal to sqrt(group_size) which can minimize + // the total number of leader SetSate. + // Set the leader itself STATE_PARALLEL_MEMTABLE_WRITER, and set + // (stride-1) writers to be STATE_PARALLEL_MEMTABLE_CALLER. + size_t stride = static_cast(sqrt(group_size)); + auto w = write_group->leader; + SetState(w, STATE_PARALLEL_MEMTABLE_WRITER); + + for (size_t i = 1; i < stride; i++) { + w = w->link_newer; + SetState(w, STATE_PARALLEL_MEMTABLE_CALLER); + } + + // After setting all STATE_PARALLEL_MEMTABLE_CALLER, the leader also + // does the job as STATE_PARALLEL_MEMTABLE_CALLER. + w = w->link_newer; + SetMemWritersEachStride(w); } static WriteThread::AdaptationContext cpmtw_ctx( @@ -788,8 +834,8 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, } AwaitState(leader, - STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_WRITER | - STATE_COMPLETED, + STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_CALLER | + STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, &eabgl_ctx); } else { Writer* head = newest_writer_.load(std::memory_order_acquire); diff --git a/db/write_thread.h b/db/write_thread.h index dc64601f9f4..dee42c80a8e 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -71,6 +71,12 @@ class WriteThread { // A state indicating that the thread may be waiting using StateMutex() // and StateCondVar() STATE_LOCKED_WAITING = 32, + + // The state used to inform a waiting writer that it has become a + // caller to call some other waiting writers to write to memtable + // by calling SetMemWritersEachStride. After doing + // this, it will also write to memtable. + STATE_PARALLEL_MEMTABLE_CALLER = 64, }; struct Writer; @@ -323,10 +329,19 @@ class WriteThread { // Causes JoinBatchGroup to return STATE_PARALLEL_MEMTABLE_WRITER for all of // the non-leader members of this write batch group. Sets Writer::sequence // before waking them up. + // If the size of write_group n is not small, the leader will call n^0.5 + // members to be PARALLEL_MEMTABLE_CALLER in the write_group to help to set + // other's status parallel. This ensures that the cost to call SetState + // sequentially does not exceed 2(n^0.5). // // WriteGroup* write_group: Extra state used to coordinate the parallel add void LaunchParallelMemTableWriters(WriteGroup* write_group); + // One of the every stride=N number writer in the WriteGroup are set to the + // MemTableWriters, where N is equal to square of the total number of this + // write_group, and all of these MemTableWriters will write to memtable. + void SetMemWritersEachStride(Writer* w); + // Reports the completion of w's batch to the parallel group leader, and // waits for the rest of the parallel batch to complete. Returns true // if this thread is the last to complete, and hence should advance