Skip to content

Commit

Permalink
rbd-mirror A/A: leader should track up/down rbd-mirror instances
Browse files Browse the repository at this point in the history
Fixes: http://tracker.ceph.com/issues/18784
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
  • Loading branch information
Mykola Golub committed Feb 21, 2017
1 parent cdac0b1 commit eaf9e7c
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 34 deletions.
20 changes: 20 additions & 0 deletions src/test/rbd_mirror/test_mock_LeaderWatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,24 @@ struct ManagedLock<MockTestImageCtx> {

} // namespace librbd

namespace rbd {
namespace mirror {

template <>
class Instances<librbd::MockTestImageCtx> {
public:
Instances(Threads *threads, librados::IoCtx &ioctx) {
}

MOCK_METHOD1(init, void(Context *));
MOCK_METHOD1(shut_down, void(Context *));
MOCK_METHOD1(notify, void(const std::string &));
};

} // namespace mirror
} // namespace rbd


// template definitions
#include "tools/rbd_mirror/LeaderWatcher.cc"

Expand Down Expand Up @@ -132,11 +150,13 @@ class TestMockLeaderWatcher : public TestMockFixture {
}

void expect_try_acquire_lock(MockManagedLock &mock_managed_lock, int r) {
// TODO: kick post_acquire_lock_handler
EXPECT_CALL(mock_managed_lock, try_acquire_lock(_))
.WillOnce(CompleteContext(r));
}

void expect_release_lock(MockManagedLock &mock_managed_lock, int r) {
// TODO: kick pre/post_release_lock_handler
EXPECT_CALL(mock_managed_lock, release_lock(_))
.WillOnce(CompleteContext(r));
}
Expand Down
131 changes: 118 additions & 13 deletions src/tools/rbd_mirror/LeaderWatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#undef dout_prefix
#define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \
<< this << " " << __func__ << ": "

namespace rbd {
namespace mirror {

Expand Down Expand Up @@ -312,7 +311,7 @@ void LeaderWatcher<I>::handle_post_acquire_leader_lock(int r,
Mutex::Locker locker(m_lock);
assert(m_on_finish == nullptr);
m_on_finish = on_finish;
m_notify_error = 0;
m_ret_val = 0;

init_status_watcher();
}
Expand All @@ -324,7 +323,7 @@ void LeaderWatcher<I>::handle_pre_release_leader_lock(Context *on_finish) {
Mutex::Locker locker(m_lock);
assert(m_on_finish == nullptr);
m_on_finish = on_finish;
m_notify_error = 0;
m_ret_val = 0;

notify_listener();
}
Expand Down Expand Up @@ -484,7 +483,7 @@ void LeaderWatcher<I>::handle_acquire_leader_lock(int r) {

m_acquire_attempts = 0;

if (m_notify_error) {
if (m_ret_val) {
dout(5) << "releasing due to error on notify" << dendl;
release_leader_lock();
return;
Expand Down Expand Up @@ -545,7 +544,7 @@ void LeaderWatcher<I>::handle_init_status_watcher(int r) {
Mutex::Locker locker(m_lock);

if (r == 0) {
notify_listener();
init_instances();
return;
}

Expand Down Expand Up @@ -583,10 +582,15 @@ void LeaderWatcher<I>::handle_shut_down_status_watcher(int r) {
if (r < 0) {
derr << "error shutting mirror status watcher down: " << cpp_strerror(r)
<< dendl;
if (!is_leader(m_lock)) {
// ignore on releasing
r = 0;
}
}

if (m_ret_val != 0) {
r = m_ret_val;
}

if (!is_leader(m_lock)) {
// ignore on releasing
r = 0;
}

assert(m_status_watcher);
Expand All @@ -598,6 +602,65 @@ void LeaderWatcher<I>::handle_shut_down_status_watcher(int r) {
on_finish->complete(r);
}

template <typename I>
void LeaderWatcher<I>::init_instances() {
dout(20) << dendl;

assert(m_lock.is_locked());
assert(!m_instances);

m_instances.reset(new Instances<I>(m_threads, m_ioctx));

Context *ctx = create_context_callback<
LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_instances>(this);

m_instances->init(ctx);
}

template <typename I>
void LeaderWatcher<I>::handle_init_instances(int r) {
dout(20) << "r=" << r << dendl;

Mutex::Locker locker(m_lock);

if (r < 0) {
derr << "error initializing instances: " << cpp_strerror(r) << dendl;
m_ret_val = r;
m_instances.reset();
shut_down_status_watcher();
return;
}

notify_listener();
}

template <typename I>
void LeaderWatcher<I>::shut_down_instances() {
dout(20) << dendl;

assert(m_lock.is_locked());
assert(m_instances);

Context *ctx = create_async_context_callback(
m_work_queue, create_context_callback<LeaderWatcher<I>,
&LeaderWatcher<I>::handle_shut_down_instances>(this));

m_instances->shut_down(ctx);
}

template <typename I>
void LeaderWatcher<I>::handle_shut_down_instances(int r) {
dout(20) << "r=" << r << dendl;
assert(r == 0);

Mutex::Locker locker(m_lock);

assert(m_instances);
m_instances.reset();

shut_down_status_watcher();
}

template <typename I>
void LeaderWatcher<I>::notify_listener() {
dout(20) << dendl;
Expand Down Expand Up @@ -630,13 +693,13 @@ void LeaderWatcher<I>::handle_notify_listener(int r) {

if (r < 0) {
derr << "error notifying listener: " << cpp_strerror(r) << dendl;
m_notify_error = r;
m_ret_val = r;
}

if (is_leader(m_lock)) {
notify_lock_acquired();
} else {
shut_down_status_watcher();
shut_down_instances();
}
}

Expand Down Expand Up @@ -665,7 +728,7 @@ void LeaderWatcher<I>::handle_notify_lock_acquired(int r) {
if (r < 0 && r != -ETIMEDOUT) {
derr << "error notifying leader lock acquired: " << cpp_strerror(r)
<< dendl;
m_notify_error = r;
m_ret_val = r;
}

assert(m_on_finish != nullptr);
Expand Down Expand Up @@ -749,6 +812,16 @@ void LeaderWatcher<I>::handle_notify_heartbeat(int r) {
&LeaderWatcher<I>::notify_heartbeat);
}

template <typename I>
void LeaderWatcher<I>::notify_heartbeat_ack() {
dout(20) << dendl;

bufferlist bl;
::encode(NotifyMessage{HeartbeatAckPayload{}}, bl);

send_notify(bl);
}

template <typename I>
void LeaderWatcher<I>::handle_heartbeat(Context *on_notify_ack) {
dout(20) << dendl;
Expand All @@ -762,6 +835,24 @@ void LeaderWatcher<I>::handle_heartbeat(Context *on_notify_ack) {
m_acquire_attempts = 0;
cancel_timer_task();
get_locker();
notify_heartbeat_ack();
}
}

on_notify_ack->complete(0);
}

template <typename I>
void LeaderWatcher<I>::handle_heartbeat_ack(uint64_t notifier_id,
Context *on_notify_ack) {
dout(20) << dendl;

{
Mutex::Locker locker(m_lock);
if (is_leader(m_lock)) {
assert(m_instances);
std::string instance_id = stringify(notifier_id);
m_instances->notify(instance_id);
}
}

Expand Down Expand Up @@ -829,19 +920,31 @@ void LeaderWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
return;
}

apply_visitor(HandlePayloadVisitor(this, ctx), notify_message.payload);
apply_visitor(HandlePayloadVisitor(this, notifier_id, ctx),
notify_message.payload);
}

template <typename I>
void LeaderWatcher<I>::handle_payload(const HeartbeatPayload &payload,
uint64_t notifier_id,
Context *on_notify_ack) {
dout(20) << "heartbeat" << dendl;

handle_heartbeat(on_notify_ack);
}

template <typename I>
void LeaderWatcher<I>::handle_payload(const HeartbeatAckPayload &payload,
uint64_t notifier_id,
Context *on_notify_ack) {
dout(20) << "heartbeat ack" << dendl;

handle_heartbeat_ack(notifier_id, on_notify_ack);
}

template <typename I>
void LeaderWatcher<I>::handle_payload(const LockAcquiredPayload &payload,
uint64_t notifier_id,
Context *on_notify_ack) {
dout(20) << "lock_acquired" << dendl;

Expand All @@ -850,6 +953,7 @@ void LeaderWatcher<I>::handle_payload(const LockAcquiredPayload &payload,

template <typename I>
void LeaderWatcher<I>::handle_payload(const LockReleasedPayload &payload,
uint64_t notifier_id,
Context *on_notify_ack) {
dout(20) << "lock_released" << dendl;

Expand All @@ -858,6 +962,7 @@ void LeaderWatcher<I>::handle_payload(const LockReleasedPayload &payload,

template <typename I>
void LeaderWatcher<I>::handle_payload(const UnknownPayload &payload,
uint64_t notifier_id,
Context *on_notify_ack) {
dout(20) << "unknown" << dendl;

Expand Down
Loading

0 comments on commit eaf9e7c

Please sign in to comment.