Skip to content

Commit

Permalink
Merge pull request #7568 from liewegas/wip-bluestore-mutex
Browse files Browse the repository at this point in the history
osd: bluestore/blockdevice: use std::mutex et al
  • Loading branch information
liewegas committed Feb 9, 2016
2 parents 278acb6 + 37b517e commit be5544b
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 49 deletions.
26 changes: 13 additions & 13 deletions src/os/bluestore/BlockDevice.cc
Expand Up @@ -29,16 +29,16 @@

void IOContext::aio_wait()
{
Mutex::Locker l(lock);
std::unique_lock<std::mutex> l(lock);
// see _aio_thread for waker logic
num_waiting.inc();
while (num_running.read() > 0 || num_reading.read() > 0) {
++num_waiting;
while (num_running.load() > 0 || num_reading.load() > 0) {
dout(10) << __func__ << " " << this
<< " waiting for " << num_running.read() << " aios and/or "
<< num_reading.read() << " readers to complete" << dendl;
cond.Wait(lock);
<< " waiting for " << num_running.load() << " aios and/or "
<< num_reading.load() << " readers to complete" << dendl;
cond.wait(l);
}
num_waiting.dec();
--num_waiting;
dout(20) << __func__ << " " << this << " done" << dendl;
}

Expand Down Expand Up @@ -68,21 +68,21 @@ BlockDevice *BlockDevice::create(const string& path, aio_callback_t cb, void *cb

void BlockDevice::queue_reap_ioc(IOContext *ioc)
{
Mutex::Locker l(ioc_reap_lock);
if (ioc_reap_count.read() == 0)
ioc_reap_count.inc();
std::lock_guard<std::mutex> l(ioc_reap_lock);
if (ioc_reap_count.load() == 0)
++ioc_reap_count;
ioc_reap_queue.push_back(ioc);
}

void BlockDevice::reap_ioc()
{
if (ioc_reap_count.read()) {
Mutex::Locker l(ioc_reap_lock);
if (ioc_reap_count.load()) {
std::lock_guard<std::mutex> l(ioc_reap_lock);
for (auto p : ioc_reap_queue) {
dout(20) << __func__ << " reap ioc " << p << dendl;
delete p;
}
ioc_reap_queue.clear();
ioc_reap_count.dec();
--ioc_reap_count;
}
}
39 changes: 24 additions & 15 deletions src/os/bluestore/BlockDevice.h
Expand Up @@ -17,6 +17,10 @@
#ifndef CEPH_OS_BLUESTORE_BLOCKDEVICE_H
#define CEPH_OS_BLUESTORE_BLOCKDEVICE_H

#include <atomic>
#include <condition_variable>
#include <mutex>

#include "acconfig.h"
#include "os/fs/FS.h"

Expand All @@ -30,43 +34,48 @@ struct IOContext {
void *nvme_task_last = nullptr;
#endif

Mutex lock;
Cond cond;
//interval_set<uint64_t> blocks; ///< blocks with aio in flight
std::mutex lock;
std::condition_variable cond;

list<FS::aio_t> pending_aios; ///< not yet submitted
list<FS::aio_t> running_aios; ///< submitting or submitted
atomic_t num_pending;
atomic_t num_running;
atomic_t num_reading;
atomic_t num_waiting;
std::atomic_int num_pending = {0};
std::atomic_int num_running = {0};
std::atomic_int num_reading = {0};
std::atomic_int num_waiting = {0};

explicit IOContext(void *p)
: priv(p),
lock("IOContext::lock")
: priv(p)
{}

// no copying
IOContext(const IOContext& other);
IOContext &operator=(const IOContext& other);

bool has_aios() {
Mutex::Locker l(lock);
return num_pending.read() || num_running.read();
std::lock_guard<std::mutex> l(lock);
return num_pending.load() || num_running.load();
}

void aio_wait();

void aio_wake() {
if (num_waiting.load()) {
std::lock_guard<std::mutex> l(lock);
cond.notify_all();
}
}
};


class BlockDevice {
Mutex ioc_reap_lock;
std::mutex ioc_reap_lock;
vector<IOContext*> ioc_reap_queue;
atomic_t ioc_reap_count;
std::atomic_int ioc_reap_count = {0};

public:
BlockDevice(): ioc_reap_lock("BlockDevice::ioc_reap_lock") {}
virtual ~BlockDevice() {}
BlockDevice() = default;
virtual ~BlockDevice() = default;
typedef void (*aio_callback_t)(void *handle, void *aio);

static BlockDevice *create(
Expand Down
2 changes: 1 addition & 1 deletion src/os/bluestore/BlueFS.cc
Expand Up @@ -1045,7 +1045,7 @@ int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length)
x_off = 0;
}
for (unsigned i = 0; i < bdev.size(); ++i) {
if (h->iocv[i]->num_pending.read()) {
if (h->iocv[i]->has_aios()) {
bdev[i]->aio_submit(h->iocv[i]);
}
}
Expand Down
32 changes: 12 additions & 20 deletions src/os/bluestore/KernelDevice.cc
Expand Up @@ -240,7 +240,7 @@ void KernelDevice::_aio_thread()
for (int i = 0; i < r; ++i) {
IOContext *ioc = static_cast<IOContext*>(aio[i]->priv);
_aio_log_finish(ioc, aio[i]->offset, aio[i]->length);
int left = ioc->num_running.dec();
int left = --ioc->num_running;
int r = aio[i]->get_return_value();
dout(10) << __func__ << " finished aio " << aio[i] << " r " << r
<< " ioc " << ioc
Expand All @@ -249,11 +249,7 @@ void KernelDevice::_aio_thread()
if (left == 0) {
// check waiting count before doing callback (which may
// destroy this ioc).
if (ioc->num_waiting.read()) {
dout(20) << __func__ << " waking waiter" << dendl;
Mutex::Locker l(ioc->lock);
ioc->cond.Signal();
}
ioc->aio_wake();
if (ioc->priv) {
aio_callback(aio_callback_priv, ioc->priv);
}
Expand Down Expand Up @@ -308,8 +304,8 @@ void KernelDevice::_aio_log_finish(
void KernelDevice::aio_submit(IOContext *ioc)
{
dout(20) << __func__ << " ioc " << ioc
<< " pending " << ioc->num_pending.read()
<< " running " << ioc->num_running.read()
<< " pending " << ioc->num_pending.load()
<< " running " << ioc->num_running.load()
<< dendl;
// move these aside, and get our end iterator position now, as the
// aios might complete as soon as they are submitted and queue more
Expand All @@ -318,10 +314,10 @@ void KernelDevice::aio_submit(IOContext *ioc)
ioc->running_aios.splice(e, ioc->pending_aios);
list<FS::aio_t>::iterator p = ioc->running_aios.begin();

int pending = ioc->num_pending.read();
ioc->num_running.add(pending);
ioc->num_pending.sub(pending);
assert(ioc->num_pending.read() == 0); // we should be only thread doing this
int pending = ioc->num_pending.load();
ioc->num_running += pending;
ioc->num_pending -= pending;
assert(ioc->num_pending.load() == 0); // we should be only thread doing this

bool done = false;
while (!done) {
Expand Down Expand Up @@ -381,7 +377,7 @@ int KernelDevice::aio_write(
#ifdef HAVE_LIBAIO
if (aio && dio && !buffered) {
ioc->pending_aios.push_back(FS::aio_t(ioc, fd_direct));
ioc->num_pending.inc();
++ioc->num_pending;
FS::aio_t& aio = ioc->pending_aios.back();
if (g_conf->bdev_inject_crash &&
rand() % g_conf->bdev_inject_crash == 0) {
Expand Down Expand Up @@ -473,7 +469,7 @@ int KernelDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
assert(off + len <= size);

_aio_log_start(ioc, off, len);
ioc->num_reading.inc();;
++ioc->num_reading;

bufferptr p = buffer::create_page_aligned(len);
int r = ::pread(buffered ? fd_buffered : fd_direct,
Expand All @@ -492,12 +488,8 @@ int KernelDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,

out:
_aio_log_finish(ioc, off, len);
ioc->num_reading.dec();
if (ioc->num_waiting.read()) {
dout(20) << __func__ << " waking waiter" << dendl;
Mutex::Locker l(ioc->lock);
ioc->cond.Signal();
}
--ioc->num_reading;
ioc->aio_wake();
return r < 0 ? r : 0;
}

Expand Down

0 comments on commit be5544b

Please sign in to comment.