Skip to content

Commit

Permalink
librbd: integrate listener for new mirroring notification payloads
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
  • Loading branch information
Jason Dillaman committed Mar 30, 2016
1 parent e95a383 commit 57db617
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 8 deletions.
47 changes: 47 additions & 0 deletions src/librbd/MirroringWatcher.cc
Expand Up @@ -69,6 +69,53 @@ int MirroringWatcher<I>::notify_image_updated(
return 0;
}

template <typename I>
void MirroringWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
bufferlist &bl) {
CephContext *cct = this->m_cct;
ldout(cct, 15) << ": notify_id=" << notify_id << ", "
<< "handle=" << handle << dendl;

Context *ctx = new typename ObjectWatcher<I>::C_NotifyAck(this, notify_id,
handle);

NotifyMessage notify_message;
try {
bufferlist::iterator iter = bl.begin();
::decode(notify_message, iter);
} catch (const buffer::error &err) {
lderr(cct) << ": error decoding image notification: " << err.what()
<< dendl;
ctx->complete(0);
return;
}

apply_visitor(HandlePayloadVisitor(this, ctx), notify_message.payload);
}

template <typename I>
void MirroringWatcher<I>::handle_payload(const ModeUpdatedPayload &payload,
Context *on_notify_ack) {
CephContext *cct = this->m_cct;
ldout(cct, 20) << ": mode updated: " << payload.mirror_mode << dendl;
handle_mode_updated(payload.mirror_mode, on_notify_ack);
}

template <typename I>
void MirroringWatcher<I>::handle_payload(const ImageUpdatedPayload &payload,
Context *on_notify_ack) {
CephContext *cct = this->m_cct;
ldout(cct, 20) << ": image state updated" << dendl;
handle_image_updated(payload.mirror_image_state, payload.image_id,
payload.global_image_id, on_notify_ack);
}

template <typename I>
void MirroringWatcher<I>::handle_payload(const UnknownPayload &payload,
Context *on_notify_ack) {
on_notify_ack->complete(0);
}

} // namespace librbd

template class librbd::MirroringWatcher<librbd::ImageCtx>;
31 changes: 31 additions & 0 deletions src/librbd/MirroringWatcher.h
Expand Up @@ -27,10 +27,41 @@ class MirroringWatcher : public ObjectWatcher<ImageCtxT> {
const std::string &image_id,
const std::string &global_image_id);

virtual void handle_mode_updated(cls::rbd::MirrorMode mirror_mode,
Context *on_ack) = 0;
virtual void handle_image_updated(cls::rbd::MirrorImageState state,
const std::string &image_id,
const std::string &global_image_id,
Context *on_ack) = 0;

protected:
virtual std::string get_oid() const;

virtual void handle_notify(uint64_t notify_id, uint64_t handle,
bufferlist &bl);

private:
struct HandlePayloadVisitor : public boost::static_visitor<void> {
MirroringWatcher *mirroring_watcher;
Context *on_notify_ack;

HandlePayloadVisitor(MirroringWatcher *mirroring_watcher,
Context *on_notify_ack)
: mirroring_watcher(mirroring_watcher), on_notify_ack(on_notify_ack) {
}

template <typename Payload>
inline void operator()(const Payload &payload) const {
mirroring_watcher->handle_payload(payload, on_notify_ack);
}
};

void handle_payload(const mirroring_watcher::ModeUpdatedPayload &payload,
Context *on_notify_ack);
void handle_payload(const mirroring_watcher::ImageUpdatedPayload &payload,
Context *on_notify_ack);
void handle_payload(const mirroring_watcher::UnknownPayload &payload,
Context *on_notify_ack);

};

Expand Down
25 changes: 18 additions & 7 deletions src/librbd/ObjectWatcher.cc
Expand Up @@ -180,13 +180,6 @@ void ObjectWatcher<I>::post_rewatch(Context *on_finish) {
on_finish->complete(0);
}

template <typename I>
void ObjectWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
bufferlist &bl) {
ldout(m_cct, 15) << ": notify_id=" << notify_id << ", "
<< "handle=" << handle << dendl;
}

template <typename I>
void ObjectWatcher<I>::acknowledge_notify(uint64_t notify_id, uint64_t handle,
bufferlist &out) {
Expand Down Expand Up @@ -332,6 +325,24 @@ bool ObjectWatcher<I>::pending_unregister_watch(int r) {
return false;
}

template <typename I>
ObjectWatcher<I>::C_NotifyAck::C_NotifyAck(ObjectWatcher *object_watcher,
uint64_t notify_id, uint64_t handle)
: object_watcher(object_watcher), notify_id(notify_id), handle(handle) {
CephContext *cct = object_watcher->m_cct;
ldout(cct, 10) << ": C_NotifyAck start: id=" << notify_id << ", "
<< "handle=" << handle << dendl;
}

template <typename I>
void ObjectWatcher<I>::C_NotifyAck::finish(int r) {
assert(r == 0);
CephContext *cct = object_watcher->m_cct;
ldout(cct, 10) << ": C_NotifyAck finish: id=" << notify_id << ", "
<< "handle=" << handle << dendl;
object_watcher->acknowledge_notify(notify_id, handle, out);
}

} // namespace librbd

template class librbd::ObjectWatcher<librbd::ImageCtx>;
17 changes: 16 additions & 1 deletion src/librbd/ObjectWatcher.h
Expand Up @@ -29,13 +29,28 @@ class ObjectWatcher {
virtual void unregister_watch(Context *on_finish);

protected:
struct C_NotifyAck : public Context {
ObjectWatcher *object_watcher;
uint64_t notify_id;
uint64_t handle;
bufferlist out;

C_NotifyAck(ObjectWatcher *object_watcher, uint64_t notify_id,
uint64_t handle);
virtual void finish(int r);

std::string get_oid() const {
return object_watcher->get_oid();
}
};

librados::IoCtx &m_io_ctx;
CephContext *m_cct;

virtual std::string get_oid() const = 0;

virtual void handle_notify(uint64_t notify_id, uint64_t handle,
bufferlist &bl);
bufferlist &bl) = 0;
void acknowledge_notify(uint64_t notify_id, uint64_t handle, bufferlist &out);

virtual void pre_unwatch(Context *on_finish);
Expand Down
1 change: 1 addition & 0 deletions src/test/Makefile-client.am
Expand Up @@ -362,6 +362,7 @@ librbd_test_la_SOURCES = \
test/librbd/test_ImageWatcher.cc \
test/librbd/test_internal.cc \
test/librbd/test_mirroring.cc \
test/librbd/test_MirroringWatcher.cc \
test/librbd/test_ObjectMap.cc \
test/librbd/journal/test_Entries.cc \
test/librbd/journal/test_Replay.cc
Expand Down
98 changes: 98 additions & 0 deletions src/test/librbd/test_MirroringWatcher.cc
@@ -0,0 +1,98 @@
// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#include "test/librbd/test_fixture.h"
#include "test/librbd/test_support.h"
#include "include/rbd_types.h"
#include "librbd/MirroringWatcher.h"
#include "gtest/gtest.h"
#include "gmock/gmock.h"
#include <list>

void register_test_mirroring_watcher() {
}

namespace librbd {

namespace {

struct MockMirroringWatcher : public MirroringWatcher<> {
std::string oid;

MockMirroringWatcher(ImageCtx &image_ctx)
: MirroringWatcher<>(image_ctx.md_ctx, image_ctx.op_work_queue) {
}

MOCK_METHOD2(handle_mode_updated, void(cls::rbd::MirrorMode, Context*));
MOCK_METHOD4(handle_image_updated, void(cls::rbd::MirrorImageState,
const std::string &,
const std::string &,
Context*));
};

} // anonymous namespace

using ::testing::_;
using ::testing::Invoke;
using ::testing::StrEq;
using ::testing::WithArg;

class TestMirroringWatcher : public TestFixture {
public:
virtual void SetUp() {
TestFixture::SetUp();

bufferlist bl;
ASSERT_EQ(0, m_ioctx.write_full(RBD_MIRRORING, bl));

librbd::ImageCtx *ictx;
ASSERT_EQ(0, open_image(m_image_name, &ictx));

m_image_watcher = new MockMirroringWatcher(*ictx);
C_SaferCond ctx;
m_image_watcher->register_watch(&ctx);
if (ctx.wait() != 0) {
delete m_image_watcher;
m_image_watcher = nullptr;
FAIL();
}
}

virtual void TearDown() {
if (m_image_watcher != nullptr) {
C_SaferCond ctx;
m_image_watcher->unregister_watch(&ctx);
ASSERT_EQ(0, ctx.wait());
delete m_image_watcher;
}
}

MockMirroringWatcher *m_image_watcher = nullptr;
};

TEST_F(TestMirroringWatcher, ModeUpdated) {
EXPECT_CALL(*m_image_watcher, handle_mode_updated(cls::rbd::MIRROR_MODE_DISABLED, _))
.WillRepeatedly(WithArg<1>(Invoke([](Context *on_finish) {
on_finish->complete(0);
})));

ASSERT_EQ(0, MockMirroringWatcher::notify_mode_updated(m_ioctx, cls::rbd::MIRROR_MODE_DISABLED));

}

TEST_F(TestMirroringWatcher, ImageStatusUpdated) {
EXPECT_CALL(*m_image_watcher,
handle_image_updated(cls::rbd::MIRROR_IMAGE_STATE_ENABLED,
StrEq("image id"), StrEq("global image id"),
_))
.WillRepeatedly(WithArg<3>(Invoke([](Context *on_finish) {
on_finish->complete(0);
})));

ASSERT_EQ(0, MockMirroringWatcher::notify_image_updated(m_ioctx,
cls::rbd::MIRROR_IMAGE_STATE_ENABLED,
"image id",
"global image id"));
}

} // namespace librbd
2 changes: 2 additions & 0 deletions src/test/librbd/test_main.cc
Expand Up @@ -16,6 +16,7 @@ extern void register_test_journal_entries();
extern void register_test_journal_replay();
extern void register_test_object_map();
extern void register_test_mirroring();
extern void register_test_mirroring_watcher();
#endif // TEST_LIBRBD_INTERNALS

int main(int argc, char **argv)
Expand All @@ -28,6 +29,7 @@ int main(int argc, char **argv)
register_test_journal_replay();
register_test_object_map();
register_test_mirroring();
register_test_mirroring_watcher();
#endif // TEST_LIBRBD_INTERNALS

::testing::InitGoogleTest(&argc, argv);
Expand Down
4 changes: 4 additions & 0 deletions src/test/librbd/test_mock_ObjectWatcher.cc
Expand Up @@ -30,6 +30,10 @@ struct MockObjectWatcher : public ObjectWatcher<MockImageCtx> {
virtual std::string get_oid() const override {
return oid;
}

virtual void handle_notify(uint64_t notify_id, uint64_t handle,
bufferlist &bl) {
}
};

} // anonymous namespace
Expand Down

0 comments on commit 57db617

Please sign in to comment.