diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index ced901aa85718..180ac46be2caf 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -7,7 +7,6 @@ #include "librbd/ExclusiveLock.h" #include "librbd/ImageCtx.h" #include "librbd/journal/Replay.h" -#include "librbd/Utils.h" #include "cls/journal/cls_journal_types.h" #include "journal/Journaler.h" #include "journal/Policy.h" @@ -644,16 +643,25 @@ void Journal::close(Context *on_finish) { template bool Journal::is_tag_owner() const { + Mutex::Locker locker(m_lock); + return is_tag_owner(m_lock); +} + +template +bool Journal::is_tag_owner(const Mutex &) const { + assert(m_lock.is_locked()); return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID); } template uint64_t Journal::get_tag_tid() const { + Mutex::Locker locker(m_lock); return m_tag_tid; } template journal::TagData Journal::get_tag_data() const { + Mutex::Locker locker(m_lock); return m_tag_data; } @@ -663,7 +671,7 @@ int Journal::demote() { ldout(cct, 20) << __func__ << dendl; Mutex::Locker locker(m_lock); - assert(m_journaler != nullptr && is_tag_owner()); + assert(m_journaler != nullptr && is_tag_owner(m_lock)); cls::journal::Client client; int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client); @@ -739,7 +747,7 @@ void Journal::allocate_local_tag(Context *on_finish) { predecessor.mirror_uuid = LOCAL_MIRROR_UUID; { Mutex::Locker locker(m_lock); - assert(m_journaler != nullptr && is_tag_owner()); + assert(m_journaler != nullptr && is_tag_owner(m_lock)); cls::journal::Client client; int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client); @@ -1172,9 +1180,16 @@ void Journal::destroy_journaler(int r) { m_journaler->remove_listener(&m_metadata_listener); transition_state(STATE_CLOSING, r); - m_journaler->shut_down(create_async_context_callback( + + Context *ctx = create_async_context_callback( m_image_ctx, create_context_callback< - Journal, &Journal::handle_journal_destroyed>(this))); + Journal, &Journal::handle_journal_destroyed>(this)); + ctx = new FunctionContext( + [this, ctx](int r) { + Mutex::Locker locker(m_lock); + m_journaler->shut_down(ctx); + }); + m_async_journal_op_tracker.wait(m_image_ctx, ctx); } template @@ -1733,22 +1748,97 @@ int Journal::check_resync_requested(bool *do_resync) { return 0; } +struct C_RefreshTags : public Context { + util::AsyncOpTracker &async_op_tracker; + Context *on_finish = nullptr; + + Mutex lock; + uint64_t tag_tid; + journal::TagData tag_data; + + C_RefreshTags(util::AsyncOpTracker &async_op_tracker) + : async_op_tracker(async_op_tracker), + lock("librbd::Journal::C_RefreshTags::lock") { + async_op_tracker.start_op(); + } + virtual ~C_RefreshTags() { + async_op_tracker.finish_op(); + } + + virtual void finish(int r) { + on_finish->complete(r); + } +}; + template void Journal::handle_metadata_updated() { CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << this << " " << __func__ << dendl; - Mutex::Locker locker(m_lock); + if (m_state != STATE_READY && !is_journal_replaying(m_lock)) { return; + } else if (is_tag_owner(m_lock)) { + ldout(cct, 20) << this << " " << __func__ << ": primary image" << dendl; + return; + } else if (m_listeners.empty()) { + ldout(cct, 20) << this << " " << __func__ << ": no listeners" << dendl; + return; } + uint64_t refresh_sequence = ++m_refresh_sequence; + ldout(cct, 20) << this << " " << __func__ << ": " + << "refresh_sequence=" << refresh_sequence << dendl; + + // pull the most recent tags from the journal, decode, and + // update the internal tag state + C_RefreshTags *refresh_ctx = new C_RefreshTags(m_async_journal_op_tracker); + refresh_ctx->on_finish = new FunctionContext( + [this, refresh_sequence, refresh_ctx](int r) { + handle_refresh_metadata(refresh_sequence, refresh_ctx->tag_tid, + refresh_ctx->tag_data, r); + }); + C_DecodeTags *decode_tags_ctx = new C_DecodeTags( + cct, &refresh_ctx->lock, &refresh_ctx->tag_tid, + &refresh_ctx->tag_data, refresh_ctx); + m_journaler->get_tags(m_tag_tid == 0 ? 0 : m_tag_tid - 1, m_tag_class, + &decode_tags_ctx->tags, decode_tags_ctx); +} + +template +void Journal::handle_refresh_metadata(uint64_t refresh_sequence, + uint64_t tag_tid, + journal::TagData tag_data, int r) { + CephContext *cct = m_image_ctx.cct; + Mutex::Locker locker(m_lock); + + if (r < 0) { + lderr(cct) << this << " " << __func__ << ": failed to refresh metadata: " + << cpp_strerror(r) << dendl; + return; + } else if (m_state != STATE_READY && !is_journal_replaying(m_lock)) { + return; + } else if (refresh_sequence != m_refresh_sequence) { + // another, more up-to-date refresh is in-flight + return; + } + + ldout(cct, 20) << this << " " << __func__ << ": " + << "refresh_sequence=" << refresh_sequence << ", " + << "tag_tid=" << tag_tid << ", " + << "tag_data=" << tag_data << dendl; while (m_listener_notify) { m_listener_cond.Wait(m_lock); } + bool was_tag_owner = is_tag_owner(m_lock); + if (m_tag_tid < tag_tid) { + m_tag_tid = tag_tid; + m_tag_data = tag_data; + } + bool promoted_to_primary = (!was_tag_owner && is_tag_owner(m_lock)); + bool resync_requested = false; - int r = check_resync_requested(&resync_requested); + r = check_resync_requested(&resync_requested); if (r < 0) { lderr(cct) << this << " " << __func__ << ": " << "failed to check if a resync was requested" << dendl; @@ -1759,7 +1849,11 @@ void Journal::handle_metadata_updated() { m_listener_notify = true; m_lock.Unlock(); - if (resync_requested) { + if (promoted_to_primary) { + for (auto listener : listeners) { + listener->handle_promoted(); + } + } else if (resync_requested) { for (auto listener : listeners) { listener->handle_resync(); } diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h index e9a1d349a5dc9..374c0bedce1ac 100644 --- a/src/librbd/Journal.h +++ b/src/librbd/Journal.h @@ -14,6 +14,7 @@ #include "journal/JournalMetadataListener.h" #include "journal/ReplayEntry.h" #include "journal/ReplayHandler.h" +#include "librbd/Utils.h" #include "librbd/journal/Types.h" #include "librbd/journal/TypeTraits.h" #include @@ -296,6 +297,8 @@ class Journal { journal::Replay *m_journal_replay; + util::AsyncOpTracker m_async_journal_op_tracker; + struct MetadataListener : public ::journal::JournalMetadataListener { Journal *journal; @@ -314,7 +317,10 @@ class Journal { Cond m_listener_cond; bool m_listener_notify = false; + uint64_t m_refresh_sequence = 0; + bool is_journal_replaying(const Mutex &) const; + bool is_tag_owner(const Mutex &) const; uint64_t append_io_events(journal::EventType event_type, const Bufferlists &bufferlists, @@ -363,6 +369,9 @@ class Journal { int check_resync_requested(bool *do_resync); void handle_metadata_updated(); + void handle_refresh_metadata(uint64_t refresh_sequence, uint64_t tag_tid, + journal::TagData tag_data, int r); + }; } // namespace librbd diff --git a/src/test/librbd/test_mock_Journal.cc b/src/test/librbd/test_mock_Journal.cc index c21f85892a8d5..5ea25c8a6e02a 100644 --- a/src/test/librbd/test_mock_Journal.cc +++ b/src/test/librbd/test_mock_Journal.cc @@ -257,8 +257,11 @@ class TestMockJournal : public TestMockFixture { void expect_get_journaler_tags(MockImageCtx &mock_image_ctx, ::journal::MockJournaler &mock_journaler, - int r) { + bool primary, int r) { journal::TagData tag_data; + if (!primary) { + tag_data.mirror_uuid = "remote mirror uuid"; + } bufferlist tag_data_bl; ::encode(tag_data, tag_data_bl); @@ -271,6 +274,15 @@ class TestMockJournal : public TestMockFixture { .WillOnce(SaveArg<0>(&m_listener)); } + void expect_get_journaler_tags(MockImageCtx &mock_image_ctx, + ::journal::MockJournaler &mock_journaler, + uint64_t start_after_tag_tid, + ::journal::Journaler::Tags &&tags, int r) { + EXPECT_CALL(mock_journaler, get_tags(start_after_tag_tid, 0, _, _)) + .WillOnce(DoAll(SetArgPointee<2>(tags), + WithArg<3>(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue)))); + } + void expect_start_replay(MockJournalImageCtx &mock_image_ctx, ::journal::MockJournaler &mock_journaler, const ReplayActions &actions) { @@ -421,7 +433,8 @@ class TestMockJournal : public TestMockFixture { void open_journal(MockJournalImageCtx &mock_image_ctx, MockJournal &mock_journal, - ::journal::MockJournaler &mock_journaler) { + ::journal::MockJournaler &mock_journaler, + bool primary = true) { expect_op_work_queue(mock_image_ctx); InSequence seq; @@ -429,7 +442,7 @@ class TestMockJournal : public TestMockFixture { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, primary, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -477,7 +490,7 @@ TEST_F(TestMockJournal, StateTransitions) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_ready, _1), @@ -566,7 +579,7 @@ TEST_F(TestMockJournal, GetTagsError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, -EBADMSG); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, -EBADMSG); expect_shut_down_journaler(mock_journaler); ASSERT_EQ(-EBADMSG, when_open(mock_journal)); } @@ -588,7 +601,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, -EINVAL) @@ -604,7 +617,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -637,7 +650,7 @@ TEST_F(TestMockJournal, FlushReplayError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_ready, _1), @@ -658,7 +671,7 @@ TEST_F(TestMockJournal, FlushReplayError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -691,7 +704,7 @@ TEST_F(TestMockJournal, CorruptEntry) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_ready, _1), @@ -711,7 +724,7 @@ TEST_F(TestMockJournal, CorruptEntry) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -743,7 +756,7 @@ TEST_F(TestMockJournal, StopError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -776,7 +789,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); ::journal::ReplayHandler *replay_handler = nullptr; expect_start_replay( @@ -806,7 +819,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -860,7 +873,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_ready, _1), @@ -885,7 +898,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -1261,7 +1274,7 @@ TEST_F(TestMockJournal, ResyncRequested) { MockJournalImageCtx mock_image_ctx(*ictx); MockJournal mock_journal(mock_image_ctx); ::journal::MockJournaler mock_journaler; - open_journal(mock_image_ctx, mock_journal, mock_journaler); + open_journal(mock_image_ctx, mock_journal, mock_journaler, false); struct Listener : public journal::Listener { C_SaferCond ctx; @@ -1283,6 +1296,15 @@ TEST_F(TestMockJournal, ResyncRequested) { }; InSequence seq; + + journal::TagData tag_data; + tag_data.mirror_uuid == Journal<>::LOCAL_MIRROR_UUID; + + bufferlist tag_data_bl; + ::encode(tag_data, tag_data_bl); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0, + {{0, 0, tag_data_bl}}, 0); + journal::ImageClientMeta image_client_meta; image_client_meta.tag_class = 0; image_client_meta.resync_requested = true; @@ -1293,4 +1315,53 @@ TEST_F(TestMockJournal, ResyncRequested) { ASSERT_EQ(0, listener.ctx.wait()); } +TEST_F(TestMockJournal, ForcePromoted) { + REQUIRE_FEATURE(RBD_FEATURE_JOURNALING); + + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); + + MockJournalImageCtx mock_image_ctx(*ictx); + MockJournal mock_journal(mock_image_ctx); + ::journal::MockJournaler mock_journaler; + open_journal(mock_image_ctx, mock_journal, mock_journaler, false); + + struct Listener : public journal::Listener { + C_SaferCond ctx; + virtual void handle_close() { + ADD_FAILURE() << "unexpected close action"; + } + virtual void handle_resync() { + ADD_FAILURE() << "unexpected resync event"; + } + virtual void handle_promoted() { + ctx.complete(0); + } + } listener; + mock_journal.add_listener(&listener); + + BOOST_SCOPE_EXIT_ALL(&) { + mock_journal.remove_listener(&listener); + close_journal(mock_journal, mock_journaler); + }; + + InSequence seq; + + journal::TagData tag_data; + tag_data.mirror_uuid == Journal<>::LOCAL_MIRROR_UUID; + + bufferlist tag_data_bl; + ::encode(tag_data, tag_data_bl); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0, + {{100, 0, tag_data_bl}}, 0); + + journal::ImageClientMeta image_client_meta; + image_client_meta.tag_class = 0; + expect_get_journaler_cached_client(mock_journaler, image_client_meta, 0); + expect_shut_down_journaler(mock_journaler); + + m_listener->handle_update(nullptr); + ASSERT_EQ(0, listener.ctx.wait()); +} + } // namespace librbd diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 4baef7f315fd2..3765dc0867064 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -217,7 +217,7 @@ class ImageReplayer { } virtual void handle_promoted() { - // TODO + img_replayer->on_stop_journal_replay(0, "force promoted"); } virtual void handle_resync() {