Skip to content

Commit

Permalink
librados: refactor watch/notify; return notify error code
Browse files Browse the repository at this point in the history
Get rid of a level of intermediate classes with confusing names and put
the notify and notify finish logic in a single place so that it is easier
to follow and understand.

Pass the return value from the notify completion message to the caller.

Fixes: #9193
Signed-off-by: Sage Weil <sage@redhat.com>
  • Loading branch information
liewegas committed Aug 29, 2014
1 parent dd4c262 commit bf40cf1
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 115 deletions.
68 changes: 11 additions & 57 deletions src/librados/IoCtxImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,8 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver,

lock->Lock();

WatchContext *wc = new WatchContext(this, oid, ctx);
WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid);
wc->watch_ctx = ctx;
client->register_watch_notify_callback(wc, cookie);
prepare_assert_ops(&wr);
wr.watch(*cookie, ver, 1);
Expand All @@ -1093,7 +1094,7 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver,

if (r < 0) {
lock->Lock();
client->unregister_watch_notify_callback(*cookie);
client->unregister_watch_notify_callback(*cookie); // destroys wc
lock->Unlock();
}

Expand All @@ -1111,7 +1112,6 @@ int librados::IoCtxImpl::_notify_ack(
prepare_assert_ops(&rd);
rd.notify_ack(notify_id, ver, cookie);
objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0);

return 0;
}

Expand Down Expand Up @@ -1157,13 +1157,16 @@ int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& b
Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
version_t objver;
uint64_t cookie;
C_NotifyComplete *ctx = new C_NotifyComplete(&mylock_all, &cond_all, &done_all);

::ObjectOperation rd;
prepare_assert_ops(&rd);

lock->Lock();
WatchContext *wc = new WatchContext(this, oid, ctx);
WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid);
wc->notify_done = &done_all;
wc->notify_lock = &mylock_all;
wc->notify_cond = &cond_all;
wc->notify_rval = &r;
client->register_watch_notify_callback(wc, &cookie);
uint32_t prot_ver = 1;
uint32_t timeout = notify_timeout;
Expand All @@ -1180,19 +1183,18 @@ int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& b
cond.Wait(mylock);
mylock.Unlock();

mylock_all.Lock();
if (r == 0) {
mylock_all.Lock();
while (!done_all)
cond_all.Wait(mylock_all);
mylock_all.Unlock();
}
mylock_all.Unlock();

lock->Lock();
client->unregister_watch_notify_callback(cookie);
client->unregister_watch_notify_callback(cookie); // destroys wc
lock->Unlock();

set_sync_op_version(objver);
delete ctx;

return r;
}
Expand Down Expand Up @@ -1310,51 +1312,3 @@ void librados::IoCtxImpl::C_aio_Safe::finish(int r)
c->put_unlock();
}

///////////////////////// C_NotifyComplete /////////////////////////////

librados::IoCtxImpl::C_NotifyComplete::C_NotifyComplete(Mutex *_l,
Cond *_c,
bool *_d)
: lock(_l), cond(_c), done(_d)
{
*done = false;
}

void librados::IoCtxImpl::C_NotifyComplete::notify(uint8_t opcode,
uint64_t ver,
bufferlist& bl)
{
lock->Lock();
*done = true;
cond->Signal();
lock->Unlock();
}

/////////////////////////// WatchContext ///////////////////////////////

librados::WatchContext::WatchContext(IoCtxImpl *io_ctx_impl_,
const object_t& _oc,
librados::WatchCtx *_ctx)
: io_ctx_impl(io_ctx_impl_), oid(_oc), ctx(_ctx), linger_id(0), cookie(0)
{
io_ctx_impl->get();
}

librados::WatchContext::~WatchContext()
{
io_ctx_impl->put();
}

void librados::WatchContext::notify(Mutex *client_lock,
uint8_t opcode,
uint64_t ver,
uint64_t notify_id,
bufferlist& payload)
{
ctx->notify(opcode, ver, payload);
if (opcode != WATCH_NOTIFY_COMPLETE) {
client_lock->Lock();
io_ctx_impl->_notify_ack(oid, notify_id, ver, cookie);
client_lock->Unlock();
}
}
61 changes: 41 additions & 20 deletions src/librados/IoCtxImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,30 +211,51 @@ struct librados::IoCtxImpl {
void set_assert_src_version(const object_t& oid, uint64_t ver);
void set_notify_timeout(uint32_t timeout);

struct C_NotifyComplete : public librados::WatchCtx {
Mutex *lock;
Cond *cond;
bool *done;

C_NotifyComplete(Mutex *_l, Cond *_c, bool *_d);
void notify(uint8_t opcode, uint64_t ver, bufferlist& bl);
};
};

namespace librados {
struct WatchContext : public RefCountedWaitObject {
IoCtxImpl *io_ctx_impl;
const object_t oid;
librados::WatchCtx *ctx;
uint64_t linger_id;
uint64_t cookie;

WatchContext(IoCtxImpl *io_ctx_impl_,
const object_t& _oc,
librados::WatchCtx *_ctx);
~WatchContext();

/**
* watch/notify info
*
* Capture state about a watch or an in-progress notify
*/
struct WatchNotifyInfo : public RefCountedWaitObject {
IoCtxImpl *io_ctx_impl; // parent
const object_t oid; // the object
uint64_t linger_id; // we use this to unlinger when we are done
uint64_t cookie; // callback cookie

// watcher
librados::WatchCtx *watch_ctx;

// notify that we initiated
Mutex *notify_lock;
Cond *notify_cond;
bool *notify_done;
int *notify_rval;

WatchNotifyInfo(IoCtxImpl *io_ctx_impl_,
const object_t& _oc)
: io_ctx_impl(io_ctx_impl_),
oid(_oc),
linger_id(0),
cookie(0),
watch_ctx(NULL),
notify_lock(NULL),
notify_cond(NULL),
notify_done(NULL),
notify_rval(NULL) {
io_ctx_impl->get();
}

~WatchNotifyInfo() {
io_ctx_impl->put();
}

void notify(Mutex *lock, uint8_t opcode, uint64_t ver, uint64_t notify_id,
bufferlist& payload);
bufferlist& payload,
int return_code);
};
}
#endif
105 changes: 70 additions & 35 deletions src/librados/RadosClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ librados::RadosClient::RadosClient(CephContext *cct_)
refcnt(1),
log_last_version(0), log_cb(NULL), log_cb_arg(NULL),
finisher(cct),
max_watch_cookie(0)
max_watch_notify_cookie(0)
{
}

Expand Down Expand Up @@ -642,70 +642,105 @@ int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompleti
return r;
}

void librados::RadosClient::blacklist_self(bool set) {
Mutex::Locker l(lock);
objecter->blacklist_self(set);
}


// -----------
// watch/notify

void librados::RadosClient::register_watch_notify_callback(
WatchContext *wc,
WatchNotifyInfo *wc,
uint64_t *cookie)
{
assert(lock.is_locked());
wc->cookie = *cookie = ++max_watch_cookie;
watchers[wc->cookie] = wc;
wc->cookie = *cookie = ++max_watch_notify_cookie;
ldout(cct,10) << __func__ << " cookie " << wc->cookie << dendl;
watch_notify_info[wc->cookie] = wc;
}

void librados::RadosClient::unregister_watch_notify_callback(uint64_t cookie)
{
ldout(cct,10) << __func__ << " cookie " << cookie << dendl;
assert(lock.is_locked());
map<uint64_t, WatchContext *>::iterator iter = watchers.find(cookie);
if (iter != watchers.end()) {
WatchContext *ctx = iter->second;
map<uint64_t, WatchNotifyInfo *>::iterator iter =
watch_notify_info.find(cookie);
if (iter != watch_notify_info.end()) {
WatchNotifyInfo *ctx = iter->second;
if (ctx->linger_id)
objecter->unregister_linger(ctx->linger_id);

watchers.erase(iter);
watch_notify_info.erase(iter);
lock.Unlock();
ldout(cct, 10) << __func__ << " dropping reference, waiting ctx=" << (void *)ctx << dendl;
ldout(cct, 10) << __func__ << " dropping reference, waiting ctx="
<< (void *)ctx << dendl;
ctx->put_wait();
ldout(cct, 10) << __func__ << " done ctx=" << (void *)ctx << dendl;
lock.Lock();
}
}

void librados::RadosClient::blacklist_self(bool set) {
Mutex::Locker l(lock);
objecter->blacklist_self(set);
}

class C_WatchNotify : public Context {
librados::WatchContext *ctx;
Mutex *client_lock;
uint8_t opcode;
uint64_t ver;
uint64_t notify_id;
bufferlist bl;

public:
C_WatchNotify(librados::WatchContext *_ctx, Mutex *_client_lock,
uint8_t _o, uint64_t _v, uint64_t _n, bufferlist& _bl) :
ctx(_ctx), client_lock(_client_lock), opcode(_o), ver(_v), notify_id(_n), bl(_bl) {}

struct C_DoWatchNotify : public Context {
librados::RadosClient *rados;
MWatchNotify *m;
C_DoWatchNotify(librados::RadosClient *r, MWatchNotify *m) : rados(r), m(m) {}
void finish(int r) {
ctx->notify(client_lock, opcode, ver, notify_id, bl);
ctx->put();
rados->do_watch_notify(m);
}
};

void librados::RadosClient::handle_watch_notify(MWatchNotify *m)
{
assert(lock.is_locked());
map<uint64_t, WatchContext *>::iterator iter = watchers.find(m->cookie);
if (iter != watchers.end()) {
WatchContext *wc = iter->second;
if (watch_notify_info.count(m->cookie)) {
ldout(cct,10) << __func__ << " queueing async " << *m << dendl;
// deliver this async via a finisher thread
finisher.queue(new C_DoWatchNotify(this, m));
} else {
// drop it on the floor
ldout(cct,10) << __func__ << " cookie " << m->cookie << " unknown" << dendl;
m->put();
}
}

void librados::RadosClient::do_watch_notify(MWatchNotify *m)
{
Mutex::Locker l(lock);
map<uint64_t, WatchNotifyInfo *>::iterator iter =
watch_notify_info.find(m->cookie);
if (iter != watch_notify_info.end()) {
WatchNotifyInfo *wc = iter->second;
assert(wc);
wc->get();
finisher.queue(new C_WatchNotify(wc, &lock, m->opcode, m->ver, m->notify_id, m->bl));
if (wc->notify_lock) {
// we sent a notify and it completed (or failed)
ldout(cct,10) << __func__ << " completed notify " << *m << dendl;
wc->notify_lock->Lock();
*wc->notify_done = true;
*wc->notify_rval = m->return_code;
wc->notify_cond->Signal();
wc->notify_lock->Unlock();
} else {
// we are watcher and got a notify
ldout(cct,10) << __func__ << " got notify " << *m << dendl;
wc->get();

// trigger the callback
lock.Unlock();
wc->watch_ctx->notify(m->opcode, m->ver, m->bl);
lock.Lock();

// send ACK back to the OSD
wc->io_ctx_impl->_notify_ack(wc->oid, m->notify_id, m->ver, m->cookie);

ldout(cct,10) << __func__ << " notify done" << dendl;
wc->put();
}
}
m->put();
}


int librados::RadosClient::mon_command(const vector<string>& cmd,
const bufferlist &inbl,
bufferlist *outbl, string *outs)
Expand Down
8 changes: 5 additions & 3 deletions src/librados/RadosClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,15 @@ class librados::RadosClient : public Dispatcher
int pool_delete_async(const char *name, PoolAsyncCompletionImpl *c);

// watch/notify
uint64_t max_watch_cookie;
map<uint64_t, librados::WatchContext *> watchers;
uint64_t max_watch_notify_cookie;
map<uint64_t, librados::WatchNotifyInfo *> watch_notify_info;

void register_watch_notify_callback(librados::WatchContext *wc,
void register_watch_notify_callback(librados::WatchNotifyInfo *wc,
uint64_t *cookie);
void unregister_watch_notify_callback(uint64_t cookie);
void handle_watch_notify(MWatchNotify *m);
void do_watch_notify(MWatchNotify *m);

int mon_command(const vector<string>& cmd, const bufferlist &inbl,
bufferlist *outbl, string *outs);
int mon_command(int rank,
Expand Down
2 changes: 2 additions & 0 deletions src/test/librados/watch_notify.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ static sem_t sem;

static void watch_notify_test_cb(uint8_t opcode, uint64_t ver, void *arg)
{
std::cout << __func__ << std::endl;
sem_post(&sem);
}

Expand All @@ -27,6 +28,7 @@ class WatchNotifyTestCtx : public WatchCtx
public:
void notify(uint8_t opcode, uint64_t ver, bufferlist& bl)
{
std::cout << __func__ << std::endl;
sem_post(&sem);
}
};
Expand Down

0 comments on commit bf40cf1

Please sign in to comment.