Skip to content

Commit

Permalink
Merge pull request #4854 from ceph/wip-11769-firefly
Browse files Browse the repository at this point in the history
librbd: aio calls may block

Reviewed-by: Josh Durgin <jdurgin@redhat.com>
  • Loading branch information
smithfarm committed Sep 3, 2015
2 parents 7f8a397 + 4a709a4 commit 99612e7
Show file tree
Hide file tree
Showing 17 changed files with 446 additions and 173 deletions.
6 changes: 6 additions & 0 deletions PendingReleaseNotes
@@ -1,3 +1,9 @@
v0.80.11
--------

- The return code for librbd's rbd_aio_read and Image::aio_read API methods no
longer returns the number of bytes read upon success. Instead, it returns 0
upon success and a negative value upon failure.

v0.81
-----
Expand Down
6 changes: 6 additions & 0 deletions src/common/Throttle.cc
Expand Up @@ -267,6 +267,12 @@ void SimpleThrottle::end_op(int r)
m_cond.Signal();
}

bool SimpleThrottle::pending_error() const
{
Mutex::Locker l(m_lock);
return (m_ret < 0);
}

int SimpleThrottle::wait_for_ret()
{
Mutex::Locker l(m_lock);
Expand Down
3 changes: 2 additions & 1 deletion src/common/Throttle.h
Expand Up @@ -76,9 +76,10 @@ class SimpleThrottle {
~SimpleThrottle();
void start_op();
void end_op(int r);
bool pending_error() const;
int wait_for_ret();
private:
Mutex m_lock;
mutable Mutex m_lock;
Cond m_cond;
uint64_t m_max;
uint64_t m_current;
Expand Down
33 changes: 32 additions & 1 deletion src/common/WorkQueue.h
Expand Up @@ -330,7 +330,7 @@ class ThreadPool : public md_config_obs_t {

public:
ThreadPool(CephContext *cct_, string nm, int n, const char *option = NULL);
~ThreadPool();
virtual ~ThreadPool();

/// return number of threads currently running
int get_num_threads() {
Expand Down Expand Up @@ -433,4 +433,35 @@ class C_QueueInWQ : public Context {
}
};

class ContextWQ : public ThreadPool::WorkQueueVal<Context *> {
public:
ContextWQ(const string &name, time_t ti, ThreadPool *tp)
: ThreadPool::WorkQueueVal<Context *>(name, ti, 0, tp) {}

void queue(Context *ctx) {
ThreadPool::WorkQueueVal<Context *>::queue(ctx);
}

protected:
virtual void _enqueue(Context *item) {
_queue.push_back(item);
}
virtual void _enqueue_front(Context *item) {
_queue.push_front(item);
}
virtual bool _empty() {
return _queue.empty();
}
virtual Context *_dequeue() {
Context *item = _queue.front();
_queue.pop_front();
return item;
}
virtual void _process(Context *item) {
item->complete(0);
}
private:
list<Context *> _queue;
};

#endif
6 changes: 6 additions & 0 deletions src/common/ceph_context.cc
Expand Up @@ -265,6 +265,7 @@ CephContext::CephContext(uint32_t module_type_)
_crypto_aes(NULL)
{
ceph_spin_init(&_service_thread_lock);
ceph_spin_init(&_associated_objs_lock);

_log = new ceph::log::Log(&_conf->subsys);
_log->start();
Expand Down Expand Up @@ -298,6 +299,10 @@ CephContext::~CephContext()
{
join_service_thread();

for (map<string, AssociatedSingletonObject*>::iterator it = _associated_objs.begin();
it != _associated_objs.end(); it++)
delete it->second;

if (_conf->lockdep) {
lockdep_unregister_ceph_context(this);
}
Expand Down Expand Up @@ -335,6 +340,7 @@ CephContext::~CephContext()

delete _conf;
ceph_spin_destroy(&_service_thread_lock);
ceph_spin_destroy(&_associated_objs_lock);

delete _crypto_none;
delete _crypto_aes;
Expand Down
19 changes: 19 additions & 0 deletions src/common/ceph_context.h
Expand Up @@ -17,6 +17,7 @@

#include <iostream>
#include <stdint.h>
#include <string>

#include "include/buffer.h"
#include "include/atomic.h"
Expand Down Expand Up @@ -58,6 +59,10 @@ class CephContext {
~CephContext();
atomic_t nref;
public:
class AssociatedSingletonObject {
public:
virtual ~AssociatedSingletonObject() {}
};
CephContext *get() {
nref.inc();
return this;
Expand Down Expand Up @@ -102,6 +107,17 @@ class CephContext {
void do_command(std::string command, cmdmap_t& cmdmap, std::string format,
bufferlist *out);

template<typename T>
void lookup_or_create_singleton_object(T*& p, const std::string &name) {
ceph_spin_lock(&_associated_objs_lock);
if (!_associated_objs.count(name)) {
p = new T(this);
_associated_objs[name] = reinterpret_cast<AssociatedSingletonObject*>(p);
} else {
p = reinterpret_cast<T*>(_associated_objs[name]);
}
ceph_spin_unlock(&_associated_objs_lock);
}
/**
* get a crypto handler
*/
Expand Down Expand Up @@ -138,6 +154,9 @@ class CephContext {

ceph::HeartbeatMap *_heartbeat_map;

ceph_spinlock_t _associated_objs_lock;
std::map<std::string, AssociatedSingletonObject*> _associated_objs;

// crypto
CryptoNone *_crypto_none;
CryptoAES *_crypto_aes;
Expand Down
3 changes: 3 additions & 0 deletions src/common/config_opts.h
Expand Up @@ -727,6 +727,9 @@ OPTION(journal_ignore_corruption, OPT_BOOL, false) // assume journal is not corr
OPTION(rados_mon_op_timeout, OPT_DOUBLE, 0) // how many seconds to wait for a response from the monitor before returning an error from a rados operation. 0 means on limit.
OPTION(rados_osd_op_timeout, OPT_DOUBLE, 0) // how many seconds to wait for a response from osds before returning an error from a rados operation. 0 means no limit.

OPTION(rbd_op_threads, OPT_INT, 1)
OPTION(rbd_op_thread_timeout, OPT_INT, 60)
OPTION(rbd_non_blocking_aio, OPT_BOOL, true) // process AIO ops from a worker thread to prevent blocking
OPTION(rbd_cache, OPT_BOOL, false) // whether to enable caching (writeback unless rbd_cache_max_dirty is 0)
OPTION(rbd_cache_writethrough_until_flush, OPT_BOOL, false) // whether to make writeback caching writethrough until flush is called, to be sure the user of librbd will send flushs so that writeback is safe
OPTION(rbd_cache_size, OPT_LONGLONG, 32<<20) // cache size in bytes
Expand Down
48 changes: 46 additions & 2 deletions src/librbd/AioCompletion.cc
Expand Up @@ -5,6 +5,7 @@

#include "common/ceph_context.h"
#include "common/dout.h"
#include "common/errno.h"

#include "librbd/AioRequest.h"
#include "librbd/internal.h"
Expand All @@ -25,7 +26,7 @@ namespace librbd {
building = false;
if (!pending_count) {
finalize(cct, rval);
complete();
complete(cct);
}
lock.Unlock();
}
Expand Down Expand Up @@ -54,6 +55,49 @@ namespace librbd {
}
}

void AioCompletion::complete(CephContext *cct) {
utime_t elapsed;
assert(lock.is_locked());
elapsed = ceph_clock_now(cct) - start_time;
switch (aio_type) {
case AIO_TYPE_READ:
ictx->perfcounter->tinc(l_librbd_aio_rd_latency, elapsed); break;
case AIO_TYPE_WRITE:
ictx->perfcounter->tinc(l_librbd_aio_wr_latency, elapsed); break;
case AIO_TYPE_DISCARD:
ictx->perfcounter->tinc(l_librbd_aio_discard_latency, elapsed); break;
case AIO_TYPE_FLUSH:
ictx->perfcounter->tinc(l_librbd_aio_flush_latency, elapsed); break;
default:
lderr(cct) << "completed invalid aio_type: " << aio_type << dendl;
break;
}

if (ictx != NULL) {
Mutex::Locker l(ictx->aio_lock);
assert(ictx->pending_aio != 0);
--ictx->pending_aio;
ictx->pending_aio_cond.Signal();
}

if (complete_cb) {
complete_cb(rbd_comp, complete_arg);
}
done = true;
cond.Signal();
}

void AioCompletion::fail(CephContext *cct, int r)
{
lderr(cct) << "AioCompletion::fail() " << this << ": " << cpp_strerror(r)
<< dendl;
lock.Lock();
assert(pending_count == 0);
rval = r;
complete(cct);
put_unlock();
}

void AioCompletion::complete_request(CephContext *cct, ssize_t r)
{
ldout(cct, 20) << "AioCompletion::complete_request() "
Expand All @@ -70,7 +114,7 @@ namespace librbd {
int count = --pending_count;
if (!count && !building) {
finalize(cct, rval);
complete();
complete(cct);
}
put_unlock();
}
Expand Down
33 changes: 2 additions & 31 deletions src/librbd/AioCompletion.h
Expand Up @@ -101,37 +101,8 @@ namespace librbd {
start_time = ceph_clock_now(ictx->cct);
}

void complete() {
utime_t elapsed;
assert(lock.is_locked());
elapsed = ceph_clock_now(ictx->cct) - start_time;
switch (aio_type) {
case AIO_TYPE_READ:
ictx->perfcounter->tinc(l_librbd_aio_rd_latency, elapsed); break;
case AIO_TYPE_WRITE:
ictx->perfcounter->tinc(l_librbd_aio_wr_latency, elapsed); break;
case AIO_TYPE_DISCARD:
ictx->perfcounter->tinc(l_librbd_aio_discard_latency, elapsed); break;
case AIO_TYPE_FLUSH:
ictx->perfcounter->tinc(l_librbd_aio_flush_latency, elapsed); break;
default:
lderr(ictx->cct) << "completed invalid aio_type: " << aio_type << dendl;
break;
}

{
Mutex::Locker l(ictx->aio_lock);
assert(ictx->pending_aio != 0);
--ictx->pending_aio;
ictx->pending_aio_cond.Signal();
}

if (complete_cb) {
complete_cb(rbd_comp, complete_arg);
}
done = true;
cond.Signal();
}
void complete(CephContext *cct);
void fail(CephContext *cct, int r);

void set_complete_cb(void *cb_arg, callback_t cb) {
complete_cb = cb;
Expand Down
15 changes: 9 additions & 6 deletions src/librbd/AioRequest.cc
Expand Up @@ -85,8 +85,9 @@ namespace librbd {
return true;
}

int AioRead::send() {
ldout(m_ictx->cct, 20) << "send " << this << " " << m_oid << " " << m_object_off << "~" << m_object_len << dendl;
void AioRead::send() {
ldout(m_ictx->cct, 20) << "send " << this << " " << m_oid << " "
<< m_object_off << "~" << m_object_len << dendl;

librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(this, rados_req_cb, NULL);
Expand All @@ -99,10 +100,11 @@ namespace librbd {
} else {
op.read(m_object_off, m_object_len, &m_read_data, NULL);
}

r = m_ioctx->aio_operate(m_oid, rados_completion, &op, flags, NULL);
assert(r == 0);

rados_completion->release();
return r;
}

/** write **/
Expand Down Expand Up @@ -224,16 +226,17 @@ namespace librbd {
return finished;
}

int AbstractWrite::send() {
ldout(m_ictx->cct, 20) << "send " << this << " " << m_oid << " " << m_object_off << "~" << m_object_len << dendl;
void AbstractWrite::send() {
ldout(m_ictx->cct, 20) << "send " << this << " " << m_oid << " "
<< m_object_off << "~" << m_object_len << dendl;
librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(this, NULL, rados_req_cb);
int r;
assert(m_write.size());
r = m_ioctx->aio_operate(m_oid, rados_completion, &m_write,
m_snap_seq, m_snaps);
assert(r == 0);
rados_completion->release();
return r;
}

void AbstractWrite::send_copyup() {
Expand Down
6 changes: 3 additions & 3 deletions src/librbd/AioRequest.h
Expand Up @@ -43,7 +43,7 @@ namespace librbd {
}

virtual bool should_complete(int r) = 0;
virtual int send() = 0;
virtual void send() = 0;

protected:
void read_from_parent(vector<pair<uint64_t,uint64_t> >& image_extents);
Expand Down Expand Up @@ -73,7 +73,7 @@ namespace librbd {
}
virtual ~AioRead() {}
virtual bool should_complete(int r);
virtual int send();
virtual void send();

ceph::bufferlist &data() {
return m_read_data;
Expand All @@ -100,7 +100,7 @@ namespace librbd {
bool hide_enoent);
virtual ~AbstractWrite() {}
virtual bool should_complete(int r);
virtual int send();
virtual void send();
void guard_write();

bool has_parent() const {
Expand Down

0 comments on commit 99612e7

Please sign in to comment.