Skip to content

Commit

Permalink
librados: add timeout parameter to rados_watch
Browse files Browse the repository at this point in the history
Signed-off-by: Ryne Li <lizhenqiangsnake@gmail.com>
  • Loading branch information
ryneli committed Oct 11, 2016
1 parent 6fef17b commit e467337
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 21 deletions.
65 changes: 57 additions & 8 deletions src/include/rados/librados.h
Expand Up @@ -2137,7 +2137,6 @@ typedef void (*rados_watchcb2_t)(void *arg,
* after 30 seconds. Watches are automatically reestablished when a new
* connection is made, or a placement group switches OSDs.
*
* @note BUG: watch timeout should be configurable
* @note BUG: librados should provide a way for watchers to notice connection resets
* @note BUG: the ver parameter does not work, and -ERANGE will never be returned
* (See URL tracker.ceph.com/issues/2592)
Expand All @@ -2163,13 +2162,12 @@ CEPH_RADOS_API int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver,
* A watch operation registers the client as being interested in
* notifications on an object. OSDs keep track of watches on
* persistent storage, so they are preserved across cluster changes by
* the normal recovery process. If the client loses its connection to
* the primary OSD for a watched object, the watch will be removed
* after 30 seconds. Watches are automatically reestablished when a new
* the normal recovery process. If the client loses its connection to the
* primary OSD for a watched object, the watch will be removed after
* a timeout configured with osd_client_watch_timeout.
* Watches are automatically reestablished when a new
* connection is made, or a placement group switches OSDs.
*
* @note BUG: watch timeout should be configurable
*
* @param io the pool the object is in
* @param o the object to watch
* @param cookie where to store the internal id assigned to this watch
Expand All @@ -2183,6 +2181,30 @@ CEPH_RADOS_API int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *cooki
rados_watcherrcb_t watcherrcb,
void *arg);

/**
* Register an interest in an object
*
* A watch operation registers the client as being interested in
* notifications on an object. OSDs keep track of watches on
* persistent storage, so they are preserved across cluster changes by
* the normal recovery process. Watches are automatically reestablished when a new
* connection is made, or a placement group switches OSDs.
*
* @param io the pool the object is in
* @param o the object to watch
* @param cookie where to store the internal id assigned to this watch
* @param watchcb what to do when a notify is received on this object
* @param watcherrcb what to do when the watch session encounters an error
* @param timeout how many seconds the connection will keep after disconnection
* @param arg opaque value to pass to the callback
* @returns 0 on success, negative error code on failure
*/
CEPH_RADOS_API int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *cookie,
rados_watchcb2_t watchcb,
rados_watcherrcb_t watcherrcb,
uint32_t timeout,
void *arg);

/**
* Asynchronous register an interest in an object
*
Expand All @@ -2194,8 +2216,6 @@ CEPH_RADOS_API int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *cooki
* after 30 seconds. Watches are automatically reestablished when a new
* connection is made, or a placement group switches OSDs.
*
* @note BUG: watch timeout should be configurable
*
* @param io the pool the object is in
* @param o the object to watch
* @param completion what to do when operation has been attempted
Expand All @@ -2211,6 +2231,35 @@ CEPH_RADOS_API int rados_aio_watch(rados_ioctx_t io, const char *o,
rados_watcherrcb_t watcherrcb,
void *arg);

/**
* Asynchronous register an interest in an object
*
* A watch operation registers the client as being interested in
* notifications on an object. OSDs keep track of watches on
* persistent storage, so they are preserved across cluster changes by
* the normal recovery process. If the client loses its connection to
* the primary OSD for a watched object, the watch will be removed
* after the number of seconds that configured in timeout parameter.
* Watches are automatically reestablished when a new
* connection is made, or a placement group switches OSDs.
*
* @param io the pool the object is in
* @param o the object to watch
* @param completion what to do when operation has been attempted
* @param handle where to store the internal id assigned to this watch
* @param watchcb what to do when a notify is received on this object
* @param watcherrcb what to do when the watch session encounters an error
* @param timeout how many seconds the connection will keep after disconnection
* @param arg opaque value to pass to the callback
* @returns 0 on success, negative error code on failure
*/
CEPH_RADOS_API int rados_aio_watch2(rados_ioctx_t io, const char *o,
rados_completion_t completion, uint64_t *handle,
rados_watchcb2_t watchcb,
rados_watcherrcb_t watcherrcb,
uint32_t timeout,
void *arg);

/**
* Check on the status of a watch
*
Expand Down
4 changes: 4 additions & 0 deletions src/include/rados/librados.hpp
Expand Up @@ -1019,8 +1019,12 @@ namespace librados
// watch/notify
int watch2(const std::string& o, uint64_t *handle,
librados::WatchCtx2 *ctx);
int watch3(const std::string& o, uint64_t *handle,
librados::WatchCtx2 *ctx, uint32_t timeout);
int aio_watch(const std::string& o, AioCompletion *c, uint64_t *handle,
librados::WatchCtx2 *ctx);
int aio_watch2(const std::string& o, AioCompletion *c, uint64_t *handle,
librados::WatchCtx2 *ctx, uint32_t timeout);
int unwatch2(uint64_t handle);
int aio_unwatch(uint64_t handle, AioCompletion *c);
/**
Expand Down
23 changes: 21 additions & 2 deletions src/librados/IoCtxImpl.cc
Expand Up @@ -1419,6 +1419,15 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
librados::WatchCtx *ctx,
librados::WatchCtx2 *ctx2,
bool internal)
{
return watch(oid, handle, ctx, ctx2, 0, internal);
}

int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
librados::WatchCtx *ctx,
librados::WatchCtx2 *ctx2,
uint32_t timeout,
bool internal)
{
::ObjectOperation wr;
version_t objver;
Expand All @@ -1430,7 +1439,7 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
oid, ctx, ctx2, internal);

prepare_assert_ops(&wr);
wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH);
wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
bufferlist bl;
objecter->linger_watch(linger_op, wr,
snapc, ceph::real_clock::now(), bl,
Expand All @@ -1454,6 +1463,16 @@ int librados::IoCtxImpl::aio_watch(const object_t& oid,
uint64_t *handle,
librados::WatchCtx *ctx,
librados::WatchCtx2 *ctx2,
bool internal) {
return aio_watch(oid, c, handle, ctx, ctx2, 0, internal);
}

int librados::IoCtxImpl::aio_watch(const object_t& oid,
AioCompletionImpl *c,
uint64_t *handle,
librados::WatchCtx *ctx,
librados::WatchCtx2 *ctx2,
uint32_t timeout,
bool internal)
{
Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
Expand All @@ -1465,7 +1484,7 @@ int librados::IoCtxImpl::aio_watch(const object_t& oid,
linger_op->watch_context = new WatchInfo(this, oid, ctx, ctx2, internal);

prepare_assert_ops(&wr);
wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH);
wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
bufferlist bl;
objecter->linger_watch(linger_op, wr,
snapc, ceph::real_clock::now(), bl,
Expand Down
5 changes: 5 additions & 0 deletions src/librados/IoCtxImpl.h
Expand Up @@ -234,9 +234,14 @@ struct librados::IoCtxImpl {
void set_sync_op_version(version_t ver);
int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx,
librados::WatchCtx2 *ctx2, bool internal = false);
int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx,
librados::WatchCtx2 *ctx2, uint32_t timeout, bool internal = false);
int aio_watch(const object_t& oid, AioCompletionImpl *c, uint64_t *cookie,
librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2,
bool internal = false);
int aio_watch(const object_t& oid, AioCompletionImpl *c, uint64_t *cookie,
librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2,
uint32_t timeout, bool internal = false);
int watch_check(uint64_t cookie);
int unwatch(uint64_t cookie);
int aio_unwatch(uint64_t cookie, AioCompletionImpl *c);
Expand Down
46 changes: 39 additions & 7 deletions src/librados/librados.cc
Expand Up @@ -1878,6 +1878,13 @@ int librados::IoCtx::watch2(const string& oid, uint64_t *cookie,
return io_ctx_impl->watch(obj, cookie, NULL, ctx2);
}

int librados::IoCtx::watch3(const string& oid, uint64_t *cookie,
librados::WatchCtx2 *ctx2, uint32_t timeout)
{
object_t obj(oid);
return io_ctx_impl->watch(obj, cookie, NULL, ctx2, timeout);
}

int librados::IoCtx::aio_watch(const string& oid, AioCompletion *c,
uint64_t *cookie,
librados::WatchCtx2 *ctx2)
Expand All @@ -1886,6 +1893,14 @@ int librados::IoCtx::aio_watch(const string& oid, AioCompletion *c,
return io_ctx_impl->aio_watch(obj, c->pc, cookie, NULL, ctx2);
}

int librados::IoCtx::aio_watch2(const string& oid, AioCompletion *c,
uint64_t *cookie,
librados::WatchCtx2 *ctx2,
uint32_t timeout)
{
object_t obj(oid);
return io_ctx_impl->aio_watch(obj, c->pc, cookie, NULL, ctx2, timeout);
}

int librados::IoCtx::unwatch(const string& oid, uint64_t handle)
{
Expand Down Expand Up @@ -4520,9 +4535,17 @@ struct C_WatchCB2 : public librados::WatchCtx2 {
extern "C" int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle,
rados_watchcb2_t watchcb,
rados_watcherrcb_t watcherrcb,
void *arg) {
return rados_watch3(io, o, handle, watchcb, watcherrcb, 0, arg);
}

extern "C" int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *handle,
rados_watchcb2_t watchcb,
rados_watcherrcb_t watcherrcb,
uint32_t timeout,
void *arg)
{
tracepoint(librados, rados_watch2_enter, io, o, handle, watchcb, arg);
tracepoint(librados, rados_watch3_enter, io, o, handle, watchcb, arg);
int ret;
if (!watchcb || !o || !handle) {
ret = -EINVAL;
Expand All @@ -4531,19 +4554,28 @@ extern "C" int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle,
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
C_WatchCB2 *wc = new C_WatchCB2(watchcb, watcherrcb, arg);
ret = ctx->watch(oid, cookie, NULL, wc, true);
ret = ctx->watch(oid, cookie, NULL, wc, timeout, true);
}
tracepoint(librados, rados_watch_exit, ret, handle ? *handle : 0);
tracepoint(librados, rados_watch3_exit, ret, handle ? *handle : 0);
return ret;
}

extern "C" int rados_aio_watch(rados_ioctx_t io, const char *o,
rados_completion_t completion,
uint64_t *handle,
rados_watchcb2_t watchcb,
rados_watcherrcb_t watcherrcb, void *arg)
rados_watcherrcb_t watcherrcb, void *arg) {
return rados_aio_watch2(io, o, completion, handle, watchcb, watcherrcb, 0, arg);
}

extern "C" int rados_aio_watch2(rados_ioctx_t io, const char *o,
rados_completion_t completion,
uint64_t *handle,
rados_watchcb2_t watchcb,
rados_watcherrcb_t watcherrcb,
uint32_t timeout, void *arg)
{
tracepoint(librados, rados_aio_watch_enter, io, o, completion, handle, watchcb, arg);
tracepoint(librados, rados_aio_watch2_enter, io, o, completion, handle, watchcb, timeout, arg);
int ret;
if (!completion || !watchcb || !o || !handle) {
ret = -EINVAL;
Expand All @@ -4554,9 +4586,9 @@ extern "C" int rados_aio_watch(rados_ioctx_t io, const char *o,
librados::AioCompletionImpl *c =
reinterpret_cast<librados::AioCompletionImpl*>(completion);
C_WatchCB2 *wc = new C_WatchCB2(watchcb, watcherrcb, arg);
ret = ctx->aio_watch(oid, c, cookie, NULL, wc, true);
ret = ctx->aio_watch(oid, c, cookie, NULL, wc, timeout, true);
}
tracepoint(librados, rados_watch_exit, ret, handle ? *handle : 0);
tracepoint(librados, rados_aio_watch2_exit, ret, handle ? *handle : 0);
return ret;
}

Expand Down
73 changes: 73 additions & 0 deletions src/test/librados/watch_notify.cc
Expand Up @@ -770,6 +770,79 @@ TEST_P(LibRadosWatchNotifyPP, WatchNotify2Timeout) {
comp->release();
}

TEST_P(LibRadosWatchNotifyPP, WatchNotify3) {
notify_oid = "foo";
notify_ioctx = &ioctx;
notify_cookies.clear();
uint32_t timeout = 3; // configured timeout
char buf[128];
memset(buf, 0xcc, sizeof(buf));
bufferlist bl1;
bl1.append(buf, sizeof(buf));
ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
uint64_t handle;
WatchNotifyTestCtx2 ctx;
ASSERT_EQ(0, ioctx.watch3(notify_oid, &handle, &ctx, timeout));
ASSERT_GT(ioctx.watch_check(handle), 0);
std::list<obj_watch_t> watches;
ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
ASSERT_EQ(watches.size(), 1u);
std::cout << "List watches" << std::endl;
for (std::list<obj_watch_t>::iterator it = watches.begin();
it != watches.end(); ++it) {
ASSERT_EQ(it->timeout_seconds, timeout);
}
bufferlist bl2, bl_reply;
ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply));
bufferlist::iterator p = bl_reply.begin();
std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
std::set<std::pair<uint64_t,uint64_t> > missed_map;
::decode(reply_map, p);
::decode(missed_map, p);
ASSERT_EQ(1u, notify_cookies.size());
ASSERT_EQ(1u, notify_cookies.count(handle));
ASSERT_EQ(1u, reply_map.size());
ASSERT_EQ(5u, reply_map.begin()->second.length());
ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
ASSERT_EQ(0u, missed_map.size());
ASSERT_GT(ioctx.watch_check(handle), 0);
ioctx.unwatch2(handle);
}

TEST_F(LibRadosWatchNotify, AioWatchDelete2) {
notify_io = ioctx;
notify_oid = "foo";
notify_err = 0;
char buf[128];
uint32_t timeout = 3;
memset(buf, 0xcc, sizeof(buf));
ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));


rados_completion_t comp;
uint64_t handle;
ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
rados_aio_watch2(ioctx, notify_oid, comp, &handle,
watch_notify2_test_cb, watch_notify2_test_errcb, timeout, NULL);
ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
ASSERT_EQ(0, rados_aio_get_return_value(comp));
rados_aio_release(comp);
ASSERT_EQ(0, rados_remove(ioctx, notify_oid));
int left = 30;
std::cout << "waiting up to " << left << " for disconnect notification ..."
<< std::endl;
while (notify_err == 0 && --left) {
sleep(1);
}
ASSERT_TRUE(left > 0);
ASSERT_EQ(-ENOTCONN, notify_err);
ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
rados_aio_unwatch(ioctx, handle, comp);
ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
ASSERT_EQ(-ENOENT, rados_aio_get_return_value(comp));
rados_aio_release(comp);
}
// --

INSTANTIATE_TEST_CASE_P(LibRadosWatchNotifyPPTests, LibRadosWatchNotifyPP,
Expand Down

0 comments on commit e467337

Please sign in to comment.