Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

librbd: aio calls may block #4854

Merged
merged 14 commits into from Sep 3, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions PendingReleaseNotes
@@ -1,3 +1,9 @@
v0.80.11
--------

- The return code for librbd's rbd_aio_read and Image::aio_read API methods no
longer returns the number of bytes read upon success. Instead, it returns 0
upon success and a negative value upon failure.

v0.81
-----
Expand Down
6 changes: 6 additions & 0 deletions src/common/Throttle.cc
Expand Up @@ -267,6 +267,12 @@ void SimpleThrottle::end_op(int r)
m_cond.Signal();
}

bool SimpleThrottle::pending_error() const
{
Mutex::Locker l(m_lock);
return (m_ret < 0);
}

int SimpleThrottle::wait_for_ret()
{
Mutex::Locker l(m_lock);
Expand Down
3 changes: 2 additions & 1 deletion src/common/Throttle.h
Expand Up @@ -76,9 +76,10 @@ class SimpleThrottle {
~SimpleThrottle();
void start_op();
void end_op(int r);
bool pending_error() const;
int wait_for_ret();
private:
Mutex m_lock;
mutable Mutex m_lock;
Cond m_cond;
uint64_t m_max;
uint64_t m_current;
Expand Down
33 changes: 32 additions & 1 deletion src/common/WorkQueue.h
Expand Up @@ -330,7 +330,7 @@ class ThreadPool : public md_config_obs_t {

public:
ThreadPool(CephContext *cct_, string nm, int n, const char *option = NULL);
~ThreadPool();
virtual ~ThreadPool();

/// return number of threads currently running
int get_num_threads() {
Expand Down Expand Up @@ -433,4 +433,35 @@ class C_QueueInWQ : public Context {
}
};

class ContextWQ : public ThreadPool::WorkQueueVal<Context *> {
public:
ContextWQ(const string &name, time_t ti, ThreadPool *tp)
: ThreadPool::WorkQueueVal<Context *>(name, ti, 0, tp) {}

void queue(Context *ctx) {
ThreadPool::WorkQueueVal<Context *>::queue(ctx);
}

protected:
virtual void _enqueue(Context *item) {
_queue.push_back(item);
}
virtual void _enqueue_front(Context *item) {
_queue.push_front(item);
}
virtual bool _empty() {
return _queue.empty();
}
virtual Context *_dequeue() {
Context *item = _queue.front();
_queue.pop_front();
return item;
}
virtual void _process(Context *item) {
item->complete(0);
}
private:
list<Context *> _queue;
};

#endif
6 changes: 6 additions & 0 deletions src/common/ceph_context.cc
Expand Up @@ -265,6 +265,7 @@ CephContext::CephContext(uint32_t module_type_)
_crypto_aes(NULL)
{
ceph_spin_init(&_service_thread_lock);
ceph_spin_init(&_associated_objs_lock);

_log = new ceph::log::Log(&_conf->subsys);
_log->start();
Expand Down Expand Up @@ -298,6 +299,10 @@ CephContext::~CephContext()
{
join_service_thread();

for (map<string, AssociatedSingletonObject*>::iterator it = _associated_objs.begin();
it != _associated_objs.end(); it++)
delete it->second;

if (_conf->lockdep) {
lockdep_unregister_ceph_context(this);
}
Expand Down Expand Up @@ -335,6 +340,7 @@ CephContext::~CephContext()

delete _conf;
ceph_spin_destroy(&_service_thread_lock);
ceph_spin_destroy(&_associated_objs_lock);

delete _crypto_none;
delete _crypto_aes;
Expand Down
19 changes: 19 additions & 0 deletions src/common/ceph_context.h
Expand Up @@ -17,6 +17,7 @@

#include <iostream>
#include <stdint.h>
#include <string>

#include "include/buffer.h"
#include "include/atomic.h"
Expand Down Expand Up @@ -58,6 +59,10 @@ class CephContext {
~CephContext();
atomic_t nref;
public:
class AssociatedSingletonObject {
public:
virtual ~AssociatedSingletonObject() {}
};
CephContext *get() {
nref.inc();
return this;
Expand Down Expand Up @@ -102,6 +107,17 @@ class CephContext {
void do_command(std::string command, cmdmap_t& cmdmap, std::string format,
bufferlist *out);

template<typename T>
void lookup_or_create_singleton_object(T*& p, const std::string &name) {
ceph_spin_lock(&_associated_objs_lock);
if (!_associated_objs.count(name)) {
p = new T(this);
_associated_objs[name] = reinterpret_cast<AssociatedSingletonObject*>(p);
} else {
p = reinterpret_cast<T*>(_associated_objs[name]);
}
ceph_spin_unlock(&_associated_objs_lock);
}
/**
* get a crypto handler
*/
Expand Down Expand Up @@ -138,6 +154,9 @@ class CephContext {

ceph::HeartbeatMap *_heartbeat_map;

ceph_spinlock_t _associated_objs_lock;
std::map<std::string, AssociatedSingletonObject*> _associated_objs;

// crypto
CryptoNone *_crypto_none;
CryptoAES *_crypto_aes;
Expand Down
3 changes: 3 additions & 0 deletions src/common/config_opts.h
Expand Up @@ -727,6 +727,9 @@ OPTION(journal_ignore_corruption, OPT_BOOL, false) // assume journal is not corr
OPTION(rados_mon_op_timeout, OPT_DOUBLE, 0) // how many seconds to wait for a response from the monitor before returning an error from a rados operation. 0 means on limit.
OPTION(rados_osd_op_timeout, OPT_DOUBLE, 0) // how many seconds to wait for a response from osds before returning an error from a rados operation. 0 means no limit.

OPTION(rbd_op_threads, OPT_INT, 1)
OPTION(rbd_op_thread_timeout, OPT_INT, 60)
OPTION(rbd_non_blocking_aio, OPT_BOOL, true) // process AIO ops from a worker thread to prevent blocking
OPTION(rbd_cache, OPT_BOOL, false) // whether to enable caching (writeback unless rbd_cache_max_dirty is 0)
OPTION(rbd_cache_writethrough_until_flush, OPT_BOOL, false) // whether to make writeback caching writethrough until flush is called, to be sure the user of librbd will send flushs so that writeback is safe
OPTION(rbd_cache_size, OPT_LONGLONG, 32<<20) // cache size in bytes
Expand Down
48 changes: 46 additions & 2 deletions src/librbd/AioCompletion.cc
Expand Up @@ -5,6 +5,7 @@

#include "common/ceph_context.h"
#include "common/dout.h"
#include "common/errno.h"

#include "librbd/AioRequest.h"
#include "librbd/internal.h"
Expand All @@ -25,7 +26,7 @@ namespace librbd {
building = false;
if (!pending_count) {
finalize(cct, rval);
complete();
complete(cct);
}
lock.Unlock();
}
Expand Down Expand Up @@ -54,6 +55,49 @@ namespace librbd {
}
}

void AioCompletion::complete(CephContext *cct) {
utime_t elapsed;
assert(lock.is_locked());
elapsed = ceph_clock_now(cct) - start_time;
switch (aio_type) {
case AIO_TYPE_READ:
ictx->perfcounter->tinc(l_librbd_aio_rd_latency, elapsed); break;
case AIO_TYPE_WRITE:
ictx->perfcounter->tinc(l_librbd_aio_wr_latency, elapsed); break;
case AIO_TYPE_DISCARD:
ictx->perfcounter->tinc(l_librbd_aio_discard_latency, elapsed); break;
case AIO_TYPE_FLUSH:
ictx->perfcounter->tinc(l_librbd_aio_flush_latency, elapsed); break;
default:
lderr(cct) << "completed invalid aio_type: " << aio_type << dendl;
break;
}

if (ictx != NULL) {
Mutex::Locker l(ictx->aio_lock);
assert(ictx->pending_aio != 0);
--ictx->pending_aio;
ictx->pending_aio_cond.Signal();
}

if (complete_cb) {
complete_cb(rbd_comp, complete_arg);
}
done = true;
cond.Signal();
}

void AioCompletion::fail(CephContext *cct, int r)
{
lderr(cct) << "AioCompletion::fail() " << this << ": " << cpp_strerror(r)
<< dendl;
lock.Lock();
assert(pending_count == 0);
rval = r;
complete(cct);
put_unlock();
}

void AioCompletion::complete_request(CephContext *cct, ssize_t r)
{
ldout(cct, 20) << "AioCompletion::complete_request() "
Expand All @@ -70,7 +114,7 @@ namespace librbd {
int count = --pending_count;
if (!count && !building) {
finalize(cct, rval);
complete();
complete(cct);
}
put_unlock();
}
Expand Down
33 changes: 2 additions & 31 deletions src/librbd/AioCompletion.h
Expand Up @@ -101,37 +101,8 @@ namespace librbd {
start_time = ceph_clock_now(ictx->cct);
}

void complete() {
utime_t elapsed;
assert(lock.is_locked());
elapsed = ceph_clock_now(ictx->cct) - start_time;
switch (aio_type) {
case AIO_TYPE_READ:
ictx->perfcounter->tinc(l_librbd_aio_rd_latency, elapsed); break;
case AIO_TYPE_WRITE:
ictx->perfcounter->tinc(l_librbd_aio_wr_latency, elapsed); break;
case AIO_TYPE_DISCARD:
ictx->perfcounter->tinc(l_librbd_aio_discard_latency, elapsed); break;
case AIO_TYPE_FLUSH:
ictx->perfcounter->tinc(l_librbd_aio_flush_latency, elapsed); break;
default:
lderr(ictx->cct) << "completed invalid aio_type: " << aio_type << dendl;
break;
}

{
Mutex::Locker l(ictx->aio_lock);
assert(ictx->pending_aio != 0);
--ictx->pending_aio;
ictx->pending_aio_cond.Signal();
}

if (complete_cb) {
complete_cb(rbd_comp, complete_arg);
}
done = true;
cond.Signal();
}
void complete(CephContext *cct);
void fail(CephContext *cct, int r);

void set_complete_cb(void *cb_arg, callback_t cb) {
complete_cb = cb;
Expand Down
15 changes: 9 additions & 6 deletions src/librbd/AioRequest.cc
Expand Up @@ -85,8 +85,9 @@ namespace librbd {
return true;
}

int AioRead::send() {
ldout(m_ictx->cct, 20) << "send " << this << " " << m_oid << " " << m_object_off << "~" << m_object_len << dendl;
void AioRead::send() {
ldout(m_ictx->cct, 20) << "send " << this << " " << m_oid << " "
<< m_object_off << "~" << m_object_len << dendl;

librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(this, rados_req_cb, NULL);
Expand All @@ -99,10 +100,11 @@ namespace librbd {
} else {
op.read(m_object_off, m_object_len, &m_read_data, NULL);
}

r = m_ioctx->aio_operate(m_oid, rados_completion, &op, flags, NULL);
assert(r == 0);

rados_completion->release();
return r;
}

/** write **/
Expand Down Expand Up @@ -224,16 +226,17 @@ namespace librbd {
return finished;
}

int AbstractWrite::send() {
ldout(m_ictx->cct, 20) << "send " << this << " " << m_oid << " " << m_object_off << "~" << m_object_len << dendl;
void AbstractWrite::send() {
ldout(m_ictx->cct, 20) << "send " << this << " " << m_oid << " "
<< m_object_off << "~" << m_object_len << dendl;
librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(this, NULL, rados_req_cb);
int r;
assert(m_write.size());
r = m_ioctx->aio_operate(m_oid, rados_completion, &m_write,
m_snap_seq, m_snaps);
assert(r == 0);
rados_completion->release();
return r;
}

void AbstractWrite::send_copyup() {
Expand Down
6 changes: 3 additions & 3 deletions src/librbd/AioRequest.h
Expand Up @@ -43,7 +43,7 @@ namespace librbd {
}

virtual bool should_complete(int r) = 0;
virtual int send() = 0;
virtual void send() = 0;

protected:
void read_from_parent(vector<pair<uint64_t,uint64_t> >& image_extents);
Expand Down Expand Up @@ -73,7 +73,7 @@ namespace librbd {
}
virtual ~AioRead() {}
virtual bool should_complete(int r);
virtual int send();
virtual void send();

ceph::bufferlist &data() {
return m_read_data;
Expand All @@ -100,7 +100,7 @@ namespace librbd {
bool hide_enoent);
virtual ~AbstractWrite() {}
virtual bool should_complete(int r);
virtual int send();
virtual void send();
void guard_write();

bool has_parent() const {
Expand Down