diff --git a/qa/workunits/rbd/rbd_mirror.sh b/qa/workunits/rbd/rbd_mirror.sh index 5b016f0b6f5b4..361c78fc4b484 100755 --- a/qa/workunits/rbd/rbd_mirror.sh +++ b/qa/workunits/rbd/rbd_mirror.sh @@ -21,9 +21,9 @@ create_image ${CLUSTER2} ${POOL} ${image} wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image} write_image ${CLUSTER2} ${POOL} ${image} 100 wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image} -test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position' +wait_for_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position' if [ -z "${RBD_MIRROR_USE_RBD_MIRROR}" ]; then - test_status_in_pool_dir ${CLUSTER2} ${POOL} ${image} 'down+unknown' + wait_for_status_in_pool_dir ${CLUSTER2} ${POOL} ${image} 'down+unknown' fi compare_images ${POOL} ${image} @@ -35,16 +35,16 @@ write_image ${CLUSTER2} ${POOL} ${image1} 100 start_mirror ${CLUSTER1} wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image1} wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image1} -test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image1} 'up+replaying' 'master_position' +wait_for_status_in_pool_dir ${CLUSTER1} ${POOL} ${image1} 'up+replaying' 'master_position' if [ -z "${RBD_MIRROR_USE_RBD_MIRROR}" ]; then - test_status_in_pool_dir ${CLUSTER2} ${POOL} ${image1} 'down+unknown' + wait_for_status_in_pool_dir ${CLUSTER2} ${POOL} ${image1} 'down+unknown' fi compare_images ${POOL} ${image1} testlog "TEST: test the first image is replaying after restart" write_image ${CLUSTER2} ${POOL} ${image} 100 wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image} -test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position' +wait_for_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position' compare_images ${POOL} ${image} testlog "TEST: stop/start/restart mirror via admin socket" @@ -102,41 +102,58 @@ start_mirror ${CLUSTER2} # demote and promote same cluster demote_image ${CLUSTER2} ${POOL} ${image} wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image} -test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+stopped' -test_status_in_pool_dir ${CLUSTER2} ${POOL} ${image} 'up+stopped' +wait_for_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+stopped' +wait_for_status_in_pool_dir ${CLUSTER2} ${POOL} ${image} 'up+stopped' promote_image ${CLUSTER2} ${POOL} ${image} wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image} write_image ${CLUSTER2} ${POOL} ${image} 100 wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image} -test_status_in_pool_dir ${CLUSTER2} ${POOL} ${image} 'up+stopped' -test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position' +wait_for_status_in_pool_dir ${CLUSTER2} ${POOL} ${image} 'up+stopped' +wait_for_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position' compare_images ${POOL} ${image} # failover demote_image ${CLUSTER2} ${POOL} ${image} wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image} -test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+stopped' -test_status_in_pool_dir ${CLUSTER2} ${POOL} ${image} 'up+stopped' +wait_for_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+stopped' +wait_for_status_in_pool_dir ${CLUSTER2} ${POOL} ${image} 'up+stopped' promote_image ${CLUSTER1} ${POOL} ${image} wait_for_image_replay_started ${CLUSTER2} ${POOL} ${image} write_image ${CLUSTER1} ${POOL} ${image} 100 wait_for_replay_complete ${CLUSTER2} ${CLUSTER1} ${POOL} ${image} -test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+stopped' -test_status_in_pool_dir ${CLUSTER2} ${POOL} ${image} 'up+replaying' 'master_position' +wait_for_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+stopped' +wait_for_status_in_pool_dir ${CLUSTER2} ${POOL} ${image} 'up+replaying' 'master_position' compare_images ${POOL} ${image} # failback demote_image ${CLUSTER1} ${POOL} ${image} wait_for_image_replay_stopped ${CLUSTER2} ${POOL} ${image} -test_status_in_pool_dir ${CLUSTER2} ${POOL} ${image} 'up+stopped' +wait_for_status_in_pool_dir ${CLUSTER2} ${POOL} ${image} 'up+stopped' promote_image ${CLUSTER2} ${POOL} ${image} wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image} write_image ${CLUSTER2} ${POOL} ${image} 100 wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image} -test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position' -test_status_in_pool_dir ${CLUSTER2} ${POOL} ${image} 'up+stopped' +wait_for_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position' +wait_for_status_in_pool_dir ${CLUSTER2} ${POOL} ${image} 'up+stopped' compare_images ${POOL} ${image} +# force promote +force_promote_image=test_force_promote +create_image ${CLUSTER2} ${POOL} ${force_promote_image} +write_image ${CLUSTER2} ${POOL} ${force_promote_image} 100 +wait_for_image_replay_stopped ${CLUSTER2} ${POOL} ${force_promote_image} +wait_for_image_replay_started ${CLUSTER1} ${POOL} ${force_promote_image} +wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${force_promote_image} +wait_for_status_in_pool_dir ${CLUSTER1} ${POOL} ${force_promote_image} 'up+replaying' 'master_position' +wait_for_status_in_pool_dir ${CLUSTER2} ${POOL} ${force_promote_image} 'up+stopped' +promote_image ${CLUSTER1} ${POOL} ${force_promote_image} '--force' +wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${force_promote_image} +wait_for_image_replay_stopped ${CLUSTER2} ${POOL} ${force_promote_image} +wait_for_status_in_pool_dir ${CLUSTER1} ${POOL} ${force_promote_image} 'up+stopped' +wait_for_status_in_pool_dir ${CLUSTER2} ${POOL} ${force_promote_image} 'up+stopped' +write_image ${CLUSTER1} ${POOL} ${force_promote_image} 100 +write_image ${CLUSTER2} ${POOL} ${force_promote_image} 100 + testlog "TEST: cloned images" parent_image=test_parent parent_snap=snap @@ -152,12 +169,12 @@ write_image ${CLUSTER2} ${POOL} ${clone_image} 100 enable_mirror ${CLUSTER2} ${PARENT_POOL} ${parent_image} wait_for_image_replay_started ${CLUSTER1} ${PARENT_POOL} ${parent_image} wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${PARENT_POOL} ${parent_image} -test_status_in_pool_dir ${CLUSTER1} ${PARENT_POOL} ${parent_image} 'up+replaying' 'master_position' +wait_for_status_in_pool_dir ${CLUSTER1} ${PARENT_POOL} ${parent_image} 'up+replaying' 'master_position' compare_images ${PARENT_POOL} ${parent_image} wait_for_image_replay_started ${CLUSTER1} ${POOL} ${clone_image} wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${clone_image} -test_status_in_pool_dir ${CLUSTER1} ${POOL} ${clone_image} 'up+replaying' 'master_position' +wait_for_status_in_pool_dir ${CLUSTER1} ${POOL} ${clone_image} 'up+replaying' 'master_position' compare_images ${POOL} ${clone_image} expect_failure "is non-primary" clone_image ${CLUSTER1} ${PARENT_POOL} \ @@ -194,7 +211,7 @@ for i in ${image3} ${image5}; do remove_snapshot ${CLUSTER2} ${POOL} ${i} 'snap2' # workaround #16555: before removing make sure it is not still bootstrapped wait_for_image_replay_started ${CLUSTER1} ${POOL} ${i} - remove_image ${CLUSTER2} ${POOL} ${i} + remove_image_retry ${CLUSTER2} ${POOL} ${i} done for i in ${image2} ${image3} ${image4} ${image5}; do @@ -234,7 +251,7 @@ testlog "TEST: simple image resync" request_resync_image ${CLUSTER1} ${POOL} ${image} wait_for_image_present ${CLUSTER1} ${POOL} ${image} 'deleted' wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image} -test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position' +wait_for_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position' compare_images ${POOL} ${image} testlog "TEST: image resync while replayer is stopped" @@ -245,7 +262,7 @@ admin_daemon ${CLUSTER1} rbd mirror start ${POOL}/${image} wait_for_image_present ${CLUSTER1} ${POOL} ${image} 'deleted' admin_daemon ${CLUSTER1} rbd mirror start ${POOL}/${image} wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image} -test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position' +wait_for_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position' compare_images ${POOL} ${image} testlog "TEST: request image resync while daemon is offline" @@ -253,7 +270,7 @@ stop_mirror ${CLUSTER1} request_resync_image ${CLUSTER1} ${POOL} ${image} start_mirror ${CLUSTER1} wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image} -test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position' +wait_for_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position' compare_images ${POOL} ${image} testlog "TEST: client disconnect" @@ -268,7 +285,7 @@ test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})" disconnect_image ${CLUSTER2} ${POOL} ${image} test -z "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})" wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image} -test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected' +wait_for_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected' testlog " - replay started after resync requested" request_resync_image ${CLUSTER1} ${POOL} ${image} @@ -293,7 +310,7 @@ set_image_meta ${CLUSTER2} ${POOL} ${image} \ testlog " - replay is still stopped (disconnected) after restart" admin_daemon ${CLUSTER1} rbd mirror start ${POOL}/${image} wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image} -test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected' +wait_for_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected' testlog " - replay started after resync requested" request_resync_image ${CLUSTER1} ${POOL} ${image} @@ -317,6 +334,6 @@ set_image_meta ${CLUSTER1} ${POOL} ${image} \ disconnect_image ${CLUSTER2} ${POOL} ${image} test -z "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})" wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image} -test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected' +wait_for_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected' echo OK diff --git a/qa/workunits/rbd/rbd_mirror_helpers.sh b/qa/workunits/rbd/rbd_mirror_helpers.sh index 2b3e58073948b..1088ae24b0d01 100755 --- a/qa/workunits/rbd/rbd_mirror_helpers.sh +++ b/qa/workunits/rbd/rbd_mirror_helpers.sh @@ -487,6 +487,21 @@ test_status_in_pool_dir() grep "description: .*${description_pattern}" ${status_log} } +wait_for_status_in_pool_dir() +{ + local cluster=$1 + local pool=$2 + local image=$3 + local state_pattern=$4 + local description_pattern=$5 + + for s in 1 2 4 8 8 8 8 8 8 8 8 16 16; do + sleep ${s} + test_status_in_pool_dir ${cluster} ${pool} ${image} ${state_pattern} ${description_pattern} && return 0 + done + return 1 +} + create_image() { local cluster=$1 ; shift @@ -520,6 +535,7 @@ remove_image() local pool=$2 local image=$3 + rbd --cluster=${cluster} -p ${pool} snap purge ${image} rbd --cluster=${cluster} -p ${pool} rm ${image} } @@ -692,8 +708,9 @@ promote_image() local cluster=$1 local pool=$2 local image=$3 + local force=$4 - rbd --cluster=${cluster} mirror image promote ${pool}/${image} + rbd --cluster=${cluster} mirror image promote ${pool}/${image} ${force} } set_pool_mirror_mode() diff --git a/qa/workunits/rbd/rbd_mirror_stress.sh b/qa/workunits/rbd/rbd_mirror_stress.sh index c7cd75ad33141..229169ba2c71d 100755 --- a/qa/workunits/rbd/rbd_mirror_stress.sh +++ b/qa/workunits/rbd/rbd_mirror_stress.sh @@ -100,7 +100,7 @@ for i in `seq 1 10` do stress_write_image ${CLUSTER2} ${POOL} ${image} - test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position' + wait_for_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position' snap_name="snap${i}" create_snap ${CLUSTER2} ${POOL} ${image} ${snap_name} @@ -114,7 +114,7 @@ do remove_snapshot ${CLUSTER2} ${POOL} ${image} ${snap_name} done -remove_image ${CLUSTER2} ${POOL} ${image} +remove_image_retry ${CLUSTER2} ${POOL} ${image} wait_for_image_present ${CLUSTER1} ${POOL} ${image} 'deleted' testlog "TEST: create many images" @@ -152,7 +152,7 @@ for i in `seq 1 ${IMAGE_COUNT}` do image="image_${i}" remove_snapshot ${CLUSTER2} ${POOL} ${image} ${snap_name} - remove_image ${CLUSTER2} ${POOL} ${image} + remove_image_retry ${CLUSTER2} ${POOL} ${image} done testlog "TEST: image deletions should propagate" diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc index 08a7e9307aa45..36dffdfb4d8bc 100644 --- a/src/journal/JournalMetadata.cc +++ b/src/journal/JournalMetadata.cc @@ -256,21 +256,23 @@ struct C_GetTags : public Context { const std::string &oid; const std::string &client_id; AsyncOpTracker &async_op_tracker; + uint64_t start_after_tag_tid; boost::optional tag_class; JournalMetadata::Tags *tags; Context *on_finish; const uint64_t MAX_RETURN = 64; - uint64_t start_after_tag_tid = 0; bufferlist out_bl; C_GetTags(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid, const std::string &client_id, AsyncOpTracker &async_op_tracker, + uint64_t start_after_tag_tid, const boost::optional &tag_class, JournalMetadata::Tags *tags, Context *on_finish) : cct(cct), ioctx(ioctx), oid(oid), client_id(client_id), - async_op_tracker(async_op_tracker), tag_class(tag_class), tags(tags), - on_finish(on_finish) { + async_op_tracker(async_op_tracker), + start_after_tag_tid(start_after_tag_tid), tag_class(tag_class), + tags(tags), on_finish(on_finish) { async_op_tracker.start_op(); } virtual ~C_GetTags() { @@ -579,11 +581,12 @@ void JournalMetadata::get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish) { ctx->send(); } -void JournalMetadata::get_tags(const boost::optional &tag_class, +void JournalMetadata::get_tags(uint64_t start_after_tag_tid, + const boost::optional &tag_class, Tags *tags, Context *on_finish) { C_GetTags *ctx = new C_GetTags(m_cct, m_ioctx, m_oid, m_client_id, - m_async_op_tracker, tag_class, - tags, on_finish); + m_async_op_tracker, start_after_tag_tid, + tag_class, tags, on_finish); ctx->send(); } diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h index 4055f9937047f..880130126dd9e 100644 --- a/src/journal/JournalMetadata.h +++ b/src/journal/JournalMetadata.h @@ -71,7 +71,8 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable { void allocate_tag(uint64_t tag_class, const bufferlist &data, Tag *tag, Context *on_finish); void get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish); - void get_tags(const boost::optional &tag_class, Tags *tags, + void get_tags(uint64_t start_after_tag_tid, + const boost::optional &tag_class, Tags *tags, Context *on_finish); inline const Settings &get_settings() const { diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc index 3487b756178c8..108e428e541d8 100644 --- a/src/journal/Journaler.cc +++ b/src/journal/Journaler.cc @@ -316,7 +316,12 @@ void Journaler::get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish) { } void Journaler::get_tags(uint64_t tag_class, Tags *tags, Context *on_finish) { - m_metadata->get_tags(tag_class, tags, on_finish); + m_metadata->get_tags(0, tag_class, tags, on_finish); +} + +void Journaler::get_tags(uint64_t start_after_tag_tid, uint64_t tag_class, + Tags *tags, Context *on_finish) { + m_metadata->get_tags(start_after_tag_tid, tag_class, tags, on_finish); } void Journaler::start_replay(ReplayHandler *replay_handler) { diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h index 49d1f67aec975..1ba56c62b6cf2 100644 --- a/src/journal/Journaler.h +++ b/src/journal/Journaler.h @@ -96,6 +96,8 @@ class Journaler { cls::journal::Tag *tag, Context *on_finish); void get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish); void get_tags(uint64_t tag_class, Tags *tags, Context *on_finish); + void get_tags(uint64_t start_after_tag_tid, uint64_t tag_class, Tags *tags, + Context *on_finish); void start_replay(ReplayHandler *replay_handler); void start_live_replay(ReplayHandler *replay_handler, double interval); diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index b6738c9ee9940..180ac46be2caf 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -7,7 +7,6 @@ #include "librbd/ExclusiveLock.h" #include "librbd/ImageCtx.h" #include "librbd/journal/Replay.h" -#include "librbd/Utils.h" #include "cls/journal/cls_journal_types.h" #include "journal/Journaler.h" #include "journal/Policy.h" @@ -317,7 +316,6 @@ Journal::~Journal() { assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED); assert(m_journaler == NULL); assert(m_journal_replay == NULL); - assert(m_on_replay_close_request == nullptr); assert(m_wait_for_state_contexts.empty()); } @@ -561,6 +559,12 @@ bool Journal::is_journal_ready() const { template bool Journal::is_journal_replaying() const { Mutex::Locker locker(m_lock); + return is_journal_replaying(m_lock); +} + +template +bool Journal::is_journal_replaying(const Mutex &) const { + assert(m_lock.is_locked()); return (m_state == STATE_REPLAYING || m_state == STATE_FLUSHING_REPLAY || m_state == STATE_FLUSHING_RESTART || @@ -608,6 +612,21 @@ void Journal::close(Context *on_finish) { on_finish = create_async_context_callback(m_image_ctx, on_finish); Mutex::Locker locker(m_lock); + while (m_listener_notify) { + m_listener_cond.Wait(m_lock); + } + + Listeners listeners(m_listeners); + m_listener_notify = true; + m_lock.Unlock(); + for (auto listener : listeners) { + listener->handle_close(); + } + + m_lock.Lock(); + m_listener_notify = false; + m_listener_cond.Signal(); + assert(m_state != STATE_UNINITIALIZED); if (m_state == STATE_CLOSED) { on_finish->complete(m_error_result); @@ -618,28 +637,31 @@ void Journal::close(Context *on_finish) { stop_recording(); } - // interrupt external replay if active - if (m_on_replay_close_request != nullptr) { - m_on_replay_close_request->complete(0); - m_on_replay_close_request = nullptr; - } - m_close_pending = true; wait_for_steady_state(on_finish); } template bool Journal::is_tag_owner() const { + Mutex::Locker locker(m_lock); + return is_tag_owner(m_lock); +} + +template +bool Journal::is_tag_owner(const Mutex &) const { + assert(m_lock.is_locked()); return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID); } template uint64_t Journal::get_tag_tid() const { + Mutex::Locker locker(m_lock); return m_tag_tid; } template journal::TagData Journal::get_tag_data() const { + Mutex::Locker locker(m_lock); return m_tag_data; } @@ -649,7 +671,7 @@ int Journal::demote() { ldout(cct, 20) << __func__ << dendl; Mutex::Locker locker(m_lock); - assert(m_journaler != nullptr && is_tag_owner()); + assert(m_journaler != nullptr && is_tag_owner(m_lock)); cls::journal::Client client; int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client); @@ -725,7 +747,7 @@ void Journal::allocate_local_tag(Context *on_finish) { predecessor.mirror_uuid = LOCAL_MIRROR_UUID; { Mutex::Locker locker(m_lock); - assert(m_journaler != nullptr && is_tag_owner()); + assert(m_journaler != nullptr && is_tag_owner(m_lock)); cls::journal::Client client; int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client); @@ -1051,16 +1073,13 @@ typename Journal::Future Journal::wait_event(Mutex &lock, uint64_t tid, template void Journal::start_external_replay(journal::Replay **journal_replay, - Context *on_start, - Context *on_close_request) { + Context *on_start) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << dendl; Mutex::Locker locker(m_lock); assert(m_state == STATE_READY); assert(m_journal_replay == nullptr); - assert(m_on_replay_close_request == nullptr); - m_on_replay_close_request = on_close_request; on_start = util::create_async_context_callback(m_image_ctx, on_start); on_start = new FunctionContext( @@ -1089,11 +1108,6 @@ void Journal::handle_start_external_replay(int r, << "failed to stop recording: " << cpp_strerror(r) << dendl; *journal_replay = nullptr; - if (m_on_replay_close_request != nullptr) { - m_on_replay_close_request->complete(r); - m_on_replay_close_request = nullptr; - } - // get back to a sane-state start_append(); on_finish->complete(r); @@ -1108,15 +1122,13 @@ void Journal::handle_start_external_replay(int r, template void Journal::stop_external_replay() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << dendl; + Mutex::Locker locker(m_lock); assert(m_journal_replay != nullptr); assert(m_state == STATE_REPLAYING); - if (m_on_replay_close_request != nullptr) { - m_on_replay_close_request->complete(-ECANCELED); - m_on_replay_close_request = nullptr; - } - delete m_journal_replay; m_journal_replay = nullptr; @@ -1168,9 +1180,16 @@ void Journal::destroy_journaler(int r) { m_journaler->remove_listener(&m_metadata_listener); transition_state(STATE_CLOSING, r); - m_journaler->shut_down(create_async_context_callback( + + Context *ctx = create_async_context_callback( m_image_ctx, create_context_callback< - Journal, &Journal::handle_journal_destroyed>(this))); + Journal, &Journal::handle_journal_destroyed>(this)); + ctx = new FunctionContext( + [this, ctx](int r) { + Mutex::Locker locker(m_lock); + m_journaler->shut_down(ctx); + }); + m_async_journal_op_tracker.wait(m_image_ctx, ctx); } template @@ -1685,13 +1704,13 @@ void Journal::wait_for_steady_state(Context *on_state) { } template -int Journal::check_resync_requested(bool *do_resync) { +int Journal::is_resync_requested(bool *do_resync) { Mutex::Locker l(m_lock); - return check_resync_requested_internal(do_resync); + return check_resync_requested(do_resync); } template -int Journal::check_resync_requested_internal(bool *do_resync) { +int Journal::check_resync_requested(bool *do_resync) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << dendl; @@ -1729,56 +1748,135 @@ int Journal::check_resync_requested_internal(bool *do_resync) { return 0; } +struct C_RefreshTags : public Context { + util::AsyncOpTracker &async_op_tracker; + Context *on_finish = nullptr; + + Mutex lock; + uint64_t tag_tid; + journal::TagData tag_data; + + C_RefreshTags(util::AsyncOpTracker &async_op_tracker) + : async_op_tracker(async_op_tracker), + lock("librbd::Journal::C_RefreshTags::lock") { + async_op_tracker.start_op(); + } + virtual ~C_RefreshTags() { + async_op_tracker.finish_op(); + } + + virtual void finish(int r) { + on_finish->complete(r); + } +}; + template void Journal::handle_metadata_updated() { CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << this << " " << __func__ << dendl; + Mutex::Locker locker(m_lock); - std::list resync_private_list; + if (m_state != STATE_READY && !is_journal_replaying(m_lock)) { + return; + } else if (is_tag_owner(m_lock)) { + ldout(cct, 20) << this << " " << __func__ << ": primary image" << dendl; + return; + } else if (m_listeners.empty()) { + ldout(cct, 20) << this << " " << __func__ << ": no listeners" << dendl; + return; + } - { - Mutex::Locker l(m_lock); + uint64_t refresh_sequence = ++m_refresh_sequence; + ldout(cct, 20) << this << " " << __func__ << ": " + << "refresh_sequence=" << refresh_sequence << dendl; + + // pull the most recent tags from the journal, decode, and + // update the internal tag state + C_RefreshTags *refresh_ctx = new C_RefreshTags(m_async_journal_op_tracker); + refresh_ctx->on_finish = new FunctionContext( + [this, refresh_sequence, refresh_ctx](int r) { + handle_refresh_metadata(refresh_sequence, refresh_ctx->tag_tid, + refresh_ctx->tag_data, r); + }); + C_DecodeTags *decode_tags_ctx = new C_DecodeTags( + cct, &refresh_ctx->lock, &refresh_ctx->tag_tid, + &refresh_ctx->tag_data, refresh_ctx); + m_journaler->get_tags(m_tag_tid == 0 ? 0 : m_tag_tid - 1, m_tag_class, + &decode_tags_ctx->tags, decode_tags_ctx); +} - if (m_state == STATE_CLOSING || m_state == STATE_CLOSED || - m_state == STATE_UNINITIALIZED || m_state == STATE_STOPPING) { - return; - } +template +void Journal::handle_refresh_metadata(uint64_t refresh_sequence, + uint64_t tag_tid, + journal::TagData tag_data, int r) { + CephContext *cct = m_image_ctx.cct; + Mutex::Locker locker(m_lock); - bool do_resync = false; - int r = check_resync_requested_internal(&do_resync); - if (r < 0) { - lderr(cct) << this << " " << __func__ << ": " - << "failed to check if a resync was requested" << dendl; - return; - } + if (r < 0) { + lderr(cct) << this << " " << __func__ << ": failed to refresh metadata: " + << cpp_strerror(r) << dendl; + return; + } else if (m_state != STATE_READY && !is_journal_replaying(m_lock)) { + return; + } else if (refresh_sequence != m_refresh_sequence) { + // another, more up-to-date refresh is in-flight + return; + } - if (do_resync) { - for (const auto& listener : - m_listener_map[journal::ListenerType::RESYNC]) { - journal::ResyncListener *rsync_listener = - boost::get(listener); - resync_private_list.push_back(rsync_listener); - } - } + ldout(cct, 20) << this << " " << __func__ << ": " + << "refresh_sequence=" << refresh_sequence << ", " + << "tag_tid=" << tag_tid << ", " + << "tag_data=" << tag_data << dendl; + while (m_listener_notify) { + m_listener_cond.Wait(m_lock); + } + + bool was_tag_owner = is_tag_owner(m_lock); + if (m_tag_tid < tag_tid) { + m_tag_tid = tag_tid; + m_tag_data = tag_data; + } + bool promoted_to_primary = (!was_tag_owner && is_tag_owner(m_lock)); + + bool resync_requested = false; + r = check_resync_requested(&resync_requested); + if (r < 0) { + lderr(cct) << this << " " << __func__ << ": " + << "failed to check if a resync was requested" << dendl; + return; } - for (const auto& listener : resync_private_list) { - listener->handle_resync(); + Listeners listeners(m_listeners); + m_listener_notify = true; + m_lock.Unlock(); + + if (promoted_to_primary) { + for (auto listener : listeners) { + listener->handle_promoted(); + } + } else if (resync_requested) { + for (auto listener : listeners) { + listener->handle_resync(); + } } + + m_lock.Lock(); + m_listener_notify = false; + m_listener_cond.Signal(); } template -void Journal::add_listener(journal::ListenerType type, - journal::JournalListenerPtr listener) { - Mutex::Locker l(m_lock); - m_listener_map[type].push_back(listener); +void Journal::add_listener(journal::Listener *listener) { + Mutex::Locker locker(m_lock); + m_listeners.insert(listener); } template -void Journal::remove_listener(journal::ListenerType type, - journal::JournalListenerPtr listener) { - Mutex::Locker l(m_lock); - m_listener_map[type].remove(listener); +void Journal::remove_listener(journal::Listener *listener) { + Mutex::Locker locker(m_lock); + while (m_listener_notify) { + m_listener_cond.Wait(m_lock); + } + m_listeners.erase(listener); } } // namespace librbd diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h index 3961e48e32613..374c0bedce1ac 100644 --- a/src/librbd/Journal.h +++ b/src/librbd/Journal.h @@ -8,11 +8,13 @@ #include "include/atomic.h" #include "include/Context.h" #include "include/interval_set.h" +#include "common/Cond.h" #include "common/Mutex.h" #include "journal/Future.h" #include "journal/JournalMetadataListener.h" #include "journal/ReplayEntry.h" #include "journal/ReplayHandler.h" +#include "librbd/Utils.h" #include "librbd/journal/Types.h" #include "librbd/journal/TypeTraits.h" #include @@ -156,15 +158,13 @@ class Journal { } void start_external_replay(journal::Replay **journal_replay, - Context *on_start, Context *on_close_request); + Context *on_start); void stop_external_replay(); - void add_listener(journal::ListenerType type, - journal::JournalListenerPtr listener); - void remove_listener(journal::ListenerType type, - journal::JournalListenerPtr listener); + void add_listener(journal::Listener *listener); + void remove_listener(journal::Listener *listener); - int check_resync_requested(bool *do_resync); + int is_resync_requested(bool *do_resync); private: ImageCtxT &m_image_ctx; @@ -296,7 +296,8 @@ class Journal { bool m_blocking_writes; journal::Replay *m_journal_replay; - Context *m_on_replay_close_request = nullptr; + + util::AsyncOpTracker m_async_journal_op_tracker; struct MetadataListener : public ::journal::JournalMetadataListener { Journal *journal; @@ -311,9 +312,15 @@ class Journal { } } m_metadata_listener; - typedef std::map > ListenerMap; - ListenerMap m_listener_map; + typedef std::set Listeners; + Listeners m_listeners; + Cond m_listener_cond; + bool m_listener_notify = false; + + uint64_t m_refresh_sequence = 0; + + bool is_journal_replaying(const Mutex &) const; + bool is_tag_owner(const Mutex &) const; uint64_t append_io_events(journal::EventType event_type, const Bufferlists &bufferlists, @@ -359,9 +366,12 @@ class Journal { bool is_steady_state() const; void wait_for_steady_state(Context *on_state); - int check_resync_requested_internal(bool *do_resync); + int check_resync_requested(bool *do_resync); void handle_metadata_updated(); + void handle_refresh_metadata(uint64_t refresh_sequence, uint64_t tag_tid, + journal::TagData tag_data, int r); + }; } // namespace librbd diff --git a/src/librbd/Utils.h b/src/librbd/Utils.h index 6bd4320073b9f..b098881d6f5a9 100644 --- a/src/librbd/Utils.h +++ b/src/librbd/Utils.h @@ -161,6 +161,42 @@ inline ImageCtx *get_image_ctx(ImageCtx *image_ctx) { return image_ctx; } +/// helper for tracking in-flight async ops when coordinating +/// a shut down of the invoking class instance +class AsyncOpTracker { +public: + AsyncOpTracker() : m_refs(0) { + } + + void start_op() { + m_refs.inc(); + } + + void finish_op() { + if (m_refs.dec() == 0 && m_on_finish != nullptr) { + Context *on_finish = nullptr; + std::swap(on_finish, m_on_finish); + on_finish->complete(0); + } + } + + template + void wait(I &image_ctx, Context *on_finish) { + assert(m_on_finish == nullptr); + + on_finish = create_async_context_callback(image_ctx, on_finish); + if (m_refs.read() == 0) { + on_finish->complete(0); + return; + } + m_on_finish = on_finish; + } + +private: + atomic_t m_refs; + Context *m_on_finish = nullptr; +}; + } // namespace util } // namespace librbd diff --git a/src/librbd/journal/Types.h b/src/librbd/journal/Types.h index dbcf5a5499714..4c1319c13daf3 100644 --- a/src/librbd/journal/Types.h +++ b/src/librbd/journal/Types.h @@ -525,17 +525,19 @@ std::ostream &operator<<(std::ostream &out, const MirrorPeerClientMeta &meta); std::ostream &operator<<(std::ostream &out, const TagPredecessor &predecessor); std::ostream &operator<<(std::ostream &out, const TagData &tag_data); -enum class ListenerType : int8_t { - RESYNC -}; +struct Listener { + virtual ~Listener() { + } -struct ResyncListener { - virtual ~ResyncListener() {} - virtual void handle_resync() = 0; -}; + /// invoked when journal close is requested + virtual void handle_close() = 0; -typedef boost::variant JournalListenerPtr; + /// invoked when journal is promoted to primary + virtual void handle_promoted() = 0; + /// invoked when journal resync is requested + virtual void handle_resync() = 0; +}; } // namespace journal } // namespace librbd diff --git a/src/test/journal/mock/MockJournaler.h b/src/test/journal/mock/MockJournaler.h index 54867666cd90f..814c90ecd9247 100644 --- a/src/test/journal/mock/MockJournaler.h +++ b/src/test/journal/mock/MockJournaler.h @@ -110,6 +110,8 @@ struct MockJournaler { MOCK_METHOD3(get_tag, void(uint64_t, cls::journal::Tag *, Context *)); MOCK_METHOD3(get_tags, void(uint64_t, journal::Journaler::Tags*, Context*)); + MOCK_METHOD4(get_tags, void(uint64_t, uint64_t, journal::Journaler::Tags*, + Context*)); MOCK_METHOD1(start_replay, void(::journal::ReplayHandler *replay_handler)); MOCK_METHOD2(start_live_replay, void(ReplayHandler *, double)); @@ -220,6 +222,11 @@ struct MockJournalerProxy { Context *on_finish) { MockJournaler::get_instance().get_tags(tag_class, tags, on_finish); } + void get_tags(uint64_t start_after_tag_tid, uint64_t tag_class, + journal::Journaler::Tags *tags, Context *on_finish) { + MockJournaler::get_instance().get_tags(start_after_tag_tid, tag_class, tags, + on_finish); + } void start_replay(::journal::ReplayHandler *replay_handler) { MockJournaler::get_instance().start_replay(replay_handler); diff --git a/src/test/librbd/mock/MockJournal.h b/src/test/librbd/mock/MockJournal.h index 0cd86393c818d..56b32804c41b4 100644 --- a/src/test/librbd/mock/MockJournal.h +++ b/src/test/librbd/mock/MockJournal.h @@ -81,12 +81,10 @@ struct MockJournal { MOCK_METHOD3(commit_op_event, void(uint64_t, int, Context *)); MOCK_METHOD2(replay_op_ready, void(uint64_t, Context *)); - MOCK_METHOD2(add_listener, void(journal::ListenerType, - journal::JournalListenerPtr)); - MOCK_METHOD2(remove_listener, void(journal::ListenerType, - journal::JournalListenerPtr)); + MOCK_METHOD1(add_listener, void(journal::Listener *)); + MOCK_METHOD1(remove_listener, void(journal::Listener *)); - MOCK_METHOD1(check_resync_requested, int(bool *)); + MOCK_METHOD1(is_resync_requested, int(bool *)); }; } // namespace librbd diff --git a/src/test/librbd/test_mock_Journal.cc b/src/test/librbd/test_mock_Journal.cc index 7c69fe8bcfdba..5ea25c8a6e02a 100644 --- a/src/test/librbd/test_mock_Journal.cc +++ b/src/test/librbd/test_mock_Journal.cc @@ -236,12 +236,16 @@ class TestMockJournal : public TestMockFixture { } void expect_get_journaler_cached_client(::journal::MockJournaler &mock_journaler, int r) { - journal::ImageClientMeta image_client_meta; image_client_meta.tag_class = 0; + expect_get_journaler_cached_client(mock_journaler, image_client_meta, r); + } + void expect_get_journaler_cached_client(::journal::MockJournaler &mock_journaler, + const journal::ImageClientMeta &client_meta, + int r) { journal::ClientData client_data; - client_data.client_meta = image_client_meta; + client_data.client_meta = client_meta; cls::journal::Client client; ::encode(client_data, client.data); @@ -253,8 +257,11 @@ class TestMockJournal : public TestMockFixture { void expect_get_journaler_tags(MockImageCtx &mock_image_ctx, ::journal::MockJournaler &mock_journaler, - int r) { + bool primary, int r) { journal::TagData tag_data; + if (!primary) { + tag_data.mirror_uuid = "remote mirror uuid"; + } bufferlist tag_data_bl; ::encode(tag_data, tag_data_bl); @@ -263,7 +270,17 @@ class TestMockJournal : public TestMockFixture { EXPECT_CALL(mock_journaler, get_tags(0, _, _)) .WillOnce(DoAll(SetArgPointee<1>(tags), WithArg<2>(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue)))); - EXPECT_CALL(mock_journaler, add_listener(_)); + EXPECT_CALL(mock_journaler, add_listener(_)) + .WillOnce(SaveArg<0>(&m_listener)); + } + + void expect_get_journaler_tags(MockImageCtx &mock_image_ctx, + ::journal::MockJournaler &mock_journaler, + uint64_t start_after_tag_tid, + ::journal::Journaler::Tags &&tags, int r) { + EXPECT_CALL(mock_journaler, get_tags(start_after_tag_tid, 0, _, _)) + .WillOnce(DoAll(SetArgPointee<2>(tags), + WithArg<3>(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue)))); } void expect_start_replay(MockJournalImageCtx &mock_image_ctx, @@ -416,7 +433,8 @@ class TestMockJournal : public TestMockFixture { void open_journal(MockJournalImageCtx &mock_image_ctx, MockJournal &mock_journal, - ::journal::MockJournaler &mock_journaler) { + ::journal::MockJournaler &mock_journaler, + bool primary = true) { expect_op_work_queue(mock_image_ctx); InSequence seq; @@ -424,7 +442,7 @@ class TestMockJournal : public TestMockFixture { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, primary, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -451,6 +469,8 @@ class TestMockJournal : public TestMockFixture { static void invoke_replay_complete(::journal::ReplayHandler *handler, int r) { handler->handle_complete(r); } + + ::journal::JournalMetadataListener *m_listener = nullptr; }; TEST_F(TestMockJournal, StateTransitions) { @@ -470,7 +490,7 @@ TEST_F(TestMockJournal, StateTransitions) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_ready, _1), @@ -559,7 +579,7 @@ TEST_F(TestMockJournal, GetTagsError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, -EBADMSG); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, -EBADMSG); expect_shut_down_journaler(mock_journaler); ASSERT_EQ(-EBADMSG, when_open(mock_journal)); } @@ -581,7 +601,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, -EINVAL) @@ -597,7 +617,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -630,7 +650,7 @@ TEST_F(TestMockJournal, FlushReplayError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_ready, _1), @@ -651,7 +671,7 @@ TEST_F(TestMockJournal, FlushReplayError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -684,7 +704,7 @@ TEST_F(TestMockJournal, CorruptEntry) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_ready, _1), @@ -704,7 +724,7 @@ TEST_F(TestMockJournal, CorruptEntry) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -736,7 +756,7 @@ TEST_F(TestMockJournal, StopError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -769,7 +789,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); ::journal::ReplayHandler *replay_handler = nullptr; expect_start_replay( @@ -799,7 +819,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -853,7 +873,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_ready, _1), @@ -878,7 +898,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -1149,15 +1169,12 @@ TEST_F(TestMockJournal, ExternalReplay) { expect_shut_down_journaler(mock_journaler); C_SaferCond start_ctx; - C_SaferCond close_request_ctx; journal::Replay *journal_replay = nullptr; - mock_journal.start_external_replay(&journal_replay, &start_ctx, - &close_request_ctx); + mock_journal.start_external_replay(&journal_replay, &start_ctx); ASSERT_EQ(0, start_ctx.wait()); mock_journal.stop_external_replay(); - ASSERT_EQ(-ECANCELED, close_request_ctx.wait()); } TEST_F(TestMockJournal, ExternalReplayFailure) { @@ -1180,16 +1197,13 @@ TEST_F(TestMockJournal, ExternalReplayFailure) { expect_shut_down_journaler(mock_journaler); C_SaferCond start_ctx; - C_SaferCond close_request_ctx; journal::Replay *journal_replay = nullptr; - mock_journal.start_external_replay(&journal_replay, &start_ctx, - &close_request_ctx); + mock_journal.start_external_replay(&journal_replay, &start_ctx); ASSERT_EQ(-EINVAL, start_ctx.wait()); - ASSERT_EQ(-EINVAL, close_request_ctx.wait()); } -TEST_F(TestMockJournal, ExternalReplayCloseRequest) { +TEST_F(TestMockJournal, AppendDisabled) { REQUIRE_FEATURE(RBD_FEATURE_JOURNALING); librbd::ImageCtx *ictx; @@ -1197,31 +1211,61 @@ TEST_F(TestMockJournal, ExternalReplayCloseRequest) { MockJournalImageCtx mock_image_ctx(*ictx); MockJournal mock_journal(mock_image_ctx); + MockJournalPolicy mock_journal_policy; + ::journal::MockJournaler mock_journaler; open_journal(mock_image_ctx, mock_journal, mock_journaler); + BOOST_SCOPE_EXIT_ALL(&) { + close_journal(mock_journal, mock_journaler); + }; InSequence seq; - expect_stop_append(mock_journaler, 0); + RWLock::RLocker snap_locker(mock_image_ctx.snap_lock); + EXPECT_CALL(mock_image_ctx, get_journal_policy()).WillOnce( + Return(ictx->get_journal_policy())); + ASSERT_TRUE(mock_journal.is_journal_appending()); + + EXPECT_CALL(mock_image_ctx, get_journal_policy()).WillOnce( + Return(&mock_journal_policy)); + EXPECT_CALL(mock_journal_policy, append_disabled()).WillOnce(Return(true)); + ASSERT_FALSE(mock_journal.is_journal_appending()); + expect_shut_down_journaler(mock_journaler); +} - C_SaferCond start_ctx; - C_SaferCond close_request_ctx; +TEST_F(TestMockJournal, CloseListenerEvent) { + REQUIRE_FEATURE(RBD_FEATURE_JOURNALING); - journal::Replay *journal_replay = nullptr; - mock_journal.start_external_replay(&journal_replay, &start_ctx, - &close_request_ctx); - ASSERT_EQ(0, start_ctx.wait()); + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); + + MockJournalImageCtx mock_image_ctx(*ictx); + MockJournal mock_journal(mock_image_ctx); + ::journal::MockJournaler mock_journaler; + open_journal(mock_image_ctx, mock_journal, mock_journaler); - C_SaferCond close_ctx; - mock_journal.close(&close_ctx); + struct Listener : public journal::Listener { + C_SaferCond ctx; + virtual void handle_close() { + ctx.complete(0); + } + virtual void handle_resync() { + ADD_FAILURE() << "unexpected resync request"; + } + virtual void handle_promoted() { + ADD_FAILURE() << "unexpected promotion event"; + } + } listener; + mock_journal.add_listener(&listener); - ASSERT_EQ(0, close_request_ctx.wait()); - mock_journal.stop_external_replay(); + expect_shut_down_journaler(mock_journaler); + close_journal(mock_journal, mock_journaler); - ASSERT_EQ(0, close_ctx.wait()); + ASSERT_EQ(0, listener.ctx.wait()); + mock_journal.remove_listener(&listener); } -TEST_F(TestMockJournal, AppendDisabled) { +TEST_F(TestMockJournal, ResyncRequested) { REQUIRE_FEATURE(RBD_FEATURE_JOURNALING); librbd::ImageCtx *ictx; @@ -1229,26 +1273,95 @@ TEST_F(TestMockJournal, AppendDisabled) { MockJournalImageCtx mock_image_ctx(*ictx); MockJournal mock_journal(mock_image_ctx); - MockJournalPolicy mock_journal_policy; + ::journal::MockJournaler mock_journaler; + open_journal(mock_image_ctx, mock_journal, mock_journaler, false); + + struct Listener : public journal::Listener { + C_SaferCond ctx; + virtual void handle_close() { + ADD_FAILURE() << "unexpected close action"; + } + virtual void handle_resync() { + ctx.complete(0); + } + virtual void handle_promoted() { + ADD_FAILURE() << "unexpected promotion event"; + } + } listener; + mock_journal.add_listener(&listener); + + BOOST_SCOPE_EXIT_ALL(&) { + mock_journal.remove_listener(&listener); + close_journal(mock_journal, mock_journaler); + }; + InSequence seq; + + journal::TagData tag_data; + tag_data.mirror_uuid == Journal<>::LOCAL_MIRROR_UUID; + + bufferlist tag_data_bl; + ::encode(tag_data, tag_data_bl); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0, + {{0, 0, tag_data_bl}}, 0); + + journal::ImageClientMeta image_client_meta; + image_client_meta.tag_class = 0; + image_client_meta.resync_requested = true; + expect_get_journaler_cached_client(mock_journaler, image_client_meta, 0); + expect_shut_down_journaler(mock_journaler); + + m_listener->handle_update(nullptr); + ASSERT_EQ(0, listener.ctx.wait()); +} + +TEST_F(TestMockJournal, ForcePromoted) { + REQUIRE_FEATURE(RBD_FEATURE_JOURNALING); + + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); + + MockJournalImageCtx mock_image_ctx(*ictx); + MockJournal mock_journal(mock_image_ctx); ::journal::MockJournaler mock_journaler; - open_journal(mock_image_ctx, mock_journal, mock_journaler); + open_journal(mock_image_ctx, mock_journal, mock_journaler, false); + + struct Listener : public journal::Listener { + C_SaferCond ctx; + virtual void handle_close() { + ADD_FAILURE() << "unexpected close action"; + } + virtual void handle_resync() { + ADD_FAILURE() << "unexpected resync event"; + } + virtual void handle_promoted() { + ctx.complete(0); + } + } listener; + mock_journal.add_listener(&listener); + BOOST_SCOPE_EXIT_ALL(&) { + mock_journal.remove_listener(&listener); close_journal(mock_journal, mock_journaler); }; InSequence seq; - RWLock::RLocker snap_locker(mock_image_ctx.snap_lock); - EXPECT_CALL(mock_image_ctx, get_journal_policy()).WillOnce( - Return(ictx->get_journal_policy())); - ASSERT_TRUE(mock_journal.is_journal_appending()); - EXPECT_CALL(mock_image_ctx, get_journal_policy()).WillOnce( - Return(&mock_journal_policy)); - EXPECT_CALL(mock_journal_policy, append_disabled()).WillOnce(Return(true)); - ASSERT_FALSE(mock_journal.is_journal_appending()); + journal::TagData tag_data; + tag_data.mirror_uuid == Journal<>::LOCAL_MIRROR_UUID; + bufferlist tag_data_bl; + ::encode(tag_data, tag_data_bl); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0, + {{100, 0, tag_data_bl}}, 0); + + journal::ImageClientMeta image_client_meta; + image_client_meta.tag_class = 0; + expect_get_journaler_cached_client(mock_journaler, image_client_meta, 0); expect_shut_down_journaler(mock_journaler); + + m_listener->handle_update(nullptr); + ASSERT_EQ(0, listener.ctx.wait()); } } // namespace librbd diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 050764a70786b..8ffd31bde6842 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -225,19 +225,6 @@ class ImageReplayerAdminSocketHook : public AdminSocketHook { Commands commands; }; -template -struct ResyncListener : public librbd::journal::ResyncListener { - ImageReplayer *img_replayer; - - ResyncListener(ImageReplayer *img_replayer) - : img_replayer(img_replayer) { - } - - virtual void handle_resync() { - img_replayer->resync_image(); - } -}; - } // anonymous namespace template @@ -286,7 +273,7 @@ ImageReplayer::ImageReplayer(Threads *threads, m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " + remote_image_id), m_progress_cxt(this), - m_resync_listener(new ResyncListener(this)), + m_journal_listener(new JournalListener(this)), m_remote_listener(this) { // Register asok commands using a temporary "remote_pool_name/global_image_id" @@ -320,7 +307,7 @@ ImageReplayer::~ImageReplayer() assert(m_bootstrap_request == nullptr); assert(m_in_flight_status_updates == 0); - delete m_resync_listener; + delete m_journal_listener; delete m_asok_hook; } @@ -428,7 +415,6 @@ void ImageReplayer::bootstrap() { template void ImageReplayer::handle_bootstrap(int r) { dout(20) << "r=" << r << dendl; - { Mutex::Locker locker(m_lock); m_bootstrap_request->put(); @@ -450,15 +436,25 @@ void ImageReplayer::handle_bootstrap(int r) { return; } + + assert(m_local_journal == nullptr); { - Mutex::Locker locker(m_lock); + RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock); + if (m_local_image_ctx->journal != nullptr) { + m_local_journal = m_local_image_ctx->journal; + m_local_journal->add_listener(m_journal_listener); + } + } - m_local_image_ctx->journal->add_listener( - librbd::journal::ListenerType::RESYNC, - m_resync_listener); + if (m_local_journal == nullptr) { + on_start_fail(-EINVAL, "error accessing local journal"); + return; + } + { + Mutex::Locker locker(m_lock); bool do_resync = false; - r = m_local_image_ctx->journal->check_resync_requested(&do_resync); + r = m_local_image_ctx->journal->is_resync_requested(&do_resync); if (r < 0) { derr << "failed to check if a resync was requested" << dendl; } @@ -479,7 +475,7 @@ void ImageReplayer::handle_bootstrap(int r) { } std::string name = m_local_ioctx.get_pool_name() + "/" + - m_local_image_ctx->name; + m_local_image_ctx->name; if (m_name != name) { m_name = name; if (m_asok_hook) { @@ -547,23 +543,9 @@ template void ImageReplayer::start_replay() { dout(20) << dendl; - assert(m_local_journal == nullptr); - { - RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock); - if (m_local_image_ctx->journal != nullptr) { - m_local_journal = m_local_image_ctx->journal; - - Context *start_ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_start_replay>(this); - Context *stop_ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_stop_replay_request>(this); - m_local_journal->start_external_replay(&m_local_replay, start_ctx, - stop_ctx); - return; - } - } - - on_start_fail(-EINVAL, "error starting journal replay"); + Context *start_ctx = create_context_callback< + ImageReplayer, &ImageReplayer::handle_start_replay>(this); + m_local_journal->start_external_replay(&m_local_replay, start_ctx); } template @@ -571,7 +553,7 @@ void ImageReplayer::handle_start_replay(int r) { dout(20) << "r=" << r << dendl; if (r < 0) { - m_local_journal = nullptr; + assert(m_local_replay == nullptr); derr << "error starting external replay on local image " << m_local_image_id << ": " << cpp_strerror(r) << dendl; on_start_fail(r, "error starting replay on local image"); @@ -618,19 +600,6 @@ void ImageReplayer::handle_start_replay(int r) { } -template -void ImageReplayer::handle_stop_replay_request(int r) { - if (r < 0) { - // error starting or we requested the stop -- ignore - return; - } - - // journal close has been requested, stop replay so the journal - // can be closed (since it will wait on replay to finish) - dout(20) << dendl; - on_stop_journal_replay(); -} - template void ImageReplayer::on_start_fail(int r, const std::string &desc) { @@ -931,9 +900,7 @@ void ImageReplayer::replay_flush() { return; } - Context *stop_ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_stop_replay_request>(this); - m_local_journal->start_external_replay(&m_local_replay, ctx, stop_ctx); + m_local_journal->start_external_replay(&m_local_replay, ctx); }); m_local_replay->shut_down(false, ctx); } @@ -1385,25 +1352,35 @@ void ImageReplayer::shut_down(int r) { }); } } - if (m_local_replay != nullptr) { + if (m_local_journal != nullptr) { + ctx = new FunctionContext([this, ctx](int r) { + m_local_journal = nullptr; + ctx->complete(0); + }); + if (m_local_replay != nullptr) { + ctx = new FunctionContext([this, ctx](int r) { + m_local_journal->stop_external_replay(); + m_local_replay = nullptr; + + delete m_event_preprocessor; + m_event_preprocessor = nullptr; + ctx->complete(0); + }); + } ctx = new FunctionContext([this, ctx](int r) { if (r < 0) { derr << "error flushing journal replay: " << cpp_strerror(r) << dendl; } - m_local_journal->stop_external_replay(); - m_local_journal = nullptr; - m_local_replay = nullptr; - - delete m_event_preprocessor; - m_event_preprocessor = nullptr; + // blocks if listener notification is in-progress + m_local_journal->remove_listener(m_journal_listener); ctx->complete(0); }); - ctx = new FunctionContext([this, ctx](int r) { - m_local_journal->remove_listener( - librbd::journal::ListenerType::RESYNC, m_resync_listener); - m_local_replay->shut_down(true, ctx); - }); + if (m_local_replay != nullptr) { + ctx = new FunctionContext([this, ctx](int r) { + m_local_replay->shut_down(true, ctx); + }); + } } if (m_replay_handler != nullptr) { ctx = new FunctionContext([this, ctx](int r) { diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index ba81deaa1ee58..3765dc0867064 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -205,6 +205,26 @@ class ImageReplayer { typedef typename librbd::journal::TypeTraits::Journaler Journaler; typedef boost::optional OptionalState; + struct JournalListener : public librbd::journal::Listener { + ImageReplayer *img_replayer; + + JournalListener(ImageReplayer *img_replayer) + : img_replayer(img_replayer) { + } + + virtual void handle_close() { + img_replayer->on_stop_journal_replay(); + } + + virtual void handle_promoted() { + img_replayer->on_stop_journal_replay(0, "force promoted"); + } + + virtual void handle_resync() { + img_replayer->resync_image(); + } + }; + class BootstrapProgressContext : public ProgressContext { public: BootstrapProgressContext(ImageReplayer *replayer) : @@ -242,7 +262,7 @@ class ImageReplayer { librbd::journal::Replay *m_local_replay = nullptr; Journaler* m_remote_journaler = nullptr; ::journal::ReplayHandler *m_replay_handler = nullptr; - librbd::journal::ResyncListener *m_resync_listener; + librbd::journal::Listener *m_journal_listener; bool m_stopping_for_resync = false; Context *m_on_start_finish = nullptr; @@ -327,7 +347,6 @@ class ImageReplayer { void start_replay(); void handle_start_replay(int r); - void handle_stop_replay_request(int r); void replay_flush(); void handle_replay_flush(int r);