Skip to content

Commit

Permalink
Merge pull request #7839 from dillaman/wip-14809
Browse files Browse the repository at this point in the history
librbd: remove last synchronous librados calls from open/close state machine

Reviewed-by: Josh Durgin <jdurgin@redhat.com>
  • Loading branch information
jdurgin committed Mar 2, 2016
2 parents 928c2e4 + 34dd39c commit 2da7196
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 63 deletions.
16 changes: 4 additions & 12 deletions src/librbd/ImageCtx.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,8 @@ struct C_InvalidateCache : public Context {
}

void ImageCtx::shutdown() {
if (image_watcher != nullptr) {
unregister_watch();
}
delete image_watcher;
image_watcher = nullptr;

delete asok_hook;
asok_hook = nullptr;
Expand Down Expand Up @@ -780,17 +779,10 @@ struct C_InvalidateCache : public Context {
object_cacher->clear_nonexistence(object_set);
}

int ImageCtx::register_watch() {
void ImageCtx::register_watch(Context *on_finish) {
assert(image_watcher == NULL);
image_watcher = new ImageWatcher(*this);
return image_watcher->register_watch();
}

void ImageCtx::unregister_watch() {
assert(image_watcher != NULL);
image_watcher->unregister_watch();
delete image_watcher;
image_watcher = NULL;
image_watcher->register_watch(on_finish);
}

uint64_t ImageCtx::prune_parent_extents(vector<pair<uint64_t,uint64_t> >& objectx,
Expand Down
3 changes: 1 addition & 2 deletions src/librbd/ImageCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,7 @@ namespace librbd {
int invalidate_cache(bool purge_on_error=false);
void invalidate_cache(Context *on_finish);
void clear_nonexistence_cache();
int register_watch();
void unregister_watch();
void register_watch(Context *on_finish);
uint64_t prune_parent_extents(vector<pair<uint64_t,uint64_t> >& objectx,
uint64_t overlap);

Expand Down
75 changes: 59 additions & 16 deletions src/librbd/ImageWatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,43 @@ namespace librbd {
using namespace image_watcher;
using namespace watch_notify;
using util::create_context_callback;
using util::create_rados_safe_callback;

namespace {

struct C_UnwatchAndFlush : public Context {
librados::Rados rados;
Context *on_finish;
bool flushing = false;
int ret_val = 0;

C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish)
: rados(io_ctx), on_finish(on_finish) {
}

virtual void complete(int r) override {
if (ret_val == 0 && r < 0) {
ret_val = r;
}

if (!flushing) {
flushing = true;

librados::AioCompletion *aio_comp = create_rados_safe_callback(this);
r = rados.aio_watch_flush(aio_comp);
assert(r == 0);
aio_comp->release();
} else {
Context::complete(ret_val);
}
}

virtual void finish(int r) override {
on_finish->complete(r);
}
};

} // anonymous namespace

static const double RETRY_DELAY_SECONDS = 1.0;

Expand All @@ -54,40 +91,46 @@ ImageWatcher::~ImageWatcher()
}
}

int ImageWatcher::register_watch() {
void ImageWatcher::register_watch(Context *on_finish) {
ldout(m_image_ctx.cct, 10) << this << " registering image watcher" << dendl;

RWLock::WLocker l(m_watch_lock);
RWLock::RLocker watch_locker(m_watch_lock);
assert(m_watch_state == WATCH_STATE_UNREGISTERED);
librados::AioCompletion *aio_comp = create_rados_safe_callback(
new C_RegisterWatch(this, on_finish));
int r = m_image_ctx.md_ctx.aio_watch(m_image_ctx.header_oid, aio_comp,
&m_watch_handle, &m_watch_ctx);
assert(r == 0);
aio_comp->release();
}

void ImageWatcher::handle_register_watch(int r) {
RWLock::WLocker watch_locker(m_watch_lock);
assert(m_watch_state == WATCH_STATE_UNREGISTERED);
int r = m_image_ctx.md_ctx.watch2(m_image_ctx.header_oid,
&m_watch_handle,
&m_watch_ctx);
if (r < 0) {
return r;
m_watch_handle = 0;
} else if (r >= 0) {
m_watch_state = WATCH_STATE_REGISTERED;
}

m_watch_state = WATCH_STATE_REGISTERED;
return 0;
}

int ImageWatcher::unregister_watch() {
void ImageWatcher::unregister_watch(Context *on_finish) {
ldout(m_image_ctx.cct, 10) << this << " unregistering image watcher" << dendl;

cancel_async_requests();
m_task_finisher->cancel_all();

int r = 0;
{
RWLock::WLocker l(m_watch_lock);
if (m_watch_state == WATCH_STATE_REGISTERED) {
r = m_image_ctx.md_ctx.unwatch2(m_watch_handle);
librados::AioCompletion *aio_comp = create_rados_safe_callback(
new C_UnwatchAndFlush(m_image_ctx.md_ctx, on_finish));
int r = m_image_ctx.md_ctx.aio_unwatch(m_watch_handle, aio_comp);
assert(r == 0);
aio_comp->release();
}
m_watch_state = WATCH_STATE_UNREGISTERED;
}

librados::Rados rados(m_image_ctx.md_ctx);
rados.watch_flush();
return r;
}

void ImageWatcher::flush(Context *on_finish) {
Expand Down
18 changes: 16 additions & 2 deletions src/librbd/ImageWatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class ImageWatcher {
ImageWatcher(ImageCtx& image_ctx);
~ImageWatcher();

int register_watch();
int unregister_watch();
void register_watch(Context *on_finish);
void unregister_watch(Context *on_finish);
void flush(Context *on_finish);

int notify_flatten(uint64_t request_id, ProgressContext &prog_ctx);
Expand Down Expand Up @@ -149,6 +149,18 @@ class ImageWatcher {
ProgressContext *m_prog_ctx;
};

struct C_RegisterWatch : public Context {
ImageWatcher *image_watcher;
Context *on_finish;

C_RegisterWatch(ImageWatcher *image_watcher, Context *on_finish)
: image_watcher(image_watcher), on_finish(on_finish) {
}
virtual void finish(int r) override {
image_watcher->handle_register_watch(r);
on_finish->complete(r);
}
};
struct C_NotifyAck : public Context {
ImageWatcher *image_watcher;
uint64_t notify_id;
Expand Down Expand Up @@ -224,6 +236,8 @@ class ImageWatcher {

image_watcher::Notifier m_notifier;

void handle_register_watch(int r);

void schedule_cancel_async_requests();
void cancel_async_requests();

Expand Down
12 changes: 8 additions & 4 deletions src/librbd/image/CloseRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,22 @@ CloseRequest<I>::CloseRequest(I *image_ctx, Context *on_finish)

template <typename I>
void CloseRequest<I>::send() {
// TODO
send_shut_down_aio_queue();
//send_unregister_image_watcher();
send_unregister_image_watcher();
}

template <typename I>
void CloseRequest<I>::send_unregister_image_watcher() {
if (m_image_ctx->image_watcher == nullptr) {
send_shut_down_aio_queue();
return;
}

CephContext *cct = m_image_ctx->cct;
ldout(cct, 10) << this << " " << __func__ << dendl;

// prevent incoming requests from our peers

m_image_ctx->image_watcher->unregister_watch(create_context_callback<
CloseRequest<I>, &CloseRequest<I>::handle_unregister_image_watcher>(this));
}

template <typename I>
Expand Down
28 changes: 3 additions & 25 deletions src/librbd/image/OpenRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,6 @@
namespace librbd {
namespace image {

namespace {

template <typename I>
class C_RegisterWatch : public Context {
public:
I &image_ctx;
Context *on_finish;

C_RegisterWatch(I &image_ctx, Context *on_finish)
: image_ctx(image_ctx), on_finish(on_finish) {
}

virtual void finish(int r) {
assert(r == 0);
on_finish->complete(image_ctx.register_watch());
}
};

} // anonymous namespace

using util::create_context_callback;
using util::create_rados_ack_callback;

Expand Down Expand Up @@ -291,12 +271,10 @@ void OpenRequest<I>::send_register_watch() {
CephContext *cct = m_image_ctx->cct;
ldout(cct, 10) << this << " " << __func__ << dendl;

// no librados async version of watch
using klass = OpenRequest<I>;
Context *ctx = new C_RegisterWatch<I>(
*m_image_ctx,
create_context_callback<klass, &klass::handle_register_watch>(this));
m_image_ctx->op_work_queue->queue(ctx);
Context *ctx = create_context_callback<
klass, &klass::handle_register_watch>(this);
m_image_ctx->register_watch(ctx);
} else {
send_refresh();
}
Expand Down
16 changes: 16 additions & 0 deletions src/test/librados_test_stub/LibradosTestStub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,17 @@ int IoCtx::aio_remove(const std::string& oid, AioCompletion *c) {
return ctx->aio_remove(oid, c->pc);
}

int IoCtx::aio_watch(const std::string& o, AioCompletion *c, uint64_t *handle,
librados::WatchCtx2 *watch_ctx) {
TestIoCtxImpl *ctx = reinterpret_cast<TestIoCtxImpl*>(io_ctx_impl);
return ctx->aio_watch(o, c->pc, handle, watch_ctx);
}

int IoCtx::aio_unwatch(uint64_t handle, AioCompletion *c) {
TestIoCtxImpl *ctx = reinterpret_cast<TestIoCtxImpl*>(io_ctx_impl);
return ctx->aio_unwatch(handle, c->pc);
}

config_t IoCtx::cct() {
TestIoCtxImpl *ctx = reinterpret_cast<TestIoCtxImpl*>(io_ctx_impl);
return reinterpret_cast<config_t>(ctx->get_rados_client()->cct());
Expand Down Expand Up @@ -811,6 +822,11 @@ AioCompletion *Rados::aio_create_completion(void *cb_arg,
return new AioCompletion(c);
}

int Rados::aio_watch_flush(AioCompletion* c) {
TestRadosClient *impl = reinterpret_cast<TestRadosClient*>(client);
return impl->aio_watch_flush(c->pc);
}

int Rados::blacklist_add(const std::string& client_address,
uint32_t expire_seconds) {
TestRadosClient *impl = reinterpret_cast<TestRadosClient*>(client);
Expand Down
18 changes: 18 additions & 0 deletions src/test/librados_test_stub/TestIoCtxImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,24 @@ int TestIoCtxImpl::aio_operate_read(const std::string& oid,
return 0;
}

int TestIoCtxImpl::aio_watch(const std::string& o, AioCompletionImpl *c,
uint64_t *handle, librados::WatchCtx2 *watch_ctx) {
m_pending_ops.inc();
c->get();
C_AioNotify *ctx = new C_AioNotify(this, c);
m_client->get_watch_notify().aio_watch(o, get_instance_id(), handle,
watch_ctx, ctx);
return 0;
}

int TestIoCtxImpl::aio_unwatch(uint64_t handle, AioCompletionImpl *c) {
m_pending_ops.inc();
c->get();
C_AioNotify *ctx = new C_AioNotify(this, c);
m_client->get_watch_notify().aio_unwatch(handle, ctx);
return 0;
}

int TestIoCtxImpl::exec(const std::string& oid, TestClassHandler *handler,
const char *cls, const char *method,
bufferlist& inbl, bufferlist* outbl,
Expand Down
4 changes: 3 additions & 1 deletion src/test/librados_test_stub/TestIoCtxImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ class TestIoCtxImpl {
AioCompletionImpl *c, int flags,
bufferlist *pbl);
virtual int aio_remove(const std::string& oid, AioCompletionImpl *c) = 0;

virtual int aio_watch(const std::string& o, AioCompletionImpl *c,
uint64_t *handle, librados::WatchCtx2 *ctx);
virtual int aio_unwatch(uint64_t handle, AioCompletionImpl *c);
virtual int append(const std::string& oid, const bufferlist &bl,
const SnapContext &snapc) = 0;
virtual int assert_exists(const std::string &oid) = 0;
Expand Down
8 changes: 8 additions & 0 deletions src/test/librados_test_stub/TestRadosClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ void TestRadosClient::flush_aio_operations(AioCompletionImpl *c) {
}
}

int TestRadosClient::aio_watch_flush(AioCompletionImpl *c) {
c->get();
Context *ctx = new FunctionContext(boost::bind(
&TestRadosClient::finish_aio_completion, this, c, _1));
get_watch_notify().aio_flush(ctx);
return 0;
}

void TestRadosClient::finish_aio_completion(AioCompletionImpl *c, int r) {
librados::finish_aio_completion(c, r);
}
Expand Down
1 change: 1 addition & 0 deletions src/test/librados_test_stub/TestRadosClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class TestRadosClient {
virtual int64_t pool_lookup(const std::string &name) = 0;
virtual int pool_reverse_lookup(int64_t id, std::string *name) = 0;

virtual int aio_watch_flush(AioCompletionImpl *c);
virtual int watch_flush() = 0;

virtual int blacklist_add(const std::string& client_address,
Expand Down
17 changes: 17 additions & 0 deletions src/test/librados_test_stub/TestWatchNotify.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,23 @@ int TestWatchNotify::list_watchers(const std::string& o,
return 0;
}

void TestWatchNotify::aio_flush(Context *on_finish) {
m_finisher->queue(on_finish);
}

void TestWatchNotify::aio_watch(const std::string& o, uint64_t gid,
uint64_t *handle,
librados::WatchCtx2 *watch_ctx,
Context *on_finish) {
int r = watch(o, gid, handle, nullptr, watch_ctx);
m_finisher->queue(on_finish, r);
}

void TestWatchNotify::aio_unwatch(uint64_t handle, Context *on_finish) {
unwatch(handle);
m_finisher->queue(on_finish);
}

void TestWatchNotify::aio_notify(const std::string& oid, bufferlist& bl,
uint64_t timeout_ms, bufferlist *pbl,
Context *on_notify) {
Expand Down
8 changes: 7 additions & 1 deletion src/test/librados_test_stub/TestWatchNotify.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,17 @@ class TestWatchNotify : boost::noncopyable {
TestWatchNotify(CephContext *cct, Finisher *finisher);
~TestWatchNotify();

void flush();
int list_watchers(const std::string& o,
std::list<obj_watch_t> *out_watchers);

void aio_flush(Context *on_finish);
void aio_watch(const std::string& o, uint64_t gid, uint64_t *handle,
librados::WatchCtx2 *watch_ctx, Context *on_finish);
void aio_unwatch(uint64_t handle, Context *on_finish);
void aio_notify(const std::string& oid, bufferlist& bl, uint64_t timeout_ms,
bufferlist *pbl, Context *on_notify);

void flush();
int notify(const std::string& o, bufferlist& bl,
uint64_t timeout_ms, bufferlist *pbl);
void notify_ack(const std::string& o, uint64_t notify_id,
Expand Down

0 comments on commit 2da7196

Please sign in to comment.