Skip to content

Commit

Permalink
Merge pull request #13416 from dillaman/wip-rbd-mirror-cleanup
Browse files Browse the repository at this point in the history
rbd-mirror: track images via global image id

Reviewed-by: Mykola Golub <mgolub@mirantis.com>
  • Loading branch information
Mykola Golub committed Feb 16, 2017
2 parents f91fa5a + 34016f7 commit 321dc61
Show file tree
Hide file tree
Showing 27 changed files with 751 additions and 261 deletions.
77 changes: 53 additions & 24 deletions src/cls/rbd/cls_rbd_client.cc
Expand Up @@ -1006,25 +1006,41 @@ namespace librbd {
return dir_get_name_finish(&it, name);
}

void dir_list_start(librados::ObjectReadOperation *op,
const std::string &start, uint64_t max_return)
{
bufferlist in_bl;
::encode(start, in_bl);
::encode(max_return, in_bl);

op->exec("rbd", "dir_list", in_bl);
}

int dir_list_finish(bufferlist::iterator *it, map<string, string> *images)
{
try {
::decode(*images, *it);
} catch (const buffer::error &err) {
return -EBADMSG;
}
return 0;
}

int dir_list(librados::IoCtx *ioctx, const std::string &oid,
const std::string &start, uint64_t max_return,
map<string, string> *images)
{
bufferlist in, out;
::encode(start, in);
::encode(max_return, in);
int r = ioctx->exec(oid, "rbd", "dir_list", in, out);
if (r < 0)
return r;
librados::ObjectReadOperation op;
dir_list_start(&op, start, max_return);

bufferlist::iterator iter = out.begin();
try {
::decode(*images, iter);
} catch (const buffer::error &err) {
return -EBADMSG;
bufferlist out_bl;
int r = ioctx->operate(oid, &op, &out_bl);
if (r < 0) {
return r;
}

return 0;
bufferlist::iterator iter = out_bl.begin();
return dir_list_finish(&iter, images);
}

void dir_add_image(librados::ObjectWriteOperation *op,
Expand Down Expand Up @@ -1417,29 +1433,42 @@ namespace librbd {
return 0;
}

int mirror_image_list(librados::IoCtx *ioctx,
const std::string &start, uint64_t max_return,
std::map<std::string, std::string> *mirror_image_ids) {
void mirror_image_list_start(librados::ObjectReadOperation *op,
const std::string &start, uint64_t max_return)
{
bufferlist in_bl;
::encode(start, in_bl);
::encode(max_return, in_bl);
op->exec("rbd", "mirror_image_list", in_bl);
}

bufferlist out_bl;
int r = ioctx->exec(RBD_MIRRORING, "rbd", "mirror_image_list", in_bl,
out_bl);
if (r < 0) {
return r;
}

int mirror_image_list_finish(bufferlist::iterator *it,
std::map<string, string> *mirror_image_ids)
{
try {
bufferlist::iterator bl_it = out_bl.begin();
::decode(*mirror_image_ids, bl_it);
::decode(*mirror_image_ids, *it);
} catch (const buffer::error &err) {
return -EBADMSG;
}
return 0;
}

int mirror_image_list(librados::IoCtx *ioctx,
const std::string &start, uint64_t max_return,
std::map<std::string, std::string> *mirror_image_ids) {
librados::ObjectReadOperation op;
mirror_image_list_start(&op, start, max_return);

bufferlist out_bl;
int r = ioctx->operate(RBD_MIRRORING, &op, &out_bl);
if (r < 0) {
return r;
}

bufferlist::iterator bl_it = out_bl.begin();
return mirror_image_list_finish(&bl_it, mirror_image_ids);
}

void mirror_image_get_image_id_start(librados::ObjectReadOperation *op,
const std::string &global_image_id) {
bufferlist in_bl;
Expand Down
7 changes: 7 additions & 0 deletions src/cls/rbd/cls_rbd_client.h
Expand Up @@ -210,6 +210,9 @@ namespace librbd {
int dir_get_name_finish(bufferlist::iterator *it, std::string *name);
int dir_get_name(librados::IoCtx *ioctx, const std::string &oid,
const std::string &id, std::string *name);
void dir_list_start(librados::ObjectReadOperation *op,
const std::string &start, uint64_t max_return);
int dir_list_finish(bufferlist::iterator *it, map<string, string> *images);
int dir_list(librados::IoCtx *ioctx, const std::string &oid,
const std::string &start, uint64_t max_return,
map<string, string> *images);
Expand Down Expand Up @@ -287,6 +290,10 @@ namespace librbd {
int mirror_peer_set_cluster(librados::IoCtx *ioctx,
const std::string &uuid,
const std::string &cluster_name);
void mirror_image_list_start(librados::ObjectReadOperation *op,
const std::string &start, uint64_t max_return);
int mirror_image_list_finish(bufferlist::iterator *it,
std::map<string, string> *mirror_image_ids);
int mirror_image_list(librados::IoCtx *ioctx,
const std::string &start, uint64_t max_return,
std::map<std::string, std::string> *mirror_image_ids);
Expand Down
94 changes: 57 additions & 37 deletions src/librbd/Watcher.cc
Expand Up @@ -12,7 +12,8 @@

#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
#define dout_prefix *_dout << "librbd::Watcher: "
#define dout_prefix *_dout << "librbd::Watcher: " << this << " " << __func__ \
<< ": "

namespace librbd {

Expand Down Expand Up @@ -80,56 +81,76 @@ Watcher::~Watcher() {
}

void Watcher::register_watch(Context *on_finish) {
ldout(m_cct, 10) << this << " registering watcher" << dendl;
ldout(m_cct, 10) << dendl;

RWLock::RLocker watch_locker(m_watch_lock);
assert(m_watch_state == WATCH_STATE_UNREGISTERED);
m_watch_state = WATCH_STATE_REGISTERING;

librados::AioCompletion *aio_comp = create_rados_safe_callback(
new C_RegisterWatch(this, on_finish));
int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx);
assert(r == 0);
aio_comp->release();
}

void Watcher::handle_register_watch(int r) {
ldout(m_cct, 10) << this << " handle register r=" << r << dendl;
RWLock::WLocker watch_locker(m_watch_lock);
assert(m_watch_state == WATCH_STATE_UNREGISTERED);
if (r < 0) {
lderr(m_cct) << ": failed to register watch: " << cpp_strerror(r) << dendl;
m_watch_handle = 0;
} else if (r >= 0) {
m_watch_state = WATCH_STATE_REGISTERED;
void Watcher::handle_register_watch(int r, Context *on_finish) {
ldout(m_cct, 10) << "r=" << r << dendl;
Context *unregister_watch_ctx = nullptr;
{
RWLock::WLocker watch_locker(m_watch_lock);
assert(m_watch_state == WATCH_STATE_REGISTERING);

std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
if (r < 0) {
lderr(m_cct) << "failed to register watch: " << cpp_strerror(r)
<< dendl;
m_watch_handle = 0;
m_watch_state = WATCH_STATE_UNREGISTERED;
} else if (r >= 0) {
m_watch_state = WATCH_STATE_REGISTERED;
}
}

on_finish->complete(r);

// wake up pending unregister request
if (unregister_watch_ctx != nullptr) {
unregister_watch_ctx->complete(0);
}
}

void Watcher::unregister_watch(Context *on_finish) {
ldout(m_cct, 10) << this << " unregistering watcher" << dendl;
ldout(m_cct, 10) << dendl;

RWLock::WLocker watch_locker(m_watch_lock);
if (m_watch_state == WATCH_STATE_REWATCHING) {
ldout(m_cct, 10) << this << " delaying unregister until rewatch completed"
<< dendl;
{
RWLock::WLocker watch_locker(m_watch_lock);
if (m_watch_state == WATCH_STATE_REGISTERING ||
m_watch_state == WATCH_STATE_REWATCHING) {
ldout(m_cct, 10) << "delaying unregister until register completed"
<< dendl;

assert(m_unregister_watch_ctx == nullptr);
m_unregister_watch_ctx = new FunctionContext([this, on_finish](int r) {
unregister_watch(on_finish);
});
return;
}

assert(m_unregister_watch_ctx == nullptr);
m_unregister_watch_ctx = new FunctionContext([this, on_finish](int r) {
unregister_watch(on_finish);
});
return;
}
if (m_watch_state == WATCH_STATE_REGISTERED ||
m_watch_state == WATCH_STATE_ERROR) {
m_watch_state = WATCH_STATE_UNREGISTERED;

if (m_watch_state == WATCH_STATE_REGISTERED ||
m_watch_state == WATCH_STATE_ERROR) {
m_watch_state = WATCH_STATE_UNREGISTERED;

librados::AioCompletion *aio_comp = create_rados_safe_callback(
new C_UnwatchAndFlush(m_ioctx, on_finish));
int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
assert(r == 0);
aio_comp->release();
} else {
on_finish->complete(0);
librados::AioCompletion *aio_comp = create_rados_safe_callback(
new C_UnwatchAndFlush(m_ioctx, on_finish));
int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
assert(r == 0);
aio_comp->release();
return;
}
}

on_finish->complete(0);
}

void Watcher::flush(Context *on_finish) {
Expand All @@ -144,8 +165,7 @@ void Watcher::set_oid(const string& oid) {
}

void Watcher::handle_error(uint64_t handle, int err) {
lderr(m_cct) << this << " watch failed: " << handle << ", "
<< cpp_strerror(err) << dendl;
lderr(m_cct) << "handle=" << handle << ": " << cpp_strerror(err) << dendl;

RWLock::WLocker l(m_watch_lock);
if (m_watch_state == WATCH_STATE_REGISTERED) {
Expand All @@ -163,7 +183,7 @@ void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle,
}

void Watcher::rewatch() {
ldout(m_cct, 10) << this << " re-registering watch" << dendl;
ldout(m_cct, 10) << dendl;

RWLock::WLocker l(m_watch_lock);
if (m_watch_state != WATCH_STATE_ERROR) {
Expand All @@ -180,7 +200,7 @@ void Watcher::rewatch() {
}

void Watcher::handle_rewatch(int r) {
ldout(m_cct, 10) << this << " " << __func__ << ": r=" << r << dendl;
ldout(m_cct, 10) "r=" << r << dendl;

WatchState next_watch_state = WATCH_STATE_REGISTERED;
if (r < 0) {
Expand Down
12 changes: 9 additions & 3 deletions src/librbd/Watcher.h
Expand Up @@ -39,10 +39,15 @@ class Watcher {
RWLock::RLocker locker(m_watch_lock);
return m_watch_state == WATCH_STATE_REGISTERED;
}
bool is_unregistered() const {
RWLock::RLocker locker(m_watch_lock);
return m_watch_state == WATCH_STATE_UNREGISTERED;
}

protected:
enum WatchState {
WATCH_STATE_UNREGISTERED,
WATCH_STATE_REGISTERING,
WATCH_STATE_REGISTERED,
WATCH_STATE_ERROR,
WATCH_STATE_REWATCHING
Expand Down Expand Up @@ -81,6 +86,8 @@ class Watcher {
* |
* | (register_watch)
* |
* REGISTERING
* |
* v (watch error)
* REGISTERED * * * * * * * > ERROR
* | ^ |
Expand Down Expand Up @@ -122,15 +129,14 @@ class Watcher {
: watcher(watcher), on_finish(on_finish) {
}
virtual void finish(int r) override {
watcher->handle_register_watch(r);
on_finish->complete(r);
watcher->handle_register_watch(r, on_finish);
}
};

WatchCtx m_watch_ctx;
Context *m_unregister_watch_ctx = nullptr;

void handle_register_watch(int r);
void handle_register_watch(int r, Context *on_finish);

void rewatch();
void handle_rewatch(int r);
Expand Down
22 changes: 22 additions & 0 deletions src/test/librados_test_stub/LibradosTestStub.cc
Expand Up @@ -356,6 +356,28 @@ IoCtx::~IoCtx() {
close();
}

IoCtx::IoCtx(const IoCtx& rhs) {
io_ctx_impl = rhs.io_ctx_impl;
if (io_ctx_impl) {
TestIoCtxImpl *ctx = reinterpret_cast<TestIoCtxImpl*>(io_ctx_impl);
ctx->get();
}
}

IoCtx& IoCtx::operator=(const IoCtx& rhs) {
if (io_ctx_impl) {
TestIoCtxImpl *ctx = reinterpret_cast<TestIoCtxImpl*>(io_ctx_impl);
ctx->put();
}

io_ctx_impl = rhs.io_ctx_impl;
if (io_ctx_impl) {
TestIoCtxImpl *ctx = reinterpret_cast<TestIoCtxImpl*>(io_ctx_impl);
ctx->get();
}
return *this;
}

int IoCtx::aio_flush() {
TestIoCtxImpl *ctx = reinterpret_cast<TestIoCtxImpl*>(io_ctx_impl);
ctx->aio_flush();
Expand Down
1 change: 0 additions & 1 deletion src/test/librbd/operation/test_mock_Request.cc
Expand Up @@ -41,7 +41,6 @@ struct AsyncRequest<librbd::MockTestImageCtx> {
} // namespace librbd

#include "librbd/operation/Request.cc"
template class librbd::operation::Request<librbd::MockTestImageCtx>;

namespace librbd {
namespace journal {
Expand Down
2 changes: 0 additions & 2 deletions src/test/librbd/test_mock_AioImageRequest.cc
Expand Up @@ -129,8 +129,6 @@ AioObjectRead<librbd::MockTestImageCtx>* AioObjectRead<librbd::MockTestImageCtx>

#include "librbd/AioImageRequest.cc"

template class librbd::AioImageRequest<librbd::MockTestImageCtx>;

namespace librbd {

using ::testing::_;
Expand Down
1 change: 0 additions & 1 deletion src/test/librbd/test_mock_ExclusiveLock.cc
Expand Up @@ -136,7 +136,6 @@ struct PreReleaseRequest<MockExclusiveLockImageCtx> : public BaseRequest<PreRele

// template definitions
#include "librbd/ExclusiveLock.cc"
template class librbd::ExclusiveLock<librbd::MockExclusiveLockImageCtx>;

ACTION_P(FinishLockUnlock, request) {
if (request->on_lock_unlock != nullptr) {
Expand Down
1 change: 1 addition & 0 deletions src/test/rbd_mirror/CMakeLists.txt
Expand Up @@ -27,6 +27,7 @@ add_executable(unittest_rbd_mirror
image_sync/test_mock_SnapshotCreateRequest.cc
image_sync/test_mock_SyncPointCreateRequest.cc
image_sync/test_mock_SyncPointPruneRequest.cc
pool_watcher/test_mock_RefreshImagesRequest.cc
)
add_ceph_unittest(unittest_rbd_mirror ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_rbd_mirror)
set_target_properties(unittest_rbd_mirror PROPERTIES COMPILE_FLAGS
Expand Down

0 comments on commit 321dc61

Please sign in to comment.