diff --git a/src/os/bluestore/BlockDevice.cc b/src/os/bluestore/BlockDevice.cc index ac17b6494cbe7..8286070bb11fd 100644 --- a/src/os/bluestore/BlockDevice.cc +++ b/src/os/bluestore/BlockDevice.cc @@ -29,16 +29,16 @@ void IOContext::aio_wait() { - Mutex::Locker l(lock); + std::unique_lock l(lock); // see _aio_thread for waker logic - num_waiting.inc(); - while (num_running.read() > 0 || num_reading.read() > 0) { + ++num_waiting; + while (num_running.load() > 0 || num_reading.load() > 0) { dout(10) << __func__ << " " << this - << " waiting for " << num_running.read() << " aios and/or " - << num_reading.read() << " readers to complete" << dendl; - cond.Wait(lock); + << " waiting for " << num_running.load() << " aios and/or " + << num_reading.load() << " readers to complete" << dendl; + cond.wait(l); } - num_waiting.dec(); + --num_waiting; dout(20) << __func__ << " " << this << " done" << dendl; } @@ -68,21 +68,21 @@ BlockDevice *BlockDevice::create(const string& path, aio_callback_t cb, void *cb void BlockDevice::queue_reap_ioc(IOContext *ioc) { - Mutex::Locker l(ioc_reap_lock); - if (ioc_reap_count.read() == 0) - ioc_reap_count.inc(); + std::lock_guard l(ioc_reap_lock); + if (ioc_reap_count.load() == 0) + ++ioc_reap_count; ioc_reap_queue.push_back(ioc); } void BlockDevice::reap_ioc() { - if (ioc_reap_count.read()) { - Mutex::Locker l(ioc_reap_lock); + if (ioc_reap_count.load()) { + std::lock_guard l(ioc_reap_lock); for (auto p : ioc_reap_queue) { dout(20) << __func__ << " reap ioc " << p << dendl; delete p; } ioc_reap_queue.clear(); - ioc_reap_count.dec(); + --ioc_reap_count; } } diff --git a/src/os/bluestore/BlockDevice.h b/src/os/bluestore/BlockDevice.h index 955f77c3ca00d..5815d4000fa11 100644 --- a/src/os/bluestore/BlockDevice.h +++ b/src/os/bluestore/BlockDevice.h @@ -17,6 +17,10 @@ #ifndef CEPH_OS_BLUESTORE_BLOCKDEVICE_H #define CEPH_OS_BLUESTORE_BLOCKDEVICE_H +#include +#include +#include + #include "acconfig.h" #include "os/fs/FS.h" @@ -30,20 +34,18 @@ struct IOContext { void *nvme_task_last = nullptr; #endif - Mutex lock; - Cond cond; - //interval_set blocks; ///< blocks with aio in flight + std::mutex lock; + std::condition_variable cond; list pending_aios; ///< not yet submitted list running_aios; ///< submitting or submitted - atomic_t num_pending; - atomic_t num_running; - atomic_t num_reading; - atomic_t num_waiting; + std::atomic_int num_pending = {0}; + std::atomic_int num_running = {0}; + std::atomic_int num_reading = {0}; + std::atomic_int num_waiting = {0}; explicit IOContext(void *p) - : priv(p), - lock("IOContext::lock") + : priv(p) {} // no copying @@ -51,22 +53,29 @@ struct IOContext { IOContext &operator=(const IOContext& other); bool has_aios() { - Mutex::Locker l(lock); - return num_pending.read() || num_running.read(); + std::lock_guard l(lock); + return num_pending.load() || num_running.load(); } void aio_wait(); + + void aio_wake() { + if (num_waiting.load()) { + std::lock_guard l(lock); + cond.notify_all(); + } + } }; class BlockDevice { - Mutex ioc_reap_lock; + std::mutex ioc_reap_lock; vector ioc_reap_queue; - atomic_t ioc_reap_count; + std::atomic_int ioc_reap_count = {0}; public: - BlockDevice(): ioc_reap_lock("BlockDevice::ioc_reap_lock") {} - virtual ~BlockDevice() {} + BlockDevice() = default; + virtual ~BlockDevice() = default; typedef void (*aio_callback_t)(void *handle, void *aio); static BlockDevice *create( diff --git a/src/os/bluestore/BlueFS.cc b/src/os/bluestore/BlueFS.cc index a79af94bcf5e0..0fbc8c97d0cb1 100644 --- a/src/os/bluestore/BlueFS.cc +++ b/src/os/bluestore/BlueFS.cc @@ -1045,7 +1045,7 @@ int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length) x_off = 0; } for (unsigned i = 0; i < bdev.size(); ++i) { - if (h->iocv[i]->num_pending.read()) { + if (h->iocv[i]->has_aios()) { bdev[i]->aio_submit(h->iocv[i]); } } diff --git a/src/os/bluestore/KernelDevice.cc b/src/os/bluestore/KernelDevice.cc index b85b47393762c..ae2d612ef41f7 100644 --- a/src/os/bluestore/KernelDevice.cc +++ b/src/os/bluestore/KernelDevice.cc @@ -240,7 +240,7 @@ void KernelDevice::_aio_thread() for (int i = 0; i < r; ++i) { IOContext *ioc = static_cast(aio[i]->priv); _aio_log_finish(ioc, aio[i]->offset, aio[i]->length); - int left = ioc->num_running.dec(); + int left = --ioc->num_running; int r = aio[i]->get_return_value(); dout(10) << __func__ << " finished aio " << aio[i] << " r " << r << " ioc " << ioc @@ -249,11 +249,7 @@ void KernelDevice::_aio_thread() if (left == 0) { // check waiting count before doing callback (which may // destroy this ioc). - if (ioc->num_waiting.read()) { - dout(20) << __func__ << " waking waiter" << dendl; - Mutex::Locker l(ioc->lock); - ioc->cond.Signal(); - } + ioc->aio_wake(); if (ioc->priv) { aio_callback(aio_callback_priv, ioc->priv); } @@ -308,8 +304,8 @@ void KernelDevice::_aio_log_finish( void KernelDevice::aio_submit(IOContext *ioc) { dout(20) << __func__ << " ioc " << ioc - << " pending " << ioc->num_pending.read() - << " running " << ioc->num_running.read() + << " pending " << ioc->num_pending.load() + << " running " << ioc->num_running.load() << dendl; // move these aside, and get our end iterator position now, as the // aios might complete as soon as they are submitted and queue more @@ -318,10 +314,10 @@ void KernelDevice::aio_submit(IOContext *ioc) ioc->running_aios.splice(e, ioc->pending_aios); list::iterator p = ioc->running_aios.begin(); - int pending = ioc->num_pending.read(); - ioc->num_running.add(pending); - ioc->num_pending.sub(pending); - assert(ioc->num_pending.read() == 0); // we should be only thread doing this + int pending = ioc->num_pending.load(); + ioc->num_running += pending; + ioc->num_pending -= pending; + assert(ioc->num_pending.load() == 0); // we should be only thread doing this bool done = false; while (!done) { @@ -381,7 +377,7 @@ int KernelDevice::aio_write( #ifdef HAVE_LIBAIO if (aio && dio && !buffered) { ioc->pending_aios.push_back(FS::aio_t(ioc, fd_direct)); - ioc->num_pending.inc(); + ++ioc->num_pending; FS::aio_t& aio = ioc->pending_aios.back(); if (g_conf->bdev_inject_crash && rand() % g_conf->bdev_inject_crash == 0) { @@ -473,7 +469,7 @@ int KernelDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, assert(off + len <= size); _aio_log_start(ioc, off, len); - ioc->num_reading.inc();; + ++ioc->num_reading; bufferptr p = buffer::create_page_aligned(len); int r = ::pread(buffered ? fd_buffered : fd_direct, @@ -492,12 +488,8 @@ int KernelDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, out: _aio_log_finish(ioc, off, len); - ioc->num_reading.dec(); - if (ioc->num_waiting.read()) { - dout(20) << __func__ << " waking waiter" << dendl; - Mutex::Locker l(ioc->lock); - ioc->cond.Signal(); - } + --ioc->num_reading; + ioc->aio_wake(); return r < 0 ? r : 0; }