From 434620bab9c1a57244b6625f53838dd883491dc3 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 5 Feb 2016 11:20:15 -0500 Subject: [PATCH 1/2] os/bluestore/BlockDevice: use std::mutex and std::condition_variable Pull aio wait code into an IOContext method too. Signed-off-by: Sage Weil --- src/os/bluestore/BlockDevice.cc | 8 ++++---- src/os/bluestore/BlockDevice.h | 26 +++++++++++++++++--------- src/os/bluestore/KernelDevice.cc | 12 ++---------- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/os/bluestore/BlockDevice.cc b/src/os/bluestore/BlockDevice.cc index ac17b6494cbe7..f683f6fa60585 100644 --- a/src/os/bluestore/BlockDevice.cc +++ b/src/os/bluestore/BlockDevice.cc @@ -29,14 +29,14 @@ void IOContext::aio_wait() { - Mutex::Locker l(lock); + std::unique_lock l(lock); // see _aio_thread for waker logic num_waiting.inc(); while (num_running.read() > 0 || num_reading.read() > 0) { dout(10) << __func__ << " " << this << " waiting for " << num_running.read() << " aios and/or " << num_reading.read() << " readers to complete" << dendl; - cond.Wait(lock); + cond.wait(l); } num_waiting.dec(); dout(20) << __func__ << " " << this << " done" << dendl; @@ -68,7 +68,7 @@ BlockDevice *BlockDevice::create(const string& path, aio_callback_t cb, void *cb void BlockDevice::queue_reap_ioc(IOContext *ioc) { - Mutex::Locker l(ioc_reap_lock); + std::lock_guard l(ioc_reap_lock); if (ioc_reap_count.read() == 0) ioc_reap_count.inc(); ioc_reap_queue.push_back(ioc); @@ -77,7 +77,7 @@ void BlockDevice::queue_reap_ioc(IOContext *ioc) void BlockDevice::reap_ioc() { if (ioc_reap_count.read()) { - Mutex::Locker l(ioc_reap_lock); + std::lock_guard l(ioc_reap_lock); for (auto p : ioc_reap_queue) { dout(20) << __func__ << " reap ioc " << p << dendl; delete p; diff --git a/src/os/bluestore/BlockDevice.h b/src/os/bluestore/BlockDevice.h index 955f77c3ca00d..897c2d01bc6c8 100644 --- a/src/os/bluestore/BlockDevice.h +++ b/src/os/bluestore/BlockDevice.h @@ -17,6 +17,9 @@ #ifndef CEPH_OS_BLUESTORE_BLOCKDEVICE_H #define CEPH_OS_BLUESTORE_BLOCKDEVICE_H +#include +#include + #include "acconfig.h" #include "os/fs/FS.h" @@ -30,9 +33,8 @@ struct IOContext { void *nvme_task_last = nullptr; #endif - Mutex lock; - Cond cond; - //interval_set blocks; ///< blocks with aio in flight + std::mutex lock; + std::condition_variable cond; list pending_aios; ///< not yet submitted list running_aios; ///< submitting or submitted @@ -42,8 +44,7 @@ struct IOContext { atomic_t num_waiting; explicit IOContext(void *p) - : priv(p), - lock("IOContext::lock") + : priv(p) {} // no copying @@ -51,22 +52,29 @@ struct IOContext { IOContext &operator=(const IOContext& other); bool has_aios() { - Mutex::Locker l(lock); + std::lock_guard l(lock); return num_pending.read() || num_running.read(); } void aio_wait(); + + void aio_wake() { + if (num_waiting.read()) { + std::lock_guard l(lock); + cond.notify_all(); + } + } }; class BlockDevice { - Mutex ioc_reap_lock; + std::mutex ioc_reap_lock; vector ioc_reap_queue; atomic_t ioc_reap_count; public: - BlockDevice(): ioc_reap_lock("BlockDevice::ioc_reap_lock") {} - virtual ~BlockDevice() {} + BlockDevice() = default; + virtual ~BlockDevice() = default; typedef void (*aio_callback_t)(void *handle, void *aio); static BlockDevice *create( diff --git a/src/os/bluestore/KernelDevice.cc b/src/os/bluestore/KernelDevice.cc index b85b47393762c..98b71e2b73930 100644 --- a/src/os/bluestore/KernelDevice.cc +++ b/src/os/bluestore/KernelDevice.cc @@ -249,11 +249,7 @@ void KernelDevice::_aio_thread() if (left == 0) { // check waiting count before doing callback (which may // destroy this ioc). - if (ioc->num_waiting.read()) { - dout(20) << __func__ << " waking waiter" << dendl; - Mutex::Locker l(ioc->lock); - ioc->cond.Signal(); - } + ioc->aio_wake(); if (ioc->priv) { aio_callback(aio_callback_priv, ioc->priv); } @@ -493,11 +489,7 @@ int KernelDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, out: _aio_log_finish(ioc, off, len); ioc->num_reading.dec(); - if (ioc->num_waiting.read()) { - dout(20) << __func__ << " waking waiter" << dendl; - Mutex::Locker l(ioc->lock); - ioc->cond.Signal(); - } + ioc->aio_wake(); return r < 0 ? r : 0; } From 37b517ebfe72ba2306cc2676e2e793a7d08d644b Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 5 Feb 2016 11:45:23 -0500 Subject: [PATCH 2/2] os/bluestore/BlockDevice: std::atomic_int Signed-off-by: Sage Weil --- src/os/bluestore/BlockDevice.cc | 18 +++++++++--------- src/os/bluestore/BlockDevice.h | 17 +++++++++-------- src/os/bluestore/BlueFS.cc | 2 +- src/os/bluestore/KernelDevice.cc | 20 ++++++++++---------- 4 files changed, 29 insertions(+), 28 deletions(-) diff --git a/src/os/bluestore/BlockDevice.cc b/src/os/bluestore/BlockDevice.cc index f683f6fa60585..8286070bb11fd 100644 --- a/src/os/bluestore/BlockDevice.cc +++ b/src/os/bluestore/BlockDevice.cc @@ -31,14 +31,14 @@ void IOContext::aio_wait() { std::unique_lock l(lock); // see _aio_thread for waker logic - num_waiting.inc(); - while (num_running.read() > 0 || num_reading.read() > 0) { + ++num_waiting; + while (num_running.load() > 0 || num_reading.load() > 0) { dout(10) << __func__ << " " << this - << " waiting for " << num_running.read() << " aios and/or " - << num_reading.read() << " readers to complete" << dendl; + << " waiting for " << num_running.load() << " aios and/or " + << num_reading.load() << " readers to complete" << dendl; cond.wait(l); } - num_waiting.dec(); + --num_waiting; dout(20) << __func__ << " " << this << " done" << dendl; } @@ -69,20 +69,20 @@ BlockDevice *BlockDevice::create(const string& path, aio_callback_t cb, void *cb void BlockDevice::queue_reap_ioc(IOContext *ioc) { std::lock_guard l(ioc_reap_lock); - if (ioc_reap_count.read() == 0) - ioc_reap_count.inc(); + if (ioc_reap_count.load() == 0) + ++ioc_reap_count; ioc_reap_queue.push_back(ioc); } void BlockDevice::reap_ioc() { - if (ioc_reap_count.read()) { + if (ioc_reap_count.load()) { std::lock_guard l(ioc_reap_lock); for (auto p : ioc_reap_queue) { dout(20) << __func__ << " reap ioc " << p << dendl; delete p; } ioc_reap_queue.clear(); - ioc_reap_count.dec(); + --ioc_reap_count; } } diff --git a/src/os/bluestore/BlockDevice.h b/src/os/bluestore/BlockDevice.h index 897c2d01bc6c8..5815d4000fa11 100644 --- a/src/os/bluestore/BlockDevice.h +++ b/src/os/bluestore/BlockDevice.h @@ -17,8 +17,9 @@ #ifndef CEPH_OS_BLUESTORE_BLOCKDEVICE_H #define CEPH_OS_BLUESTORE_BLOCKDEVICE_H -#include +#include #include +#include #include "acconfig.h" #include "os/fs/FS.h" @@ -38,10 +39,10 @@ struct IOContext { list pending_aios; ///< not yet submitted list running_aios; ///< submitting or submitted - atomic_t num_pending; - atomic_t num_running; - atomic_t num_reading; - atomic_t num_waiting; + std::atomic_int num_pending = {0}; + std::atomic_int num_running = {0}; + std::atomic_int num_reading = {0}; + std::atomic_int num_waiting = {0}; explicit IOContext(void *p) : priv(p) @@ -53,13 +54,13 @@ struct IOContext { bool has_aios() { std::lock_guard l(lock); - return num_pending.read() || num_running.read(); + return num_pending.load() || num_running.load(); } void aio_wait(); void aio_wake() { - if (num_waiting.read()) { + if (num_waiting.load()) { std::lock_guard l(lock); cond.notify_all(); } @@ -70,7 +71,7 @@ struct IOContext { class BlockDevice { std::mutex ioc_reap_lock; vector ioc_reap_queue; - atomic_t ioc_reap_count; + std::atomic_int ioc_reap_count = {0}; public: BlockDevice() = default; diff --git a/src/os/bluestore/BlueFS.cc b/src/os/bluestore/BlueFS.cc index a79af94bcf5e0..0fbc8c97d0cb1 100644 --- a/src/os/bluestore/BlueFS.cc +++ b/src/os/bluestore/BlueFS.cc @@ -1045,7 +1045,7 @@ int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length) x_off = 0; } for (unsigned i = 0; i < bdev.size(); ++i) { - if (h->iocv[i]->num_pending.read()) { + if (h->iocv[i]->has_aios()) { bdev[i]->aio_submit(h->iocv[i]); } } diff --git a/src/os/bluestore/KernelDevice.cc b/src/os/bluestore/KernelDevice.cc index 98b71e2b73930..ae2d612ef41f7 100644 --- a/src/os/bluestore/KernelDevice.cc +++ b/src/os/bluestore/KernelDevice.cc @@ -240,7 +240,7 @@ void KernelDevice::_aio_thread() for (int i = 0; i < r; ++i) { IOContext *ioc = static_cast(aio[i]->priv); _aio_log_finish(ioc, aio[i]->offset, aio[i]->length); - int left = ioc->num_running.dec(); + int left = --ioc->num_running; int r = aio[i]->get_return_value(); dout(10) << __func__ << " finished aio " << aio[i] << " r " << r << " ioc " << ioc @@ -304,8 +304,8 @@ void KernelDevice::_aio_log_finish( void KernelDevice::aio_submit(IOContext *ioc) { dout(20) << __func__ << " ioc " << ioc - << " pending " << ioc->num_pending.read() - << " running " << ioc->num_running.read() + << " pending " << ioc->num_pending.load() + << " running " << ioc->num_running.load() << dendl; // move these aside, and get our end iterator position now, as the // aios might complete as soon as they are submitted and queue more @@ -314,10 +314,10 @@ void KernelDevice::aio_submit(IOContext *ioc) ioc->running_aios.splice(e, ioc->pending_aios); list::iterator p = ioc->running_aios.begin(); - int pending = ioc->num_pending.read(); - ioc->num_running.add(pending); - ioc->num_pending.sub(pending); - assert(ioc->num_pending.read() == 0); // we should be only thread doing this + int pending = ioc->num_pending.load(); + ioc->num_running += pending; + ioc->num_pending -= pending; + assert(ioc->num_pending.load() == 0); // we should be only thread doing this bool done = false; while (!done) { @@ -377,7 +377,7 @@ int KernelDevice::aio_write( #ifdef HAVE_LIBAIO if (aio && dio && !buffered) { ioc->pending_aios.push_back(FS::aio_t(ioc, fd_direct)); - ioc->num_pending.inc(); + ++ioc->num_pending; FS::aio_t& aio = ioc->pending_aios.back(); if (g_conf->bdev_inject_crash && rand() % g_conf->bdev_inject_crash == 0) { @@ -469,7 +469,7 @@ int KernelDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, assert(off + len <= size); _aio_log_start(ioc, off, len); - ioc->num_reading.inc();; + ++ioc->num_reading; bufferptr p = buffer::create_page_aligned(len); int r = ::pread(buffered ? fd_buffered : fd_direct, @@ -488,7 +488,7 @@ int KernelDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, out: _aio_log_finish(ioc, off, len); - ioc->num_reading.dec(); + --ioc->num_reading; ioc->aio_wake(); return r < 0 ? r : 0; }