Skip to content

Commit

Permalink
librbd: new journal listener event for force promotion
Browse files Browse the repository at this point in the history
Fixes: http://tracker.ceph.com/issues/16974
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
  • Loading branch information
Jason Dillaman committed Sep 15, 2016
1 parent 84dec47 commit 5eceee8
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 27 deletions.
112 changes: 103 additions & 9 deletions src/librbd/Journal.cc
Expand Up @@ -7,7 +7,6 @@
#include "librbd/ExclusiveLock.h"
#include "librbd/ImageCtx.h"
#include "librbd/journal/Replay.h"
#include "librbd/Utils.h"
#include "cls/journal/cls_journal_types.h"
#include "journal/Journaler.h"
#include "journal/Policy.h"
Expand Down Expand Up @@ -644,16 +643,25 @@ void Journal<I>::close(Context *on_finish) {

template <typename I>
bool Journal<I>::is_tag_owner() const {
Mutex::Locker locker(m_lock);
return is_tag_owner(m_lock);
}

template <typename I>
bool Journal<I>::is_tag_owner(const Mutex &) const {
assert(m_lock.is_locked());
return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
}

template <typename I>
uint64_t Journal<I>::get_tag_tid() const {
Mutex::Locker locker(m_lock);
return m_tag_tid;
}

template <typename I>
journal::TagData Journal<I>::get_tag_data() const {
Mutex::Locker locker(m_lock);
return m_tag_data;
}

Expand All @@ -663,7 +671,7 @@ int Journal<I>::demote() {
ldout(cct, 20) << __func__ << dendl;

Mutex::Locker locker(m_lock);
assert(m_journaler != nullptr && is_tag_owner());
assert(m_journaler != nullptr && is_tag_owner(m_lock));

cls::journal::Client client;
int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
Expand Down Expand Up @@ -739,7 +747,7 @@ void Journal<I>::allocate_local_tag(Context *on_finish) {
predecessor.mirror_uuid = LOCAL_MIRROR_UUID;
{
Mutex::Locker locker(m_lock);
assert(m_journaler != nullptr && is_tag_owner());
assert(m_journaler != nullptr && is_tag_owner(m_lock));

cls::journal::Client client;
int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
Expand Down Expand Up @@ -1172,9 +1180,16 @@ void Journal<I>::destroy_journaler(int r) {
m_journaler->remove_listener(&m_metadata_listener);

transition_state(STATE_CLOSING, r);
m_journaler->shut_down(create_async_context_callback(

Context *ctx = create_async_context_callback(
m_image_ctx, create_context_callback<
Journal<I>, &Journal<I>::handle_journal_destroyed>(this)));
Journal<I>, &Journal<I>::handle_journal_destroyed>(this));
ctx = new FunctionContext(
[this, ctx](int r) {
Mutex::Locker locker(m_lock);
m_journaler->shut_down(ctx);
});
m_async_journal_op_tracker.wait(m_image_ctx, ctx);
}

template <typename I>
Expand Down Expand Up @@ -1733,22 +1748,97 @@ int Journal<I>::check_resync_requested(bool *do_resync) {
return 0;
}

struct C_RefreshTags : public Context {
util::AsyncOpTracker &async_op_tracker;
Context *on_finish = nullptr;

Mutex lock;
uint64_t tag_tid;
journal::TagData tag_data;

C_RefreshTags(util::AsyncOpTracker &async_op_tracker)
: async_op_tracker(async_op_tracker),
lock("librbd::Journal::C_RefreshTags::lock") {
async_op_tracker.start_op();
}
virtual ~C_RefreshTags() {
async_op_tracker.finish_op();
}

virtual void finish(int r) {
on_finish->complete(r);
}
};

template <typename I>
void Journal<I>::handle_metadata_updated() {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;

Mutex::Locker locker(m_lock);

if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
return;
} else if (is_tag_owner(m_lock)) {
ldout(cct, 20) << this << " " << __func__ << ": primary image" << dendl;
return;
} else if (m_listeners.empty()) {
ldout(cct, 20) << this << " " << __func__ << ": no listeners" << dendl;
return;
}

uint64_t refresh_sequence = ++m_refresh_sequence;
ldout(cct, 20) << this << " " << __func__ << ": "
<< "refresh_sequence=" << refresh_sequence << dendl;

// pull the most recent tags from the journal, decode, and
// update the internal tag state
C_RefreshTags *refresh_ctx = new C_RefreshTags(m_async_journal_op_tracker);
refresh_ctx->on_finish = new FunctionContext(
[this, refresh_sequence, refresh_ctx](int r) {
handle_refresh_metadata(refresh_sequence, refresh_ctx->tag_tid,
refresh_ctx->tag_data, r);
});
C_DecodeTags *decode_tags_ctx = new C_DecodeTags(
cct, &refresh_ctx->lock, &refresh_ctx->tag_tid,
&refresh_ctx->tag_data, refresh_ctx);
m_journaler->get_tags(m_tag_tid == 0 ? 0 : m_tag_tid - 1, m_tag_class,
&decode_tags_ctx->tags, decode_tags_ctx);
}

template <typename I>
void Journal<I>::handle_refresh_metadata(uint64_t refresh_sequence,
uint64_t tag_tid,
journal::TagData tag_data, int r) {
CephContext *cct = m_image_ctx.cct;
Mutex::Locker locker(m_lock);

if (r < 0) {
lderr(cct) << this << " " << __func__ << ": failed to refresh metadata: "
<< cpp_strerror(r) << dendl;
return;
} else if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
return;
} else if (refresh_sequence != m_refresh_sequence) {
// another, more up-to-date refresh is in-flight
return;
}

ldout(cct, 20) << this << " " << __func__ << ": "
<< "refresh_sequence=" << refresh_sequence << ", "
<< "tag_tid=" << tag_tid << ", "
<< "tag_data=" << tag_data << dendl;
while (m_listener_notify) {
m_listener_cond.Wait(m_lock);
}

bool was_tag_owner = is_tag_owner(m_lock);
if (m_tag_tid < tag_tid) {
m_tag_tid = tag_tid;
m_tag_data = tag_data;
}
bool promoted_to_primary = (!was_tag_owner && is_tag_owner(m_lock));

bool resync_requested = false;
int r = check_resync_requested(&resync_requested);
r = check_resync_requested(&resync_requested);
if (r < 0) {
lderr(cct) << this << " " << __func__ << ": "
<< "failed to check if a resync was requested" << dendl;
Expand All @@ -1759,7 +1849,11 @@ void Journal<I>::handle_metadata_updated() {
m_listener_notify = true;
m_lock.Unlock();

if (resync_requested) {
if (promoted_to_primary) {
for (auto listener : listeners) {
listener->handle_promoted();
}
} else if (resync_requested) {
for (auto listener : listeners) {
listener->handle_resync();
}
Expand Down
9 changes: 9 additions & 0 deletions src/librbd/Journal.h
Expand Up @@ -14,6 +14,7 @@
#include "journal/JournalMetadataListener.h"
#include "journal/ReplayEntry.h"
#include "journal/ReplayHandler.h"
#include "librbd/Utils.h"
#include "librbd/journal/Types.h"
#include "librbd/journal/TypeTraits.h"
#include <algorithm>
Expand Down Expand Up @@ -296,6 +297,8 @@ class Journal {

journal::Replay<ImageCtxT> *m_journal_replay;

util::AsyncOpTracker m_async_journal_op_tracker;

struct MetadataListener : public ::journal::JournalMetadataListener {
Journal<ImageCtxT> *journal;

Expand All @@ -314,7 +317,10 @@ class Journal {
Cond m_listener_cond;
bool m_listener_notify = false;

uint64_t m_refresh_sequence = 0;

bool is_journal_replaying(const Mutex &) const;
bool is_tag_owner(const Mutex &) const;

uint64_t append_io_events(journal::EventType event_type,
const Bufferlists &bufferlists,
Expand Down Expand Up @@ -363,6 +369,9 @@ class Journal {
int check_resync_requested(bool *do_resync);

void handle_metadata_updated();
void handle_refresh_metadata(uint64_t refresh_sequence, uint64_t tag_tid,
journal::TagData tag_data, int r);

};

} // namespace librbd
Expand Down

0 comments on commit 5eceee8

Please sign in to comment.